import { Injectable } from '@angular/core'; import { IMqttMessage, IMqttServiceOptions, MqttService, IPublishOptions, } from 'ngx-mqtt'; import { IClientSubscribeOptions } from 'mqtt-browser'; import {BehaviorSubject, Observable, Subject, Subscription} from 'rxjs'; import {TOPIC_GETTING, TOPIC_INFO, TOPIC_LOG, TOPIC_SETTING} from "../../app.constants"; @Injectable({ providedIn: 'root' }) export class MqttClientService { client: MqttService | undefined; constructor(private _mqttService: MqttService) { this.client = this._mqttService; } private curSubscription: Subscription | undefined; private settingSubscription: Subscription | undefined; connection = { hostname: 'broker.hivemq.com', clean: true, path:'/mqtt', port: 8884, reconnectPeriod: 4000, protocol: 'wss', } receiveNews = ''; isConnection = false; private statusSubject = new BehaviorSubject(this.isConnection); public status$ = this.statusSubject.asObservable(); private messagesSubject$ = new BehaviorSubject(null); public messages$ = this.messagesSubject$.asObservable(); private settingSubject$ = new BehaviorSubject(null); public setting$ = this.settingSubject$.asObservable(); // 创建连接 createConnection() { try { this.client?.connect(this.connection as IMqttServiceOptions) } catch (error) { console.log('mqtt.connect error', error); } this.client?.onConnect.subscribe(() => { this.isConnection = true; this.statusSubject.next(this.isConnection); console.log('Connection succeeded!'); }); this.client?.onError.subscribe((error: any) => { this.isConnection = false; this.statusSubject.next(this.isConnection); this.clearLog(); console.log('Connection failed', error); }); this.client?.onClose.subscribe((error: any) => { this.isConnection = false; this.statusSubject.next(this.isConnection); this.clearLog(); console.log('Connection close', error); }); } // 订阅主题 getLog() { const { topic, qos } = { topic: TOPIC_LOG, qos: 0, }; this.curSubscription = this.client?.observe(topic, { qos, dup: false } as IClientSubscribeOptions).subscribe((message: IMqttMessage) => { try { let plainMessage = ""; for (var i = 0; i < message.payload.length; i++) { plainMessage += String.fromCharCode(parseInt(String(message.payload[i]))); } console.log(JSON.parse(plainMessage)); this.messagesSubject$.next(JSON.parse(plainMessage)); }catch (e) { console.log(e); } }); } // 订阅主题 getSetting() { const { topic, qos } = { topic: TOPIC_SETTING, qos: 0, }; this.settingSubscription = this.client?.observe(topic, { qos, dup: false } as IClientSubscribeOptions).subscribe((message: IMqttMessage) => { try { let plainMessage = ""; for (var i = 0; i < message.payload.length; i++) { plainMessage += String.fromCharCode(parseInt(String(message.payload[i]))); } console.log(JSON.parse(plainMessage)); this.settingSubject$.next(JSON.parse(plainMessage)); }catch (e) { console.log(e); } }); } // 取消订阅 clearLog() { this.curSubscription?.unsubscribe() this.settingSubscription?.unsubscribe(); } // 发送消息 doPublish(data: any) { this.client?.unsafePublish(TOPIC_INFO, JSON.stringify(data), {qos: 0} as IPublishOptions); } // 发送消息 sendPublish(data: any, topic = TOPIC_INFO) { this.client?.unsafePublish(topic, JSON.stringify(data), {qos: 0} as IPublishOptions); // this.client?.publish("isoft/node 4/in4", JSON.stringify(data), {qos: 0} as IPublishOptions).subscribe(); } // 断开连接 destroyConnection() { try { this.client?.disconnect(true) this.isConnection = false console.log('Successfully disconnected!') } catch (error: any) { console.log('Disconnect failed', error.toString()) } } }