import { OnDestroy, Pipe, PipeTransform } from '@angular/core';

import { Observable, ReplaySubject } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

/**
 * Converts scalar to Observable
 */
@Pipe({
  name: 'finToObservable',
  pure: false,
})
export class ToObservablePipe<T> implements PipeTransform, OnDestroy {
  private currentValue: T;
  private sub$: ReplaySubject<T> = new ReplaySubject<T>(1);
  private obs$: Observable<T>;

  constructor() {
    this.obs$ = this.sub$.pipe(distinctUntilChanged());
  }

  ngOnDestroy(): void {
    this.sub$.complete();
  }

  transform = (a: T): Observable<T> => {
    if (a !== this.currentValue) {
      this.currentValue = a;
      if (a instanceof Observable) {
        console.error('Received Observable as input in toObservablePipe', a);
      }
      this.sub$.next(a);
    }
    return this.obs$;
  };
}
