import { ParsedServerPubsubMessage, RecordUpdateMessage, serverPubSubMessageD } from "libs/PubSubTypes";
import { Logger } from "libs/logger";
import { getAbortSignalPromise } from "libs/promise-utils";
import { areDecoderErrors } from "ts-decoders";
import { BehaviorSubject, Subject, distinctUntilChanged, map, combineLatest } from "rxjs";
import { PointerWithRecord } from "libs/schema";
import { Simplify } from "type-fest";
import { AbortError } from "libs/errors";
import { config } from "./config";
import type { AuthServiceApi } from "./auth.service";
import { Socket, io } from "socket.io-client";
import { isEqual } from "libs/predicates";
import { debounce, uniqWith } from "lodash-comms";

export type PubsubClientApi = Simplify<WebsocketPubsubClient>;

export class WebsocketPubsubClient {
  private static broadcastChannel = new BroadcastChannel("pubsub");

  logger: Logger;

  /** Use NetworkService#mode$ instead */
  _mode$ = new BehaviorSubject<"NORMAL" | "FORCE_OFFLINE">("NORMAL");

  /**
   * The Status of the Websocket connection.
   *
   * Possible values:
   * - IDLE: No connection attempt has been made or the socket was manually disconnected.
   * - CONNECTING: The socket is attempting to connect to the server.
   * - CONNECTED: The socket is connected to the server.
   * - ERROR: The socket was connected but now the connection is an error state. Reconnection attempts will be made automatically.
   *
   * If the socket is trying to reconnect after an error, the status will remain in ERROR (i.e. not transition to CONNECTING) until
   * the connection is successful. At that point, the status will change to CONNECTED.
   */
  connectionStatus$ = new BehaviorSubject<"IDLE" | "CONNECTING" | "CONNECTED" | "ERROR">("IDLE");

  private _messages$ = new Subject<ParsedServerPubsubMessage[]>();
  /**
   * An observable which emits whenever the pubsub connection receives a new batch of messages.
   * Note that this observable will never error or complete.
   */
  messages$ = this._messages$.asObservable();

  private socket?: Socket;
  private onStartAbortController?: AbortController;
  private unprocessedPubSubMessages: ParsedServerPubsubMessage[] = [];

  constructor(
    private props: {
      logger: Logger;
      auth: AuthServiceApi;
      onMessage: (messages: ParsedServerPubsubMessage[]) => void;
      onStart: (props: { abortSignal: Promise<never> }) => void;
      /** Called in response to an optimistic update from another tab */
      onLocalMessage: (props: { messages: RecordUpdateMessage[]; forceUpdate: boolean }) => void;
    },
  ) {
    this.logger = props.logger.child({ name: "WebsocketPubsubClient" });

    const shouldConnect$ = this._mode$.pipe(
      map((mode) => mode !== "FORCE_OFFLINE"),
      distinctUntilChanged(),
    );

    combineLatest([this.props.auth.currentUserId$, shouldConnect$])
      .pipe(distinctUntilChanged(isEqual))
      .subscribe(([userId, shouldConnect]) => {
        if (userId && shouldConnect) {
          this.connect();
        } else {
          this.disconnect();
        }
      });

    // For broadcasting local, optimistic updates to other tabs.
    WebsocketPubsubClient.broadcastChannel.onmessage = (event) => {
      const { messages, forceUpdate } = event.data as {
        messages: RecordUpdateMessage[];
        forceUpdate: boolean;
      };

      this.props.onLocalMessage({ messages, forceUpdate });
    };

    this.messages$.subscribe(this.props.onMessage);
  }

  subscribe(keyOrKeys: string | string[]) {
    this.socket?.emit("subscribe", keyOrKeys);
  }

  unsubscribe(key: string) {
    this.socket?.emit("unsubscribe", key);
  }

  emitLocalMessage(props: { changes: PointerWithRecord[]; forceUpdate?: boolean }) {
    const { changes, forceUpdate = false } = props;

    if (changes.length === 0) return;

    WebsocketPubsubClient.broadcastChannel.postMessage({
      messages: changes.map(({ table, id, record }): ParsedServerPubsubMessage => {
        return {
          type: "RECORD_UPDATE",
          table,
          id,
          version: record.version,
        };
      }),
      forceUpdate,
    });
  }

  private connect() {
    this.disconnect();

    const newSocket = io(config.apiServerOrigin, {
      path: "/api/connectToPubSub",
      autoConnect: false,
      reconnection: true,
      reconnectionAttempts: Infinity,
      reconnectionDelay: 1_000,
      reconnectionDelayMax: 20_000,
      randomizationFactor: 0.5,
      // Safari of 2025 still opts to use long-polling via XHR requests even though Websocket APIs are ubiquitous.
      // So we explicitly disable that transport that method https://socket.io/docs/v4/how-it-works/#http-long-polling
      transports: ["websocket"],
      withCredentials: true,
    });

    this.socket = newSocket;
    this.setupSocketEventListeners(newSocket);

    this.onStartAbortController = new AbortController();

    this.connectionStatus$.next("CONNECTING");

    newSocket.connect();
  }

  private setupSocketConnectionEventListeners(socket: Socket) {
    // This event is emitted by the Socket instance upon connection and reconnection
    socket.on("connect", () => {
      this.connectionStatus$.next("CONNECTED");

      // Either a completely new connection or a reconnect that wasn't able to restore the session/missed events
      // https://socket.io/docs/v4/connection-state-recovery
      if (!socket.recovered) {
        if (this.onStartAbortController) {
          this.props.onStart({
            abortSignal: getAbortSignalPromise(this.onStartAbortController.signal),
          });
        }
      }
    });

    socket.on("disconnect", (reason, description) => {
      this.logger.info({ reason, description }, "Disconnected!");

      this.onStartAbortController?.abort(new AbortError("Connection closed"));
    });
  }

  private setupSocketErrorEventListeners(socket: Socket) {
    socket.on("connect_error", (error) => {
      this.logger.error(error, `Socket connection error: ${error.message}`);
    });

    socket.io.on("error", (error) => {
      this.logger.error(error, `Socket manager connection error: ${error.message}`);
    });

    socket.io.on("reconnect_error", (error) => {
      this.logger.error(error, `Socket manager reconnect error: ${error.message}`);
      this.connectionStatus$.next("ERROR");
    });

    socket.io.on("reconnect_failed", () => {
      this.logger.error(`Socket manager unable to reconnect`);
    });
  }

  private setupSocketEventListeners(socket: Socket) {
    this.setupSocketConnectionEventListeners(socket);
    this.setupSocketErrorEventListeners(socket);

    socket.on("update", (...args) => {
      // All the record updates are in the first element of the array
      if (args.length !== 1) {
        this.logger.error(
          { args },
          `The update event payload should only have a single item (unless we changed that behaviour), got ${args.length}.`,
        );
      }

      this.processWebSocketUpdateEvent(args[0]);
    });
  }

  private disconnect() {
    this.connectionStatus$.next("IDLE");

    if (!this.socket) return;

    this.socket.disconnect();
  }

  private processWebSocketUpdateEvent(data: string) {
    let rawMessages: any[];

    try {
      rawMessages = JSON.parse(data);
    } catch (e) {
      this.logger.error({ data }, "Unable to parse update event data");
      return;
    }

    const messages = rawMessages.reduce((store: ParsedServerPubsubMessage[], message: any) => {
      const decoded = serverPubSubMessageD.decode(message);

      if (areDecoderErrors(decoded)) {
        this.logger.error({ decoded }, "Unknown update event message");
        return store;
      }

      store.push(decoded.value);
      return store;
    }, []);

    this.unprocessedPubSubMessages.push(...messages);
    this.processPubSubMessages();
  }

  /*
  We use a debounce because one change on the server almost always results
  in several related updates that are sent over sockets in quick succession
   */
  private processPubSubMessages = debounce(
    () => {
      const uniqueMessages = uniqWith(this.unprocessedPubSubMessages, isEqual);
      this.unprocessedPubSubMessages = [];

      this.logger.debug(uniqueMessages, `Processing ${uniqueMessages.length} messages`);
      this._messages$.next(uniqueMessages);
    },
    500,
    { maxWait: 1000 },
  );
}
