You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

104 lines
3.1KB

  1. import { Injectable } from '@angular/core';
  2. import {
  3. IMqttMessage,
  4. IMqttServiceOptions,
  5. MqttService,
  6. IPublishOptions,
  7. } from 'ngx-mqtt';
  8. import { IClientSubscribeOptions } from 'mqtt-browser';
  9. import {BehaviorSubject, Observable, Subject, Subscription} from 'rxjs';
  10. @Injectable({
  11. providedIn: 'root'
  12. })
  13. export class MqttClientService {
  14. client: MqttService | undefined;
  15. constructor(private _mqttService: MqttService) {
  16. this.client = this._mqttService;
  17. }
  18. private curSubscription: Subscription | undefined;
  19. connection = {
  20. hostname: 'broker.hivemq.com',
  21. clean: true,
  22. path:'/mqtt',
  23. port: 8884,
  24. reconnectPeriod: 4000,
  25. protocol: 'wss',
  26. }
  27. receiveNews = '';
  28. isConnection = false;
  29. private statusSubject = new BehaviorSubject<boolean>(this.isConnection);
  30. public status$ = this.statusSubject.asObservable();
  31. private messagesSubject$ = new BehaviorSubject(null);
  32. public messages$ = this.messagesSubject$.asObservable();
  33. // 创建连接
  34. createConnection() {
  35. try {
  36. this.client?.connect(this.connection as IMqttServiceOptions)
  37. } catch (error) {
  38. console.log('mqtt.connect error', error);
  39. }
  40. this.client?.onConnect.subscribe(() => {
  41. this.isConnection = true;
  42. this.statusSubject.next(this.isConnection);
  43. console.log('Connection succeeded!');
  44. });
  45. this.client?.onError.subscribe((error: any) => {
  46. this.isConnection = false;
  47. this.statusSubject.next(this.isConnection);
  48. console.log('Connection failed', error);
  49. });
  50. // this.client?.onMessage.subscribe((packet: any) => {
  51. // this.receiveNews = this.receiveNews.concat(packet.payload.toString())
  52. // console.log(`Received message ${packet.payload.toString()} from topic ${packet.topic}`);
  53. // this.messagesSubject$.next(packet);
  54. // })
  55. }
  56. // 订阅主题
  57. getLog() {
  58. const { topic, qos } = {
  59. topic: 'isoft/node 4/log4',
  60. qos: 0,
  61. };
  62. this.client?.observe(topic, { qos, dup: false } as IClientSubscribeOptions).subscribe((message: IMqttMessage) => {
  63. try {
  64. let plainMessage = "";
  65. for (var i = 0; i < message.payload.length; i++) {
  66. plainMessage += String.fromCharCode(parseInt(String(message.payload[i])));
  67. }
  68. console.log(JSON.parse(plainMessage));
  69. this.messagesSubject$.next(JSON.parse(plainMessage));
  70. }catch (e) {
  71. console.log(e);
  72. }
  73. });
  74. }
  75. // 取消订阅
  76. clearLog() {
  77. this.curSubscription?.unsubscribe()
  78. }
  79. // 发送消息
  80. doPublish(data: any) {
  81. this.client?.unsafePublish("isoft/node 4/in4", JSON.stringify(data), {qos: 0} as IPublishOptions);
  82. }
  83. // 发送消息
  84. sendPublish(data: any) {
  85. this.client?.unsafePublish("isoft/node 4/in4", JSON.stringify(data), {qos: 0} as IPublishOptions);
  86. // this.client?.publish("isoft/node 4/in4", JSON.stringify(data), {qos: 0} as IPublishOptions).subscribe();
  87. }
  88. // 断开连接
  89. destroyConnection() {
  90. try {
  91. this.client?.disconnect(true)
  92. this.isConnection = false
  93. console.log('Successfully disconnected!')
  94. } catch (error: any) {
  95. console.log('Disconnect failed', error.toString())
  96. }
  97. }
  98. }