import { Injectable } from '@angular/core'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { BehaviorSubject,Subject } from "rxjs"; import { config } from "../../../assets/config/config"; @Injectable({ providedIn: 'root' }) export class SocketService { gateway = config.gateway; websocket$: WebSocketSubject | undefined ; private isConnected: boolean = false; private messagesSubject$ = new Subject(); public messages$ = this.messagesSubject$.asObservable(); private statusSubject = new BehaviorSubject(this.isConnected); public status$ = this.statusSubject.asObservable(); constructor() { } public connect(cfg: { reconnect: boolean } = { reconnect: false }): void { if (cfg.reconnect || !this.websocket$ || this.websocket$.closed) { console.log(cfg.reconnect ? 'Reconnecting WebSocket…' : 'Trying to open a WebSocket connection…'); this.websocket$ = this.getNewWebSocket(); this.websocket$.subscribe((messages) => { this.messagesSubject$.next(messages); }); } } close() { this.websocket$?.complete(); this.websocket$ = undefined; } sendMessage(msg: any) { this.websocket$?.next(msg); } private getNewWebSocket() { return webSocket({ url: this.gateway, openObserver: { next: () => { console.log('Connection ok'); this.isConnected = true; this.statusSubject.next(this.isConnected); } }, closeObserver: { next: () => { console.log('Connection closed'); this.websocket$ = undefined; this.isConnected = false; this.connect({reconnect: true}); this.statusSubject.next(this.isConnected); } }, }); } }