import type { Database } from "@local/sqlite-wasm-patch";
import {
  iterateRecordMap,
  PointerWithRecord,
  RecordMap,
  RecordPointer,
  RecordTable,
  RecordValue,
  setMapRecord,
  TABLE_NAMES,
  virtualTables,
  generateRecordId,
  getPointer,
  getMapRecords,
  createRecordMapFromPointersWithRecords,
  ClientSingletonRecord,
  ClientSingletonRecordName,
} from "libs/schema";
import { getSqlToUpsertRecord } from "libs/getSqlToUpsertRecord";
import { hasIntersection, isEqual, isNonNullable } from "libs/predicates";
import { Logger } from "libs/logger";
import { isDecoderSuccess } from "ts-decoders";
import { SqlValue, Statement, sql } from "libs/sql-statement";
import { TransactionConflictError, ValidationError } from "libs/errors";
import { groupBy, memoize, merge, uniq, uniqWith } from "lodash-comms";
import { GetRecordResult, GetRecordsResult, Query, QueryResult } from "libs/database";
import { ClientDatabaseAdapterApi, DatabaseChange } from "./ClientDatabaseAdapterApi";
import { Observable, combineLatest, from, map, of, share, shareReplay, switchMap, throttle } from "rxjs";
import { astVisitor, parse, Statement as PgAstStatement } from "pgsql-ast-parser";
import { cacheReplayForTime, startWith } from "libs/rxjs-operators";
import { fromDatabaseDecoders, recordToDatabaseFnMap } from "libs/schema/client/decoders";
import { PartialDeep } from "type-fest";

export class ClientDatabaseAdapter implements ClientDatabaseAdapterApi {
  protected changeSubscriptions = new Set<(change: DatabaseChange) => void>();
  protected observeRecordCache = new Map<string, Observable<GetRecordResult<RecordTable>>>();

  constructor(
    private db: Database,
    protected logger: Logger,
  ) {}

  query(statement: Statement, transaction?: Database) {
    const db = transaction || this.db;

    try {
      const rows = db.exec({
        sql: statement.text,
        bind: statement.values,
        returnValue: "resultRows",
        rowMode: "object",
      });

      return { rows } as {
        rows: Array<{ [columnName: string]: SqlValue }>;
      };
    } catch (error) {
      this.logger.error({ error, statement }, "[query] error");
      throw error;
    }
  }

  /** Will return -1 if there is no schema version */
  getSchemaVersion(): number {
    const { rows: tables } = this.query(sql`
      SELECT
        name
      FROM
        sqlite_master
      WHERE
        type = 'table'
      AND
        name = 'migration'
    `);

    if (!tables[0]) return -1;

    const { rows: migrations } = this.query(sql`
      SELECT
        "id"
      FROM
        "migration"
      ORDER BY
        "id" DESC
      LIMIT 1
    `);

    return (migrations[0]?.id as number | undefined) ?? -1;
  }

  getRecord<T extends RecordTable>(table: T, id: string): GetRecordResult<T>;
  getRecord<T extends RecordTable>(pointer: RecordPointer<T>): GetRecordResult<T>;
  getRecord<T extends RecordTable>(a: T | RecordPointer<T>, b?: string): GetRecordResult<T> {
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    const pointer = typeof a === "string" ? { table: a, id: b! } : a;

    if (!TABLE_NAMES.includes(pointer.table)) {
      throw new ValidationError(`getRecord: invalid record table "${pointer.table}"`);
    }

    const statement = sql`
      SELECT
        * 
      FROM 
        "${sql.raw(pointer.table)}"
      WHERE 
        "${sql.raw(pointer.table)}".id = ${pointer.id}
      LIMIT 1`;

    const {
      rows: [row],
    } = this.query(statement);

    if (!row) return [null];

    return [this.decodeRecordFromDatabase(pointer.table, row)];
  }

  /**
   * Provide the `withDeletedRows` option to return deleted_row results for any of the
   * provided pointers which are deleted. Even if withDeletedRows is falsey, if a provided
   * pointer has `table === "deleted_row"` then the result will include the deleted_row record
   * if it exists.
   */
  getRecords<Table extends RecordTable>(
    pointers: RecordPointer<Table>[],
    options?: { withDeletedRows?: false },
  ): GetRecordsResult<Table>;
  getRecords<Table extends RecordTable>(
    pointers: RecordPointer<Table>[],
    options: { withDeletedRows: boolean },
  ): GetRecordsResult<Table | "deleted_row">;
  getRecords<Table extends RecordTable>(
    pointers: RecordPointer<Table>[],
    options: { withDeletedRows?: boolean } = {},
  ): GetRecordsResult<Table> {
    const distinctPointers = uniqWith(pointers, isEqual).filter(({ table }) => TABLE_NAMES.includes(table));

    const groupedPointersMap = groupBy(distinctPointers, (p) => p.table);

    Object.keys(virtualTables).forEach((table) => {
      delete groupedPointersMap[table];
    });

    const groupedPointers = Object.entries(groupedPointersMap);

    if (groupedPointers.length === 0) {
      return [[]];
    }

    const { statements, deletedRowIds } = groupedPointers.reduce(
      (store, [table, pointers]) => {
        const recordIds = pointers.map((p) => p.id);

        if (table === "deleted_row") {
          store.deletedRowIds.push(...recordIds);
        } else {
          store.statements.push({
            table: table as RecordTable,
            statement: sql`
              SELECT
                * 
              FROM 
                "${sql.raw(table)}"
              WHERE 
                "${sql.raw(table)}"."id" IN (${sql.join(recordIds)});
            `,
          });

          if (options.withDeletedRows) {
            store.deletedRowIds.push(
              ...pointers.map((p) =>
                generateRecordId("deleted_row", {
                  row_table: p.table,
                  row_id: p.id,
                }),
              ),
            );
          }
        }

        return store;
      },
      {
        deletedRowIds: [] as string[],
        statements: [] as { table: RecordTable; statement: Statement }[],
      },
    );

    if (deletedRowIds.length > 0) {
      statements.push({
        table: "deleted_row",
        statement: sql`
          SELECT
            * 
          FROM 
            "deleted_row"
          WHERE 
            "deleted_row"."id" IN (${sql.join(deletedRowIds)});
        `,
      });
    }

    try {
      const pointersWithRecord = this.db.transaction((db) => {
        const pointersWithRecord = statements.flatMap(({ table, statement }) => {
          const { rows } = this.query(statement, db);

          return rows.map((row) => {
            return {
              table,
              id: row.id as string,
              record: this.decodeRecordFromDatabase(table, row),
            } as PointerWithRecord;
          });
        });

        return pointersWithRecord;
      });

      return [pointersWithRecord as PointerWithRecord<Table>[]];
    } catch (error) {
      this.logger.error({ error, pointers, options }, "[getRecords] transaction error");

      throw error;
    }
  }

  getQuery<Table extends RecordTable>(query: Query<Table>): QueryResult<Table> {
    const results = this.db.transaction((db) => {
      return query.statements().map((s) => this.query(s, db).rows);
    });

    const parsedResults = query.parseQueryResults(results);

    const pointersWithRecord = parsedResults.flat(2) as PointerWithRecord[];

    const recordMap = createRecordMapFromPointersWithRecords(pointersWithRecord);

    const records = getMapRecords(recordMap, query.primaryTable);

    return [records, { recordMap }] as unknown as QueryResult<Table>;
  }

  observeRecord<T extends RecordTable>(pointer: RecordPointer<T>): Observable<GetRecordResult<T>> {
    const { table, id } = pointer;

    const cacheKey = `${table}:${id}`;
    const cachedQuery = this.observeRecordCache.get(cacheKey);

    if (cachedQuery) {
      return cachedQuery as Observable<GetRecordResult<T>>;
    }

    if (!TABLE_NAMES.includes(table)) {
      throw new ValidationError(`liveRecord: invalid record table "${table}"`);
    }

    const runQuery = () => this.getRecord(pointer);

    const deletedRecordId = generateRecordId("deleted_row", {
      row_id: pointer.id,
      row_table: pointer.table,
    });

    const subscribe = (onChange: () => void) =>
      this.subscribeToRecordChanges(({ changes }) => {
        const wasRecordChanged = !!changes[table]?.[id] || !!changes["deleted_row"]?.[deletedRecordId];

        if (!wasRecordChanged) return;

        onChange();
      });

    const observable = getObservableForQuery<GetRecordResult<T>>({
      runQuery,
      subscribe,
    }).pipe(
      cacheReplayForTime({
        timeMs: 100,
        onInit: () => {
          this.observeRecordCache.set(cacheKey, observable);
        },
        onCleanup: () => {
          this.observeRecordCache.delete(cacheKey);
        },
      }),
    );

    return observable;
  }

  observeRecords<Table extends RecordTable>(pointers: RecordPointer<Table>[]): Observable<GetRecordsResult<Table>> {
    if (pointers.length === 0) {
      return of([[]]);
    }

    return combineLatest(
      pointers.map((pointer) =>
        this.observeRecord(pointer).pipe(
          map(([record]) => {
            if (!record) return null;
            return { ...pointer, record } as PointerWithRecord<Table>;
          }),
        ),
      ),
    ).pipe(map((records) => [records.filter(isNonNullable)]));
  }

  observeQuery<Table extends RecordTable>(query: Query<Table>): Observable<QueryResult<Table>> {
    const affectedTableNames = uniq(query.statements().flatMap((s) => parseTableNames(s.text)));

    if (affectedTableNames.length === 0) {
      throw new Error("Could not calculate selected table names");
    }

    const runQuery = () => this.getQuery(query);

    const subscribe = (onChange: () => void) =>
      this.subscribeToRecordChanges(({ tableNames }) => {
        if (!hasIntersection(affectedTableNames, tableNames)) return;
        onChange();
      });

    return getObservableForQuery({
      runQuery,
      subscribe,
    });
  }

  writeRecordMap(
    recordMap: RecordMap | PointerWithRecord[],
    options: { forceUpdate?: boolean; throwOnVersionMismatch?: boolean } = {},
  ) {
    const pointersWithRecord = Array.isArray(recordMap) ? recordMap : Array.from(iterateRecordMap(recordMap));

    if (pointersWithRecord.length === 0) return {};

    const { forceUpdate = false, throwOnVersionMismatch = false } = options;

    this.logger.debug({ pointersWithRecord }, "writeRecordMap");

    const changeRecordMap: RecordMap = {};

    try {
      this.db.transaction((db) => {
        for (const pointerWithRecord of pointersWithRecord) {
          if (virtualTables[pointerWithRecord.table]) continue;

          const existingRecord =
            pointerWithRecord.table === "deleted_row"
              ? this.getRecordVersion({
                  table: pointerWithRecord.record.row_table,
                  id: pointerWithRecord.record.row_id,
                } as RecordPointer)
              : this.getRecordVersion(pointerWithRecord);

          const oldVersion = existingRecord?.version as number | undefined;

          if (forceUpdate || oldVersion === undefined || pointerWithRecord.record.version > oldVersion) {
            setMapRecord(changeRecordMap, pointerWithRecord, pointerWithRecord.record);

            const record = this.encodeRecordForDatabase(pointerWithRecord);

            const { text, values } = getSqlToUpsertRecord(
              {
                table: pointerWithRecord.table,
                id: pointerWithRecord.id,
                record,
              } as any,
              forceUpdate,
            );

            db.exec({
              sql: text,
              bind: values as SqlValue[],
              returnValue: "resultRows",
              rowMode: "object",
            });

            if (pointerWithRecord.table === "deleted_row") {
              const statement = sql`
              DELETE FROM
                "${sql.raw(pointerWithRecord.record.row_table)}"
              WHERE
                "${sql.raw(pointerWithRecord.record.row_table)}"."id" = ${pointerWithRecord.record.row_id}
            `;

              db.exec({
                sql: statement.text,
                bind: statement.values as SqlValue[],
                returnValue: "resultRows",
                rowMode: "object",
              });
            } else {
              const statement = sql`
              DELETE FROM
                "deleted_row"
              WHERE
                "deleted_row"."row_table" = ${pointerWithRecord.table}
              AND
                "deleted_row"."row_id" = ${pointerWithRecord.id}
            `;

              db.exec({
                sql: statement.text,
                bind: statement.values as SqlValue[],
                returnValue: "resultRows",
                rowMode: "object",
              });
            }
          } else if (throwOnVersionMismatch && pointerWithRecord.record.version <= oldVersion) {
            this.logger.warn({ existingRecord, pointerWithRecord }, `writeRecordMap: record version conflict`);

            throw new TransactionConflictError(`writeRecordMap: record version conflict`);
          } else {
            this.logger.debug(
              {
                forceUpdate,
                throwOnVersionMismatch,
                isOldVersionSmaller:
                  oldVersion !== undefined ? pointerWithRecord.record.version > oldVersion : "oldVersion is undefined",
                pointerWithRecord,
                existingRecord,
              },
              "writeRecordMap ignoring recordWrite",
            );
          }
        }

        // Regardless of whether or not we write the new records to the database,
        // we mark the records as having been read for garbage collection.
        this.markRecordsRead(pointersWithRecord, db);
      });
    } catch (error) {
      this.logger.error({ error, recordMap, options }, "[writeRecordMap] transaction error");

      throw error;
    }

    const baseChangedTables = Object.keys(changeRecordMap);

    const deletedRecords = changeRecordMap["deleted_row"] ? Object.values(changeRecordMap["deleted_row"]) : [];

    const changedTables = uniq([...baseChangedTables, ...deletedRecords.map((record) => record.row_table)]);

    if (changedTables.length > 0) {
      this.emitDatabaseChange({
        tableNames: changedTables,
        changes: changeRecordMap,
      });
    }

    return changeRecordMap;
  }

  deletePointers(pointers: RecordPointer[], options: { suppressChangeNotification?: boolean } = {}) {
    const changeRecordMap: RecordMap = {};

    try {
      this.db.transaction((db) => {
        for (const pointer of pointers) {
          if (virtualTables[pointer.table]) continue;

          if (!TABLE_NAMES.includes(pointer.table)) {
            throw new ValidationError(`deletePointers: invalid record table "${pointer.table}"`);
          }

          const statement = sql`
          SELECT
            * 
          FROM 
            "${sql.raw(pointer.table)}"
          WHERE 
            "${sql.raw(pointer.table)}"."id" = ${pointer.id}
          LIMIT
            1`;

          const {
            rows: [row],
          } = this.query(statement, db);

          const record = !row ? null : this.decodeRecordFromDatabase(pointer.table, row);

          if (record) {
            setMapRecord(changeRecordMap, pointer, record);

            const removeRecord = sql`
            DELETE FROM
              "${sql.raw(pointer.table)}"
            WHERE
              "${sql.raw(pointer.table)}"."id" = ${pointer.id};
          `;

            this.query(removeRecord, db);
          }

          if (pointer.table !== "deleted_row") {
            const statement = sql`
            SELECT
              * 
            FROM 
              "deleted_row"
            WHERE 
              "row_table" = ${pointer.table}
            AND
              "row_id" = ${pointer.id}
            LIMIT
              1`;

            const {
              rows: [row],
            } = this.query(statement, db);

            const record = !row ? null : this.decodeRecordFromDatabase("deleted_row", row);

            if (record) {
              const deletedRowPointer = getPointer("deleted_row", {
                row_table: pointer.table,
                row_id: pointer.id,
              });

              setMapRecord(changeRecordMap, deletedRowPointer, record);

              const removeRecord = sql`
              DELETE FROM
                "deleted_row"
              WHERE
                "id" = ${deletedRowPointer.id};
            `;

              this.query(removeRecord, db);
            }
          }

          const removeClientRowRead = sql`
          DELETE FROM
            "client_row_read"
          WHERE
            "row_table" = ${pointer.table}
          AND
            "row_id" = ${pointer.id};
        `;

          this.query(removeClientRowRead, db);
        }
      });
    } catch (error) {
      this.logger.error({ error, pointers, options }, "[deletePointers] transaction error");

      throw error;
    }

    const baseChangedTables = Object.keys(changeRecordMap);

    const deletedRecords = changeRecordMap["deleted_row"] ? Object.values(changeRecordMap["deleted_row"]) : [];

    const changedTables = uniq([...baseChangedTables, ...deletedRecords.map((record) => record.row_table)]);

    if (changedTables.length > 0 && !options.suppressChangeNotification) {
      this.emitDatabaseChange({
        tableNames: changedTables,
        changes: changeRecordMap,
      });
    }

    return changeRecordMap;
  }

  /** Emissions may contain records which are not changed. */
  subscribeToRecordChanges(callback: (change: DatabaseChange) => void) {
    this.changeSubscriptions.add(callback);

    return () => {
      this.changeSubscriptions.delete(callback);
    };
  }

  markRecordsRead(pointers: RecordPointer[], tx?: Database): void {
    const nonVirtualPointers = pointers.filter((p) => !virtualTables[p.table]);

    if (nonVirtualPointers.length === 0) return;

    const now = Date.now();

    const statement = sql`
      INSERT INTO "client_row_read" 
        ("row_table", "row_id", "read_at")
      VALUES 
        ${sql.bulk(nonVirtualPointers.map(({ table, id }) => [table, id, now]))}
      ON CONFLICT ("row_table", "row_id")
      DO UPDATE SET
        "read_at" = ${now}
    `;

    this.query(statement, tx);
  }

  getRecordsLastReadBefore(time: number): RecordPointer[] {
    const statement = sql`
      SELECT
        *
      FROM
        "client_row_read"
      WHERE
        "client_row_read"."read_at" < ${time}
    `;

    const { rows } = this.query(statement);

    return rows.map((row) => getPointer(row.row_table as RecordTable, row.row_id as string));
  }

  getSingletonRecord<Name extends ClientSingletonRecordName>(name: Name, tx?: Database): ClientSingletonRecord<Name> {
    const operation = (tx: Database) => {
      const { rows } = this.query(
        sql`
          SELECT
            *
          FROM
            "singletons"
          WHERE
            "name" = ${name}
          LIMIT
            1  
        `,
        tx,
      );

      let record = rows[0];

      if (!record) {
        record = {
          name,
          data: JSON.stringify({}),
          version: 1,
          updated_at: new Date().toISOString(),
        };

        this.query(
          sql`
            INSERT INTO "singletons" (
              "name",
              "data",
              "version",
              "updated_at"
            ) VALUES (
              ${record.name},
              ${record.data},
              ${record.version},
              ${record.updated_at}
            )
          `,
          tx,
        );
      }

      return {
        ...record,
        data: JSON.parse(record.data as string),
      } as ClientSingletonRecord<Name>;
    };

    try {
      if (tx) return operation(tx);
      return this.db.transaction(operation);
    } catch (error) {
      this.logger.error({ error, singletonName: name }, "[getSingletonRecord] transaction error");

      throw error;
    }
  }

  updateSingletonRecord<Name extends ClientSingletonRecordName>(
    name: Name,
    update: PartialDeep<ClientSingletonRecord<Name>["data"]>,
  ): ClientSingletonRecord<Name> {
    try {
      return this.db.transaction((tx) => {
        const existing = this.getSingletonRecord(name, tx);

        const data = merge(existing.data, update);

        const newRecord = {
          name,
          data,
          version: existing.version + 1,
          updated_at: new Date().toISOString(),
        };

        this.query(
          sql`
          UPDATE "singletons" 
          SET
            "data" = ${JSON.stringify(newRecord.data)},
            "version" = ${newRecord.version},
            "updated_at" = ${newRecord.updated_at}
          WHERE
            "name" = ${newRecord.name};
        `,
          tx,
        );

        return newRecord;
      });
    } catch (error) {
      this.logger.error({ error, singletonName: name, update }, "[updateSingletonRecord] transaction error");

      throw error;
    }
  }

  logDatabaseState() {
    try {
      const tables = this.db.transaction((db) => {
        const tables = db
          .exec("SELECT name FROM sqlite_master WHERE type='table';", {
            returnValue: "resultRows",
            rowMode: "object",
          })
          .map((row) => row.name as string);

        return tables.map((table) => {
          return {
            table,
            rows: this.query(sql`SELECT * FROM "${sql.raw(table)}";`, db).rows,
          };
        });
      });

      this.logger.debug({ tables }, "logDatabaseState");
    } catch (error) {
      this.logger.error({ error }, "[logDatabaseState] transaction error");
    }
  }

  close(): void {
    this.db.close();
  }

  protected emitDatabaseChange(change: DatabaseChange) {
    this.logger.debug({ change }, "emitDatabaseChange");

    for (const callback of this.changeSubscriptions) {
      callback(change);
    }
  }

  protected encodeRecordForDatabase<Table extends RecordTable>(
    pointerWithRecord: PointerWithRecord<Table>,
  ): { [columnName: string]: SqlValue } {
    const mapRecordToDatabase = recordToDatabaseFnMap[pointerWithRecord.table];

    if (!mapRecordToDatabase) {
      throw new Error(`writeRecordMap: could not find record mapper for ${pointerWithRecord.table}`);
    }

    try {
      return mapRecordToDatabase(pointerWithRecord.record);
    } catch (e) {
      this.logger.error({ pointerWithRecord, error: e }, "Error encoding record for database");
      throw e;
    }
  }

  protected decodeRecordFromDatabase<Table extends RecordTable>(table: Table, row: any): RecordValue<Table> | null {
    const decoder = fromDatabaseDecoders[table];

    if (!decoder) {
      this.logger.error(`decodeRecord: could not find decoder for ${table}`);
      return null;
    }

    const decoded = decoder.decode(row);

    if (isDecoderSuccess(decoded)) {
      return decoded.value as RecordValue<Table>;
    }

    this.logger.error({ decoded }, `decodeRecord error`);

    return null;
  }

  protected getRecordVersion(
    pointer: RecordPointer,
    tx?: Database,
  ): {
    table: RecordTable;
    id: string;
    version: number;
    is_deleted: boolean;
  } | null {
    const db = tx || this.db;

    const existingSmt = sql`
      SELECT
        version
      FROM
        "${sql.raw(pointer.table)}"
      WHERE
        "${sql.raw(pointer.table)}".id = ${pointer.id}
    `;

    const [existingRecord] = db.exec({
      sql: existingSmt.text,
      bind: existingSmt.values as any[],
      returnValue: "resultRows",
      rowMode: "object",
    });

    if (existingRecord) {
      return {
        table: pointer.table,
        id: pointer.id,
        version: existingRecord.version as number,
        is_deleted: false,
      };
    }

    if (pointer.table === "deleted_row") return null;

    const deletedStm = sql`
      SELECT
        version
      FROM
        "deleted_row"
      WHERE
        "deleted_row"."row_table" = ${pointer.table}
      AND
        "deleted_row"."row_id" = ${pointer.id}
    `;

    const [deletedRecord] = db.exec({
      sql: deletedStm.text,
      bind: deletedStm.values as any[],
      returnValue: "resultRows",
      rowMode: "object",
    });

    if (!deletedRecord) return null;

    return {
      table: pointer.table,
      id: pointer.id,
      version: deletedRecord.version as number,
      is_deleted: true,
    };
  }
}

/**
 * Note that this function internally uses a SQL parser
 * made for Postgres' SQL syntax. It's possible we might
 * run into edge cases in the future where a SQLite query
 * causes this to error.
 */
// Parsing a sql statement is relatively expensive (e.g. taking 20ms on a
// large query), so we memoize the results.
const parseTableNames = memoize((sqlQuery: string): string[] => {
  let statements: PgAstStatement[];

  try {
    statements = parse(sqlQuery);
  } catch (e) {
    console.error(`parseTableNames: Error parsing SQL`, sqlQuery, e);
    throw e;
  }

  if (statements.length !== 1) {
    throw new Error(`parseTableNames: Must receive exactly one SQL statement`);
  }

  // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  const statement = statements[0]!;

  const tables = new Set<string>();

  const visitor = astVisitor(() => ({
    tableRef: (t) => tables.add(t.name),
  }));

  visitor.statement(statement);

  const tableNames = Array.from(tables);

  return tableNames;
});

function getObservableForQuery<T>(args: {
  runQuery: () => T;
  subscribe: (onChanges: () => void) => () => void;
}): Observable<T> {
  const { runQuery, subscribe } = args;

  return new Observable<T>((subscriber) => {
    const unsubscribe = subscribe(() => subscriber.next(runQuery()));

    return () => {
      unsubscribe();
    };
  }).pipe(
    // Note that we're using share & startWith instead of just using shareReply as a small memory optimization.
    // This way, we don't need to store an additional copy of the query results in memory.
    // The downside is that we need to rerun the query whenever a new subscriber subscribers (i.e. additional
    // CPU overhead).
    share({ resetOnRefCountZero: true }),
    startWith(runQuery),
  );
}

function getObservableForAsyncQuery<T>(args: {
  runQuery: () => Promise<T>;
  subscribe: (onChanges: () => void) => () => void;
}): Observable<T> {
  const { runQuery, subscribe } = args;

  let query: Promise<T>;

  return new Observable((subscriber) => {
    const unsubscribe = subscribe(async () => subscriber.next(null));
    subscriber.next(null);

    return () => {
      unsubscribe();
    };
  }).pipe(
    // We need to throttle values because running the query can take an unknown
    // amount of time. If updates triggered faster than the query could run, then
    // the query would never resolve. This throttle ensures, once triggered, we
    // wait for the query to resolve before running it again.
    throttle(
      () => {
        // This works because we have `leading: true` so emitted values
        // first pass through `throttle` (triggering switchMap) before
        // this factory function is called (confirmed expirimentally).
        // Additionally, it's important that we're using `share()` so
        // that there's only one subscriber updating the `query`
        // variable.
        return from(query);
      },
      {
        leading: true,
        trailing: true,
      },
    ),
    switchMap(() => {
      query = runQuery();
      return query;
    }),
    shareReplay({ bufferSize: 1, refCount: true }),
  );
}
