import {
  Observable,
  ReplaySubject,
  Subject,
  distinctUntilChanged,
  filter,
  map,
  merge,
  shareReplay,
  takeUntil
} from 'rxjs';
import { WorkerSynchronizerContract } from './worker-synchronizer-contract';

type WorkerSynchronizerMessage<T> = {
  payload: T;
  action: 'worker-synchronizer';
  type: 'standard' | 'send-current-value' | 'get-current-value';
  channelName: string;
  timestamp: number;
};

type WorkerSynchronizerChannelGetCurrentValue = {
  action: 'worker-synchronizer';
  type: 'get-current-value';
  channelName: string;
};

export function transferWorkerSynchronizerNext<T>(workerSynchronizer: WorkerSynchronizer<T>) {
  return (value: T): void => workerSynchronizer.next(value);
}

const isBroadcastMessage =
  (channelName: string) =>
  <T>(message: Record<string, any>): message is WorkerSynchronizerMessage<T> =>
    message?.action === 'worker-synchronizer' && message?.channelName === channelName;

export class WorkerSynchronizer<T> {
  private _messageShareRef$!: Observable<T>;

  private _currentValue?: WorkerSynchronizerMessage<T>;
  private _currentValue$: ReplaySubject<WorkerSynchronizerMessage<T>> = new ReplaySubject<WorkerSynchronizerMessage<T>>(
    1
  );
  private _currentValueSet: boolean = false;
  private _destroy$: Subject<void> = new Subject<void>();

  constructor(
    private _channelName: string,
    private _messenger: WorkerSynchronizerContract,
    private _localReplaySubject: ReplaySubject<WorkerSynchronizerMessage<T>>,
    private _debug?: boolean
  ) {
    this._init();
  }

  private _init(): void {
    this._listenForCurrentValue();
    this._listenForGetCurrentValue();
    const requestCurrentValueMessage: WorkerSynchronizerChannelGetCurrentValue = {
      action: 'worker-synchronizer',
      type: 'get-current-value',
      channelName: this._channelName
    };
    this._messenger.postMessage(requestCurrentValueMessage);
  }

  public next(message: T): void {
    const timeStamp: number = new Date().getTime();
    const outcomeMessage: WorkerSynchronizerMessage<T> = {
      action: 'worker-synchronizer',
      payload: message,
      type: 'standard',
      channelName: this._channelName,
      timestamp: timeStamp
    };
    this._setCurrentValue(outcomeMessage);
    this._sendCurrentValue(outcomeMessage);
    this._messenger.postMessage(outcomeMessage);
    this._localReplaySubject.next(outcomeMessage);
  }

  public get message$(): Observable<T> {
    return (this._messageShareRef$ ??= merge(
      this._currentValue$,
      this._getMessage().pipe(filter((message: WorkerSynchronizerMessage<T>) => message?.type === 'standard')),
      this._localReplaySubject
    ).pipe(
      distinctUntilChanged((previous: WorkerSynchronizerMessage<T>, current: WorkerSynchronizerMessage<T>) => {
        // sometimes we got duplicate signals with same data
        // we allow only diffrent data
        return current.timestamp == previous.timestamp;
      }),
      map((message: WorkerSynchronizerMessage<T>) => message.payload),
      shareReplay({ refCount: true, bufferSize: 1 })
    ));
  }

  private _listenForCurrentValue(): void {
    this._getMessage()
      .pipe(
        filter((message: WorkerSynchronizerMessage<T>) => message.type === 'send-current-value'),
        takeUntil(this._destroy$)
      )
      .subscribe((message: WorkerSynchronizerMessage<T>) => this._setCurrentValue(message));
  }

  private _listenForGetCurrentValue(): void {
    this._getMessage()
      .pipe(
        filter((message: WorkerSynchronizerMessage<T>) => message.type === 'get-current-value'),
        takeUntil(this._destroy$)
      )
      .subscribe((message: WorkerSynchronizerMessage<T>) => this._sendCurrentValue(message));
  }

  private _getMessage(): Observable<WorkerSynchronizerMessage<T>> {
    return this._messenger
      .getMessage<WorkerSynchronizerMessage<T>>()
      .pipe(filter(isBroadcastMessage(this._channelName)));
  }

  private _setCurrentValue(value: WorkerSynchronizerMessage<T>): void {
    this._currentValue = value;
    this._currentValueSet = true;
    this._currentValue$.next(value);
  }

  private _sendCurrentValue(message: WorkerSynchronizerMessage<T>): void {
    if (!this._currentValueSet) {
      return;
    }
    const currentValueOutcomeMessage: WorkerSynchronizerMessage<T> = {
      action: 'worker-synchronizer',
      payload: this._currentValue!.payload,
      type: 'send-current-value',
      channelName: this._channelName,
      timestamp: message.timestamp
    };
    this._messenger.postMessage(currentValueOutcomeMessage);
  }

  public destroy(): void {
    this._destroy$.next(undefined);
    this._destroy$.complete();
  }
}
