import { isEqual, uniqWith } from "lodash";
import { ReactiveMap } from "libs/EventEmitter";
import {
  ApiVersionError,
  AuthenticationError,
  PermissionError,
  TransactionConflictError,
  UnreachableCaseError,
  ValidationError,
} from "libs/errors";
import { formatResponseError } from "./api";
import { Transaction, getOperationPointers, op } from "libs/transaction";
import { RecordMap, RecordPointer, RecordTable } from "libs/schema";
import { DeferredPromise, wait } from "libs/promise-utils";
import { MS_IN_MINUTE, MS_IN_SECOND } from "libs/date-helpers";
import { isDefined } from "libs/predicates";
import { ClientEnvironment } from "./ClientEnvironment";
import { LocalForagePersistedKVStore, PersistedKVStore } from "./KVStore";
import { Simplify } from "type-fest";
import { globalState } from "~/state/global.state";
import { config } from "./config";

type Thunk = {
  deferred: DeferredPromise<void>;
  transaction: Transaction;
  onRollback: (transaction: Transaction) => Promise<void>;
};

function pointerToKey<T extends RecordTable>(pointer: RecordPointer<T>) {
  return [pointer.table, pointer.id].join(":");
}

export type TransactionQueueApi = Simplify<TransactionQueue>;

export class TransactionQueue {
  static store = new LocalForagePersistedKVStore({
    namespace: "TransactionQueue",
  });

  // number of seconds to wait before retrying after on error (will increase exponentially)
  private errorBackoffInSeconds = 10;
  private running = false;

  constructor(
    private env: Pick<ClientEnvironment, "auth" | "api" | "network" | "logger">,
    private props: {
      onWrite: (recordMap: RecordMap) => Promise<void>;
      onRollback: (transaction: Transaction) => Promise<void>;
      errorBackoffInSeconds?: number;
    },
  ) {
    this.env = {
      ...this.env,
      logger: this.env.logger.child({ name: "TransactionQueue" }),
    };

    this.loadFromOffline(this.env.auth.getCurrentUserId());
    setInterval(this.logState.bind(this), MS_IN_MINUTE * 5);

    if (props.errorBackoffInSeconds !== undefined) {
      this.errorBackoffInSeconds = props.errorBackoffInSeconds;
    }
  }

  private debouncedThunks = new Map<
    string, // transaction type
    { timeoutId: number; thunk: Thunk }
  >();

  private thunks: Thunk[] = [];

  private pendingWrites = new ReactiveMap<string, number>();

  private incPendingWrites(pointers: RecordPointer[]) {
    const writes = pointers.map(pointerToKey).map((key) => {
      const n = this.pendingWrites.get(key) || 0;
      return { key, value: n + 1 };
    });
    this.pendingWrites.write(writes);
  }

  private decPendingWrites(pointers: RecordPointer[]) {
    const writes = pointers.map(pointerToKey).map((key) => {
      const n = this.pendingWrites.get(key) || 0;
      if (n === 0) console.error("This should never be zero!");

      if (n > 1) return { key, value: n - 1 };
      else return { key, value: undefined };
    });
    this.pendingWrites.write(writes);
  }

  isPendingWrite<T extends RecordTable>(pointer: RecordPointer<T>) {
    const n = this.pendingWrites.get(pointerToKey(pointer)) || 0;
    return n > 0;
  }

  isQueueEmpty() {
    return this.thunks.length === 0;
  }

  subscribeIsPendingWrite<T extends RecordTable>(
    pointer: RecordPointer<T>,
    fn: (pending: boolean) => void,
  ): () => void {
    let prev = this.isPendingWrite(pointer);
    return this.pendingWrites.subscribe(pointerToKey(pointer), () => {
      const next = this.isPendingWrite(pointer);
      if (prev !== next) {
        prev = next;
        fn(next);
      }
    });
  }

  private saveForOffline() {
    const currentUserId = this.env.auth.getCurrentUserId();

    return TransactionQueue.store.setItem(
      `transactions-${currentUserId}`,
      this.thunks.map((thunk) => thunk.transaction),
    );
  }

  // we're intentionally passing the current user ID here instead of using getCurrentUserId
  // to make it clear that this method is impacted by the current user ID.
  async loadFromOffline(currentUserId: string | null) {
    const key = `transactions-${currentUserId}`;

    // This code is necessary as part of the migration of the TransactionQueue to the SharedWorker.
    // The SharedWorker doesn't support localStorage so we've moved storage to localForage and
    // indexedDB. This code migrates the old transactions from localStorage to localForage.
    // This can be removed after the migration is complete.
    // John - 9/9/24
    if (typeof localStorage !== "undefined") {
      const oldKVStore = new PersistedKVStore({ namespace: "TransactionQueue" });
      const oldTransactions = oldKVStore.getItem<Transaction[]>(key);

      if (oldTransactions) {
        await TransactionQueue.store.setItem(key, oldTransactions);
        oldKVStore.removeItem(key);
      }
    }

    const transactions = await TransactionQueue.store.getItem<Transaction[]>(key);

    if (!transactions) return;

    for (const transaction of transactions) {
      this.enqueue(transaction);
    }
  }

  /** Use write(environment, transaction) instead of this function for optimistic updates. */
  enqueue(transaction: Transaction) {
    // Track which records have a pending write.
    const pointers = uniqWith(transaction.operations.flatMap(getOperationPointers), isEqual);

    this.incPendingWrites(pointers);

    let decComplete = false;
    const decPendingWrites = () => {
      if (decComplete) return;
      decComplete = true;
      this.decPendingWrites(pointers);
    };

    const onRollback = (transaction: Transaction) => {
      // We want to decrement the pending writes before calling onRollback so that
      // we can check and see if a record is still pending inside the onRollback call.
      decPendingWrites();
      return this.props.onRollback(transaction);
    };

    // This promise will get resolved once it is submitted.
    let deferred: DeferredPromise<void>;

    if (isDefined(transaction.debounce)) {
      const { type, debounce } = transaction;

      if (!type) {
        throw new Error("transaction.type is required when using debounce");
      }

      const storedTransaction = this.debouncedThunks.get(type);

      if (storedTransaction) {
        clearTimeout(storedTransaction.timeoutId);
        deferred = storedTransaction.thunk.deferred;
      } else {
        deferred = new DeferredPromise();
      }

      const thunk: Thunk = { deferred, transaction, onRollback };

      const timeoutId = setTimeout(async () => {
        this.debouncedThunks.delete(type);
        this.thunks.push(thunk);
        await this.saveForOffline();
        this.dequeue();
      }, debounce) as unknown as number;

      this.debouncedThunks.set(type, { timeoutId, thunk });
    } else {
      deferred = new DeferredPromise();
      this.thunks.push({ deferred, transaction, onRollback });
      this.saveForOffline().then(() => {
        return this.dequeue();
      });
    }

    return deferred.promise.finally(decPendingWrites);
  }

  async dequeue() {
    if (this.running) return;
    this.running = true;

    try {
      LOOP: while (this.thunks.length && this.env.network.isOnline()) {
        // Try writing a batch.
        const batch = this.getBatch();

        if (!batch.length) break LOOP;

        if (config.isBeta || config.mode === "test") {
          // Limiting transaction batching to the beta deployment while we work out the kinks.
          const { authorId } = batch[0]!.transaction;
          const operations = batch.flatMap(({ transaction }) => transaction.operations);
          const transaction = op.transaction({ authorId, operations });

          const batchResult = await writeTransaction(this.env, {
            transaction,
            errorBackoffInSeconds: this.errorBackoffInSeconds,
          });

          if (batchResult.type === "offline") break LOOP; // Try again later...

          if (batchResult.type === "ok") {
            await this.props.onWrite(batchResult.recordMap);
            for (const thunk of batch) thunk.deferred.resolve();
            this.thunks.splice(0, batch.length);
            await this.saveForOffline();
            continue LOOP;
          }
        }

        // If there's an error writing the batch, write each transaction one by one since
        // some of them might be fine on their own.

        for (const thunk of batch) {
          const result = await writeTransaction(this.env, {
            transaction: thunk.transaction,
            errorBackoffInSeconds: this.errorBackoffInSeconds,
          });

          if (result.type === "offline") break LOOP; // Try again later...

          this.thunks.shift();
          await this.saveForOffline();

          switch (result.type) {
            case "ok": {
              await this.props.onWrite(result.recordMap);
              thunk.deferred.resolve();
              break;
            }
            case "error": {
              await thunk.onRollback(thunk.transaction);

              globalState.getState().setErrorMessage(`
                An unexpected error occurred while syncing a recent change to the server. 
                ${this.thunks.length && `Pausing further updates (updates remaining ${this.thunks.length}). `}
                Reload the page to try again.
              `);

              thunk.deferred.reject(result.error);

              // We break out of the loop to avoid automatically trying the rest of the transactions.
              // The other transactions will be tried when the user reloads the page.
              break LOOP;
            }
            case "rollback": {
              await thunk.onRollback(thunk.transaction);
              thunk.deferred.reject(result.error);
              break;
            }
            default: {
              throw new UnreachableCaseError(result);
            }
          }
        }
      }
    } catch (error) {
      globalState.getState().setErrorMessage(`
        An unexpected error occurred while syncing a recent change to the server.
        ${this.thunks.length && `Pausing further updates (updates remaining ${this.thunks.length}). `}
        Reload the page to try again.
      `);

      this.env.logger.fatal({ error }, "[TransactionQueue] unexpected error in dequeue");
    }

    this.running = false;
  }

  private getBatch() {
    const maxContentLength = 100_000;
    let batchSize = 0;
    const batch: Thunk[] = [];

    for (const thunk of this.thunks) {
      const thunkSize = JSON.stringify(thunk.transaction).length;
      if (batchSize + thunkSize > maxContentLength) break;
      batchSize += thunkSize;
      batch.push(thunk);
    }

    if (!batch.length && this.thunks.length) {
      // If we can't fit any transactions in a batch, just send the first one
      batch.push(this.thunks[0]!);
    }

    return batch;
  }

  private async logState(label?: string) {
    const transactions = await TransactionQueue.store.getItem<Transaction[]>(
      `transactions-${this.env.auth.getCurrentUserId()}`,
    );

    if (!transactions) return;

    if (!Array.isArray(transactions)) {
      this.env.logger.error({ label, transactions }, "[TransactionQueue] unexpected store value");
      return;
    }

    if (transactions.length < 20) return;

    this.env.logger.notice(
      {
        label,
        transactionsCount: transactions.length,
        // Theoretically a client could build up a huge list of transactions and logging all
        // of them could take up a lot of bandwidth and resources. Instead we just log the
        // first 5 transactions. Theoretically these are the most important anyway since
        // they're processed in order. So if an error is occurring when the transactions are
        // being processed, it would be the first transaction's fault.
        transactionsSlice: transactions.slice(0, 5),
      },
      "[TransactionQueue] pending transactions",
    );
  }
}

type WriteResponse =
  | { type: "ok"; recordMap: RecordMap }
  | { type: "offline" }
  | { type: "rollback"; error: Error }
  | { type: "error"; error: Error };

async function writeTransaction(
  environment: Pick<ClientEnvironment, "api" | "logger">,
  props: {
    transaction: Transaction;
    errorBackoffInSeconds: number;
  },
): Promise<WriteResponse> {
  const { transaction, errorBackoffInSeconds } = props;

  environment.logger.debug({ transaction }, "[write] sending transaction to server");

  let attempts = 0;

  // Submit and retry.
  while (true) {
    attempts++;

    const response = await environment.api.write(transaction);

    if (response.status === 200) {
      return { type: "ok", recordMap: response.body.recordMap };
    }

    if (response.status === 0) {
      // Offline
      return { type: "offline" };
    }

    if (response.status === ValidationError.statusCode) {
      environment.logger.error({ transaction }, `[write] ValidationError`);
      const error = new ValidationError(formatResponseError(response));
      return { type: "rollback", error };
    }

    if (response.status === PermissionError.statusCode) {
      environment.logger.error({ transaction }, `[write] PermissionError`);
      const error = new PermissionError(formatResponseError(response));
      return { type: "rollback", error };
    }

    if (response.status === TransactionConflictError.statusCode) {
      if (attempts > 5) {
        environment.logger.warn({ transaction }, `[write] TransactionConflictError`);
        const error = new TransactionConflictError(formatResponseError(response));
        return { type: "rollback", error };
      }

      // retry immediately
      continue;
    }

    if (response.status === ApiVersionError.statusCode) {
      // The API service is expected to handle these errors before resolving the request
      // and that involves a page reload. Reaching this branch is therefore unexpected.
      environment.logger.error({ transaction }, `[write] ApiVersionError`);
      throw new Error(`[TransactionQueue] [write] unexpected ApiVersionError`);
    }

    if (response.status === AuthenticationError.statusCode) {
      // The API service is expected to handle these errors before resolving the request
      // and that involves signing out, triggering a page reload. Reaching this branch
      // is therefore unexpected.
      environment.logger.error({ transaction }, `[write] AuthenticationError`);
      throw new Error(`[TransactionQueue] [write] unexpected AuthenticationError`);
    }

    // Errors handling
    // Below are unexpected errors which we attempt to retry

    if (attempts <= 4) {
      // Will retry after errorBackoffInSeconds (increasing exponentially)
      // With the default of 10 seconds, the retries will be at 10, 20, 40 and 80 seconds
      await wait(2 ** (attempts - 1) * MS_IN_SECOND * errorBackoffInSeconds);
      continue;
    }

    // Unknown error
    const error = new Error(response.status + ": " + response.body);
    environment.logger.error({ transaction, error }, `[write] unknown error`);
    return { type: "error", error };
  }
}
