import { isEqual, uniqWith, once } from "lodash-comms";
import {
  ApiVersionError,
  AuthenticationError,
  PermissionError,
  TransactionConflictError,
  UnreachableCaseError,
  ValidationError,
} from "libs/errors";
import { formatResponseError } from "./api.service";
import { Transaction, getOperationPointers, op, squashTransactions } from "libs/transaction";
import { getPointer, RecordMap, RecordPointer } from "libs/schema";
import { DeferredPromise, onlyCallFnOnceWhilePreviousCallIsPending, throttleAsyncFn, wait } from "libs/promise-utils";
import { MS_IN_MINUTE, MS_IN_SECOND } from "libs/date-helpers";
import { ClientEnvironment } from "./ClientEnvironment";
import { globalState } from "~/state/global.state";
import {
  BehaviorSubject,
  distinctUntilChanged,
  firstValueFrom,
  map,
  Observable,
  pairwise,
  share,
  switchMap,
} from "rxjs";
import { startWith } from "libs/rxjs-operators";
import { arrayChange } from "libs/array-utils";
import { LocalForageKVStore } from "~/utils/KVStore";
import { isDefined } from "libs/predicates";

export type QueuedTx = {
  transaction: Transaction;
  deferred: DeferredPromise<void>;
};

export type DebouncedTx = {
  toQueue: QueuedTx;
  debounceTimeoutId: ReturnType<typeof setTimeout>;
  debounceMs: number;
  maxWaitTimeoutId: ReturnType<typeof setTimeout>;
  maxWaitMs: number;
};

export type EnqueueOptions = {
  debounce?: {
    key: string;
    /** Pass 0 to dequeue the transaction immediately and clear any existing debounce */
    debounceMs: number;
    /**
     * Pass 0 to dequeue the transaction immediately and clear any existing debounce. Otherwise,
     * the existing maxWaitMs, if any, will be preferred over this one.
     */
    maxWaitMs?: number;
  };
};

export type TransactionQueueApi = Pick<
  TransactionQueue,
  "init" | "dequeue" | "enqueue" | "isPendingWrite" | "isQueueEmpty" | "isQueueEmpty$" | "flush"
>;

export class TransactionQueue {
  /** Key to use with the isLoading service to indicate that the tx queue is loading */
  static readonly isLoadingKey = "TransactionQueue";
  /**
   * Key to use with the isLoading service to indicate that the tx queue is loading something
   * that is applicable in offline mode. This is a subset of the isLoadingKey.
   */
  static readonly isLoadingKeyOffline = "TransactionQueue-offline";

  // We use localForage here, instead of localStorage, because it also works inside worker's
  static store = new LocalForageKVStore({
    namespace: "TransactionQueue",
  });

  // number of seconds to wait before retrying after on error (will increase exponentially)
  private errorBackoffInSeconds: number;

  /** Thunks which are ready to be processed */
  queue: QueuedTx[] = [];

  debouncedTxs = new Map<
    string, // debounce key
    DebouncedTx
  >();

  // Each record is allowed a single debounced tx type at a time. This is intentional since allowing
  // multiple types of debounced txs for a single record seems like it would be susceptible to bugs.
  pointersWithDebouncedTx = new Map<
    string, // record key
    string // debounce key
  >();

  // Note that sometimes we might increment pending writes for the same record more times
  // than we have pending operations for that record. I.e. the pending writes count should
  // be thought of as "Are there any pending writes? Yes or no."
  pendingWrites$ = new BehaviorSubject<Map<string, number>>(new Map());
  get pendingWrites() {
    return this.pendingWrites$.getValue();
  }

  transactionStoreChanges$: Observable<Transaction[] | undefined>;

  constructor(
    public env: Pick<ClientEnvironment, "auth" | "api" | "network" | "logger" | "isLoading">,
    public props: {
      onWrite: (props: { recordMap: RecordMap }) => Promise<void>;
      onRollback: (props: { transaction: Transaction }) => Promise<void>;
      errorBackoffInSeconds?: number;
    },
  ) {
    this.env = {
      ...this.env,
      logger: this.env.logger.child({ name: "TransactionQueue" }),
    };

    this.errorBackoffInSeconds = props.errorBackoffInSeconds ?? 10;

    this.transactionStoreChanges$ = this.env.auth.currentUserId$.pipe(
      switchMap((currentUserId) => TransactionQueue.store.getItem$<Transaction[]>(`transactions-${currentUserId}`)),
      share(),
    );
  }

  init() {
    setInterval(this.logState.bind(this), MS_IN_MINUTE * 5);

    this.env.auth.currentUserId$.subscribe((currentUserId) => {
      this.loadFromOffline(currentUserId);
    });

    // Subscribe to storage changes so that we can be aware of pending writes in other tabs.
    // Note that if we increment pending writes for a transaction initiated in this tab multiple
    // times (e.g. once here and once below), that's ok.
    this.transactionStoreChanges$
      .pipe(
        map((transactions = []) => transactions.flatMap((tx) => tx.operations.flatMap(getOperationPointers))),
        startWith(() => []),
        pairwise(),
        map(([prev, next]) => arrayChange(next, prev, isEqual)),
      )
      .subscribe(({ added, removed }) => {
        this.incPendingWrites(added);
        this.decPendingWrites(removed);
      });
  }

  isQueueEmpty() {
    return this.pendingWrites.size === 0;
  }

  isQueueEmpty$() {
    return this.pendingWrites$.pipe(
      map((pendingWrites) => pendingWrites.size === 0),
      distinctUntilChanged(),
    );
  }

  isPendingWrite(pointer: RecordPointer<any>) {
    const n = this.pendingWrites.get(pointerToKey(pointer)) || 0;
    return n > 0;
  }

  /**
   * Used to flush debounced transactions and immediately start processing them. If
   * debounceKeys are supplied then only a debounced transaction matching that key will
   * be flushed. Otherwise, all debounced transactions will be flushed.
   *
   * When flush is called, it will synchronously push debounced transactions to the queue
   * (updating `queue`, `debouncedTxs`, and `pointersWithDebouncedTx`) but it won't resolve
   * until the tx manager's offline store has been updated.
   */
  async flush(pointers?: RecordPointer[]) {
    if (pointers) {
      const debouncedKeys = new Set<string>();

      for (const pointer of pointers) {
        const pointerKey = pointerToKey(pointer);
        const debounceKey = this.pointersWithDebouncedTx.get(pointerKey);
        if (debounceKey) debouncedKeys.add(debounceKey);
        this.pointersWithDebouncedTx.delete(pointerKey);
      }

      if (!debouncedKeys.size) return;

      for (const debouncedKey of debouncedKeys) {
        const data = this.debouncedTxs.get(debouncedKey);

        if (!data) {
          this.env.logger.warn({ debouncedKey }, `[flush] missing debouncedKey data`);
          continue;
        }

        this.queue.push(data.toQueue);
        clearTimeout(data.debounceTimeoutId);
        clearTimeout(data.maxWaitTimeoutId);
        this.debouncedTxs.delete(debouncedKey);
      }
    } else {
      if (!this.debouncedTxs.size) return;

      for (const data of this.debouncedTxs.values()) {
        clearTimeout(data.debounceTimeoutId);
        clearTimeout(data.maxWaitTimeoutId);
        this.queue.push(data.toQueue);
      }

      this.debouncedTxs.clear();
      this.pointersWithDebouncedTx.clear();
    }

    // We save for offline again because the order of this thunk may now be different vs
    // before.
    await this.saveState();
    this.dequeue();
  }

  /** Use write(environment, transaction) instead of this function for optimistic updates. */
  async enqueue(transaction: Transaction, options?: EnqueueOptions) {
    const pointers = uniqWith(transaction.operations.flatMap(getOperationPointers), isEqual);

    let deferred: DeferredPromise<void>;

    if (!options?.debounce) {
      deferred = new DeferredPromise();
      this.incPendingWrites(pointers);
      this.env.isLoading.add(deferred.promise, { key: TransactionQueue.isLoadingKey });

      // It's important that we flush any debounced updates before pushing this new tx onto the
      // queue so that those updates happen before this tx.
      this.flush(pointers);
      this.queue.push({ deferred, transaction });

      await this.saveState();
      this.dequeue();
      return deferred.promise;
    }

    const { debounce } = options;

    // Flush any pointers which have a pending tx associated with a different debounce key
    const pointersToBeFlushed = pointers.filter((pointer) => {
      const pointerKey = pointerToKey(pointer);
      const existingDebounceKey = this.pointersWithDebouncedTx.get(pointerKey);
      return isDefined(existingDebounceKey) && existingDebounceKey !== debounce.key;
    });

    // No need to await saving the new state as we're about to update the state again
    this.flush(pointersToBeFlushed);

    pointers.forEach((pointer) => {
      const pointerKey = pointerToKey(pointer);
      // Mark this pointer as having a debounced tx with the new key.
      this.pointersWithDebouncedTx.set(pointerKey, debounce.key);
    });

    const debouncedTx = this.debouncedTxs.get(debounce.key);
    const onTimeout = once(() => this.flush(pointers));
    let maxWaitTimeoutId: ReturnType<typeof setTimeout>;
    let maxWaitMs: number;

    if (debouncedTx) {
      transaction = squashTransactions([debouncedTx.toQueue.transaction, transaction]);
      maxWaitMs = debouncedTx.maxWaitMs;
      maxWaitTimeoutId = debouncedTx.maxWaitTimeoutId;
      clearTimeout(debouncedTx.debounceTimeoutId);
      deferred = debouncedTx.toQueue.deferred;
    } else {
      maxWaitMs = debounce.maxWaitMs || MS_IN_SECOND * 30;
      maxWaitTimeoutId = setTimeout(onTimeout, maxWaitMs);
      deferred = new DeferredPromise();
      this.incPendingWrites(pointers);
    }

    this.env.isLoading.add(deferred.promise, { key: TransactionQueue.isLoadingKey });

    const thunk: QueuedTx = { deferred, transaction };

    const debounceTimeoutId = setTimeout(onTimeout, debounce.debounceMs);

    this.debouncedTxs.set(debounce.key, {
      toQueue: thunk,
      debounceTimeoutId,
      debounceMs: debounce.debounceMs,
      maxWaitTimeoutId,
      maxWaitMs,
    });

    if (debounce.debounceMs === 0 || debounce.maxWaitMs === 0) {
      await this.flush(pointers);
    } else {
      await this.saveState();
    }

    return deferred.promise;
  }

  dequeue = onlyCallFnOnceWhilePreviousCallIsPending(async () => {
    try {
      LOOP: while (this.queue.length && this.env.network.isOnline()) {
        // Try writing a batch.
        const batch = this.getBatch();

        if (!batch.length) break LOOP;

        const { authorId } = batch[0]!.transaction;
        const operations = batch.flatMap(({ transaction }) => transaction.operations);
        const transaction = op.transaction({ authorId, operations });

        const batchResult = await writeTransaction(this.env, {
          transaction,
          errorBackoffInSeconds: this.errorBackoffInSeconds,
          maxAttempts: 3,
        });

        if (batchResult.type === "offline") break LOOP; // Try again later...

        if (batchResult.type === "ok") {
          this.queue.splice(0, batch.length);
          await this.saveState();
          await this.resolve(batch, batchResult.recordMap);
          continue LOOP;
        }

        // If there's an error writing the batch, write each transaction one by one since
        // some of them might be fine on their own.

        for (const thunk of batch) {
          const result = await writeTransaction(this.env, {
            transaction: thunk.transaction,
            errorBackoffInSeconds: this.errorBackoffInSeconds,
            maxAttempts: 5,
          });

          if (result.type === "offline") break LOOP; // Try again later...

          this.queue.shift();
          await this.saveState();

          switch (result.type) {
            case "ok": {
              await this.resolve([thunk], result.recordMap);
              break;
            }
            case "error": {
              this.env.logger.fatal({ error: result.error }, "[TransactionQueue] unexpected error in dequeue");

              globalState.getState().setError({
                message: getErrorMessage(this.queue.length),
                code: "TQ01",
              });

              await this.rollback(thunk, result.error);

              // We break out of the loop to avoid automatically trying the rest of the transactions.
              // The other transactions will be tried when the user reloads the page.
              break LOOP;
            }
            case "rollback": {
              await this.rollback(thunk, result.error);
              break;
            }
            default: {
              throw new UnreachableCaseError(result);
            }
          }
        }
      }
    } catch (error) {
      globalState.getState().setError({
        message: getErrorMessage(this.queue.length),
        code: "TQ02",
      });

      this.env.logger.fatal({ error }, "[TransactionQueue] unexpected error in dequeue");
    }
  });

  // we're intentionally passing the current user ID here instead of using getCurrentUserId
  // to make it clear that this method is impacted by the current user ID.
  async loadFromOffline(currentUserId: string | null) {
    const key = `transactions-${currentUserId}`;

    const transactions = await TransactionQueue.store.getItem<Transaction[]>(key);

    if (!transactions) return;

    if (this.queue.length || this.debouncedTxs.size) {
      this.env.logger.error(
        { readyThunksCount: this.queue.length, debouncedThunksCount: this.debouncedTxs.size },
        `[loadFromOffline] existing transactions in memory already`,
      );
    }

    // When loading offline transactions, we first clear any existing ones which are pending.
    this.queue = [];
    this.debouncedTxs = new Map();

    for (const transaction of transactions) {
      this.enqueue(transaction);
    }
  }

  saveState = (() => {
    const save = throttleAsyncFn(async () => {
      const currentUserId = this.env.auth.getCurrentUserId();

      // When saving transactions for offline, we ignore the debounced state.
      // Under normal usage, these transactions will be executed from memory, respecting the
      // debounced state. But if the client is closed and reopened, then we forget the debounced
      // state and attempt to execute all saved transactions immediately.
      const transactions = [
        ...this.queue.map((thunk) => thunk.transaction),
        ...Array.from(this.debouncedTxs.values()).map(({ toQueue }) => toQueue.transaction),
      ];

      try {
        const nextChange = firstValueFrom(this.transactionStoreChanges$);
        const changed = await TransactionQueue.store.setItem(`transactions-${currentUserId}`, transactions);
        // Wait to resolve until after we've processed the emission of `TransactionQueue.store.getItem$` above.
        // This is important so that, when rolling back a transaction, we process the rollback after we've decremented
        // any pending writes related to the transaction.
        if (changed) await nextChange;
      } catch (error) {
        this.env.logger.error({ error }, `[saveForOffline] error`);
      }
    });

    // If we called `isLoading.add()` inside the `save()` function, then the previous call would resolve,
    // setting the `TransactionQueue.isLoadingKeyOffline` loading key to false, before the next call would
    // then be processed. The result would be that the `TransactionQueue.isLoadingKeyOffline` key would
    // "flash" from `true -> false -> true`, which could potentially cause unexpected behavior if trigger
    // an event as soon as they key becomes `false`.
    return () =>
      this.env.isLoading.add(save(), {
        key: [TransactionQueue.isLoadingKey, TransactionQueue.isLoadingKeyOffline],
      });
  })();

  getBatch() {
    const maxContentLength = 100_000;
    let batchSize = 0;
    const batch: QueuedTx[] = [];

    for (const thunk of this.queue) {
      const thunkSize = JSON.stringify(thunk.transaction).length;
      if (batchSize + thunkSize > maxContentLength) break;
      batchSize += thunkSize;
      batch.push(thunk);
    }

    if (!batch.length && this.queue.length) {
      // If we can't fit any transactions in a batch, just send the first one
      batch.push(this.queue[0]!);
    }

    return batch;
  }

  async resolve(queued: QueuedTx[], recordMap: RecordMap) {
    await this.props.onWrite({ recordMap });

    for (const data of queued) {
      const pointers = uniqWith(data.transaction.operations.flatMap(getOperationPointers), isEqual);
      this.decPendingWrites(pointers);
      data.deferred.resolve();
      this.env.logger.debug({ transaction: data.transaction }, `transaction successfully sent to server`);
    }
  }

  async rollback(queued: QueuedTx, error: Error) {
    const { transaction, deferred } = queued;
    // We want to decrement the pending writes before calling onRollback so that
    // we can check and see if a record still has pending writes inside the onRollback call.
    const pointers = uniqWith(transaction.operations.flatMap(getOperationPointers), isEqual);
    this.decPendingWrites(pointers);

    try {
      this.env.logger.warn({ transaction, error }, `transaction rejected by server`);
      await this.props.onRollback({ transaction });
      deferred.reject(error);
    } catch (error) {
      this.env.logger.fatal({ error }, "[rollback] error during rollback");

      globalState.getState().setError({
        message: "An unexpected error occurred while rolling back a transaction.",
        code: "TQ03",
      });

      // Note that we're intentionally not rejecting the promise in the case of an error on rollback.
      // This is so that any followup logic doesn't run.
    }
  }

  incPendingWrites(pointers: RecordPointer[]) {
    pointers.forEach((pointer) => {
      const key = pointerToKey(pointer);
      const n = this.pendingWrites.get(key) || 0;
      this.pendingWrites.set(key, n + 1);
    });

    this.pendingWrites$.next(this.pendingWrites);
  }

  decPendingWrites(pointers: RecordPointer[]) {
    pointers.forEach((pointer) => {
      const key = pointerToKey(pointer);
      const n = this.pendingWrites.get(key) || 0;

      if (n <= 1) {
        this.pendingWrites.delete(key);
      } else {
        this.pendingWrites.set(key, n - 1);
      }
    });

    this.pendingWrites$.next(this.pendingWrites);
  }

  async logState(label?: string) {
    const transactions = await TransactionQueue.store.getItem<Transaction[]>(
      `transactions-${this.env.auth.getCurrentUserId()}`,
    );

    if (!transactions) return;

    if (!Array.isArray(transactions)) {
      this.env.logger.error({ label, transactions }, "[TransactionQueue] unexpected store value");
      return;
    }

    if (transactions.length < 20) return;

    this.env.logger.notice(
      {
        label,
        transactionsCount: transactions.length,
        // Theoretically a client could build up a huge list of transactions and logging all
        // of them could take up a lot of bandwidth and resources. Instead we just log the
        // first 5 transactions. Theoretically these are the most important anyway since
        // they're processed in order. So if an error is occurring when the transactions are
        // being processed, it would be the first transaction's fault.
        transactionsSlice: transactions.slice(0, 5),
      },
      "[TransactionQueue] pending transactions",
    );
  }
}

/* -----------------------------------------------------------------------------------------------*/

type WriteResponse =
  | { type: "ok"; recordMap: RecordMap }
  | { type: "offline" }
  | { type: "rollback"; error: Error }
  | { type: "error"; error: Error };

async function writeTransaction(
  environment: Pick<ClientEnvironment, "api" | "logger">,
  props: {
    transaction: Transaction;
    errorBackoffInSeconds: number;
    maxAttempts: number;
  },
): Promise<WriteResponse> {
  const { transaction, errorBackoffInSeconds, maxAttempts } = props;

  environment.logger.debug({ transaction }, "[write] sending transaction to server");

  let attempts = 0;

  // Submit and retry.
  while (true) {
    attempts++;

    const response = await environment.api.write(transaction);

    if (response.status === 200) {
      return { type: "ok", recordMap: response.body.recordMap };
    }

    if (response.status === 0) {
      // Offline
      return { type: "offline" };
    }

    if (response.status === ValidationError.statusCode) {
      environment.logger.error({ transaction }, `[write] ValidationError`);
      const error = new ValidationError(formatResponseError(response));
      return { type: "rollback", error };
    }

    if (response.status === PermissionError.statusCode) {
      environment.logger.error({ transaction }, `[write] PermissionError`);
      const error = new PermissionError(formatResponseError(response));
      return { type: "rollback", error };
    }

    if (response.status === TransactionConflictError.statusCode) {
      if (attempts > maxAttempts) {
        environment.logger.warn({ transaction }, `[write] TransactionConflictError`);
        const error = new TransactionConflictError(formatResponseError(response));
        return { type: "rollback", error };
      }

      // retry immediately
      continue;
    }

    if (response.status === ApiVersionError.statusCode) {
      // The API service is expected to handle these errors before resolving the request
      // and that involves a page reload. Reaching this branch is therefore unexpected.
      environment.logger.error({ transaction }, `[write] ApiVersionError`);
      throw new Error(`[TransactionQueue] [write] unexpected ApiVersionError`);
    }

    if (response.status === AuthenticationError.statusCode) {
      // The API service is expected to handle these errors before resolving the request
      // and that involves signing out, triggering a page reload. Reaching this branch
      // is therefore unexpected.
      environment.logger.error({ transaction }, `[write] AuthenticationError`);
      throw new Error(`[TransactionQueue] [write] unexpected AuthenticationError`);
    }

    // Errors handling
    // Below are unexpected errors which we attempt to retry

    if (attempts <= maxAttempts) {
      // Will retry after errorBackoffInSeconds (increasing exponentially)
      // With the default of 10 seconds, the retries will be at 10, 20, 40 and 80 seconds
      await wait(2 ** (attempts - 1) * MS_IN_SECOND * errorBackoffInSeconds);
      continue;
    }

    // Unknown error
    const error = new Error(response.status + ": " + response.body);
    environment.logger.error({ transaction, error }, `[write] unknown error`);
    return { type: "error", error };
  }
}

/* -----------------------------------------------------------------------------------------------*/

function getErrorMessage(thunkCount: number) {
  return `
    An unexpected error occurred while syncing a recent change to the server.
    ${thunkCount && `Pausing further updates (updates remaining ${thunkCount}). `}
    Reload the page to try again.
  `;
}

function pointerToKey(pointer: RecordPointer<any>) {
  return `${pointer.table}:${pointer.id}`;
}

/* -----------------------------------------------------------------------------------------------*/
