const db = require('../dbProxy.js'); const logProxy = require('../logProxy.js'); const deviceManager = require('../schedule/core/deviceManager.js'); const chatManager = require('../job/managers/chatManager'); /** * MQTT 消息分发器 * 处理所有 MQTT 消息的分发和响应(包括请求、心跳、响应等) */ class MqttDispatcher { constructor(components, mqttClient) { this.taskQueue = components.taskQueue; this.mqttClient = mqttClient; this.actionHandlers = new Map(); this.subscribedTopics = new Set(); // 去重防抖:记录最近处理过的 Boss 消息 securityId -> timestamp this.bossMessageDedupMap = new Map(); } /** * 启动 MQTT 分发器 */ start() { this.registerActionHandlers(); console.log('[MQTT分发器] 已启动'); } /** * 注册指令处理器 */ registerActionHandlers() { // 注册各种MQTT指令处理器 this.actionHandlers.set('manual_job', async (params) => { return await this.handleManualJobRequest(params); }); this.actionHandlers.set('get_status', async () => { return this.getSystemStatus(); }); } /** * 处理手动任务请求 */ async handleManualJobRequest(params) { const { sn_code, data } = params; const { keyword, taskType } = data; const config = require('../schedule/config.js'); try { // addTask 内部会检查账号是否启用,这里直接调用即可 const taskId = await this.taskQueue.addTask(sn_code, { taskType: taskType, taskName: `手动任务 - ${keyword || taskType}`, taskParams: data, priority: config.getTaskPriority(taskType, { urgent: true }) }); return { success: true, taskId: taskId, message: '任务已添加到队列' }; } catch (error) { return { success: false, error: error.message }; } } /** * 获取系统状态(供调度管理器使用) */ getSystemStatus() { return { systemStats: deviceManager.getSystemStats(), allDevices: deviceManager.getAllDevicesStatus(), taskQueues: this.taskQueue.getAllDeviceStatus() }; } /** * 分发 MQTT 请求消息 */ async dispatchMqttMessage(topic, messageStr, mqttClient) { try { const message = JSON.parse(messageStr); const { action, data, uuid, platform, sn_code } = message; const deviceCode = topic.replace('request_', '') || sn_code; console.log(`[MQTT分发器] 收到指令 - 设备: ${deviceCode}, 动作: ${action}, UUID: ${uuid}`); // 检查设备是否在线 if (!deviceManager.isDeviceOnline(deviceCode)) { console.log(`[MQTT分发器] 设备 ${deviceCode} 离线,忽略指令`); return this.sendMqttResponse(mqttClient, uuid, { code: 500, message: '设备离线', data: null }); } // 检查授权状态(对于需要授权的操作) const authorizationService = require('../../services/authorization_service'); const authCheck = await authorizationService.checkAuthorization(deviceCode, 'sn_code'); if (!authCheck.is_authorized) { console.log(`[MQTT分发器] 设备 ${deviceCode} 授权检查失败: ${authCheck.message}`); return this.sendMqttResponse(mqttClient, uuid, { code: 403, message: authCheck.message, data: null }); } // 获取对应的处理器 const handler = this.actionHandlers.get(action); if (!handler) { console.log(`[MQTT分发器] 未找到动作 ${action} 的处理器`); return this.sendMqttResponse(mqttClient, uuid, { code: 404, message: `未找到动作 ${action} 的处理器`, data: null }); } // 执行处理器 const result = await handler({ sn_code: deviceCode, action, data, platform, mqttClient, uuid }); // 发送响应 return this.sendMqttResponse(mqttClient, uuid, result); } catch (error) { console.error('[MQTT分发器] 分发指令时出错:', error); try { const message = JSON.parse(messageStr); if (message.uuid) { return this.sendMqttResponse(mqttClient, message.uuid, { code: 500, message: error.message, data: null }); } } catch (e) { console.error('[MQTT分发器] 解析消息失败'); } } } /** * 发送 MQTT 响应 */ async sendMqttResponse(mqttClient, uuid, result) { const response = { uuid: uuid, code: result.code || 200, message: result.message || 'success', data: result.data || null, timestamp: Date.now() }; try { await mqttClient.publish('response', JSON.stringify(response)); console.log(`[MQTT分发器] 发送响应 - UUID: ${uuid}, Code: ${response.code}`); } catch (error) { console.error('[MQTT分发器] 发送响应失败:', error); } } /** * 创建成功响应 */ static createSuccessResponse(data = null, message = 'success') { return { code: 200, message: message, data: data }; } /** * 创建失败响应 */ static createErrorResponse(message, code = 500) { return { code: code, message: message, data: null }; } /** * 处理心跳消息 * @param {object|string} message - 心跳消息对象或JSON字符串 */ async handleHeartbeat(message) { try { // 解析消息(如果是字符串,则解析JSON) let heartbeatData = message; if (typeof message === 'string') { heartbeatData = JSON.parse(message); } const { sn_code, timestamp, status, memory, platform_status, platform_login_status,clientId } = heartbeatData; if (!sn_code) { console.warn('[MQTT心跳] 心跳消息中未找到设备SN码'); return; } // 移除 device_status 模型依赖 // const device_status = db.getModel('device_status'); // let device = await device_status.findByPk(sn_code); let device = null; // device_status 已移除,暂时设为 null const updateData = { isOnline: true, // 收到心跳,设备在线 lastHeartbeatTime: timestamp ? new Date(timestamp) : new Date(), missedHeartbeats: 0 }; // 更新内存信息 if (memory && memory.percent !== undefined) { updateData.memoryUsage = memory.percent; } // 优先使用 platform_login_status(新格式), const loginStatusData = platform_login_status ; if (loginStatusData) { let isLoggedIn = false; let loggedInPlatform = null; let loggedInUsername = null; let loggedInUserId = null; let loginTime = null; // 判断是新格式还是旧格式 isLoggedIn = platform_login_status.login || false; loggedInPlatform = platform_login_status.platform || null; loggedInUsername = platform_login_status.username || ''; loggedInUserId = platform_login_status.user_id || null; if (platform_login_status.timestamp) { loginTime = new Date(platform_login_status.timestamp); } // 移除 device_status 依赖,previousIsLoggedIn 暂时设为 false const previousIsLoggedIn = false; // device_status 已移除 updateData.isLoggedIn = isLoggedIn; if (isLoggedIn) { updateData.platform = loggedInPlatform; updateData.accountName = loggedInUsername || ''; // 如果之前未登录,现在登录了,更新登录时间 if (!previousIsLoggedIn) { updateData.loginTime = loginTime || updateData.lastHeartbeatTime; } console.log(`[MQTT心跳] 设备 ${sn_code} 已登录 - 平台: ${loggedInPlatform}, 用户: ${loggedInUsername}, ID: ${loggedInUserId}`); } else { // 如果之前已登录,现在未登录了,清空登录相关信息 if (previousIsLoggedIn) { updateData.accountName = ''; // loginTime 保持不变,记录最后登录时间 } console.log(`[MQTT心跳] 设备 ${sn_code} 未登录`); } } // 更新 pla_account 表中的在线和登录状态 try { const pla_account = db.getModel('pla_account'); await pla_account.update( { is_online: 1, is_logged_in: updateData.isLoggedIn ? 1 : 0 }, { where: { sn_code } } ); } catch (error) { console.error(`[MQTT心跳] 更新数据库状态失败:`, error); } // 记录心跳到设备管理器(包含登录状态) const heartbeatPayload = { isLoggedIn: updateData.isLoggedIn || false, ...heartbeatData }; await deviceManager.recordHeartbeat(sn_code, heartbeatPayload); } catch (error) { console.error('[MQTT心跳] 处理心跳消息失败:', error); } } /** * 处理来自 boss-automation-nodejs 的 Boss 聊天消息 * @param {object|string} message - Boss 消息对象或 JSON 字符串 */ async handleBossMessage(message) { try { let data = message; if (typeof message === 'string') { try { data = JSON.parse(message); } catch (e) { console.warn('[MQTT Boss 消息] JSON 解析失败,按原始字符串处理'); } } const sn_code = data && data.sn_code ? data.sn_code : null; const payload = data && data.payload ? data.payload : null; if (!sn_code || !payload) { console.warn('[MQTT Boss 消息] sn_code 或 payload 缺失,忽略:', data); return; } // 按你给的结构解析:取第一条消息的文本等关键信息 const firstMsg = Array.isArray(payload.messages) && payload.messages.length > 0 ? payload.messages[0] : null; const fromUidObj = firstMsg && firstMsg.from && firstMsg.from.uid; const toUidObj = firstMsg && firstMsg.to && firstMsg.to.uid; const text = firstMsg && firstMsg.body && typeof firstMsg.body.text === 'string' ? firstMsg.body.text : null; // 兼容 uid 为数字或 { low, high } 两种格式 const toUidStr = (uid) => { if (uid == null) return null; if (typeof uid === 'number' && !Number.isNaN(uid)) return String(uid); if (typeof uid === 'object' && typeof uid.low === 'number') return String(uid.low); return null; }; const normalized = { sn_code, type: payload.type || null, version: payload.version || null, from_uid: toUidStr(fromUidObj), to_uid: toUidStr(toUidObj), text, raw: payload }; // 去重防抖:按 securityId(或 cmid)在一定时间窗口内只处理一次 const securityId = firstMsg && firstMsg.securityId; const cmidObj = firstMsg && firstMsg.cmid; const cmid = cmidObj && typeof cmidObj.low === 'number' ? `${cmidObj.high}:${cmidObj.low}` : null; const dedupKey = securityId || cmid; const now = Date.now(); const windowMs = 2 * 60 * 1000; // 2 分钟内视为重复 if (dedupKey) { const lastTs = this.bossMessageDedupMap.get(dedupKey); if (lastTs && now - lastTs < windowMs) { console.log('[MQTT Boss 消息] 检测到重复消息,跳过处理:', { sn_code, dedupKey }); return; } this.bossMessageDedupMap.set(dedupKey, now); } console.log('[MQTT Boss 消息] 解析结果:', { sn_code: normalized.sn_code, type: normalized.type, version: normalized.version, from_uid: normalized.from_uid, to_uid: normalized.to_uid, text: normalized.text }); // 落库到 chat_message 表(与自动沟通的落库格式保持一致) try { const chatMessageModel = db.getModel('chat_message'); const platform = 'boss'; const friendId = normalized.from_uid ? Number(normalized.from_uid) : 0; const mid = firstMsg && firstMsg.mid && typeof firstMsg.mid.low === 'number' ? firstMsg.mid.low : 0; const base = { sn_code, platform, friendId, encryptFriendId: '', fetch_time: new Date() }; if (friendId && chatMessageModel) { const existing = await chatMessageModel.findOne({ where: { sn_code, platform, friendId, mid } }); const row = { ...base, mid, message_data: firstMsg }; if (existing) { await existing.update(row); } else { await chatMessageModel.create(row); } } } catch (e) { console.warn('[MQTT Boss 消息] 写入 chat_message 失败:', e.message); } // 调用现有 AI 自动回复流程(基于 get_chat_detail + getReplyContentFromDetail) try { const friendIdNum = normalized.from_uid != null ? Number(normalized.from_uid) : 0; const hasValidFriendId = friendIdNum > 0 && Number.isFinite(friendIdNum); if (hasValidFriendId && this.mqttClient) { const result = await chatManager.auto_reply_with_ai(sn_code, this.mqttClient, { friendId: friendIdNum, platform: 'boss' }); console.log('[MQTT Boss 消息] AI 自动回复结果:', result); } else if (!hasValidFriendId && normalized.from_uid != null) { console.warn('[MQTT Boss 消息] 跳过 AI 回复:friendId 无效或为 0', { from_uid: normalized.from_uid, friendIdNum }); } } catch (e) { console.warn('[MQTT Boss 消息] AI 自动回复失败:', e.message); } } catch (error) { console.error('[MQTT Boss 消息] 处理 Boss 消息失败:', error); } } /** * 处理响应消息 * @param {object|string} message - 响应消息对象或JSON字符串 */ handleResponse(message) { // 响应消息处理逻辑(目前为空,可根据需要扩展) // 例如:记录响应日志、更新任务状态等 } } module.exports = MqttDispatcher;