import { get, isEqual, set, unset, update, isPlainObject, pick, omit } from "lodash-comms";
import {
  RecordMap,
  RecordPointer,
  RecordTable,
  getMapRecord,
  setMapRecord,
  RecordValue,
  timelineEntryOrderEncoders,
  iterateRecordMap,
  getPointer,
} from "libs/schema";
import { IterableElement, Merge, SetOptional, Simplify, SetRequired } from "type-fest";
import { uuid } from "libs/uuid";
import { UnreachableCaseError, ValidationError } from "./errors";
import { isDefined } from "./predicates";

/* -------------------------------------------------------------------------------------------------
 * Transactions
 * -------------------------------------------------------------------------------------------------
 * Transactions in Comms are "last write wins". This means that, if person A is offline and attempts
 * an update and if person B is online and performs an update after person A, when person A comes
 * back online their update will be applied last and will overwrite person B's update. Note though
 * that if person A lost permission to perform a write while they were offline, their write will be
 * rejected when they come back online.
 */

export type Transaction = {
  txId: string;
  authorId: string;
  operations: Operation[];
  /** Only used for logging purposes */
  label?: string; // e.g. "createDraft"
  type?: string; // e.g. "createDraft-1234"
  /** Type is also required when using debounce */
  debounce?: number;
  // onOptimisticWrite?: () => void;
  // onServerResponse?: (props: { error?: unknown }) => void;
};

/* -------------------------------------------------------------------------------------------------
 * Operations
 * -------------------------------------------------------------------------------------------------
 */

export type Operation =
  | SetOperation
  | UpdateOperation
  | UpsertOperation
  | DeleteOperation
  | AssertVersionOperation
  | ListInsertOperation
  | ListRemoveOperation;

export type SetOperation = {
  type: "set";
  table: string;
  id: string;
  key: string[];
  value: any;
};

export type UpdateOperation = {
  type: "update";
  table: string;
  id: string;
  key: string[];
  value: any;
};

export type UpsertOperation = {
  type: "upsert";
  table: string;
  id: string;
  /**
   * onCreate operations can be operations on different
   * records.
   */
  onCreate?: Operation[];
  /**
   * onUpdate operations can be operations on different
   * records.
   */
  onUpdate?: Array<Operation | UpsertOnUpdateConditionalOp>;
};

export type UpsertOnUpdateConditionalOp = {
  type: "upsert_on_update";
  operations: Operation[];
  /**
   * The key value pairs of this `where` record must match the
   * record associated with the upsert operation in order for these
   * operations to be applied.
   */
  where: Record<string, ConditionalOpExp | undefined>;
};

export type ConditionalOpExp<T = unknown> = { eq: T };

export type DeleteOperation = {
  type: "delete";
  table: string;
  id: string;
  version?: number;
};

export type AssertVersionOperation = {
  type: "assert_version";
  table: RecordTable;
  id: string;
  version: number;
};

export type ListInsertOperation = {
  type: "list_insert";
  table: string;
  id: string;
  key: string[];
  value: any;
  /** Defaults to append */
  where?: "prepend" | "append" | { before: any } | { after: any };
};

export type ListRemoveOperation = {
  type: "list_remove";
  table: string;
  id: string;
  key: string[];
  value: any;
};

export type CreateRecord<T extends RecordTable | RecordValue> = T extends RecordTable
  ? // @ts-expect-error upset because the AutoField params don't necessarily exist in the RecordValue interface
    SetOptional<RecordValue<T>, AutoField | "deleted_at" | "deleted_by_user_id">
  : Omit<T, AutoField | "deleted_at" | "deleted_by_user_id">;

export type AutoField = "created_at" | "updated_at" | "server_updated_at" | "version";

type RecordBase = {
  version: number;
  created_at: string;
  updated_at: string;
  server_updated_at: string | null;
  deleted_at: string | null;
  deleted_by_user_id: string | null;
};

/* -----------------------------------------------------------------------------------------------*/

function setOp<T extends RecordTable>(
  table: T | RecordPointer<T>,
  value: WithSentinelValues<CreateRecord<T>>,
): SetOperation;
function setOp<T extends RecordTable, A extends keyof Omit<RecordValue<T>, "id" | AutoField>>(
  pointer: RecordPointer<T>,
  keys: [A],
  value: WithSentinelValues<RecordValue<T>[A]>,
): SetOperation;
function setOp<
  T extends RecordTable,
  A extends keyof Omit<RecordValue<T>, "id" | AutoField>,
  B extends keyof RecordValue<T>[A],
>(pointer: RecordPointer<T>, keys: [A, B], value: WithSentinelValues<RecordValue<T>[A][B]>): SetOperation;
function setOp<T extends RecordTable>(
  a: T | RecordPointer<T>,
  b: string[] | CreateRecord<T>,
  c?: unknown,
): SetOperation {
  const pointer = (
    typeof a === "string"
      ? {
          table: a,
          id: (b as unknown as { id: string }).id,
        }
      : a
  ) as RecordPointer;

  const keys = Array.isArray(b) ? b : [];

  const value = Array.isArray(b) ? c : b;

  return {
    type: "set",
    ...pointer,
    key: keys,
    value: value,
  } as SetOperation;
}

/* -----------------------------------------------------------------------------------------------*/

function upsertOp<T extends RecordTable>(
  pointer: RecordPointer<T>,
  props: {
    /**
     * An onCreate operation could be an operation for a different
     * record, not the record associated with the pointer.
     */
    onCreate?: Operation[];
    /**
     * An onUpdate operation could be an operation for a different
     * record, not the record associated with the pointer.
     */
    onUpdate?: Array<
      | Operation
      | Merge<
          UpsertOnUpdateConditionalOp,
          {
            /**
             * The key value pairs of this `where` record must match the
             * record associated with the upsert operation in order for these
             * operations to be applied.
             */
            where: {
              [K in keyof RecordValue<T>]?: ConditionalOpExp<RecordValue<T>[K]>;
            };
          }
        >
    >;
  },
): UpsertOperation {
  return {
    type: "upsert",
    ...pointer,
    onCreate: props.onCreate,
    onUpdate: props.onUpdate,
  };
}

/* -----------------------------------------------------------------------------------------------*/

function updateOp<T extends RecordTable>(
  pointer: RecordPointer<T>,
  value: Partial<WithSentinelValues<Omit<RecordValue<T>, "id" | AutoField>>>,
): UpdateOperation;
function updateOp<T extends RecordTable, K extends keyof Omit<RecordValue<T>, "id" | AutoField>>(
  pointer: RecordPointer<T>,
  key: K,
  value: Partial<WithSentinelValues<RecordValue<T>[K]>>,
): UpdateOperation;
function updateOp<T extends RecordTable, K extends keyof Omit<RecordValue<T>, "id" | AutoField>>(
  pointer: RecordPointer<T>,
  a: K | Partial<RecordValue<T>>,
  b?: Partial<RecordValue<T>[K]>,
) {
  if (typeof a === "string") {
    const key = a;
    const value = b;

    return {
      type: "update",
      ...pointer,
      key: [key],
      value: value,
    } as UpdateOperation;
  }

  return {
    type: "update",
    ...pointer,
    key: [],
    value: a,
  } as UpdateOperation;
}

/* -----------------------------------------------------------------------------------------------*/

function deleteOp<T extends RecordTable>(table: T, record: { id: string }): DeleteOperation;
function deleteOp<T extends RecordTable>(table: T, id: string): DeleteOperation;
function deleteOp<T extends RecordTable>(pointer: RecordPointer<T>): DeleteOperation;
function deleteOp<T extends RecordTable>(a: T | RecordPointer<T>, b?: string | { id: string }): DeleteOperation {
  if (typeof a === "string") {
    if (!b) {
      throw new Error("deleteOp: Must provide an id when providing a table.");
    }

    return {
      type: "delete",
      table: a,
      id: typeof b === "string" ? b : b.id,
    };
  }

  return {
    type: "delete",
    table: a.table,
    id: a.id,
  };
}

/* -----------------------------------------------------------------------------------------------*/

function assertVersionOp<T extends RecordTable>(
  table: T,
  record: { id: string; version: number },
): AssertVersionOperation {
  return {
    type: "assert_version",
    table,
    id: record.id,
    version: record.version,
  };
}

/* -----------------------------------------------------------------------------------------------*/

function listInsertOp<T extends RecordTable, K extends keyof Omit<RecordValue<T>, "id" | AutoField> & string>(
  pointer: RecordPointer<T>,
  key: K,
  value: IterableElement<RecordValue<T>[K]>,
  options: { where?: ListInsertOperation["where"] } = {},
): ListInsertOperation {
  return {
    type: "list_insert",
    table: pointer.table,
    id: pointer.id,
    key: [key],
    value,
    where: options.where,
  };
}

/* -----------------------------------------------------------------------------------------------*/

function listRemoveOp<T extends RecordTable, K extends keyof Omit<RecordValue<T>, "id" | AutoField> & string>(
  pointer: RecordPointer<T>,
  key: K,
  value: IterableElement<RecordValue<T>[K]>,
): ListRemoveOperation {
  return {
    type: "list_remove",
    table: pointer.table,
    id: pointer.id,
    key: [key],
    value,
  };
}

/** ------------------------------------------------------------------------------------------------
 * Sentinel field values
 * -------------------------------------------------------------------------------------------------
 * Sentinel field values are special values that can be used in operations. These values are dynamically
 * replaced when the operation is applied with a new value which depends on the sentinel type. For
 * example, the `SERVER_TIMESTAMP` sentinel value will be replaced with the current timestamp when the
 * operation is applied.
 */

const fieldValue = {
  SERVER_TIMESTAMP: () => ({
    __SENTINEL_VALUE__: "SERVER_TIMESTAMP",
  }),
  FUTURE_TIME_OR_SERVER_TIMESTAMP: (props: { futureTimestamp: string }) => ({
    __SENTINEL_VALUE__: "FUTURE_TIME_OR_SERVER_TIMESTAMP",
    futureTimestamp: props.futureTimestamp,
  }),
  BRANCHED_DRAFT_TIMELINE_ORDER: (props: {
    branchedFromMessageId: string;
    draftCreatedAt: SentinelValue<"SERVER_TIMESTAMP">;
  }) => {
    return {
      __SENTINEL_VALUE__: "BRANCHED_DRAFT_TIMELINE_ORDER",
      branchedFromMessageTimelineOrder: _getValueFromRecordSentinel({
        table: "message",
        id: props.branchedFromMessageId,
        path: ["timeline_order"],
      }),
      draftCreatedAt: props.draftCreatedAt,
    };
  },
  BRANCHED_THREAD_TIMELINE_ORDER: (props: {
    branchedFromMessageId: string;
    newMessageTimelineOrder: SentinelValue<"MESSAGE_TIMELINE_ORDER">;
  }) => {
    return {
      __SENTINEL_VALUE__: "BRANCHED_THREAD_TIMELINE_ORDER",
      branchedFromMessageTimelineOrder: _getValueFromRecordSentinel({
        table: "message",
        id: props.branchedFromMessageId,
        path: ["timeline_order"],
      }),
      newMessageTimelineOrder: props.newMessageTimelineOrder,
    };
  },
  MESSAGE_TIMELINE_ORDER: (props: { scheduledToBeSentAt: string }) => ({
    __SENTINEL_VALUE__: "MESSAGE_TIMELINE_ORDER",
    scheduledToBeSentAt: props.scheduledToBeSentAt,
  }),
  VALUE_FROM_RECORD: _getValueFromRecordSentinel,
} satisfies {
  [K in keyof SentinelPropMap]: (...args: any[]) => SentinelValue<K>;
};

function _getValueFromRecordSentinel<T extends RecordTable>(props: {
  table: T;
  id: string;
  path: [keyof RecordValue<T>];
}): SentinelValue<"VALUE_FROM_RECORD">;
function _getValueFromRecordSentinel<
  T extends RecordTable,
  A extends keyof RecordValue<T>,
  B extends keyof RecordValue<T>[A],
>(props: { table: T; id: string; path: [A, B] }): SentinelValue<"VALUE_FROM_RECORD">;
function _getValueFromRecordSentinel(props: any): SentinelValue<"VALUE_FROM_RECORD"> {
  return {
    __SENTINEL_VALUE__: "VALUE_FROM_RECORD",
    table: props.table,
    id: props.id,
    path: props.path,
  };
}

type SentinelPropMap = {
  /** Will be replaced with the current timestamp by the server */
  SERVER_TIMESTAMP: undefined;
  /**
   * When the server receives this value, if the provided futureTimestamp is
   * greater than the current server timestamp, the server will use the provided
   * futureTimestamp. If the provided futureTimestamp is less than the current
   * server timestamp, the server will use the current server timestamp.
   */
  FUTURE_TIME_OR_SERVER_TIMESTAMP: {
    futureTimestamp: string;
  };
  BRANCHED_DRAFT_TIMELINE_ORDER: {
    branchedFromMessageTimelineOrder: SentinelValue<"VALUE_FROM_RECORD">;
    draftCreatedAt: SentinelValue<"SERVER_TIMESTAMP">;
  };
  BRANCHED_THREAD_TIMELINE_ORDER: {
    branchedFromMessageTimelineOrder: SentinelValue<"VALUE_FROM_RECORD">;
    newMessageTimelineOrder: SentinelValue<"MESSAGE_TIMELINE_ORDER">;
  };
  MESSAGE_TIMELINE_ORDER: {
    scheduledToBeSentAt: string;
  };
  /**
   * When evaluated on the server, the specified record will be fetched and the
   * provided path of record keys will be followed to get a value. This value
   * will be used in place of the `VALUE_FROM_RECORD` sentinel value. If the
   * specified record does not exist, the operation will fail with a validation
   * error. Example:
   *
   * ```ts
   * // server psuedo code
   * const valueFromRecord = { table: "tag", id: "1", path: ["data", "some_value"] };
   * const tagRecord = { id: "1", data: { some_value: "hello" } };
   * const resolvedValue = valueFromRecord.path.reduce((record, key) => record?.[key], tagRecord);
   * ```
   */
  VALUE_FROM_RECORD: {
    table: string;
    id: string;
    path: string[];
  };
};

type SentinelToValueMap = {
  [K in keyof SentinelPropMap]: SentinelPropMap[K] extends undefined
    ? { __SENTINEL_VALUE__: K }
    : Simplify<{ __SENTINEL_VALUE__: K } & SentinelPropMap[K]>;
};

export type WithSentinelValues<T, Props extends keyof T = keyof T> =
  T extends Record<string, unknown>
    ? { [K in keyof T]: K extends Props ? WithSentinelValues<T[K]> : T[K] }
    : T | SentinelValue;

export type SentinelValue<T extends keyof SentinelPropMap = keyof SentinelPropMap> = SentinelToValueMap[T];

/* -------------------------------------------------------------------------------------------------
 *  deeplyReplaceSentinelValues
 * -------------------------------------------------------------------------------------------------
 */

export type SentinelContext = {
  currentTimestamp: string;
  recordMap: RecordMap;
  /**
   * A set of record cache keys (i.e. strings in the form "table:id"). Records found
   * in this set can no longer be mutated.
   *
   * See applyTransaction definition for more info.
   */
  frozenRecords?: Set<string>;
};

/** Returns a new object and does not mutate input */
function deeplyReplaceSentinelValues<T>(input: T, context: SentinelContext): T {
  if (isSentinelValue(input)) {
    return getSentinelValue(input, context);
  } else if (Array.isArray(input)) {
    return input.map((item) => deeplyReplaceSentinelValues(item, context)) as T;
  } else if (isPlainObject(input)) {
    return Object.fromEntries(
      Object.entries(input as Record<string, unknown>).map(([key, value]) => {
        return [key, deeplyReplaceSentinelValues(value, context)];
      }),
    ) as T;
  } else {
    return input;
  }
}

/* -----------------------------------------------------------------------------------------------*/

function getSentinelValue(value: SentinelValue, context: SentinelContext): any {
  switch (value.__SENTINEL_VALUE__) {
    case "SERVER_TIMESTAMP": {
      return context.currentTimestamp;
    }
    case "FUTURE_TIME_OR_SERVER_TIMESTAMP": {
      if (value.futureTimestamp > context.currentTimestamp) {
        return value.futureTimestamp;
      } else {
        return context.currentTimestamp;
      }
    }
    case "BRANCHED_DRAFT_TIMELINE_ORDER": {
      const branchedFromMessageTimelineOrder = isSentinelValue(value.branchedFromMessageTimelineOrder)
        ? getSentinelValue(value.branchedFromMessageTimelineOrder, context)
        : value.branchedFromMessageTimelineOrder;

      const draftCreatedAt = isSentinelValue(value.draftCreatedAt)
        ? getSentinelValue(value.draftCreatedAt, context)
        : value.draftCreatedAt;

      return timelineEntryOrderEncoders.BRANCHED_DRAFT({
        branchedFromMessageTimelineOrder,
        draftCreatedAt,
      });
    }
    case "BRANCHED_THREAD_TIMELINE_ORDER": {
      const branchedFromMessageTimelineOrder = isSentinelValue(value.branchedFromMessageTimelineOrder)
        ? getSentinelValue(value.branchedFromMessageTimelineOrder, context)
        : value.branchedFromMessageTimelineOrder;

      const newMessageTimelineOrder = isSentinelValue(value.newMessageTimelineOrder)
        ? getSentinelValue(value.newMessageTimelineOrder, context)
        : value.newMessageTimelineOrder;

      return timelineEntryOrderEncoders.BRANCHED_THREAD({
        branchedFromMessageTimelineOrder,
        newMessageTimelineOrder,
      });
    }
    case "MESSAGE_TIMELINE_ORDER": {
      const sentAt =
        value.scheduledToBeSentAt > context.currentTimestamp ? value.scheduledToBeSentAt : context.currentTimestamp;

      return timelineEntryOrderEncoders.MESSAGE({
        sent_at: sentAt,
        scheduled_to_be_sent_at: value.scheduledToBeSentAt,
      });
    }
    case "VALUE_FROM_RECORD": {
      const { table, id } = value;
      const pointer = { table, id } as RecordPointer;
      const record = getMapRecord(context.recordMap, pointer);

      if (!record) {
        throw new ValidationError(`VALUE_FROM_RECORD cannot find "${table}:${id}".`);
      }

      context.frozenRecords?.add(`${table}:${id}`);

      return get(record, value.path);
    }
    default: {
      throw new UnreachableCaseError(value);
    }
  }
}

/* -----------------------------------------------------------------------------------------------*/

function deeplyGetReferencedSentinels(
  input: unknown,
  /** Implementation detail. Do not provide!! */
  store: SentinelValue[] = [],
): SentinelValue[] {
  if (isSentinelValue(input)) {
    store.push(input);
  } else if (Array.isArray(input)) {
    input.forEach((item) => deeplyGetReferencedSentinels(item, store));
  } else if (isPlainObject(input)) {
    Object.values(input as Record<string, unknown>).forEach((item) => {
      deeplyGetReferencedSentinels(item, store);
    });
  } else {
    // do nothing
  }

  return store;
}

/* -----------------------------------------------------------------------------------------------*/

function getPointersReferencedBySentinel(value: SentinelValue): RecordPointer[] {
  switch (value.__SENTINEL_VALUE__) {
    case "BRANCHED_DRAFT_TIMELINE_ORDER": {
      return [
        ...getPointersReferencedBySentinel(value.branchedFromMessageTimelineOrder),
        ...getPointersReferencedBySentinel(value.draftCreatedAt),
      ];
    }
    case "BRANCHED_THREAD_TIMELINE_ORDER": {
      return [
        ...getPointersReferencedBySentinel(value.branchedFromMessageTimelineOrder),
        ...getPointersReferencedBySentinel(value.newMessageTimelineOrder),
      ];
    }
    case "FUTURE_TIME_OR_SERVER_TIMESTAMP": {
      return [];
    }
    case "MESSAGE_TIMELINE_ORDER": {
      return [];
    }
    case "SERVER_TIMESTAMP": {
      return [];
    }
    case "VALUE_FROM_RECORD": {
      return [{ table: value.table, id: value.id } as RecordPointer];
    }
    default: {
      throw new UnreachableCaseError(value);
    }
  }
}

/* -----------------------------------------------------------------------------------------------*/

function isSentinelValue(value: unknown): value is SentinelValue {
  return isPlainObject(value) && "__SENTINEL_VALUE__" in (value as {});
}

/* -------------------------------------------------------------------------------------------------
 *  op
 * -------------------------------------------------------------------------------------------------
 */

export const op = {
  transaction(props: SetRequired<Partial<Transaction>, "authorId">) {
    return {
      txId: uuid(),
      operations: [],
      ...props,
    } as Transaction;
  },
  set: setOp,
  update: updateOp,
  upsert: upsertOp,
  delete: deleteOp,
  assertVersion: assertVersionOp,
  listInsert: listInsertOp,
  listRemove: listRemoveOp,
  fieldValue,
} as const;

/* -------------------------------------------------------------------------------------------------
 *  getOperationPointers
 * -------------------------------------------------------------------------------------------------
 */

/**
 * Given an operation, returns pointers for all the records needed to apply that operation.
 * Note that not all of these pointers will necessarily be changed by the operation.
 */
export function getOperationPointers(operation: Operation): RecordPointer[] {
  switch (operation.type) {
    case "assert_version": {
      return [{ table: operation.table, id: operation.id } as RecordPointer];
    }
    case "delete":
    case "list_insert":
    case "list_remove": {
      // delete, list insert and list remove are currently not allowed to include sentinels
      return [{ table: operation.table, id: operation.id } as RecordPointer];
    }
    case "set":
    case "update": {
      const pointers = [{ table: operation.table, id: operation.id } as RecordPointer];

      for (const sentinel of deeplyGetReferencedSentinels(operation.value)) {
        for (const pointer of getPointersReferencedBySentinel(sentinel)) {
          pointers.push(pointer);
        }
      }

      return pointers;
    }
    case "upsert": {
      const pointers = [{ table: operation.table, id: operation.id } as RecordPointer];

      if (operation.onCreate) {
        const newPointers = operation.onCreate.flatMap(getOperationPointers);

        pointers.push(...newPointers);
      }

      if (operation.onUpdate) {
        const newPointers = operation.onUpdate.flatMap((op) => {
          if (op.type === "upsert_on_update") {
            return op.operations.flatMap(getOperationPointers);
          }

          return getOperationPointers(op);
        });

        pointers.push(...newPointers);
      }

      return pointers;
    }
    default: {
      throw new UnreachableCaseError(operation, `getOperationPointers: unknown operation type.`);
    }
  }
}

/* -------------------------------------------------------------------------------------------------
 *  applyTransaction
 * -------------------------------------------------------------------------------------------------
 */

export function applyTransaction(props: {
  recordMap: RecordMap;
  transaction: Transaction;
  currentTimestamp: string;
  isServer: boolean;
}) {
  const { recordMap, transaction, currentTimestamp, isServer } = props;

  /**
   * A set of record cache keys (i.e. strings in the form "table:id"). Records found
   * in this set can no longer be mutated.
   *
   * These are records which, for whatever reason, can no longer be mutated in this
   * transaction. At time of writing, the reason for this would be because the
   * VALUE_FROM_RECORD sentinel referenced the record (but in the future there might
   * also be other reasons). The VALUE_FROM_RECORD sentinel
   * expects to get the most recent value of the specified record. If the record is
   * changed in the same transaction after the sentinel access it, then the value which
   * the sentinel previously extracted from the record might now be out-of-date.
   * Thus, after a sentinel accesses a record we freeze that record for the remainder
   * of the transaction and throw an ValidationError if an operation attempts to mutate
   * the record later in the same transaction. If you're running into this error,
   * you might be able to resolve it by simply reordering the operations in the
   * transaction so that no operations mutate a record used by a VALUE_FROM_RECORD
   * sentinel after the sentinel runs.
   */
  const frozenRecords = new Set<string>();

  // Keep track of the previous version so we can assert on write.
  for (const { record } of iterateRecordMap(recordMap)) {
    type RecordValueWithLastVersion = RecordValue & { __last_version: number };
    (record as RecordValueWithLastVersion).__last_version = record.version;
  }

  for (const operation of transaction.operations) {
    applyOperation({
      recordMap,
      operation,
      authorId: transaction.authorId,
      currentTimestamp,
      frozenRecords,
      isServer,
    });
  }

  for (const pointerWithRecord of iterateRecordMap(recordMap)) {
    type RecordValueWithLastVersion = RecordValue & {
      __last_version?: number | undefined;
    };

    const record = pointerWithRecord.record as RecordValueWithLastVersion;

    if (!("__last_version" in record)) {
      // For any new records which were created by an operation, make sure they have
      // a defined "__last_version" prop that is equal to `undefined`. The
      // writeRecordMap database method expects every record to have a "__last_version"
      // prop defined.
      record.__last_version = undefined;
    }

    if (isServer && record.version !== record.__last_version) {
      // We increment the version of the records that the server produces by 1 more than
      // the version of the records that the client produces. This is because, at minimum,
      // we only set the server_updated_at value on the server, so the version of the record
      // that the server produces looks different than the version that the client produces.
      // This can also hold true for sentinal values used in operations. This ensures that
      // the client re-downloads the records from the server after an update. Note that we
      // only want to increment the version of the record by 1 more than whatever the client
      // reached. This is because the client might have further optimistic updates that it
      // has applied to the record. We don't want to overwrite those optimistic updates with
      // the server's version until those updates have been reconciled with the server. If
      // two updates are made to the same record by different users, then yes it's possible
      // for the client's optimistic updates to be temporarily overridden by the server's
      // version. But when that client's operations are sent to the server the client and
      // server will eventually be consistent.
      record.version += 1;
    }
  }
}

/* -------------------------------------------------------------------------------------------------
 *  applyOperation
 * -------------------------------------------------------------------------------------------------
 */

export type ApplyOperationProps<O extends Operation = Operation> = {
  recordMap: RecordMap;
  operation: O;
  authorId: string;
  currentTimestamp: string;
  isServer: boolean;
  /**
   * A set of record cache keys (i.e. strings in the form "table:id"). Records found
   * in this set can no longer be mutated.
   *
   * See applyTransaction definition for more info.
   */
  frozenRecords?: Set<string>;
};

export function applyOperation(props: ApplyOperationProps) {
  switch (props.operation.type) {
    case "set": {
      return applySetOperation(props as ApplyOperationProps<SetOperation>);
    }
    case "update": {
      return applyUpdateOperation(props as ApplyOperationProps<UpdateOperation>);
    }
    case "upsert": {
      return applyUpsertOperation(props as ApplyOperationProps<UpsertOperation>);
    }
    case "delete": {
      return applyDeleteOperation(props as ApplyOperationProps<DeleteOperation>);
    }
    case "list_insert": {
      return applyListInsertOperation(props as ApplyOperationProps<ListInsertOperation>);
    }
    case "list_remove": {
      return applyListRemoveOperation(props as ApplyOperationProps<ListRemoveOperation>);
    }
    case "assert_version": {
      // This operation is just used by the database.write method to assert.
      return;
    }
    default: {
      throw new ValidationError("Unknown operation type.");
    }
  }
}

/* -------------------------------------------------------------------------------------------------
 *  invertOperation
 * -------------------------------------------------------------------------------------------------
 */

export type InvertOperationProps<O extends Operation = Operation> = {
  recordMap: RecordMap;
  operation: O;
  authorId: string;
};

export function invertOperation(props: InvertOperationProps) {
  switch (props.operation.type) {
    case "set": {
      return invertSetOperation(props as InvertOperationProps<SetOperation>);
    }
    case "update": {
      return invertUpdateOperation(props as InvertOperationProps<UpdateOperation>);
    }
    case "upsert": {
      return invertUpsertOperation(props as InvertOperationProps<UpsertOperation>);
    }
    case "delete": {
      return invertDeleteOperation(props as InvertOperationProps<DeleteOperation>);
    }
    case "list_insert": {
      return invertListInsertOperation(props as InvertOperationProps<ListInsertOperation>);
    }
    case "list_remove": {
      return invertListRemoveOperation(props as InvertOperationProps<ListRemoveOperation>);
    }
    case "assert_version": {
      // This operation is just used by the database.write method to assert.
      return;
    }
    default: {
      throw new ValidationError("Unknown operation type.");
    }
  }
}

/* -------------------------------------------------------------------------------------------------
 *  applySetOperation
 * -------------------------------------------------------------------------------------------------
 */

function applySetOperation(props: ApplyOperationProps<SetOperation>) {
  const { recordMap, operation, currentTimestamp } = props;
  const { table, id, key } = operation;
  const pointer = { table, id } as RecordPointer;

  if (props.frozenRecords?.has(`${table}:${id}`)) {
    throw new ValidationError(
      `Transaction attempts to mutate record "${table}:${id}" after that record ` +
        `was referenced by a sentinel value.`,
    );
  }

  const record = getMapRecord(recordMap, pointer);

  if (record) {
    const newRecord = structuredClone(record);

    if (key.length) {
      if (operation.value === undefined) {
        unset(newRecord, key);
      } else {
        set(newRecord, key, deeplyReplaceSentinelValues(operation.value, props));
      }
    } else {
      Object.assign(
        newRecord,
        omit(deeplyReplaceSentinelValues(operation.value, props), "version", "updated_at", "server_updated_at"),
      );

      // When setting a record which happens to already exist, we assume the user wants to "undelete" the
      // record unless they explicitely set the deleted_at field to a value.
      if (!isDefined(operation.value.deleted_at)) newRecord.deleted_at = null;
      if (!isDefined(operation.value.deleted_by_user_id)) newRecord.deleted_by_user_id = null;
    }

    newRecord.updated_at = currentTimestamp;
    if (props.isServer) newRecord.server_updated_at = currentTimestamp;
    newRecord.version += 1;

    setMapRecord(recordMap, pointer, newRecord);
  } else {
    if (key.length !== 0) throw new ValidationError("Record does not exist.");

    const record = deeplyReplaceSentinelValues<Partial<RecordBase>>({ ...operation.value, version: 1 }, props);

    record.created_at = currentTimestamp;
    record.updated_at = currentTimestamp;
    record.server_updated_at = props.isServer ? currentTimestamp : null;

    if (!operation.value.deleted_at) record.deleted_at = null;
    if (!operation.value.deleted_by_user_id) record.deleted_by_user_id = null;

    setMapRecord(recordMap, pointer, record);
  }
}

// /* -------------------------------------------------------------------------------------------------
//  *  invertSetOperation
//  * -------------------------------------------------------------------------------------------------
//  */

function invertSetOperation(props: InvertOperationProps<SetOperation>) {
  const { recordMap, operation } = props;
  const { table, id, key } = operation;
  const pointer = { table, id } as RecordPointer;

  const record = getMapRecord(recordMap, pointer);

  if (!record) {
    if (key.length !== 0) throw new ValidationError("Record does not exist.");

    const op: DeleteOperation = {
      type: "delete",
      table,
      id,
    };

    return op;
  }

  const value = key.length ? get(record, key) : omit(record, "version", "updated_at", "server_updated_at");

  const op: SetOperation = {
    type: "set",
    table,
    id,
    key,
    value,
  };

  return op;
}

/* -------------------------------------------------------------------------------------------------
 *  applyUpdateOperation
 * -------------------------------------------------------------------------------------------------
 */

function applyUpdateOperation(props: ApplyOperationProps<UpdateOperation>) {
  const { recordMap, operation, currentTimestamp } = props;
  const { table, id, key } = operation;
  const pointer = { table, id } as RecordPointer;

  if (props.frozenRecords?.has(`${table}:${id}`)) {
    throw new ValidationError(
      `Transaction attempts to mutate record "${table}:${id}" after that record ` +
        `was referenced by a sentinel value.`,
    );
  }

  const record = getMapRecord(recordMap, pointer);

  if (!record) {
    throw new ValidationError(`applyUpdateOperation: Record ${table}:${id} does not exist.`);
  }

  const newRecord = structuredClone(record);

  if (key.length > 0) {
    const oldValue = get(newRecord, key);

    let newValue: any;

    if (isPlainObject(oldValue)) {
      if (typeof operation.value !== "object") {
        throw new ValidationError(
          `[applyUpdateOperation] ${table}.${key} expects an object but received a non-object value.`,
        );
      }

      newValue = { ...oldValue };
      assignIgnoringUndefined(newValue, operation.value);
    } else {
      newValue = operation.value;
    }

    set(newRecord, key, deeplyReplaceSentinelValues(newValue, props));
  } else {
    if (typeof operation.value !== "object") {
      throw new ValidationError(`[applyUpdateOperation] expects an object but received a non-object value.`);
    }

    assignIgnoringUndefined(
      newRecord,
      omit(deeplyReplaceSentinelValues(operation.value, props), "version", "updated_at", "server_updated_at"),
    );
  }

  newRecord.updated_at = currentTimestamp;
  if (props.isServer) newRecord.server_updated_at = currentTimestamp;
  newRecord.version += 1;

  setMapRecord(recordMap, pointer, newRecord);
}

/* -------------------------------------------------------------------------------------------------
 *  invertUpdateOperation
 * -------------------------------------------------------------------------------------------------
 */

function invertUpdateOperation(props: InvertOperationProps<UpdateOperation>) {
  const { recordMap, operation } = props;
  const { table, id, key } = operation;
  const pointer = { table, id } as RecordPointer;

  const record = getMapRecord(recordMap, pointer);

  if (!record) {
    throw new ValidationError("Record does not exist.");
  }

  let newValue: any;

  if (key.length > 0) {
    const existingValue = get(record, key);

    if (isPlainObject(existingValue)) {
      if (typeof operation.value !== "object") {
        throw new ValidationError(
          `[applyUpdateOperation] ${table}.${key} expects an object but received a non-object value.`,
        );
      }

      newValue = pick(existingValue, Object.keys(operation.value));
    } else {
      newValue = existingValue;
    }
  } else {
    if (typeof operation.value !== "object") {
      throw new ValidationError(`[applyUpdateOperation] expects an object but received a non-object value.`);
    }

    newValue = pick(record, Object.keys(operation.value));
  }

  const op: UpdateOperation = {
    type: "update",
    table,
    id,
    key,
    value: structuredClone(newValue),
  };

  return op;
}

/* -------------------------------------------------------------------------------------------------
 *  applyUpsertOperation
 * -------------------------------------------------------------------------------------------------
 */

function applyUpsertOperation(props: ApplyOperationProps<UpsertOperation>) {
  const operations = getOperationsInUpsert(props);

  for (const op of operations) {
    applyOperation({ ...props, operation: op });
  }
}

/* -----------------------------------------------------------------------------------------------*/

function getOperationsInUpsert(props: { recordMap: RecordMap; operation: UpsertOperation }) {
  const { recordMap, operation } = props;
  const { table, id } = operation;
  const pointer = { table, id } as RecordPointer;

  // Upsert operations don't directly cause any mutations. It's the onCreate
  // or onUpdate portions of the upsert operation that cause mutations.
  //
  // if (props.frozenRecords?.has(`${table}:${id}`)) {
  //   throw new ValidationError();
  // }

  const record = getMapRecord(recordMap, pointer);
  let operations: Operation[] | undefined;

  if (record && !record.deleted_at) {
    if (operation.onUpdate) {
      operations = [];

      for (const op of operation.onUpdate) {
        if (op.type !== "upsert_on_update") {
          operations.push(op);
          continue;
        }

        const isWhereConditionSatisfied = Object.entries(op.where).every(([key, exp]) => {
          return matchConditionalExp(record, key, exp!);
        });

        if (!isWhereConditionSatisfied) continue;

        operations.push(...op.operations);
      }
    }
  } else {
    if (operation.onCreate) {
      operations = operation.onCreate;
    }
  }

  return operations || [];
}

/* -----------------------------------------------------------------------------------------------*/

function matchConditionalExp(record: Record<string, unknown>, key: string, exp: ConditionalOpExp) {
  if ("eq" in exp) {
    return isEqual(record[key], exp.eq);
  } else {
    throw new UnreachableCaseError(exp);
  }
}

/* -------------------------------------------------------------------------------------------------
 *  invertUpsertOperation
 * -------------------------------------------------------------------------------------------------
 */

function invertUpsertOperation(props: InvertOperationProps<UpsertOperation>) {
  const operations = getOperationsInUpsert(props);
  const invertedOperations: Operation[] = [];

  for (const op of operations) {
    const inverted = invertOperation({ ...props, operation: op });
    if (inverted) invertedOperations.push(inverted);
  }
}

/* -------------------------------------------------------------------------------------------------
 *  applyDeleteOperation
 * -------------------------------------------------------------------------------------------------
 */

function applyDeleteOperation(props: ApplyOperationProps<DeleteOperation>) {
  const pointer = getPointer(props.operation as RecordPointer);

  applyUpdateOperation({
    ...props,
    operation: op.update(pointer, {
      deleted_at: props.currentTimestamp,
      deleted_by_user_id: props.authorId,
    }),
  });
}

/* -------------------------------------------------------------------------------------------------
 *  invertDeleteOperation
 * -------------------------------------------------------------------------------------------------
 */

function invertDeleteOperation(props: InvertOperationProps<DeleteOperation>) {
  const pointer = getPointer(props.operation as RecordPointer);

  return invertUpdateOperation({
    ...props,
    operation: op.update(pointer, {
      deleted_at: fieldValue.SERVER_TIMESTAMP(),
      deleted_by_user_id: props.authorId,
    }),
  });
}

/* -------------------------------------------------------------------------------------------------
 *  applyListInsertOperation
 * -------------------------------------------------------------------------------------------------
 */

function applyListInsertOperation(props: ApplyOperationProps<ListInsertOperation>) {
  const { recordMap, operation, currentTimestamp } = props;
  const { key, table, id, value, where } = operation;

  if (isSentinelValue(value)) {
    // We want this function to be idempotent and it's not clear how we can do that
    // with sentinel values.
    throw new ValidationError("List insert operations currently don't support sentinel values.");
  }

  if (props.frozenRecords?.has(`${table}:${id}`)) {
    throw new ValidationError(
      `Transaction attempts to mutate record "${table}:${id}" after that record ` +
        `was referenced by a sentinel value.`,
    );
  }

  const pointer = { table, id } as RecordPointer;

  const record = getMapRecord(recordMap, pointer);

  if (!record) throw new ValidationError("Record does not exist.");

  const newRecord = structuredClone(record);

  update(newRecord, key, (list) => {
    if (list === null || list === undefined) {
      return [value];
    }

    if (!Array.isArray(list)) {
      throw new ValidationError("Cannot insert on a non-list.");
    }

    // Disallow duplicate items in a list so that this function is idempotent!
    list = list.filter((item) => item !== value);

    if (where === undefined || where === "append") {
      return [...list, value];
    }

    if (where === "prepend") {
      return [value, ...list];
    }

    if ("before" in where) {
      const i = deepEqualIndexOf(list, where.before);
      if (i === -1) return [value, ...list];
      return [...list.slice(0, i), value, ...list.slice(i)];
    }

    if ("after" in where) {
      const i = deepEqualIndexOf(list, where.after);
      if (i === -1) return [...list, value];
      return [...list.slice(0, i + 1), value, ...list.slice(i + 1)];
    }
  });

  newRecord.updated_at = currentTimestamp;
  if (props.isServer) newRecord.server_updated_at = currentTimestamp;
  newRecord.version += 1;

  setMapRecord(recordMap, pointer, newRecord);
}

/* -------------------------------------------------------------------------------------------------
 *  invertListInsertOperation
 * -------------------------------------------------------------------------------------------------
 */

function invertListInsertOperation(props: InvertOperationProps<ListInsertOperation>) {
  const { recordMap, operation } = props;
  const { type, key, table, id, value, where } = operation;
  const pointer = { table, id } as RecordPointer;

  if (isSentinelValue(value)) {
    // We want this function to be idempotent and it's not clear how we can do that
    // with sentinel values.
    throw new ValidationError("List insert operations currently don't support sentinel values.");
  }

  const record = getMapRecord(recordMap, pointer);
  if (!record) throw new ValidationError("Record does not exist.");

  const list = get(record, key);

  if (Array.isArray(list)) {
    const index = list.indexOf(value);
    // Restore position in the list.
    if (index !== -1) {
      if (index === 0) {
        const op: ListInsertOperation = {
          type,
          key,
          table,
          id,
          value,
          where: "prepend",
        };

        return op;
      } else {
        const prev = list[index - 1];
        const op: ListInsertOperation = {
          type,
          key,
          table,
          id,
          value,
          where: { after: prev },
        };

        return op;
      }
    }
  }

  // Remove from the list.
  const op: ListRemoveOperation = {
    type: "list_remove",
    key,
    table,
    id,
    value,
  };

  return op;
}

/* -------------------------------------------------------------------------------------------------
 *  applyListRemoveOperation
 * -------------------------------------------------------------------------------------------------
 */

function applyListRemoveOperation(props: ApplyOperationProps<ListRemoveOperation>) {
  const { recordMap, operation, currentTimestamp } = props;
  const { table, id, value } = operation;
  const pointer = { table, id } as RecordPointer;

  if (isSentinelValue(value)) {
    // We want this function to be idempotent and it's not clear how we can do that
    // with sentinel values.
    throw new ValidationError("List remove operations currently don't support sentinel values.");
  }

  if (props.frozenRecords?.has(`${table}:${id}`)) {
    throw new ValidationError(
      `Transaction attempts to mutate record "${table}:${id}" after that record ` +
        `was referenced by a sentinel value.`,
    );
  }

  const record = getMapRecord(recordMap, pointer);
  if (!record) throw new ValidationError("Record does not exist.");

  const newRecord = structuredClone(record);

  update(newRecord, operation.key, (list) => {
    if (list === null || list === undefined) {
      return list;
    }
    if (!Array.isArray(list)) {
      throw new ValidationError("Cannot remove from a non-list.");
    }

    return list.filter((item) => item !== value);
  });

  newRecord.updated_at = currentTimestamp;
  if (props.isServer) newRecord.server_updated_at = currentTimestamp;
  newRecord.version += 1;

  setMapRecord(recordMap, pointer, newRecord);
}

/* -------------------------------------------------------------------------------------------------
 *  invertListRemoveOperation
 * -------------------------------------------------------------------------------------------------
 */

function invertListRemoveOperation(props: InvertOperationProps<ListRemoveOperation>) {
  const { recordMap, operation } = props;
  const { key, table, id, value } = operation;
  const pointer = { table, id } as RecordPointer;

  if (isSentinelValue(value)) {
    // We want this function to be idempotent and it's not clear how we can do that
    // with sentinel values.
    throw new ValidationError("List remove operations currently don't support sentinel values.");
  }

  const record = getMapRecord(recordMap, pointer);
  if (!record) throw new ValidationError("Record does not exist.");

  const list = get(record, key);

  if (Array.isArray(list)) {
    const index = list.indexOf(value);
    // Restore position in the list.
    if (index !== -1) {
      if (index === 0) {
        const op: ListInsertOperation = {
          type: "list_insert",
          key,
          table,
          id,
          value,
          where: "prepend",
        };

        return op;
      } else {
        const prev = list[index - 1];
        const op: ListInsertOperation = {
          type: "list_insert",
          key,
          table,
          id,
          value,
          where: { after: prev },
        };

        return op;
      }
    }
  }
}

/* -------------------------------------------------------------------------------------------------
 *  utils
 * -------------------------------------------------------------------------------------------------
 */

function deepEqualIndexOf<T>(list: T[], value: T) {
  for (let i = 0; i < list.length; i++) {
    if (isEqual(list[i], value)) return i;
  }

  return -1;
}

function assignIgnoringUndefined<T>(base: T, patch: Partial<T>) {
  for (const key in patch) {
    if (patch[key] === undefined) continue;
    base[key] = patch[key] as any;
  }
}
