import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import { OperationObserverService } from './operation-observer.service';
import {
  AnyOperationState,
  isFinalState,
  Operation,
  OperationFinalState,
  OperationLoadingState,
} from './operation-state';

export interface OperationSubjectOptions {
  preventParallelOperation?: boolean;
}

export class OperationSubject<T = any, A = undefined> extends BehaviorSubject<
  Operation<T, AnyOperationState<T>>
> {
  currentOperationSub?: Subscription;

  constructor(
    private operation: (args: A) => Observable<T>,
    private observerService: OperationObserverService,
    private options?: OperationSubjectOptions,
  ) {
    super(Operation.create());
  }

  get status() {
    return this.getValue().status;
  }

  get isLoading() {
    return this.getValue().status === 'loading';
  }

  activate(args: A): Promise<Operation<T, OperationFinalState<T>>> {
    // prevent Parallel Operation
    if (
      this.getValue() instanceof OperationLoadingState &&
      this.options?.preventParallelOperation
    ) {
      return Promise.reject(
        new Error('OperationSubject cancelled activation to prevent parallel operation'),
      );
    }

    if (this.currentOperationSub) {
      this.currentOperationSub.unsubscribe();
    }

    return new Promise((resolve, reject) => {
      try {
        this.currentOperationSub = this.observerService
          .observe(this.operation(args))
          .subscribe((state) => {
            this.next(state);
            if (isFinalState(state)) {
              resolve(state);
            }
          });
      } catch (e) {
        reject(e);
      }
    });
  }

  deactivate() {
    if (this.currentOperationSub) {
      this.currentOperationSub.unsubscribe();
    }
  }
}
