const mqtt = require('mqtt') const { v4: uuidv4 } = require('uuid'); // 顶部添加 const Framework = require('../../../framework/node-core-framework'); const logs = require('../logProxy'); // 获取logsService class MqttSyncClient { constructor(brokerUrl, options = {}) { this.client = mqtt.connect(brokerUrl, options) this.isConnected = false /** @type {string[]} 需在每次 connect(含重连)后向 Broker 幂等订阅的主题 */ this._maintainedTopics = [] /** 最近一次收到任意 `response` 主题消息的时间(用于超时日志关联) */ this.lastResponseAt = null // 使用 Map 结构优化消息监听器,按 topic 分组 this.messageListeners = new Map(); // Map> this.globalListeners = new Set(); // 全局监听器(监听所有 topic) const ts = () => new Date().toISOString() const markDisconnected = (reason) => { this.isConnected = false console.warn(`[MQTT] ${ts()} 连接不可用 reason=${reason}`) } this.client.on('connect', () => { this.isConnected = true console.log(`[MQTT] ${ts()} 服务端已连接(含重连后的 connect)`) this._resubscribeMaintainedTopics() }) this.client.on('reconnect', () => { console.log(`[MQTT] ${ts()} 正在重连 Broker...`) }) this.client.on('offline', () => { markDisconnected('offline') }) this.client.on('disconnect', () => { markDisconnected('disconnect') }) this.client.on('close', () => { markDisconnected('close') }) this.client.on('end', () => { markDisconnected('end') }) this.client.on('message', (topic, message) => { let messageObj; try { messageObj = JSON.parse(message.toString()); } catch (error) { console.warn('[MQTT] 消息解析失败:', error.message); return; } if (topic === 'response') { this.lastResponseAt = Date.now() } // 1. 触发该 topic 的专用监听器 const topicListeners = this.messageListeners.get(topic); if (topicListeners && topicListeners.size > 0) { topicListeners.forEach(listener => { try { listener(topic, messageObj); } catch (error) { console.error('[MQTT] Topic监听器执行失败:', error.message); } }); } // 2. 触发全局监听器 if (this.globalListeners.size > 0) { this.globalListeners.forEach(listener => { try { listener(topic, messageObj); } catch (error) { console.error('[MQTT] 全局监听器执行失败:', error.message); } }); } }) this.client.on('error', (err) => { console.warn(`[MQTT] ${ts()} Error:`, err && err.message ? err.message : err) }) } /** * 与 mqtt.js 原生 connected 对齐,供单例健康检查 */ isBrokerConnected() { return !!(this.client && this.client.connected) } /** * 注册需在每次 connect 后向 Broker 重新声明订阅的主题(不重复注册消息监听器) * @param {string[]} topics */ setMaintainedTopics(topics) { this._maintainedTopics = Array.isArray(topics) ? [...topics] : [] } _resubscribeMaintainedTopics() { if (!this._maintainedTopics.length) return if (!this.client || !this.client.connected) return const ts = new Date().toISOString() for (const topic of this._maintainedTopics) { this.client.subscribe(topic, { qos: 0 }, (err, granted) => { if (err) { console.warn(`[MQTT] ${ts} ensureSubscriptions 订阅失败 topic=${topic}`, err.message || err) } else { console.log(`[MQTT] ${ts} ensureSubscriptions 已订阅 topic=${topic}`, granted) } }) } } waitForConnect(timeout = 5000) { return new Promise((resolve, reject) => { if (this.isBrokerConnected()) { this.isConnected = true return resolve() } const timer = setTimeout(() => { reject(new Error('MQTT connect timeout')) }, timeout) const check = () => { if (this.isBrokerConnected()) { this.isConnected = true clearTimeout(timer) resolve() } else { setTimeout(check, 100) } } check() }) } publish(topic, message, options = {}) { return new Promise((resolve, reject) => { this.client.publish(topic, message, options, (err) => { if (err) reject(err) else resolve() }) }) } subscribe(topic, callback = null, options = {}) { return new Promise((resolve, reject) => { this.client.subscribe(topic, options, (err, granted) => { if (err) { reject(err) } else { // 如果提供了回调函数,添加到消息监听器 if (callback && typeof callback === 'function') { const messageHandler = (responseTopic, message) => { if (responseTopic !== topic) return try { callback(topic, message) } catch (error) { console.warn(`[MQTT] 处理订阅消息失败: ${error.message}`) } } // 添加到消息监听器 this.addMessageListener(messageHandler) // 在订阅成功后返回订阅信息和消息处理器 resolve({ granted, messageHandler, topic }) } else { resolve(granted) } } }) }) } /** * 发布指令并等待某个主题返回(模拟同步),通过uuid确定唯一项 * @param {*} requestTopic * @param {*} requestMessage * @param {*} responseTopic * @param {*} timeout */ async publishAndWait(sn_code, requestMessage, timeout = 60 * 3 * 1000) { return new Promise((resolve, reject) => { console.log(`[MQTT指令] 发送到主题: request_${sn_code}`, requestMessage); const uuid = uuidv4(); // 将uuid加入消息体,建议使用JSON格式 const msgObj = typeof requestMessage === 'object' ? { ...requestMessage, uuid } : { data: requestMessage, uuid }; const sendMsg = JSON.stringify(msgObj); const timer = setTimeout(() => { this.removeMessageListener(onMessage); const last = this.lastResponseAt const extra = last ? ` lastResponseAt=${new Date(last).toISOString()} brokerConnected=${this.isBrokerConnected()}` : ` brokerConnected=${this.isBrokerConnected()}` console.warn(`[MQTT] ${new Date().toISOString()} publishAndWait 超时 uuid=${uuid} topic=request_${sn_code}${extra}`) reject(new Error('Timeout waiting for response' + (last ? `; lastResponseAt=${new Date(last).toISOString()}` : ''))); }, timeout); const onMessage = (topic, message) => { let { uuid: responseUuid } = message if (topic === 'response' && responseUuid === uuid) { clearTimeout(timer); this.removeMessageListener(onMessage); resolve(message) } } this.addMessageListener(onMessage) this.publish(`request_${sn_code}`, sendMsg).catch(err => { clearTimeout(timer); this.removeMessageListener(onMessage); reject(err); }); }); } /** * 添加消息监听器 * @param {Function} fn - 监听器函数 * @param {string} topic - 可选,指定监听的 topic,不指定则监听所有 */ addMessageListener(fn, topic = null) { if (typeof fn !== 'function') { throw new Error('监听器必须是函数'); } if (topic) { // 添加到特定 topic 的监听器 if (!this.messageListeners.has(topic)) { this.messageListeners.set(topic, new Set()); } this.messageListeners.get(topic).add(fn); } else { // 添加到全局监听器 this.globalListeners.add(fn); } } /** * 移除消息监听器 * @param {Function} fn - 监听器函数 * @param {string} topic - 可选,指定从哪个 topic 移除 */ removeMessageListener(fn, topic = null) { if (topic) { // 从特定 topic 移除 const topicListeners = this.messageListeners.get(topic); if (topicListeners) { topicListeners.delete(fn); // 如果该 topic 没有监听器了,删除整个 Set if (topicListeners.size === 0) { this.messageListeners.delete(topic); } } } else { // 从全局监听器移除 this.globalListeners.delete(fn); // 也尝试从所有 topic 中移除(兼容旧代码) for (const [topicKey, listeners] of this.messageListeners.entries()) { listeners.delete(fn); if (listeners.size === 0) { this.messageListeners.delete(topicKey); } } } } /** * 取消订阅主题 * @param {string} topic 要取消订阅的主题 * @param {object} subscriptionInfo 订阅时返回的订阅信息 */ async unsubscribe(topic, subscriptionInfo = null) { return new Promise((resolve, reject) => { this.client.unsubscribe(topic, (err) => { if (err) { reject(err) } else { // 如果提供了订阅信息,移除对应的消息处理器 if (subscriptionInfo && subscriptionInfo.messageHandler) { this.removeMessageListener(subscriptionInfo.messageHandler) } resolve() } }) }) } end(force = false) { this.isConnected = false this.client.end(force) } } module.exports = MqttSyncClient