/**
 * @copyright Copyright 2021, BISSELL Homecare, Inc.
 * All Rights Reserved.
 *
 * This is UNPUBLISHED PROPRIETARY SOURCE CODE of BISSELL Homecare, Inc.
 * the contents of this file may not be disclosed to third parties, copied
 * or duplicated in any form, in whole or in part, without the prior
 * written permission of BISSELL Homecare, Inc.
 */

import { Injectable } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { Observable } from "rxjs";
import { Paho } from "ng2-mqtt/mqttws31";
import { AppConfigService } from "../../app.config.service";

@Injectable()
export class MqttService {
  private _client: any;
  private _isConnected: boolean;
  private _observable: Observable<any>;

  constructor(private http: HttpClient, private appConfig: AppConfigService) {}

  public getObservable(): Observable<Observable<any>> {
    let observable = new Observable<Observable<any>>((observer) => {
      if (!this._isConnected || !this._observable) {
        this.http
          .get(this.appConfig.getConfig().portalApiURL + "/mqttConnection")
          .subscribe((data) => {
            this.initializeClient(data);
            observer.next(this._observable);
            observer.complete();
          });
      } else {
        observer.next(this._observable);
        observer.complete();
      }
    });
    return observable;
  }

  public publish(topic: string, message: string) {
    let payload: any = new Paho.MQTT.Message(message);
    payload.destinationName = topic;
    this._client.send(payload);
  }

  public subscribe(topic: string) {
    this.ensureConnected();

    this._client.subscribe(topic);
  }

  public unsubscribe(topic: string) {
    this.ensureConnected();

    this._client.unsubscribe(topic);
  }

  private initializeClient(data: any) {
    this._client = new Paho.MQTT.Client(data.signedUrl, Date.now().toString());
    this.ensureConnected();

    // wire onMessageArrived to an observable
    // onMessageArrived will always just emit the message
    this._observable = new Observable<any>((observer) => {
      this._client.onMessageArrived = (message) => observer.next(message);
    });

    // try reconnecting on a lost connection
    this._client.onConnectionLost = () => {
      this._isConnected = false;
      this.ensureConnected();
    };
  }

  private ensureConnected() {
    if (!this._isConnected) {
      this._client.connect({
        onSuccess: () => (this._isConnected = true),
      });
    }
  }
}
