import { BehaviorSubject, Observable, of } from 'rxjs';
import { delay, filter, map, mergeMap, scan, startWith, switchMap, take, tap } from 'rxjs/operators';

/* eslint-disable @typescript-eslint/no-this-alias */

export type ObjectOrArray<T> = T | T[];
/**
 * Operator that zips the stream with the last value.
 *
 * @param [initialValue=undefined] the value to set as the second element of the first emitted value.
 *
 * E.g.
 * With the inputs: 1, 2, 3, 4
 * Would pass on values: [1, undefined], [2, 1], [3, 2], [4, 3]
 *
 * @example
 * Observable.from([1,2,3])
 *   .pipe(zipWithLastValue())  -> [1, undefined], [2, 1], [3, 2]
 *
 * Observable.from([1])
 *   .pipe(zipWithLastValue(10))  -> [1, 10]
 */
export const zipWithLastValue =
  <T>(initialValue?: T) =>
  (o$: Observable<T>): Observable<T[]> =>
    o$.pipe(scan((previous, current) => [current, previous[0]], [initialValue]));

/**
 * Zips the stream with index.
 *
 * @example
 * <code><pre>
 * Observable.from(['a', 'b', 'c'])
 *   .pipe(zipWithIndex())
 *
 * -> ['a', 0], ['b', 1], ['c', 2]
 * </pre></code>
 */
export const zipWithIndex =
  <T>() =>
  (o$: Observable<T>): Observable<[T, number]> =>
    o$.pipe(map((obj, idx) => [obj, idx]));

/**
 * Reduces a stream of observables (`Observable<Observable<T>>`) so that
 * the execution of the next item in the stream will begin only after the previous
 * stream has _completed_.
 *
 * Ensures that the operations are run in correct order.
 *
 * @example
 * <code><pre>
 * const startTime = Date.now();
 * from([
 *   of(1).pipe(delay(2000), startWith(0)),
 *   from([2, 3]),
 *   of(4).pipe(delay(1000)),
 *   of(5).pipe(delay(1000))
 * ])
 * .pipe(triggerWhenPreviousCompletes())
 * .subscribe((v) => console.log(`${Date.now() - startTime}:\t${v}`));
 *
 * -------
 * OUTPUT FROM REAL RUN:
 * 4:    0
 * 5009: 1
 * 5012: 2
 * 5012: 3
 * 6016: 4
 * 7021: 5
 * </pre></code>
 */
export const triggerWhenPreviousCompletes =
  <T>() =>
  (o$: Observable<Observable<T>>): Observable<T> => {
    return new Observable<T>((subscriber) => {
      const indexReady$: BehaviorSubject<number> = new BehaviorSubject(0);
      const subscription = o$
        .pipe(
          zipWithIndex(),
          mergeMap(([obs, idx]) =>
            indexReady$.pipe(
              filter((i) => i === idx),
              take(1),
              switchMap(() => obs),
              tap({
                complete: () => indexReady$.next(idx + 1),
              })
            )
          )
        )
        .subscribe({
          next(value) {
            subscriber.next(value);
          },
          error(err: unknown) {
            subscriber.error(err);
          },
          complete() {
            indexReady$.complete();
            subscriber.complete();
          },
        });

      return () => {
        // eslint-disable-next-line rxjs/no-subject-unsubscribe
        indexReady$.unsubscribe();
        subscription.unsubscribe();
      };
    });
  };

interface PrevNext<T> {
  previous: T;
  next: T;
}

/**
 * Ensures execution in order with scan, i.e., the last input value is always guaranteed to be in effect.
 */
export const keepState =
  <T>(durationMs: number, test: (ms: T) => boolean = (a) => !a, initialState?: T) =>
  (o$: Observable<T>): Observable<T> =>
    o$.pipe(
      scan(
        (acc: PrevNext<T>, val: T) => {
          return {
            previous: acc.next,
            next: val,
          };
        },
        { previous: undefined, next: initialState }
      ),
      switchMap(({ previous, next }) => {
        if (!test(previous) && test(next)) {
          return of(next).pipe(delay(durationMs), startWith(previous));
        }
        return of(next);
      })
    );
