import { ClientEnvironment } from "./ClientEnvironment";
import { FetchStrategy, QueryCache } from "./QueryCache";
import { createClientApi } from "./api.service";
import { SubscriptionManager } from "./SubscriptionManager";
import { PubsubClientApi, WebsocketPubsubClient } from "./WebsocketPubsubClient";
import {
  deleteMapRecord,
  getMapRecord,
  getPointer,
  iterateRecordMap,
  PointerWithRecord,
  RecordMap,
  RecordPointer,
  RecordTable,
  TABLE_NAMES,
} from "libs/schema";
import { TransactionQueue } from "./TransactionQueue";
import { uniqWith, groupBy } from "lodash-comms";
import { isEqual } from "libs/predicates";
import { GetOptions, createRecordLoader } from "./RecordLoader";
import { isUuid } from "libs/uuid";
import { MS_IN_MINUTE } from "libs/date-helpers";
import { isNonNullable } from "libs/predicates";
import { getOperationPointers } from "libs/transaction";
import { createInMemoryClientDatabaseAdapter } from "libs/database/client/createInMemoryClientDatabaseAdapter";
import { ClientDatabaseApi, createClientDatabase } from "libs/database/client";
import { NEVER, combineLatest, filter, switchMap } from "rxjs";
import { LoginSuccessMessage, ParsedServerPubsubMessage } from "libs/PubSubTypes";
import { NetworkService } from "./NetworkService";
import { ApiVersionError, UnreachableCaseError } from "libs/errors";
import { preloadImgsInHTML } from "~/utils/preloadImgsInHTML";
import { Logger } from "libs/logger";
import { ClientMetadata } from "./ClientMetadata";
import { API_VERSION, SCHEMA_VERSION } from "libs/shared-constants";
import { globalState } from "../state/global.state";
import { datadogLogs } from "@datadog/browser-logs";
import type { AuthServiceApi } from "./auth.service";
import { navigatorIsOnline } from "~/utils/navigatorIsOnline";
import { PersistedDatabaseWorkerApi } from "./persisted-database-worker/PersistedDatabaseWorkerApi";
import { observeNormalizedUserSettings } from "~/observables/observeNormalizedUserSettings";
import { observeInboxEntries } from "~/observables/observeInboxEntries";
import { observeMentionExtensionData } from "~/components/forms/message-editor/extensions/mention/observeMentionExtensionData";
import { IsLoadingService } from "./is-loading.service";

export async function createEnvironmentBase(props: {
  logger: Logger;
  envLogger: Pick<Logger, "debug">;
  auth: AuthServiceApi;
  clientId: string;
  isLoading: IsLoadingService;
  persistedDb: PersistedDatabaseWorkerApi | null;
  subscribeToAlwaysAvailableRecordsFetchStrategy: FetchStrategy;
  onApiAuthenticationError: () => Promise<void>;
  onApiVersionError: (error: ApiVersionError | null) => Promise<void>;
  onPubsubMessage?: (messages: ParsedServerPubsubMessage[]) => void;
}) {
  const { logger, auth, persistedDb, clientId, isLoading, envLogger } = props;

  const inMemoryDbAdapter = await createInMemoryClientDatabaseAdapter({ logger });
  envLogger.debug({ inMemoryDbAdapter }, "Created InMemoryClientDatabaseAdapter");

  const db = createClientDatabase({ adapter: inMemoryDbAdapter, logger });
  envLogger.debug({ db }, "Created ClientDatabaseApi");

  const info: ClientMetadata = {
    clientId,
    currentUserId: auth.getCurrentUserId(),
    isOnline: navigatorIsOnline(),
    version: {
      api: API_VERSION,
      schema: {
        expected: SCHEMA_VERSION,
        // This value is supplied later, if the persistedDb is available.
        // Note that the persistedDb checks it's schema version against the expected `SCHEMA_VERSION`
        // value during intialization and automatically resets the database schema, if necessary. As such
        // we can safely use the expected `SCHEMA_VERSION` value above, when this value is `null`. This
        // value is useful for logging / debugging purposes, not for detecting breaking changes.
        actual: null,
      },
    },
    datadogSessionId: datadogLogs.getInternalContext()?.session_id,
  };

  envLogger.debug({ info }, "Created ClientMetadata");

  const pubsub = new WebsocketPubsubClient({
    logger,
    auth,
    async onStart() {
      transactionQueue.dequeue();
      const keys = subscriptionManager.keys();
      if (keys.length === 0) return;
      queryCache.clearCache();
      for (const key of keys) pubsub.subscribe(key);
      recordLoader.reloadActiveQueries();
    },
    async onMessage(messages) {
      const pointersToRefetch = getPointersToRefetch({ db }, { messages });

      pubsub.logger.verbose({ messages, pointersToRefetch }, "onMessage");

      if (pointersToRefetch.length > 0) {
        const deferred = queryCache.loadRecords({
          pointers: pointersToRefetch,
          fetchStrategy: "server",
        });

        deferred.promise.catch((error) => {
          pubsub.logger.error({ error }, `[PubSub] onMessage queryCache.loadRecords error`);
        });

        preloadImgsInRefetchedMessages(environment, pointersToRefetch).catch((error) => {
          pubsub.logger.error({ error }, `[PubSub] onMessage preloadImgsInRefetchedMessages error`);
        });
      }

      props.onPubsubMessage?.(messages);
    },
    async onLocalMessage({ messages, forceUpdate }) {
      const pointersToRefetch = forceUpdate ? messages.map(getPointer) : getPointersToRefetch({ db }, { messages });

      pubsub.logger.debug({ messages, pointersToRefetch }, "onLocalMessage");

      if (pointersToRefetch.length === 0) return;
      if (!persistedDb) return;

      const [pointersWithRecord] = await persistedDb.getRecords(pointersToRefetch, { includeSoftDeletes: true });

      pubsub.logger.debug({ pointersWithRecord }, "onLocalMessage writeRecordMap");

      db.writeRecordMap(pointersWithRecord, { forceUpdate });
    },
  });

  envLogger.debug({ pubsub }, "Created WebsocketPubsubClient");

  const writeRecordMap = createWriteRecordMapFn({ db, logger, persistedDb, pubsub });

  const network = new NetworkService({ logger, pubsub });
  envLogger.debug({ network }, "Created NetworkService");

  const api = createClientApi(
    { info, logger, network },
    {
      onAuthenticationError: props.onApiAuthenticationError,
      onApiVersionError: props.onApiVersionError,
    },
  );

  envLogger.debug({ api }, "Created Api");

  const transactionQueue = new TransactionQueue(
    { auth, logger, api, network, isLoading },
    {
      async onWrite({ recordMap }) {
        logger.debug({ recordMap }, "[TransactionQueue] onWrite");
        await writeRecordMap(recordMap);
      },
      async onRollback({ transaction }) {
        logger.notice({ transaction }, "[TransactionQueue] onRollback");

        const pointers = uniqWith(transaction.operations.flatMap(getOperationPointers), isEqual);

        const { pointersWithoutPendingWrites } = groupPointersByPendingWrite({ transactionQueue }, { pointers });

        // Get the latest records from the server.
        const response = await api.getRecords({ pointers: pointersWithoutPendingWrites });

        if (response.status === 200) {
          // Force update the cache.
          const { recordMap } = response.body;

          if (!recordMap) {
            throw new Error("[TransactionQueue] [onRollback] No recordMap in response");
          }

          const deletePointers: RecordPointer[] = [];

          for (const pointer of pointersWithoutPendingWrites) {
            if (transactionQueue.isPendingWrite(pointer)) {
              // If there is a pending write for one of these pointers, we don't want to
              // overwrite them with the server's response.
              deleteMapRecord(recordMap, pointer);
              continue;
            }

            const record = getMapRecord(recordMap, pointer);

            if (record) continue;

            deletePointers.push(pointer);
          }

          await Promise.all([
            db.deletePointers(deletePointers),
            persistedDb?.deletePointers(deletePointers),
            writeRecordMap(recordMap, { forceUpdate: true }),
          ]);
        } else if (response.status === 0) {
          logger.warn(`[TransactionQueue] [onRollback] offline`);
        } else {
          logger.error({ ...response }, `[TransactionQueue] [onRollback] Network error: ${response.status}`);
          throw new Error("[TransactionQueue] [onRollback] Network error");
        }
      },
    },
  );

  transactionQueue.init();

  envLogger.debug({ transactionQueue }, "Created TransactionQueue");

  const queryCache = new QueryCache({
    api,
    db,
    logger,
    persistedDb,
    transactionQueue,
    writeRecordMap,
  });

  envLogger.debug({ queryCache }, "Created QueryCache");

  const subscriptionManager = new SubscriptionManager({
    logger,
    onSubscribe: (key) => {
      subscriptionManager.logger.verbose(
        {
          key,
          subscriptionCount: subscriptionManager.keys().length,
        },
        "onSubscribe",
      );

      pubsub.subscribe(key);
    },
    onUnsubscribe: (key) => {
      subscriptionManager.logger.verbose(
        {
          key,
          subscriptionCount: subscriptionManager.keys().length,
        },
        "onUnsubscribe",
      );

      pubsub.unsubscribe(key);

      const querykeys = [key];

      const queryKeysSet = subscriptionManager.subscriptionKeyToQueryCacheKeyMap.get(key);

      if (queryKeysSet) {
        querykeys.push(...queryKeysSet);
      }

      subscriptionManager.subscriptionKeyToQueryCacheKeyMap.delete(key);

      unloadQueries(querykeys, true);
    },
  });

  envLogger.debug({ subscriptionManager }, "Created SubscriptionManager");

  const recordLoader = createRecordLoader({
    db,
    queryCache,
    subscriptionManager,
    persistedDb,
    logger,
  });

  envLogger.debug({ recordLoader }, "Created RecordLoader");

  function unloadQueries(queryKeys: string[], deletePointers: boolean) {
    // Note that this map function is also mutating the apiLoader cache.
    const pointers = queryKeys
      .map((queryKey) => {
        queryCache.deleteCachedPromise(queryKey);

        const queryKeyParts = queryKey.split(":");

        const isKeyForRecord = queryKeyParts.length === 2;

        if (!isKeyForRecord) return;

        const [possibleTable, possibleId] = queryKeyParts;

        if (!TABLE_NAMES.includes(possibleTable as never)) return;
        if (!isUuid(possibleId || "")) return;

        const pointer = getPointer(possibleTable as RecordTable, possibleId as string);

        return pointer;
      })
      .filter(isNonNullable);

    if (pointers.length === 0) return;

    if (deletePointers) {
      // These are pointers that are no longer being used by any active queries.
      db.deletePointers(pointers, { suppressChangeNotifications: true });
    } else {
      // If we're not deleting the records now, update the last read time
      // for the records to allow garbage collection to clean it up later.
      db.markRecordsRead(pointers);
    }
  }

  const GARBAGE_COLLECTION_INTERVAL = MS_IN_MINUTE * 10;

  function garbageCollectRecordCache() {
    const lastGarbageCollectionTime = new Date().valueOf() - GARBAGE_COLLECTION_INTERVAL;

    const potentialPointers = db.getRecordsLastReadBefore(lastGarbageCollectionTime);

    const activeSubscriptions = subscriptionManager.keys();

    const keys = potentialPointers
      .map((pointer) => `${pointer.table}:${pointer.id}`)
      .filter((key) => !activeSubscriptions.includes(key));

    logger.debug({ keys }, "garbage collect keys");
    // logger.debug("database state before garbage collection");
    // (db as any).logDatabaseState();

    unloadQueries(keys, true);

    // logger.debug("database state after garbage collection");
    // (db as any).logDatabaseState();
  }

  // Periodically run garbage collection.
  setInterval(garbageCollectRecordCache, GARBAGE_COLLECTION_INTERVAL);

  const subscribeToAlwaysAvailableRecords = (currentUserId: string) => {
    const options: GetOptions = {
      fetchStrategy: props.subscribeToAlwaysAvailableRecordsFetchStrategy,
    };

    const environment = { recordLoader, db, logger, subscriptionManager, isLoading };

    // We also preload this information as part of the sync service's initial sync,
    // but not all clients use the sync service so we're subscribing to it here as well.
    // Arguably we could stop preloading the drafts in the sync service initial sync now,
    // but we're continuing to do that so that we can remove this query here in the
    // future without worrying about breaking the sync service's initial sync.
    environment.recordLoader.observeGetDrafts({ currentUserId }, options).subscribe();

    observeNormalizedUserSettings(environment, { userId: currentUserId }, options).subscribe();

    environment.recordLoader.observeGetTagsUserHasAccessTo({ currentUserId }, options).subscribe();

    // Note that we also load all the tag subscriptions in "preloadSmallQueries" below.
    // We need to get the tag subscriptions here because, even if we aren't syncing the
    // inbox, Comms still relies on the queries in "subscribeToAlwaysAvailableRecords"
    // always being subscribed to. So arguably we could stop preloading the tag
    // subscriptions in "preloadSmallQueries", but it's so easy to just preload them
    // there it seems like we might as well.
    environment.recordLoader.observeGetTagSubscriptions({ user_id: currentUserId }, options).subscribe();

    observeMentionExtensionData(environment, { currentUserId }, options).subscribe();

    environment.recordLoader.observeGetUserOrganizationProfiles({ userId: currentUserId }, options).subscribe();

    environment.recordLoader
      .observeGetInboxSections({ currentUserId }, options)
      .pipe(
        switchMap(([[defaultInboxSection]]) => {
          if (!defaultInboxSection) return NEVER;

          return observeInboxEntries(
            environment,
            {
              userId: currentUserId,
              inboxSectionId: defaultInboxSection.id,
              limit: 100,
            },
            options,
          );
        }),
      )
      .subscribe();
  };

  // If we're not initializing the sync service, then we still want to subscribe to these
  // queries after the user has logged in.
  auth.currentUserId$.pipe(filter(isNonNullable)).subscribe((currentUserId) => {
    subscribeToAlwaysAvailableRecords(currentUserId);
  });

  const environment = {
    api,
    auth,
    clientId,
    db,
    globalState,
    info,
    isLoading,
    logger,
    network,
    persistedDb,
    pubsub,
    queryCache,
    recordLoader,
    subscriptionManager,
    transactionQueue,
    writeRecordMap,
  };

  // Handle logging in on pubsub
  combineLatest([auth.currentUserId$, network.isOnline$]).subscribe(([currentUserId, isOnline]) => {
    if (!currentUserId || !isOnline) return;
    // If login fails then we'll be disconnected from pubsub which will cause isOnline to
    // become false which will allow us to retry the login. Note that sending these login
    // messages when we're already "logged in" to pubsub on the server is fine.
    pubsub.login(currentUserId);
  });

  network.isOnline$.subscribe((isOnline) => {
    info.isOnline = isOnline;
  });

  return environment;
}

/* -----------------------------------------------------------------------------------------------*/

export function createWriteRecordMapFn(env: {
  db: ClientDatabaseApi;
  logger: Logger;
  persistedDb: PersistedDatabaseWorkerApi | null;
  pubsub: PubsubClientApi;
}): ClientEnvironment["writeRecordMap"] {
  const { db, logger, persistedDb, pubsub } = env;

  // We intentionally don't use an async function here so that errors are thrown by
  // `db.writeRecordMap` synchronously. E.g. the `write` function expects this.
  return (recordMap, options) => {
    // Remove possible proxy objects from the recordMap since they're not serializable
    // across the persistedDb worker boundary.
    const clonedRecordMap = JSON.parse(JSON.stringify(recordMap)) as RecordMap;

    // write to the in-memory database synchronously
    try {
      db.writeRecordMap(clonedRecordMap, options);
    } catch (error) {
      logger.fatal({ error }, "[writeRecordMap] Writing to in-memory database failed.");
      globalState.getState().setError({ message: "Writing to in-memory database failed.", code: "04" });
      throw error;
    }

    if (!persistedDb) return Promise.resolve();

    // write to the persisted database asynchronously
    return persistedDb
      .writeRecordMap(clonedRecordMap, options)
      .then((changes) => {
        const changedPointers = Array.from(iterateRecordMap(changes));
        pubsub.emitLocalMessage({ changes: changedPointers, forceUpdate: options?.forceUpdate });
      })
      .catch((error) => {
        logger.fatal({ error }, "[writeRecordMap] Writing to persisted database failed.");
        globalState.getState().setError({ message: "Writing to persisted database failed.", code: "05" });
        throw error;
      });
  };
}

/* -----------------------------------------------------------------------------------------------*/

function getPointersToRefetch(env: Pick<ClientEnvironment, "db">, props: { messages: ParsedServerPubsubMessage[] }) {
  const { messages } = props;

  const msgPointersWithVersion = uniqWith(
    messages
      .filter((msg): msg is Exclude<ParsedServerPubsubMessage, LoginSuccessMessage> => msg.type !== "LOGIN_SUCCESS")
      .map((msg) => {
        switch (msg.type) {
          case "RECORD_UPDATE": {
            return { pointer: getPointer(msg), version: msg.version };
          }
          case "CHANGE_NOTIFICATION": {
            return {
              pointer: getPointer(msg.row_table, msg.row_id),
              version: msg.version,
            };
          }
          default: {
            throw new UnreachableCaseError(msg);
          }
        }
      }),
    isEqual,
  );

  const [existingPointersWithRecords] = env.db.getRecords(
    msgPointersWithVersion.map((m) => m.pointer),
    { includeSoftDeletes: true },
  );

  const pointersToRefetch: RecordPointer[] = [];

  for (const { pointer, version } of msgPointersWithVersion) {
    const existingRecord = existingPointersWithRecords.find(({ table, id }) => isEqual(pointer, { table, id }));

    // We don't need to refetch a record if we already have it stored locally AND
    // if the local version is greater than the incoming version.
    if (!existingRecord || existingRecord.record.version < version) {
      pointersToRefetch.push(pointer);
    }
  }

  return pointersToRefetch;
}

/* -----------------------------------------------------------------------------------------------*/

function preloadImgsInRefetchedMessages(
  environment: { logger: Logger; db: ClientDatabaseApi },
  pointers: RecordPointer[],
) {
  const messageAndDraftPointers = pointers.filter(
    (p): p is RecordPointer<"message" | "draft"> => p.table === "message" || p.table === "draft",
  );

  if (messageAndDraftPointers.length === 0) return Promise.resolve();

  const [pointersWithRecord] = environment.db.getRecords(messageAndDraftPointers);

  const promises = pointersWithRecord.map(({ record }) => preloadImgsInHTML(environment, record.body_html));

  return Promise.allSettled(promises);
}

/* -----------------------------------------------------------------------------------------------*/

function groupPointersByPendingWrite<T extends RecordPointer>(
  env: Pick<ClientEnvironment, "transactionQueue">,
  props: {
    pointers: T[];
  },
) {
  const { pointers } = props;

  const { true: pointersWithPendingWrites = [], false: pointersWithoutPendingWrites = [] } = groupBy(
    pointers,
    (pointer) => env.transactionQueue.isPendingWrite(pointer),
  );

  return { pointersWithPendingWrites, pointersWithoutPendingWrites };
}

/* -----------------------------------------------------------------------------------------------*/
