import { ClientPubsubMessage, ParsedServerPubsubMessage, serverPubSubMessageD } from "libs/PubSubTypes";
import { Logger } from "libs/logger";
import { DeferredPromise, getAbortSignalPromise, promiseRetryOnError, wait } from "libs/promise-utils";
import { areDecoderErrors } from "ts-decoders";
import { BehaviorSubject, Subject, distinctUntilChanged, filter, firstValueFrom, map } from "rxjs";
import { PointerWithRecord } from "libs/schema";
import { Simplify } from "type-fest";
import { AbortError } from "libs/errors";
import { isNonNullable } from "libs/predicates";
import { config } from "./config";
import type { AuthService } from "./user.service";

export type PubsubClientApi = Simplify<WebsocketPubsubClient>;

export class WebsocketPubsubClient {
  private static broadcastChannel = new BroadcastChannel("pubsub");
  private ws: WebSocket | undefined;
  private reconnectAttempt = 1;

  logger: Logger;
  /** Use NetworkService#mode$ instead */
  _mode$ = new BehaviorSubject<"NORMAL" | "FORCE_OFFLINE">("NORMAL");
  /**
   * Resolves after the client has completed the first websocket connection attempt,
   * regardless of whether the connection was successful or not.
   */
  initialConnectionAttempt = new DeferredPromise<void>();
  isConnected$ = new BehaviorSubject(false);

  private _messages$ = new Subject<ParsedServerPubsubMessage[]>();
  /**
   * An observable which emits whenever the pubsub connection receives a new batch of messages.
   * Note that this observable will never error or complete.
   */
  messages$ = this._messages$.asObservable();

  private loggedIn: boolean;
  private loginMessageBuffer: ClientPubsubMessage[] = [];

  constructor(
    private props: {
      logger: Logger;
      auth: AuthService;
      onMessage: (messages: ParsedServerPubsubMessage[]) => void;
      onStart: (props: { abortSignal: Promise<never> }) => void;
      /** Called in response to an optimistic update from another tab */
      onLocalMessage: (messages: ParsedServerPubsubMessage[]) => void;
    },
  ) {
    this.logger = props.logger.child({ name: "WebsocketPubsubClient" });
    this.loggedIn = !!this.props.auth.getCurrentUserId();

    const shouldConnect$ = this._mode$.pipe(
      map((mode) => mode !== "FORCE_OFFLINE"),
      distinctUntilChanged(),
    );

    shouldConnect$.subscribe((shouldConnect) => {
      if (shouldConnect) {
        this.connect();
      } else {
        this.disconnect();
      }
    });

    // For broadcasting local, optimistic updates to other tabs.
    WebsocketPubsubClient.broadcastChannel.onmessage = (event) => {
      const { messages } = event.data as {
        messages: ParsedServerPubsubMessage[];
      };

      this.props.onLocalMessage(messages);
    };

    this.messages$.subscribe(this.props.onMessage);
  }

  private connect() {
    if (this.ws) return;

    const noConnect = this._mode$.getValue() === "FORCE_OFFLINE";

    if (noConnect) {
      this.reconnectAttempt = 1;
      return;
    }

    const url = new URL(config.dev.apiServerOrigin || location.href);

    url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
    url.pathname = "/api/connectToPubSub";

    this.logger.debug({ url: url.toString() }, "[websocket] connecting...");

    // Note that vite will not proxy this to the api server so we need
    // the URL to point directly at the API server.
    this.ws = new WebSocket(url);

    /** Allows us to abort the onStart promise if desired */
    const onStartAbortController = new AbortController();

    this.ws.onopen = () => {
      this.logger.debug("[websocket] connected!");
      this.isConnected$.next(true);
      this.reconnectAttempt = 1;
      this.props.onStart({
        abortSignal: getAbortSignalPromise(onStartAbortController.signal),
      });

      this.initialConnectionAttempt.resolve();
    };

    this.ws.onmessage = (event) => {
      const rawMessages = JSON.parse(event.data);

      if (rawMessages === "KEEP_ALIVE") {
        return;
      } else if (!Array.isArray(rawMessages)) {
        this.logger.error({ rawMessages }, "[websocket] unknown message");
        return;
      }

      const messages = rawMessages.reduce((store: ParsedServerPubsubMessage[], message) => {
        const decoded = serverPubSubMessageD.decode(message);

        if (areDecoderErrors(decoded)) {
          this.logger.error({ decoded }, "[websocket] unknown message");
          return store;
        }

        store.push(decoded.value);
        return store;
      }, []);

      this._messages$.next(messages);
    };

    this.ws.onerror = (error) => {
      // In the event of an error, a close event will be emitted after
      // error event. Note that websocket "errors" are not error objects but
      // rather events which contain *very little* (no?) useful information.
      this.logger.error({ error }, "[websocket] unknown error");
    };

    this.ws.onclose = () => {
      this.logger.debug("[websocket] closed");
      this.isConnected$.next(false);
      onStartAbortController.abort(new AbortError("Connection closed"));
      this.ws = undefined;
      this.initialConnectionAttempt.resolve();
      this.attemptReconnect();
    };
  }

  private disconnect() {
    if (!this.ws) return;
    this.ws.close();
    this.ws = undefined;
  }

  private async attemptReconnect() {
    const waiting = 2 ** this.reconnectAttempt * 1000;

    // We cap the max amount of time between reconnection attempts
    if (this.reconnectAttempt < 6) {
      this.reconnectAttempt += 1;
    }

    this.logger.debug({ waiting }, `[websocket] attempting reconnect...`);

    await wait(waiting);

    this.connect();
  }

  private send(message: ClientPubsubMessage, options?: { skipBuffer?: boolean }) {
    if (this.ws?.readyState !== WebSocket.OPEN) {
      this.logger.debug({ message, options }, "[websocket] Send request ignored. Websocket not connected.");

      return;
    }

    if (this.loggedIn || options?.skipBuffer) {
      this.ws.send(JSON.stringify(message));
    } else {
      this.loginMessageBuffer.push(message);
    }
  }

  subscribe(key: string) {
    this.send({ type: "SUBSCRIBE", key });
  }

  unsubscribe(key: string) {
    this.send({ type: "UNSUBSCRIBE", key });
  }

  /** Inform the PubSub server that the client has successfully logged in. */
  async login(token: string) {
    try {
      await promiseRetryOnError(
        {
          waitMs: 100,
          attempts: 3,
          errorFilter: (error) => !(error instanceof AbortError),
        },
        async () => {
          if (!this.isConnected()) return;

          const confirmationPromise = firstValueFrom(
            this.messages$.pipe(
              map((messages) => messages.find((m) => m.type === "LOGIN_SUCCESS")),
              filter(isNonNullable),
            ),
          );

          const timeoutPromise = wait(4000).then(() => {
            throw new Error("timeout");
          });

          const disconnectPromise = firstValueFrom(this.isConnected$.pipe(filter((isConn) => !isConn))).then(() => {
            throw new AbortError("disconnected");
          });

          this.send({ type: "LOGIN", token }, { skipBuffer: true });

          await Promise.race([confirmationPromise, timeoutPromise, disconnectPromise]);
        },
      );
    } catch (error) {
      this.logger.error({ error }, "[websocket] login failed");
      this.disconnect();
      return;
    }

    if (!this.isConnected()) return;

    this.loggedIn = true;
    this.loginMessageBuffer.forEach((message) => this.send(message));
    this.loginMessageBuffer = [];
  }

  isConnected() {
    return this.isConnected$.getValue();
  }

  emitLocalMessage(pointersWithRecord: PointerWithRecord[]) {
    if (pointersWithRecord.length === 0) return;

    WebsocketPubsubClient.broadcastChannel.postMessage({
      messages: pointersWithRecord.map(({ table, id, record }): ParsedServerPubsubMessage => {
        return {
          type: "RECORD_UPDATE",
          table,
          id,
          version: record.version,
        };
      }),
    });
  }
}
