import * as Y from 'yjs';
import { fromUint8Array, toUint8Array } from 'js-base64';
import { ObservableV2 } from 'lib0/observable';
import { ProviderOptions } from './types';

interface OnSuccessRequestUpdateProps {
  serverStateVector: string;
  serverUpdate: string;
}

interface Events {
  push_update: (
    update: string,
    onSuccess: () => void,
    onError: () => void
  ) => void;
  request_update: (
    timestamp: string,
    onSuccess: (props: OnSuccessRequestUpdateProps) => void,
    onError: () => void
  ) => void;
}

export class Provider extends ObservableV2<Events> {
  #yDoc: Y.Doc;
  #changes: Array<Uint8Array> = [];

  #debounceTime: number;
  #maxBufferSize: number;
  #retryTime: number;
  #retryMaxTime: number;
  #retryCounter = 1;

  #isFlushing = false;
  #startFlushingTimer: NodeJS.Timeout | null = null;

  #requestedStateExchange = false;
  #stateExchangeTimer: NodeJS.Timeout | null = null;

  constructor(ydoc: Y.Doc, options?: ProviderOptions) {
    super();

    this.#yDoc = ydoc;
    this.#debounceTime = options?.debounceTime ?? 1000;
    this.#maxBufferSize = options?.maxBufferSize ?? 100;
    this.#retryTime = options?.retryTime ?? 1000;
    this.#retryMaxTime = options?.retryMaxTime ?? 30000;

    // listen to an event that fires when the local document is updated
    this.#yDoc.on('update', this.#onDocUpdate.bind(this));
  }

  /**
   * Called when the local document is updated
   */
  #onDocUpdate(update: Uint8Array, origin: unknown) {
    // this update was produced either locally or by another provider.
    if (origin !== this) {
      this.#queueUpdate(update);
    }
  }

  /**
   * Called when any syncing with the server was successful
   * Can be update or state exchange
   */
  #onInSync() {
    this.#resetRetryCounter();
    this.#yDoc.emit('sync', [true]);
  }

  /**
   * Queue an update to be sent to the server
   */
  #queueUpdate(update: Uint8Array) {
    this.#yDoc.emit('sync', [false]);
    this.#changes.push(update);

    // Debounce changes to not send each key stroke
    if (this.#startFlushingTimer) clearTimeout(this.#startFlushingTimer);

    // Also flush after the user already input many changes
    if (this.#changes.length >= this.#maxBufferSize) {
      this.#flushUpdates();

      return;
    }

    this.#startFlushingTimer = setTimeout(
      () => this.#flushUpdates(),
      this.#debounceTime
    );
  }

  #onSuccessfullyFlushUpdate() {
    this.#isFlushing = false;

    // If something requested a state exchange during the flushing process, we should start it now
    if (this.#requestedStateExchange) {
      this.#startStateExchange();

      return;
    }

    this.#onInSync();
  }

  #flushUpdates() {
    if (this.#isFlushing) return;

    // It is possible that we did a buffer flush before the timeout went out
    // in this case we just don't flush anything
    if (this.#changes.length === 0) {
      this.#onSuccessfullyFlushUpdate();

      return;
    }

    this.#isFlushing = true;

    const update = Y.mergeUpdates(this.#changes);

    this.#changes = [];

    this.#pushUpdate(update)
      .then(() => this.#onSuccessfullyFlushUpdate())
      // If the push failed, we just start a state exchange
      .catch(() => {
        this.#isFlushing = false;
        this.#retryStateExchange();
      });
  }

  /**
   * Called when the provider receives an update from the server.
   * updates the local document
   */
  #onUpdate(update: string) {
    const updateUint8Array = toUint8Array(update);

    Y.applyUpdate(this.#yDoc, updateUint8Array, this);
  }

  /**
   * Start exchange of state with the server
   */
  #startStateExchange() {
    const onError = () => this.#retryStateExchange();
    const onSuccess = (props: OnSuccessRequestUpdateProps) =>
      this.#onRequestUpdateCallback(props);

    // We shouldn't start a new state exchange if we are in the middle of an update, as this can lead to issues
    // First we need to finish our update and then we can start a new state exchange
    // Therefore, we set a flag that we want to start a state exchange as soon as the update is finished
    if (this.#isFlushing) {
      this.#requestedStateExchange = true;

      return;
    }

    this.#requestedStateExchange = false;

    const stateVector = Y.encodeStateVector(this.#yDoc);
    const base64StateVector = fromUint8Array(stateVector);

    this.emit('request_update', [base64StateVector, onSuccess, onError]);
  }

  #retryStateExchange() {
    if (this.#stateExchangeTimer) clearTimeout(this.#stateExchangeTimer);

    this.#stateExchangeTimer = setTimeout(() => {
      this.#increaseRetryCounter();

      this.#startStateExchange();
    }, this.#getCurrentRetryTime());
  }

  /**
   * Push base64 state updates
   */
  #pushUpdate(update: Uint8Array) {
    return new Promise<void>((resolve, reject) => {
      this.emit('push_update', [fromUint8Array(update), resolve, reject]);
    });
  }

  /**
   * Check if the update is empty
   * The update is empty if it contains no structs and the delete set is equal to the compare delete set
   *
   * Since the deleteSet is always sent in it's full, we can compare the delete set of the update we want to send
   * with the delete set that we got from the server.
   */
  #isEmptyUpdate(update: Uint8Array, compareUpdate: Uint8Array): boolean {
    const { structs, ds } = Y.decodeUpdate(update);
    const { ds: compareDs } = Y.decodeUpdate(compareUpdate);

    return structs.length === 0 && Y.equalDeleteSets(ds, compareDs);
  }

  #getCurrentRetryTime() {
    return Math.min(this.#retryTime * this.#retryCounter, this.#retryMaxTime);
  }

  #increaseRetryCounter() {
    this.#retryCounter++;
  }

  #resetRetryCounter() {
    this.#retryCounter = 1;
  }

  /**
   * Callback for the request update
   * Updates the local Y state with the server state and pushes the latest changes to the server
   */
  #onRequestUpdateCallback(props: OnSuccessRequestUpdateProps) {
    const { serverStateVector, serverUpdate } = props;

    const serverStateVectorUint8Array = toUint8Array(serverStateVector);
    const serverUpdateUint8Array = toUint8Array(serverUpdate);

    this.#onUpdate(serverUpdate);
    const diff = Y.encodeStateAsUpdate(this.#yDoc, serverStateVectorUint8Array);

    // Make sure we don't send empty updates, since the server will create revisions for them
    if (this.#isEmptyUpdate(diff, serverUpdateUint8Array)) {
      // If we don't need to send anything to the server, we should be synced
      this.#onInSync();

      return;
    }

    this.#pushUpdate(diff)
      // If we successfully pushed the update, we are synced
      .then(() => this.#onInSync())
      // When we fail to push the update, we should retry the whole state exchange
      .catch(() => this.#retryStateExchange());
  }

  /**
   * Start the sync process of syncing the YDoc with the server
   */
  sync() {
    this.#startStateExchange();
  }

  /**
   * Update the local YDoc
   */
  update(serverUpdate: string) {
    this.#onUpdate(serverUpdate);
  }
}
