import {
  Observable,
  OperatorFunction,
  concat,
  interval,
  take,
  skip,
  catchError,
  switchMap,
  NEVER,
  Subscription,
  finalize,
  defer,
  shareReplay,
  tap,
} from "rxjs";
import { throwUnreachableCaseError } from "./errors";

/**
 * rxjs has a built-in `startWith` operator however the
 * built in operator is seriously flawed. The built-in
 * operator accepts a static value _on observable creation_
 * and uses that static value when the observable is
 * subscribed to. This is surprising because, in practice,
 * you ~~generally~~ always
 * want to get the startWith value when the observable is
 * _subscribed to_, not when it is created.
 *
 * I actually opened an issue about this years ago and the rxjs
 * maintainers have acknowledged the problems with `startWith()`
 * but they indicated that they aren't interested changing
 * startWith because it would be a large-ish breaking change
 * which they don't feel is worthwhile. We should always use
 * this custom startwith operator in our app.
 *
 * @param fns a spread of functions which will be called, in
 *   order, when the observable is subscribed to to get the
 *   initial values for the subscription.
 * @returns rxjs operator
 */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function startWith<T extends any[]>(...fns: ReadonlyArray<() => never[]>): OperatorFunction<T, T>;
export function startWith<T, A = T>(...fns: ReadonlyArray<() => A>): OperatorFunction<T, T | A>;
export function startWith<T, A = T>(...fns: ReadonlyArray<() => A>): OperatorFunction<T, T | A> {
  return (source) => {
    return new Observable((observer) => {
      fns.forEach((fn) => {
        try {
          observer.next(fn());
        } catch (e) {
          observer.error(e);
        }
      });

      return source.subscribe(observer);
    });
  };
}

/**
 * @param msDelay e.g. 1000
 * @returns observable that will complete without emitting after
 *   the specified delay
 */
export function wait$(msDelay: number): Observable<never>;
/**
 * @param msDelay e.g. 1000
 * @param observable observable that will be subscribed
 *   to after the specified delay
 * @returns observable that will be subscribed
 *   to after the specified delay
 */
export function wait$<T>(msDelay: number, observable: Observable<T>): Observable<T>;
export function wait$<T>(msDelay: number, observable?: Observable<T>) {
  const wait = interval(msDelay).pipe(take(1), skip(1)) as Observable<never>;

  return observable ? concat(wait, observable) : wait;
}

export class FailedPredicateError<T> extends Error {
  constructor(
    public source$: Observable<T>,
    public predicate: (value: T) => boolean,
    public label?: string,
  ) {
    super(`Failed ${label} assertPredicate check`);
  }
}

export function assertPredicate<T, P extends T>(
  name: string,
  predicate: (value: T) => value is P,
): OperatorFunction<T, P>;
export function assertPredicate<T>(name: string, predicate: (value: T) => boolean): OperatorFunction<T, T>;
export function assertPredicate<T>(name: string, predicate: (value: T) => boolean): OperatorFunction<T, T> {
  return (source) =>
    new Observable((observer) => {
      return source.subscribe({
        next(value) {
          if (!predicate(value)) {
            observer.error(new FailedPredicateError(source, predicate, name));
          } else {
            observer.next(value);
          }
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        },
      });
    });
}

export function catchFailedPredicateError<I, T>(
  name: string,
  defaultValue: () => T,
): (
  source: Observable<I>,
) => I extends unknown[] ? (T extends never[] ? Observable<I> : Observable<I | T>) : Observable<I | T>;
export function catchFailedPredicateError<I>(
  name: string,
  defaultValue: () => unknown,
): (source: Observable<I>) => Observable<I>;
export function catchFailedPredicateError<I, T>(
  name: string,
  defaultValue: () => T,
): (source: Observable<I>) => Observable<I | T> {
  return (source$: Observable<I>) => {
    function handleError(err: unknown): Observable<I | T> {
      if (err instanceof FailedPredicateError && name === err.label) {
        return defaultValue
          ? err.source$.pipe(
              switchMap((v) => (err.predicate(v) ? source$ : NEVER.pipe(startWith(defaultValue)))),
              catchError(handleError),
            )
          : err.source$.pipe(
              switchMap((v) => (err.predicate(v) ? source$ : NEVER)),
              catchError(handleError),
            );
      }

      throw err;
    }

    return source$.pipe(catchError(handleError));
  };
}

/**
 * This operator is like shareReplay except, after all subscribers have
 * unsubscribed, it will maintain the replay buffer for a specified
 * amount of time before clearing it. This is useful for cases where
 * you want to maintain a replay buffer for a period of time
 * after the last subscriber has unsubscribed.
 */
export function cacheReplayForTime(options: {
  timeMs: number;
  bufferSize?: number;
  onInit?: () => void;
  onCleanup?: () => void;
}) {
  return <T>(source: Observable<T>) => {
    let cachedSubscription: Subscription | undefined;
    let timeout: ReturnType<typeof setTimeout> | undefined;

    const shared = source.pipe(
      shareReplay({
        refCount: true,
        bufferSize: options.bufferSize || 1,
      }),
    );

    let subscriberCount = 0;

    return new Observable<T>((observer) => {
      if (!cachedSubscription) {
        options.onInit?.();
        cachedSubscription = shared.subscribe();
      }

      if (timeout) {
        clearTimeout(timeout);
        timeout = undefined;
      }

      subscriberCount++;
      const sub = shared.subscribe(observer);
      let teardownCalled = false;

      return () => {
        if (teardownCalled) return;
        teardownCalled = true;
        sub.unsubscribe();
        subscriberCount--;

        if (subscriberCount <= 0) {
          timeout = setTimeout(() => {
            cachedSubscription?.unsubscribe();
            cachedSubscription = undefined;
            options.onCleanup?.();
          }, options.timeMs);
        }
      };
    });
  };
}

/**
 * This operator is like the builtin `finalize` operator except the most recently
 * emitted value of the observable is provided as an argument to the callback.
 */
// See https://stackoverflow.com/a/70722380/5490505
// > The use of defer here is to keep separate state for each subscriber. Because finalize
//   is called not just when the observable completes but also when the subscriber
//   unsubscribes, this ensures--for the latter case--the callback will be called for each
//   subscriber with the current value at the time of each unsubscription.
export function finalizeWithValue<T>(callback: (value: T) => void) {
  return (source: Observable<T>) =>
    defer(() => {
      let lastValue: T;
      return source.pipe(
        tap((value) => (lastValue = value)),
        finalize(() => callback(lastValue)),
      );
    });
}

/**
 * Similar to the `tap` operator except the callback receives the source observable as
 * an argument and is expected to return an observable
 * which will be subscribed to. The emissions of this tap observable will be ignored
 * except for errors, which will be forwarded to the observer.
 */
export function tapObservable<T>(callback: (source: Observable<T>) => Observable<unknown>) {
  return (source: Observable<T>) =>
    new Observable<T>((observer) => {
      const subscription = callback(source).subscribe({
        error: (error) => {
          observer.error(error);
        },
      });

      subscription.add(source.subscribe(observer));
      return subscription;
    });
}

/**
 * This operator will buffer the source observable values if the operator is "closed" and will
 * let values pass through if the operator is "open". When switching from "closed" to "open",
 * all buffered values will be synchronously emitted one by one and then future values will be
 * emitted as they arrive. Default initial state is "open".
 */
export function bufferWhile<T>(
  control$: Observable<"open" | "closed">, // Observable emitting boolean values to control the buffer state
  initialState: "open" | "closed" = "open", // Initial state of the buffer (true = closed, false = open)
): OperatorFunction<T, T> {
  const isBufferOpen = (state: "open" | "closed") =>
    state === "open" ? true : state === "closed" ? false : throwUnreachableCaseError(state);

  return (source: Observable<T>) =>
    new Observable<T>((subscriber) => {
      let bufferOpen = isBufferOpen(initialState); // Tracks whether the buffer is open or closed
      let buffer: T[] = []; // Stores the buffered values when the buffer is closed

      const controlSubscription = control$.subscribe({
        next: (state: "open" | "closed") => {
          bufferOpen = isBufferOpen(state);

          if (bufferOpen && buffer.length > 0) {
            // Emit all buffered values when buffer is opened
            buffer.forEach((value) => subscriber.next(value));
            buffer = []; // Clear the buffer after emitting
          }
        },
        error: (err) => subscriber.error(err),
        complete: () => subscriber.complete(),
      });

      const sourceSubscription = source.subscribe({
        next: (value: T) => {
          if (bufferOpen) {
            subscriber.next(value); // If buffer is open, emit value immediately
          } else {
            buffer.push(value); // Otherwise, buffer the value
          }
        },
        error: (err) => subscriber.error(err),
        complete: () => {
          if (buffer.length > 0) {
            buffer.forEach((value) => subscriber.next(value)); // Emit any remaining buffered values
          }

          subscriber.complete();
        },
      });

      return () => {
        // Clean up both subscriptions
        controlSubscription.unsubscribe();
        sourceSubscription.unsubscribe();
      };
    });
}
