import * as d from "ts-decoders/decoders";
import { SetOptional } from "type-fest";
import {
  Subject,
  map,
  filter,
  share,
  takeUntil,
  Observable,
  firstValueFrom,
  concat,
  toArray,
  switchMap,
  from,
  retry,
  throwError,
  timer,
} from "rxjs";
import { areDecoderErrors } from "ts-decoders";
import { isNonNullable } from "libs/predicates";
import { DeferredPromise } from "libs/promise-utils";
import { uuid } from "libs/uuid";
import { Logger } from "libs/logger";
import { AbortError } from "libs/errors";

export class SerializedError extends Error {
  name: string;
  isSerializedStack: boolean;

  constructor(error: { name?: string; message?: string; msg?: string; stack?: string; cause?: any }) {
    super(error.message || error.msg || "unknown error serialized from message port");
    this.name = error.name || "SerializedError";

    if (error.stack) {
      this.stack = error.stack;
      this.isSerializedStack = true;
    } else {
      this.isSerializedStack = false;
    }

    this.cause = error.cause;
  }
}

export class DisconnectError extends Error {
  static isSerializedDisconnectError(
    input: unknown,
  ): input is { name: "DisconnectError"; stack?: string; cause?: unknown } {
    return typeof input === "object" && input !== null && (input as any).name === "DisconnectError";
  }

  name = "DisconnectError";

  constructor(serializedError?: { stack?: string; cause?: any }) {
    super("Recipient disconnected", { cause: serializedError?.cause });

    if (serializedError?.stack) {
      this.stack = serializedError.stack;
    }
  }
}

export class PostMessageError extends Error {
  name = "PostMessageError";
}

export type Message<T extends string = string, D = any> = {
  type: T;
  id: string;
  from: string;
  to: string;
  replyTo: string | null;
  /**
   * Optional property only applicable when observing results. Signals
   * that the result is complete and no more results will be emitted. The `data`
   * property must be `undefined` when `isComplete` is `true`.
   */
  isComplete?: boolean;
  data: D;
  transfer?: Transferable[];
};

export interface MessageTypeMap {
  PING: {
    request: Message<"PING", null>;
    response: Message<"PING", null>;
  };
}

export type ErrorMessage = Message<"ERROR", unknown>;

export type RequestType = keyof MessageTypeMap;
export type Request<T extends RequestType = RequestType> = MessageTypeMap[T]["request"];
export type Response<T extends RequestType = RequestType> = MessageTypeMap[T]["response"];

export const messageD = d.objectD<Message>({
  type: d.stringD(),
  id: d.stringD(),
  from: d.stringD(),
  to: d.stringD(),
  replyTo: d.nullableD(d.stringD()),
  isComplete: d.undefinableD(d.booleanD()),
  data: d.anyD(),
  transfer: d.undefinableD(d.arrayD(d.anyD())),
});

export type RecipientData = {
  id: string;
  port: RecipientPort;
  disconnect$: Subject<void>;
};

export type RecipientPort = Pick<MessagePort, "addEventListener" | "postMessage"> & {
  start?: () => void;
  close?: () => void;
};

/**
 * The MessagePortService facilitates communication between different contexts using the MessagePort API.
 */
export class MessagePortService {
  static readonly uniqueContexts = {
    SHARED_WORKER: "shared-worker",
    PERSISTED_DB_WORKER: "persisted-database-worker",
    LEADER: "comms-leader",
  } as const;

  static acquireContextLock(contextId: string, options: LockManagerRequestOptions = {}) {
    return new Promise<void>((lockAcquired) => {
      navigator.locks.request(
        contextId,
        options,
        () =>
          new Promise<void>((_) => {
            lockAcquired();
          }),
      );
    });
  }

  /**
   * In order to inform others when we are no longer subscribing to this query, we hold a
   * lock on the request id that we release when we unsubscribe.
   * @returns A function that releases the lock.
   */
  static acquireSubscriberLock(requestId: string) {
    const deferred = new DeferredPromise<void>();
    navigator.locks.request(requestId, () => deferred.promise);
    return () => deferred.resolve();
  }

  /**
   * Returns an observable which emits once and then completes when no one is subscribed to the
   * given request id.
   */
  static watchSubscriberLock(requestId: string) {
    const complete$ = new Subject<void>();

    navigator.locks.request(requestId, { mode: "shared" }, () => {
      complete$.next();
      complete$.complete();
    });

    return complete$.asObservable();
  }

  static buildRequest<T extends RequestType>(
    // Type inference is better if we pass type as a separate argument
    type: T,
    message: SetOptional<Request<T>, "id" | "replyTo" | "type">,
  ): Request<T> {
    const request = {
      id: uuid(),
      ...message,
      type,
      replyTo: null,
    } as Request<T>;

    return request;
  }

  static buildResponse<T extends RequestType>(
    request: Request<T>,
    response:
      | SetOptional<Response<T>, "id" | "type" | "from" | "to" | "replyTo">
      | SetOptional<ErrorMessage, "id" | "type" | "from" | "to" | "replyTo">,
  ): Response<T>;
  static buildResponse<T extends RequestType>(request: null, response: Response<T> | ErrorMessage): Response<T>;
  static buildResponse(
    request: Request | null,
    response:
      | SetOptional<Response, "id" | "type" | "from" | "to" | "replyTo">
      | SetOptional<ErrorMessage, "id" | "type" | "from" | "to" | "replyTo">
      | Response
      | ErrorMessage,
  ): Response<any> {
    const message = {
      id: uuid(),
      type: request?.type,
      from: request?.to,
      to: request?.from,
      replyTo: request?.id,
      ...response,
    } as Message;

    if (message.type === "ERROR") {
      const error = message.data;

      message.data =
        error instanceof Error ?
          { name: error.name, message: error.message, stack: error.stack, cause: error.cause }
        : error;
    }

    return message;
  }

  activated$ = new Subject<void>();
  protected rawMessages$ = new Subject<MessageEvent<any>>();

  /**
   * The messages stream emits decoded messages received from the rawMessages$ stream
   * but otherwise doesn't do any processing. The `requests$` and `responses$` streams
   * should be preferred over the messages stream.
   */
  _messages$ = this.rawMessages$.pipe(
    map((event) => {
      const result = messageD.decode(event.data);

      if (areDecoderErrors(result)) {
        this.logger.error({ error: result[0] }, "Failed to decode message");
        return null;
      }

      return result.value;
    }),
    filter(isNonNullable),
    share(),
  );

  /**
   * When the service is created, we immediately start listening for messages from clients however we buffer
   * the requests without emitting them until the shared worker setup is complete and it's activated. At
   * that time we emit all the buffered requests and then start emitting new requests as they come in.
   */
  requests$ = concat(
    this._messages$.pipe(
      takeUntil(this.activated$),
      // collect the messages in a buffer
      toArray(),
      // unwind the buffer and emit each message individually
      switchMap((messages) => from(messages)),
    ),
    // once the service is activated, we emit all new messages as they come in.
    this._messages$,
  ).pipe(
    filter((m): m is Request => !m.replyTo),
    share(),
  );

  responses$ = this._messages$.pipe(
    filter((m): m is Response | ErrorMessage => !!m.replyTo),
    share(),
  );

  recipients = new Map<string, RecipientData>();
  recipientsChanges$ = new Subject<void>();
  serviceName: string;
  senderId: string;
  logger: Logger;

  protected getRecipient: (recipientId: string) => RecipientData | undefined;
  protected defaultRetryOnDisconnect: boolean;

  constructor(props: {
    serviceName: string;
    senderId: string;
    logger: Logger;
    defaultRetryOnDisconnect: boolean;
    getRecipient?: (this: MessagePortService, recipientId: string) => RecipientData | undefined;
  }) {
    this.serviceName = props.serviceName;
    this.senderId = props.senderId;
    this.logger = props.logger.child({ name: props.serviceName, senderId: props.senderId });
    this.defaultRetryOnDisconnect = props.defaultRetryOnDisconnect;
    this.getRecipient = props.getRecipient || ((recipientId) => this.recipients.get(recipientId));
    // We subscribe to requests here so that we start buffering them immediately. If there are no subscribers and
    // a new message comes in, then the request won't be buffered.
    this.requests$.subscribe();
  }

  /**
   * Incoming requests will be delayed until after this service is activated. Note that this
   * service can still send outgoing requests.
   */
  activate() {
    this.activated$.next();
    this.activated$.complete();
  }

  /**
   * @returns A callback function to start watching for disconnects.
   */
  onConnect(recipientId: string, port: RecipientPort, options: { lockId?: string } = {}) {
    const abortController = new AbortController();

    // If, for some reason, we already have a recipient registered with the same ID, we'll
    // disconnect it before connecting the new recipient.
    if (this.recipients.has(recipientId)) {
      this.logger.warn({ recipientId }, `[onConnect] recipient already connected. Disconnecting.`);
      abortController.abort(new AbortError());
      this.onDisconnect(recipientId);
    }

    this.logger.notice(
      { recipientId, existingRecipientIds: Array.from(this.recipients.keys()) },
      `[onConnect] recipient connected`,
    );

    this.recipients.set(recipientId, { id: recipientId, port, disconnect$: new Subject<void>() });

    port.addEventListener("message", (event) => this.onMessage(event as MessageEvent<any>));
    port.addEventListener("messageerror", (event) => {
      this.logger.error({ recipientId, event }, "[onConnect] messageerror");
    });

    port.start?.();

    this.recipientsChanges$.next();

    let watchForDisconnectSetup = false;

    // We need to return a handler to watch for disconnects (instead of just watching for disconnects
    // immediately) because some services create a worker and then immediately connect to it before
    // the other end has had a chance to initialize and acquire a context lock.
    const watchForDisconnect = () => {
      if (watchForDisconnectSetup || abortController.signal.aborted) return;
      watchForDisconnectSetup = true;
      const { lockId = recipientId } = options;
      // There isn't an API to learn when a client is destroyed / disconnects. To work around this,
      // before connecting to the the shared worker, clients will acquire an exclusive lock on their
      // own ID and never release it. The lock will then only become available if the client is destroyed.
      // Other contexts can then learn when a client is destroyed by waiting for the lock to
      // become available.
      navigator.locks.request(lockId, { mode: "shared", signal: abortController.signal }, () => {
        this.onDisconnect(recipientId);
      });
    };

    return watchForDisconnect;
  }

  onDisconnect(recipientId: string) {
    this.logger.notice(
      { recipientId, existingRecipientIds: Array.from(this.recipients.keys()) },
      `[onDisconnect] recipient disconnected`,
    );

    const recipient = this.recipients.get(recipientId);
    this.recipients.delete(recipientId);
    recipient?.disconnect$.next();
    recipient?.disconnect$.complete();
    recipient?.port.close?.();

    if (recipient) {
      this.recipientsChanges$.next();
    }
  }

  async sendRequest<T extends RequestType>(
    // Type inference is better if we pass type as a separate argument
    type: T,
    message: SetOptional<Request<T>, "id" | "replyTo" | "type" | "from">,
    options: { retryOnDisconnect?: boolean } = {},
  ): Promise<Response<T>["data"]> {
    const request = MessagePortService.buildRequest(type, {
      ...message,
      from: this.senderId,
    } as any);

    this.logger.debug({ request }, `[sendRequest] request`);

    const observable = this.innerObserveRequest(request, {
      retryOnDisconnect: options.retryOnDisconnect ?? this.defaultRetryOnDisconnect,
      acquireSubscriberLock: false,
    });

    try {
      return await firstValueFrom(observable);
    } catch (error) {
      if (error instanceof DisconnectError) {
        throw error;
      } else if (error instanceof PostMessageError) {
        this.logger.error({ request, error }, `[sendRequest] failed to send request`);
      } else {
        this.logger.error({ request, error }, `[sendRequest] error response`);
      }

      throw error;
    }
  }

  observeRequest<T extends RequestType>(
    // Type inference is better if we pass type as a separate argument
    type: T,
    message: SetOptional<Request<T>, "id" | "replyTo" | "type" | "from">,
    options: { retryOnDisconnect?: boolean } = {},
  ): Observable<Response<T>["data"]> {
    const request = {
      id: uuid(),
      from: this.senderId,
      ...message,
      type,
      replyTo: null,
    } as Request<T>;

    return this.innerObserveRequest(request, {
      retryOnDisconnect: options.retryOnDisconnect ?? this.defaultRetryOnDisconnect,
      acquireSubscriberLock: true,
    });
  }

  forwardMessage(message: Message): void {
    const recipient = this.getRecipient(message.to);

    if (recipient) {
      this.logger.debug({ message }, `[${this.serviceName}] [forwardMessage] forwarding message`);

      try {
        recipient.port.postMessage(message, message);
      } catch (error) {
        this.logger.error({ message, error }, `[forwardMessage] recipient.port.postMessage error`);

        throw new PostMessageError(`[${this.serviceName}] [forwardMessage] recipient.port.postMessage error `, {
          cause: error,
        });
      }

      return;
    }

    this.logger.debug({ message, isReply: !!message.replyTo }, `[forwardMessage] cannot find recipient`);

    // If the message being forwarded is a reply and the original requestor is no longer connected,
    // then we don't need to worry about sending an error back.
    if (message.replyTo) return;

    const sender = this.getRecipient(message.from);

    if (sender) {
      const response = MessagePortService.buildResponse(message as Request, {
        type: "ERROR",
        data: new DisconnectError(),
      });

      try {
        sender.port.postMessage(response, response);
      } catch (error) {
        this.logger.error({ message, error }, `[forwardMessage] sender.port.postMessage error`);

        throw new PostMessageError(`[${this.serviceName}] [forwardMessage] sender.port.postMessage error `, {
          cause: error,
        });
      }

      return;
    }

    this.logger.error({ message }, `[forwardMessage] unhandled failure to forward message`);
  }

  sendResponse<T extends RequestType>(
    request: Request<T>,
    response:
      | SetOptional<Response<T>, "id" | "type" | "from" | "to" | "replyTo">
      | SetOptional<ErrorMessage, "id" | "from" | "to" | "replyTo">,
  ): void;
  sendResponse<T extends RequestType>(request: null, response: Response<T> | ErrorMessage): void;
  sendResponse(
    request: Request | null,
    response:
      | SetOptional<Response, "id" | "type" | "from" | "to" | "replyTo">
      | SetOptional<ErrorMessage, "id" | "from" | "to" | "replyTo">
      | Response
      | ErrorMessage,
  ): void {
    const message = MessagePortService.buildResponse(request as any, response);

    const client = this.getRecipient(message.to);

    if (!client) {
      throw new DisconnectError();
    }

    this.logger.debug({ message }, `[sendResponse] sending response`);

    try {
      client.port.postMessage(message, message);
    } catch (error) {
      throw new PostMessageError(`[${this.serviceName}] [sendResponse] client.port.postMessage error `, {
        cause: error,
      });
    }
  }

  sendError<T extends RequestType>(request: Request<T>, error: unknown) {
    this.sendResponse(request, {
      type: "ERROR",
      data: error,
    });
  }

  protected onMessage(event: MessageEvent<any>) {
    this.rawMessages$.next(event);
  }

  protected innerObserveRequest<T extends RequestType>(
    request: Request<T>,
    options: { retryOnDisconnect: boolean; acquireSubscriberLock: boolean },
  ): Observable<Response<T>["data"]> {
    return new Observable((subscriber) => {
      const recipient = this.getRecipient(request.to);

      if (!recipient) {
        subscriber.error(new DisconnectError());
        subscriber.complete();
        return;
      }

      const { port, disconnect$ } = recipient;

      try {
        port.postMessage(request, request);
      } catch (error) {
        subscriber.error(
          new PostMessageError(`[${this.serviceName}] [innerObserveRequest] port.postMessage error`, { cause: error }),
        );

        subscriber.complete();
        return;
      }

      const disconnectSub = disconnect$.subscribe(() => {
        subscriber.error(new DisconnectError());
        subscriber.complete();
      });

      let unsubscribe: (() => void) | undefined;

      // While in theory we could just acquire a subscriber lock for every query, in practice we'll be creating thousands
      // of queries and only a few will be observed. I expect that acquiring a subscriber lock for every query would cause
      // a performance hit.
      if (options.acquireSubscriberLock) {
        // In order to inform others when we are no longer subscribing to this query, we hold a
        // lock on the request id that we release when we unsubscribe.
        unsubscribe = MessagePortService.acquireSubscriberLock(request.id);
      }

      const responseSub = this.responses$
        .pipe(
          filter((response) => response.replyTo === request.id),
          takeUntil(disconnect$),
        )
        .subscribe((response) => {
          const { data } = response;

          if (response.type === "ERROR") {
            subscriber.error(reconstructSerializedError(data));
          } else if (
            !response.isComplete ||
            // If the response is marked as complete, we only emit a value if the data payload is not undefined.
            data !== undefined
          ) {
            subscriber.next(data as any);
          }

          if (response.isComplete) {
            subscriber.complete();
          }
        });

      return () => {
        unsubscribe?.();
        disconnectSub.unsubscribe();
        responseSub.unsubscribe();
      };
    }).pipe(
      retry({
        delay: (error) => {
          if (!options.retryOnDisconnect) {
            return throwError(() => error);
          }

          // Retry disconnect errors
          if (error instanceof DisconnectError) {
            return timer(50);
          } else {
            return throwError(() => error);
          }
        },
      }),
    );
  }
}

/* -----------------------------------------------------------------------------------------------*/

function reconstructSerializedError(error: unknown) {
  if (typeof error !== "object" || error === null) {
    return error;
  } else if (DisconnectError.isSerializedDisconnectError(error)) {
    return new DisconnectError(error);
  } else {
    return new SerializedError(error);
  }
}

/* -----------------------------------------------------------------------------------------------*/
