From 7ef0c68ad10c1b5042e5764bf484dbb22d7cedf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=88=90?= Date: Fri, 10 Apr 2026 18:45:10 +0800 Subject: [PATCH] 1 --- api/middleware/mqtt/mqttClient.js | 84 +++++++++++++++++++++++++++--- api/middleware/mqtt/mqttManager.js | 53 ++++++++++++++----- api/middleware/schedule/index.js | 11 +++- config/config.js | 15 ++++-- 4 files changed, 139 insertions(+), 24 deletions(-) diff --git a/api/middleware/mqtt/mqttClient.js b/api/middleware/mqtt/mqttClient.js index 2dbb044..29f04f3 100644 --- a/api/middleware/mqtt/mqttClient.js +++ b/api/middleware/mqtt/mqttClient.js @@ -8,15 +8,45 @@ class MqttSyncClient { constructor(brokerUrl, options = {}) { this.client = mqtt.connect(brokerUrl, options) this.isConnected = false + /** @type {string[]} 需在每次 connect(含重连)后向 Broker 幂等订阅的主题 */ + this._maintainedTopics = [] + /** 最近一次收到任意 `response` 主题消息的时间(用于超时日志关联) */ + this.lastResponseAt = null // 使用 Map 结构优化消息监听器,按 topic 分组 this.messageListeners = new Map(); // Map> this.globalListeners = new Set(); // 全局监听器(监听所有 topic) + const ts = () => new Date().toISOString() + const markDisconnected = (reason) => { + this.isConnected = false + console.warn(`[MQTT] ${ts()} 连接不可用 reason=${reason}`) + } + this.client.on('connect', () => { this.isConnected = true + console.log(`[MQTT] ${ts()} 服务端已连接(含重连后的 connect)`) + this._resubscribeMaintainedTopics() + }) - console.log('MQTT 服务端已连接') + this.client.on('reconnect', () => { + console.log(`[MQTT] ${ts()} 正在重连 Broker...`) + }) + + this.client.on('offline', () => { + markDisconnected('offline') + }) + + this.client.on('disconnect', () => { + markDisconnected('disconnect') + }) + + this.client.on('close', () => { + markDisconnected('close') + }) + + this.client.on('end', () => { + markDisconnected('end') }) this.client.on('message', (topic, message) => { @@ -29,6 +59,9 @@ class MqttSyncClient { return; } + if (topic === 'response') { + this.lastResponseAt = Date.now() + } // 1. 触发该 topic 的专用监听器 const topicListeners = this.messageListeners.get(topic); @@ -56,18 +89,52 @@ class MqttSyncClient { }) this.client.on('error', (err) => { - console.warn('[MQTT] Error:', err.message) + console.warn(`[MQTT] ${ts()} Error:`, err && err.message ? err.message : err) }) } + /** + * 与 mqtt.js 原生 connected 对齐,供单例健康检查 + */ + isBrokerConnected() { + return !!(this.client && this.client.connected) + } + + /** + * 注册需在每次 connect 后向 Broker 重新声明订阅的主题(不重复注册消息监听器) + * @param {string[]} topics + */ + setMaintainedTopics(topics) { + this._maintainedTopics = Array.isArray(topics) ? [...topics] : [] + } + + _resubscribeMaintainedTopics() { + if (!this._maintainedTopics.length) return + if (!this.client || !this.client.connected) return + const ts = new Date().toISOString() + for (const topic of this._maintainedTopics) { + this.client.subscribe(topic, { qos: 0 }, (err, granted) => { + if (err) { + console.warn(`[MQTT] ${ts} ensureSubscriptions 订阅失败 topic=${topic}`, err.message || err) + } else { + console.log(`[MQTT] ${ts} ensureSubscriptions 已订阅 topic=${topic}`, granted) + } + }) + } + } + waitForConnect(timeout = 5000) { return new Promise((resolve, reject) => { - if (this.isConnected) return resolve() + if (this.isBrokerConnected()) { + this.isConnected = true + return resolve() + } const timer = setTimeout(() => { reject(new Error('MQTT connect timeout')) }, timeout) const check = () => { - if (this.isConnected) { + if (this.isBrokerConnected()) { + this.isConnected = true clearTimeout(timer) resolve() } else { @@ -113,7 +180,6 @@ class MqttSyncClient { resolve(granted) } } - 1 }) }) } @@ -143,7 +209,12 @@ class MqttSyncClient { const timer = setTimeout(() => { this.removeMessageListener(onMessage); - reject(new Error('Timeout waiting for response')); + const last = this.lastResponseAt + const extra = last + ? ` lastResponseAt=${new Date(last).toISOString()} brokerConnected=${this.isBrokerConnected()}` + : ` brokerConnected=${this.isBrokerConnected()}` + console.warn(`[MQTT] ${new Date().toISOString()} publishAndWait 超时 uuid=${uuid} topic=request_${sn_code}${extra}`) + reject(new Error('Timeout waiting for response' + (last ? `; lastResponseAt=${new Date(last).toISOString()}` : ''))); }, timeout); const onMessage = (topic, message) => { @@ -242,6 +313,7 @@ class MqttSyncClient { } end(force = false) { + this.isConnected = false this.client.end(force) } } diff --git a/api/middleware/mqtt/mqttManager.js b/api/middleware/mqtt/mqttManager.js index 6b896ef..5a2ceed 100644 --- a/api/middleware/mqtt/mqttManager.js +++ b/api/middleware/mqtt/mqttManager.js @@ -1,8 +1,30 @@ const MqttSyncClient = require('./mqttClient'); const Framework = require('../../../framework/node-core-framework'); const logs = require('../logProxy'); +const appConfig = require('../../../config/config.js'); // action.js 已合并到 mqttDispatcher.js,不再需要单独引入 +function buildMqttManagerConfig() { + const mqttCfg = appConfig.mqtt || {}; + const brokerUrl = (mqttCfg.brokerUrl && String(mqttCfg.brokerUrl).trim()) + ? mqttCfg.brokerUrl.trim() + : `mqtt://${mqttCfg.host || '192.144.167.231'}:${mqttCfg.port != null ? mqttCfg.port : 1883}`; + const options = { + clientId: mqttCfg.clientId || `mqtt_server_${Math.random().toString(16).substr(2, 8)}`, + clean: mqttCfg.clean !== false, + connectTimeout: mqttCfg.connectTimeout != null ? mqttCfg.connectTimeout : 5000, + reconnectPeriod: mqttCfg.reconnectPeriod != null ? mqttCfg.reconnectPeriod : 5000, + keepalive: mqttCfg.keepalive != null ? mqttCfg.keepalive : 60 + }; + if (mqttCfg.username) { + options.username = mqttCfg.username; + } + if (mqttCfg.password) { + options.password = mqttCfg.password; + } + return { brokerUrl, options }; +} + /** * MQTT管理器 - 单例模式 * 负责管理MQTT连接,确保全局只有一个MQTT客户端实例 @@ -11,16 +33,7 @@ class MqttManager { constructor() { this.client = null; this.isInitialized = false; - this.config = { - brokerUrl: 'mqtt://192.144.167.231:1883', // MQTT Broker地址 - options: { - clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`, - clean: true, - connectTimeout: 5000, - reconnectPeriod: 5000, // 自动重连间隔 - keepalive: 10 - } - }; + this.config = buildMqttManagerConfig(); } /** @@ -30,8 +43,16 @@ class MqttManager { */ async getInstance(config = {}) { if (this.client && this.isInitialized) { - console.log('[MQTT管理器] 返回已存在的MQTT客户端实例'); - return this.client; + const brokerOk = typeof this.client.isBrokerConnected === 'function' + ? this.client.isBrokerConnected() + : this.client.isConnected; + if (!brokerOk) { + console.warn('[MQTT管理器] 单例已初始化但 Broker 未连接,重置并重建'); + await this.reset(); + } else { + console.log('[MQTT管理器] 返回已存在的MQTT客户端实例'); + return this.client; + } } // 合并配置 @@ -91,7 +112,13 @@ class MqttManager { * @returns {boolean} */ isReady() { - return this.isInitialized && this.client && this.client.isConnected; + if (!this.isInitialized || !this.client) { + return false; + } + if (typeof this.client.isBrokerConnected === 'function') { + return this.client.isBrokerConnected(); + } + return !!this.client.isConnected; } /** diff --git a/api/middleware/schedule/index.js b/api/middleware/schedule/index.js index 402fa07..dfae57f 100644 --- a/api/middleware/schedule/index.js +++ b/api/middleware/schedule/index.js @@ -141,6 +141,11 @@ class ScheduleManager { console.error('[调度管理器] 处理 Boss 消息失败:', error); } }); + + // 重连后向 Broker 幂等声明订阅(监听器仅在上方注册一次,不重复添加) + if (typeof this.mqttClient.setMaintainedTopics === 'function') { + this.mqttClient.setMaintainedTopics(['heartbeat', 'response', 'boss/message']); + } } @@ -152,7 +157,11 @@ class ScheduleManager { const status = this.mqttDispatcher ? this.mqttDispatcher.getSystemStatus() : {}; return { isInitialized: this.isInitialized, - mqttConnected: this.mqttClient && this.mqttClient.isConnected, + mqttConnected: this.mqttClient && ( + typeof this.mqttClient.isBrokerConnected === 'function' + ? this.mqttClient.isBrokerConnected() + : this.mqttClient.isConnected + ), systemStats: deviceManager.getSystemStats(), allDevices: deviceManager.getAllDevicesStatus(), taskQueues: TaskQueue.getAllDeviceStatus(), diff --git a/config/config.js b/config/config.js index dacc764..72d91af 100644 --- a/config/config.js +++ b/config/config.js @@ -71,13 +71,20 @@ module.exports = { "model": "qwen-plus" }, - // MQTT配置 + // MQTT配置(Broker 地址、保活与重连与 Broker 策略对齐时可改此处或环境变量) mqtt: { - host: process.env.MQTT_HOST || 'localhost', - port: process.env.MQTT_PORT || 1883, + /** 完整连接串,优先于 host+port */ + brokerUrl: process.env.MQTT_BROKER_URL || '', + host: process.env.MQTT_HOST || '192.144.167.231', + port: Number(process.env.MQTT_PORT || 1883), username: process.env.MQTT_USERNAME || '', password: process.env.MQTT_PASSWORD || '', - clientId: 'autowork-' + Math.random().toString(16).substr(2, 8) + clientId: process.env.MQTT_CLIENT_ID || `mqtt_server_${Math.random().toString(16).substr(2, 8)}`, + clean: true, + connectTimeout: Number(process.env.MQTT_CONNECT_TIMEOUT || 5000), + reconnectPeriod: Number(process.env.MQTT_RECONNECT_PERIOD || 5000), + /** 秒;过小易被 Broker 策略影响,过大对断线感知慢 */ + keepalive: Number(process.env.MQTT_KEEPALIVE || 60) }, // 定时任务配置