import { Observable, ReplaySubject, Subject } from 'rxjs';
import { concatMap, map, share, takeWhile, tap } from 'rxjs/operators';
import { ObservableOperation } from './observable-operation';
import { OperationObserverService } from './operation-observer.service';
import { IOperationOptions, isFinalState, Operation } from './operation-state';

const HISTORY_SIZE = 10;
export class OperationQueue {
  get currentJobKey() {
    return this._currentJob?.id;
  }

  get currentJob() {
    return this._currentJob;
  }

  get lastJob() {
    return this._history.length
      ? this._history[this._historyCursor <= 0 ? this._history.length : this._historyCursor - 1]
      : null;
  }

  get isLoading() {
    return this._currentJob?.isLoadingState() ?? false;
  }

  get jobQueue() {
    return this._history;
  }

  private _queuePusher$ = new ReplaySubject<ObservableOperation>();
  private _stateChanges$ = new Subject<Operation>();

  protected _historyCursor: number = 0;
  protected _history: Array<Operation> = [];
  protected _currentJob: Operation | null = null;

  private _source$ = new Observable<Operation>((subject) => {
    const sub$ = this._queuePusher$
      .pipe(
        tap((obs) => {
          this.updateHistory(obs.operation);
          this._stateChanges$.next(obs.operation);
        }),
        concatMap((obs) => {
          return obs;
        }),
        tap((operation) => {
          subject.next(operation);

          if (isFinalState(operation)) {
            this._currentJob = null;
          } else {
            this._currentJob = operation;
          }

          this.updateHistory(operation);
          this._stateChanges$.next(operation);
        }),
      )
      .subscribe();

    return () => {
      sub$.unsubscribe();
      this.destroy();
    };
  }).pipe(share());

  constructor(private operationService: OperationObserverService) {}

  observe(): Observable<Operation> {
    return this._source$;
  }

  queue<T = any>(observable: Observable<T>, options?: IOperationOptions): Observable<Operation<T>> {
    const operation$ = this.operationService.observe(observable, options);

    setTimeout(() => {
      this._queuePusher$.next(operation$);
    }, 2);

    return this._stateChanges$.pipe(
      map((x) => operation$.operation),
      takeWhile((state) => !isFinalState(state), true),
    );
  }

  statusChanges(): Observable<Operation> {
    return this._stateChanges$;
  }

  destroy() {
    this._queuePusher$.complete();
    this._stateChanges$.complete();
    this._historyCursor = 0;
    this._history = [];
    this._currentJob = null;
  }

  private updateHistory(operation: Operation) {
    const existing = this._history.findIndex((op) => op.id === operation.id);

    if (existing === -1) {
      this._history[this._historyCursor] = operation;
      this._historyCursor = (this._historyCursor + 1) % HISTORY_SIZE;
    } else {
      this._history[existing] = operation;
    }
  }
}
