import { Injectable } from '@angular/core';
import * as signalR from '@microsoft/signalr';
import { Observable, Subject, catchError, from, throwError } from 'rxjs';
import { environment } from '../environments/environment';
import { WebSocketState } from '../models/websocket-state-type';
import { SocketModel } from './../models/socket-model';
import { AuthService } from './auth.service';

@Injectable({
  providedIn: 'root',
})
export class ConnectionService<T> {
  private _connection!: signalR.HubConnection;

  private _streamDataSubject = new Subject<SocketModel<T>>();
  private _streamCompletedSubject = new Subject<boolean>();
  public streamDataObs = this._streamDataSubject.asObservable();
  public streamCompletedObs = this._streamCompletedSubject.asObservable();

  private expectedSequenceNumber = 0;
  private messageBuffer: SocketModel<T>[] = [];

  constructor(private _authService: AuthService) {}

  public consumeStreamData() {
    this.closeConnection();
    return this.initializeConnection().pipe(
      catchError((error) => {
        this._connection.stop();
        return throwError(() => new Error(error));
      })
    );
  }

  public sendDisconnect() {
    if (this._connection?.state === signalR.HubConnectionState.Connected) {
      this._connection
        ?.invoke('Disconnect')
        .then(() => {})
        .catch(() => {});
    }
  }

  public closeConnection() {
    this._connection?.stop();
    this._streamDataSubject.complete();
    this._streamDataSubject = new Subject<SocketModel<T>>();
    this.streamDataObs = this._streamDataSubject.asObservable();
    this._streamCompletedSubject.next(true);
    this._streamCompletedSubject = new Subject<boolean>();
    this.streamCompletedObs = this._streamCompletedSubject.asObservable();
  }

  private initializeConnection(): Observable<void> {
    this._connection = new signalR.HubConnectionBuilder()
      .configureLogging(signalR.LogLevel.Information)
      .withAutomaticReconnect()
      .withUrl(`${environment.apiBaseUrl}/stream`, {
        accessTokenFactory: () => this._authService.getAuthToken(),
      })
      .build();

    this.messageBuffer = [];
    this.expectedSequenceNumber = 0;

    this._connection.on('ChunkReceive', (data: SocketModel<T>) => {
      if (
        data.state === WebSocketState.Closed ||
        data.state === WebSocketState.Aborted ||
        data.state === WebSocketState.CloseReceived
      ) {
        this.closeConnection();
      }

      if (data.chunkNumber === this.expectedSequenceNumber) {
        this.processInSequenceChunk(data);
      } else if (data.chunkNumber > this.expectedSequenceNumber) {
        this.bufferOutOfSequenceChunk(data);
      }
    });

    return from(this._connection.start());
  }

  private processInSequenceChunk(data: SocketModel<T>): void {
    this._streamDataSubject.next(data);
    this.expectedSequenceNumber++;
    this.processBufferedChunks();
  }

  private bufferOutOfSequenceChunk(data: SocketModel<T>): void {
    this.messageBuffer.push(data);
    this.messageBuffer.sort((a, b) => a.chunkNumber - b.chunkNumber);
  }

  private processBufferedChunks(): void {
    while (this.messageBuffer.length > 0 && this.messageBuffer[0].chunkNumber === this.expectedSequenceNumber) {
      this._streamDataSubject.next(this.messageBuffer.shift()!);
      this.expectedSequenceNumber++;
    }
  }
}
