import { concat, Observable, Subscriber, of, OperatorFunction, timer } from 'rxjs';
import { NgZone } from '@angular/core';
import { reduce, delay, exhaustMap, map, startWith } from 'rxjs/operators';

/**
 * Synchronously return the value of Observable which can return value exactly after subscribe
 * (BehaviourSubject, ReplaySubject, publishReplay operator and etc...).
 */
export function getReplayObservableSyncValue<T = any>(observable$: Observable<T>): T | null {
  let valueToReturn: T = null;
  const subscription = observable$.subscribe((v) => (valueToReturn = v));
  subscription.unsubscribe();
  return valueToReturn;
}

export function concatSequentialObservableResults<TObservableResult = any>(
  observables: Observable<TObservableResult>[],
): Observable<TObservableResult[]> {
  return concat(...observables).pipe(
    reduce((acc: TObservableResult[], v) => {
      acc.push(v);
      return acc;
    }, []),
  );
}

export function executeSideObservableOperator(obs$: Observable<any>) {
  return <T>(source$: Observable<T>): Observable<T> => {
    return new Observable((subscriber: Subscriber<T>) => {
      subscriber.add(obs$.subscribe());
      return source$.subscribe({
        next: (v) => subscriber.next(v),
        error: (e) => subscriber.error(e),
        complete: () => subscriber.complete(),
      });
    });
  };
}

export function invertBooleanValueOperator(source$: Observable<any>): Observable<boolean> {
  return source$.pipe(map((s) => !s));
}

export function combineArrayAsEveryToBooleanOperator(source$: Observable<any>): Observable<boolean> {
  return source$.pipe(
    map((value) => {
      if (value instanceof Array) {
        return value.every((v) => !!v);
      }
      return !!value;
    }),
  );
}

export function combineArrayAsSomeToBooleanOperator(source$: Observable<any>): Observable<boolean> {
  return source$.pipe(
    map((value) => {
      if (value instanceof Array) {
        return value.some((v) => !!v);
      }
      return !!value;
    }),
  );
}

export function runInZoneOperator<T>(zone: NgZone): OperatorFunction<T, T> {
  return (source) => {
    return new Observable((observer) => {
      const onNext = (value: T) => zone.run(() => observer.next(value));
      const onError = (e: any) => zone.run(() => observer.error(e));
      const onComplete = () => zone.run(() => observer.complete());
      return source.subscribe(onNext, onError, onComplete);
    });
  };
}

interface IEveryMinuteObservableParameters {
  fireInitial?: boolean;
}

export function everyMinute$(params?: IEveryMinuteObservableParameters): Observable<Date> {
  const minute = 60000;
  const msUntilNextMinute = () => minute - (Date.now() % minute);

  const nextMinuteStart$ = () => of(null).pipe(delay(msUntilNextMinute()));
  const fireInitialIfRequested = (source): Observable<void> =>
    params?.fireInitial ? source.pipe(startWith<null, null>(null)) : source;

  return timer(0, minute / 2).pipe(
    exhaustMap(() => nextMinuteStart$()),
    fireInitialIfRequested,
    map(() => new Date()),
  );
}
