import {
  ChangeNotificationRecord,
  ClientSingletonRecord,
  RecordTable,
  createRecordMapFromPointersWithRecords,
  getMapRecord,
  getMapRecords,
  getPointer,
} from "libs/schema";
import {
  Observable,
  Subject,
  combineLatest,
  distinctUntilChanged,
  filter,
  fromEvent,
  map,
  merge,
  shareReplay,
} from "rxjs";
import { DeferredPromise, PromiseRetryOnErrorOptions, promiseRetryOnError } from "libs/promise-utils";
import { MS_IN_MINUTE, MS_IN_SECOND } from "libs/date-helpers";
import { isEqual } from "libs/predicates";
import { AbortError } from "libs/errors";
import { PersistedDatabaseWorkerApi } from "../persisted-database-worker/PersistedDatabaseWorkerApi";
import { SharedWorkerEnvironment } from "./SharedWorkerEnvironment";
import { startWith } from "libs/rxjs-operators";
import { SetNonNullable } from "libs/type-helpers";

export type SyncServiceEnvironment = SetNonNullable<
  Pick<
    SharedWorkerEnvironment,
    "api" | "logger" | "persistedDb" | "network" | "auth" | "subscriptionManager" | "connections"
  >,
  "persistedDb"
>;

export type SyncServiceBroadcast = SyncServiceProgressBroadcast | SyncServiceSyncDataChangeBroadcast;

export type SyncServiceProgressBroadcast = {
  type: "SYNC_PROGRESS";
  done: boolean;
  current: number;
  total: number | null;
};

export type SyncServiceSyncDataChangeBroadcast<
  P extends keyof SyncServiceSyncDataChangeBroadcastMap = keyof SyncServiceSyncDataChangeBroadcastMap,
> = Exclude<SyncServiceSyncDataChangeBroadcastMap[P], undefined>;

type SyncServiceSyncDataChangeBroadcastMap = {
  [K in keyof ClientSingletonRecord<"sync_data">["data"]]: {
    type: "SYNC_DATA_CHANGE";
    prop: K;
    value: ClientSingletonRecord<"sync_data">["data"][K];
  };
};

export class SyncServiceProvider {
  private static readonly broadcastChannel = new BroadcastChannel("SyncService");

  private static broadcastMessage(message: SyncServiceBroadcast) {
    SyncServiceProvider.broadcastChannel.postMessage(message);
    SyncServiceProvider._broadcasts$.next(message);
  }

  static readonly _broadcasts$ = new Subject<SyncServiceBroadcast>();
  static readonly broadcasts$ = merge(
    SyncServiceProvider._broadcasts$,
    fromEvent<MessageEvent>(SyncServiceProvider.broadcastChannel, "message").pipe(
      map((event) => event.data as SyncServiceBroadcast),
    ),
  );

  static get isInitialSyncComplete$() {
    return isInitialSyncComplete$;
  }

  static get syncProgress$() {
    return syncProgress$;
  }

  private static initializeSyncData(data: ClientSingletonRecord<"sync_data">["data"]) {
    // If the data prop is incomplete (e.g. because the client is brand new) then we'd
    // fail to emit initialization messages for the missing props. To address this, we
    // normalize the data prop to ensure all the keys are present.
    const normalizedData: {
      [K in keyof Required<ClientSingletonRecord<"sync_data">["data"]>]: unknown;
    } = {
      full_sync_completed_at: undefined,
      full_sync_last_synced_inbox_notification_id: undefined,
      full_sync_stop_at_inbox_notification_id: undefined,
      last_change_notification_id: undefined,
      ...data,
    };

    // When the client is first loaded we emit initialization events for all the data
    // props so that any listeners can grab the current value.
    for (const _prop in normalizedData) {
      const prop = _prop as keyof typeof data;

      SyncServiceProvider.broadcastMessage({
        type: "SYNC_DATA_CHANGE",
        prop: prop,
        value: data[prop],
      } as any);
    }
  }

  static async create(
    environment: SyncServiceEnvironment,
    props: {
      /**
       * This function preloads some small queries that we can just load all at one time.
       * Note that the server will generate change notifications for these records so that
       * they are kept up to date.
       */
      preloadSmallQueries: (currentUserId: string) => Promise<void>;
      preloadThreadViewData: (threadId: string) => Promise<void>;
    },
  ) {
    const service = new SyncServiceProvider(environment, props);
    await service.start();

    // When new clients connect we want to send them the current sync data state
    service.env.connections.recipientsChanges$.pipe(startWith(() => null)).subscribe(async () => {
      const data = await service.getMetadata();
      SyncServiceProvider.initializeSyncData(data);
    });

    return service;
  }

  private inProgressSync: DeferredPromise<void> | null = null;

  protected constructor(
    protected env: SyncServiceEnvironment,
    private props: {
      /**
       * This function preloads some small queries that we can just load all at one time.
       * Note that the server will generate change notifications for these records so that
       * they are kept up to date.
       */
      preloadSmallQueries: (currentUserId: string) => Promise<void>;
      preloadThreadViewData: (threadId: string) => Promise<void>;
    },
  ) {}

  async getMetadata() {
    const { data } = await this.env.persistedDb.getSingletonRecord("sync_data");

    return data;
  }

  async updateMetadata(props: Partial<ClientSingletonRecord<"sync_data">["data"]>) {
    await this.env.persistedDb.updateSingletonRecord("sync_data", props);

    for (const _prop in props) {
      const prop = _prop as keyof typeof props;
      SyncServiceProvider.broadcastMessage({
        type: "SYNC_DATA_CHANGE",
        prop: prop,
        value: props[prop],
      } as any);
    }

    this.env.logger.debug(props, `[SyncService] updated sync metadata `);
  }

  private async start() {
    const metadata = await this.getMetadata();

    this.env.logger.info({ isSynced: !!metadata.full_sync_completed_at }, `[SyncService] start`);

    let abortController: AbortController | undefined;

    combineLatest([this.env.auth.currentUserId$, this.env.network.isOnline$])
      .pipe(distinctUntilChanged(isEqual))
      .subscribe(async ([currentUserId, isOnline]) => {
        abortController?.abort(new AbortError());
        abortController = undefined;

        if (!isOnline) return;
        if (!currentUserId) return;

        abortController = new AbortController();

        const abortSignal = abortController.signal;

        // Subscribe to change notifications for the current user.
        // We are intentionally not unsubscribing here.
        // See https://linear.app/levels/issue/COM-251/we-are-sometimes-losing-the-change-notification-subscription
        const sub = this.env.subscriptionManager._subscribe(`change_notification:user_id:${currentUserId}`);

        // Unsubscribe on user changes
        abortSignal.addEventListener("abort", sub, { once: true });

        this.env.logger.debug(`[SyncService] subscribed to change notifications for ${currentUserId}`);

        const retryOptions: PromiseRetryOnErrorOptions = {
          waitMs: MS_IN_SECOND * 10,
          maxWaitMs: MS_IN_MINUTE * 5,
          errorFilter: (error) => !(error instanceof AbortError),
          exponentialBackoff: true,
        };

        try {
          await promiseRetryOnError(retryOptions, () => this.fullSync({ currentUserId, abortSignal }));

          await promiseRetryOnError(retryOptions, () =>
            this.incrementalSync({
              currentUserId,
              abortSignal,
            }),
          );
        } catch (error) {
          if (error instanceof AbortError) return;
          this.env.logger.error({ error }, `[SyncService] error syncing changes`);
          throw error;
        }
      });
  }

  private async fullSync(props: { currentUserId: string; resync?: boolean; abortSignal?: AbortSignal }) {
    if (this.inProgressSync) return this.inProgressSync.promise;
    this.inProgressSync = new DeferredPromise();
    this.inProgressSync.promise.finally(() => {
      this.inProgressSync = null;
    });

    const { currentUserId, resync, abortSignal } = props;

    try {
      const withAbortSignal = getWithAbortSignalFn(abortSignal);

      const syncData = await withAbortSignal(() => this.getMetadata());

      this.env.logger.debug(syncData, `[SyncService] fullSync`);

      let lastSyncedInboxNotificationId = syncData.full_sync_last_synced_inbox_notification_id;

      let stopAtInboxNotificationId = syncData.full_sync_stop_at_inbox_notification_id;

      let isComplete = !!syncData.full_sync_completed_at;
      let batch = 1;
      const limit = 200;

      if (isComplete) {
        if (!resync) {
          this.env.logger.debug(`[SyncService] fullSync...complete!`);
          return;
        }

        lastSyncedInboxNotificationId = null;
        stopAtInboxNotificationId = null;
        isComplete = false;

        await withAbortSignal(() =>
          this.updateMetadata({
            full_sync_last_synced_inbox_notification_id: null,
            full_sync_stop_at_inbox_notification_id: null,
            full_sync_completed_at: null,
          }),
        );
      }

      if (!stopAtInboxNotificationId || syncData.last_change_notification_id === null) {
        const lastChangeNotificationId = await withAbortSignal(() => this.getLastChangeNotificationId());

        // Incremental sync won't start until we set a last_change_notification_id.
        await withAbortSignal(() =>
          this.updateMetadata({
            last_change_notification_id: lastChangeNotificationId,
          }),
        );

        stopAtInboxNotificationId = await withAbortSignal(() => this.getLastInboxNotificationId());

        if (!stopAtInboxNotificationId) {
          await withAbortSignal(() => this.props.preloadSmallQueries(currentUserId));

          // If there are no notifications in the inbox then we can consider the sync complete.
          await withAbortSignal(() =>
            this.updateMetadata({
              full_sync_completed_at: new Date().toISOString(),
            }),
          );

          return; // and return early
        }

        await withAbortSignal(() =>
          this.updateMetadata({
            full_sync_stop_at_inbox_notification_id: stopAtInboxNotificationId,
          }),
        );
      }

      await withAbortSignal(() => this.props.preloadSmallQueries(currentUserId));

      const response = await withAbortSignal(() =>
        this.env.api.getNotificationCount({
          startAt: lastSyncedInboxNotificationId
            ? getPointer("notification", lastSyncedInboxNotificationId)
            : undefined,
          endAt: getPointer("notification", stopAtInboxNotificationId!),
        }),
      );

      const totalNotificationsToSyncCount = response.status === 200 ? response.body.notificationCount : null;

      let notificationsSynced = 0;

      SyncServiceProvider.broadcastMessage({
        type: "SYNC_PROGRESS",
        done: false,
        current: notificationsSynced,
        total: totalNotificationsToSyncCount,
      });

      while (!isComplete) {
        const response = await withAbortSignal(() =>
          this.env.api.getNotifications({
            user_id: currentUserId,
            startAt: lastSyncedInboxNotificationId
              ? getPointer("notification", lastSyncedInboxNotificationId)
              : undefined,
            endAt: getPointer("notification", stopAtInboxNotificationId!),
            is_done: false,
            orderBy: "created_at",
            orderDir: "ASC",
            limit,
          }),
        );

        if (response.status !== 200) {
          this.env.logger.debug({ response }, `[SyncService] fullSync network error`);

          throw new Error(`[SyncService] fullSync network error`);
        }

        this.env.logger.debug({ ...response.body, batch }, `[SyncService] fullSync notifications`);

        const notifications = getMapRecords(response.body.recordMap, "notification");

        for (const notification of notifications) {
          await withAbortSignal(() => this.props.preloadThreadViewData(notification.thread_id));

          this.env.logger.debug(`[SyncService] fullSync...synced thread ${notification.thread_id}`);

          lastSyncedInboxNotificationId = notification.id;

          await withAbortSignal(() =>
            this.updateMetadata({
              full_sync_last_synced_inbox_notification_id: lastSyncedInboxNotificationId,
            }),
          );

          notificationsSynced++;

          SyncServiceProvider.broadcastMessage({
            type: "SYNC_PROGRESS",
            done: false,
            current: notificationsSynced,
            total: totalNotificationsToSyncCount,
          });
        }

        isComplete = limit > notifications.length;
        batch++;
      }

      await withAbortSignal(() =>
        this.updateMetadata({
          full_sync_completed_at: new Date().toISOString(),
        }),
      );

      SyncServiceProvider.broadcastMessage({
        type: "SYNC_PROGRESS",
        done: true,
        current: 0,
        total: null,
      });

      this.env.logger.debug(`[SyncService] fullSync...complete!`);

      // After completing the full sync we immediately start an incremental sync in case
      // any records were changed while the full sync was in progress.
      await this.incrementalSync({
        currentUserId,
        abortSignal,
        nested: true,
      });
    } finally {
      this.inProgressSync.resolve();
    }
  }

  /**
   * Downloads all records which have been changed since the last time
   * this was called.
   */
  async incrementalSync(props: { currentUserId: string; abortSignal?: AbortSignal; nested?: boolean }) {
    this.env.logger.debug(`[SyncService] incrementalSync called`);

    if (!props.nested) {
      if (this.inProgressSync) {
        return this.inProgressSync.promise;
      }

      this.inProgressSync = new DeferredPromise();
      this.inProgressSync.promise.finally(() => {
        this.inProgressSync = null;
      });
    }

    try {
      const withAbortSignal = getWithAbortSignalFn(props.abortSignal);

      const syncData = await withAbortSignal(() => this.getMetadata());

      this.env.logger.debug(syncData, `[SyncService] incrementalSync`);

      if (syncData.last_change_notification_id === undefined) {
        // If we don't already have a last_change_notification_id, then that means
        // we're syncing for the first time. We don't yet support backfilling past sync
        // data so we do nothing. This client is also subscribing to change_notification
        // PubSub updates. As those come in the last_change_notification_id will be updated,
        // providing us with a baseline to start downloading records changed since last sync.
        return;
      }

      let lastChangeNotificationId = syncData.last_change_notification_id;
      let moreChanges = true;
      let batch = 1;
      const limit = 500;

      while (moreChanges) {
        const response = await withAbortSignal(() =>
          this.env.api.getChangeNotifications({
            after: { id: lastChangeNotificationId },
            limit,
          }),
        );

        if (response.status !== 200) {
          this.env.logger.debug({ response }, `[SyncService] incrementalSync network error`);

          throw new Error(`[SyncService] incrementalSync network error: ${response.status}`);
        }

        this.env.logger.debug({ ...response.body, batch }, `[SyncService] change notifications`);

        if (response.body.notifications.length === 0) {
          return;
        }

        const pointersToRefetch = await withAbortSignal(() =>
          getPointersToRefetch({
            persistedDb: this.env.persistedDb,
            changeNotifications: response.body.notifications,
          }),
        );

        this.env.logger.debug({ pointersToRefetch }, `[SyncService] pointers to refetch`);

        if (pointersToRefetch.length > 0) {
          await withAbortSignal(() =>
            this.env.api.getRecords({ pointers: pointersToRefetch }, { abortSignal: props.abortSignal }),
          );
        } else {
          break;
        }

        const lastNotification = response.body.notifications.at(-1)!;
        lastChangeNotificationId = lastNotification.id;

        await withAbortSignal(() =>
          this.updateMetadata({
            last_change_notification_id: lastChangeNotificationId,
          }),
        );

        moreChanges = response.body.moreChanges;
        batch++;
      }
    } finally {
      if (!props.nested) {
        this.inProgressSync?.resolve();
      }
    }
  }

  private async getLastChangeNotificationId() {
    const response = await this.env.api.getChangeNotifications({
      orderDir: "DESC",
      limit: 1,
    });

    if (response.status !== 200) {
      this.env.logger.debug({ response }, `[SyncService] getLastChangeNotificationId network error`);

      throw new Error(`[SyncService] getLastChangeNotificationId network error`);
    }

    return response.body.notifications[0]?.id || 1;
  }

  private async getLastInboxNotificationId() {
    const response = await this.env.api.getNotifications({
      user_id: this.env.auth.getAndAssertCurrentUserId(),
      orderBy: "created_at",
      orderDir: "DESC",
      limit: 1,
    });

    if (response.status !== 200) {
      this.env.logger.debug({ response }, `[SyncService] getLastInboxNotificationId network error`);

      throw new Error(`[SyncService] getLastInboxNotificationId network error`);
    }

    const [notification] = getMapRecords(response.body.recordMap, "notification");

    return notification?.id;
  }
}

/* -----------------------------------------------------------------------------------------------*/

async function getPointersToRefetch(props: {
  persistedDb: PersistedDatabaseWorkerApi;
  changeNotifications: ChangeNotificationRecord[];
}) {
  const { persistedDb, changeNotifications } = props;

  const msgPointersWithVersion = changeNotifications.map((n) => {
    return {
      pointer: getPointer(n.row_table as RecordTable, n.row_id),
      version: n.version,
    };
  });

  const [cachedPointerWithRecords] = await persistedDb.getRecords(
    msgPointersWithVersion.map((r) => r.pointer),
    { withDeletedRows: true },
  );

  const cachedRecordMap = createRecordMapFromPointersWithRecords(cachedPointerWithRecords);

  // We don't need to refetch a record if we already have it stored locally AND
  // if the local version is greater than the incoming version.
  const pointersToRefetch = msgPointersWithVersion
    .filter(({ pointer, version }) => {
      let existingRecord = getMapRecord(cachedRecordMap, pointer);

      if (!existingRecord) {
        const deletedPointer = getPointer("deleted_row", {
          row_table: pointer.table,
          row_id: pointer.id,
        });

        existingRecord = getMapRecord(cachedRecordMap, deletedPointer);
      }

      return !existingRecord || existingRecord.version < version;
    })
    .map((r) => r.pointer);

  return pointersToRefetch;
}

/* -----------------------------------------------------------------------------------------------*/

function getWithAbortSignalFn(abortSignal?: AbortSignal) {
  return <T>(promise: () => Promise<T>) => {
    if (abortSignal?.aborted) return Promise.reject(abortSignal.reason);
    return DeferredPromise.from(promise(), { abortSignal }).promise;
  };
}

/* -----------------------------------------------------------------------------------------------*/

// The SyncService is hosted by one leader tab and then other tabs remotely access it's methods.
// Because of this we can't simply access an observable property on the SyncService instance.
const _isInitialSyncComplete$ = SyncServiceProvider.broadcasts$.pipe(
  filter(
    (message): message is SyncServiceSyncDataChangeBroadcast<"full_sync_completed_at"> =>
      message.type === "SYNC_DATA_CHANGE" && message.prop === "full_sync_completed_at",
  ),
  map((message) => !!message.value),
  startWith(() => false),
);

const isInitialSyncComplete$ = combineLatest([_isInitialSyncComplete$]).pipe(
  map(([isInitialSyncComplete]) => {
    return isInitialSyncComplete;
  }),
  distinctUntilChanged(),
  shareReplay(1),
);

// This is a hot observable, so we need to subscribe to it to start it.
isInitialSyncComplete$.subscribe();

/* -----------------------------------------------------------------------------------------------*/

const syncProgress$: Observable<{
  done: boolean;
  current: number;
  total: number | null;
}> = SyncServiceProvider.broadcasts$.pipe(
  filter((message): message is SyncServiceProgressBroadcast => message.type === "SYNC_PROGRESS"),
  startWith(() => ({ done: true, current: 0, total: null })),
  distinctUntilChanged(),
  shareReplay(1),
);

// This is a hot observable, so we need to subscribe to it to start it.
syncProgress$.subscribe();

/* -----------------------------------------------------------------------------------------------*/
