const db = require('../dbProxy.js'); const deviceManager = require('../schedule/deviceManager.js'); /** * MQTT 消息分发器 * 处理所有 MQTT 消息的分发和响应(包括请求、心跳、响应等) */ class MqttDispatcher { constructor(components, mqttClient) { this.taskQueue = components.taskQueue; this.mqttClient = mqttClient; this.actionHandlers = new Map(); this.subscribedTopics = new Set(); } /** * 启动 MQTT 分发器 */ start() { this.registerActionHandlers(); console.log('[MQTT分发器] 已启动'); } /** * 注册指令处理器 */ registerActionHandlers() { // 注册各种MQTT指令处理器 this.actionHandlers.set('manual_job', async (params) => { return await this.handleManualJobRequest(params); }); this.actionHandlers.set('get_status', async () => { return this.getSystemStatus(); }); } /** * 处理手动任务请求 */ async handleManualJobRequest(params) { const { sn_code, data } = params; const { keyword, taskType } = data; const config = require('../schedule/config.js'); try { // addTask 内部会检查账号是否启用,这里直接调用即可 const taskId = await this.taskQueue.addTask(sn_code, { taskType: taskType, taskName: `手动任务 - ${keyword || taskType}`, taskParams: data, priority: config.getTaskPriority(taskType, { urgent: true }) }); return { success: true, taskId: taskId, message: '任务已添加到队列' }; } catch (error) { return { success: false, error: error.message }; } } /** * 获取系统状态(供调度管理器使用) */ getSystemStatus() { return { systemStats: deviceManager.getSystemStats(), allDevices: deviceManager.getAllDevicesStatus(), taskQueues: this.taskQueue.getAllDeviceStatus() }; } /** * 分发 MQTT 请求消息 */ async dispatchMqttMessage(topic, messageStr, mqttClient) { try { const message = JSON.parse(messageStr); const { action, data, uuid, platform, sn_code } = message; const deviceCode = topic.replace('request_', '') || sn_code; console.log(`[MQTT分发器] 收到指令 - 设备: ${deviceCode}, 动作: ${action}, UUID: ${uuid}`); // 检查设备是否在线 if (!deviceManager.isDeviceOnline(deviceCode)) { console.log(`[MQTT分发器] 设备 ${deviceCode} 离线,忽略指令`); return this.sendMqttResponse(mqttClient, uuid, { code: 500, message: '设备离线', data: null }); } // 获取对应的处理器 const handler = this.actionHandlers.get(action); if (!handler) { console.log(`[MQTT分发器] 未找到动作 ${action} 的处理器`); return this.sendMqttResponse(mqttClient, uuid, { code: 404, message: `未找到动作 ${action} 的处理器`, data: null }); } // 执行处理器 const result = await handler({ sn_code: deviceCode, action, data, platform, mqttClient, uuid }); // 发送响应 return this.sendMqttResponse(mqttClient, uuid, result); } catch (error) { console.error('[MQTT分发器] 分发指令时出错:', error); try { const message = JSON.parse(messageStr); if (message.uuid) { return this.sendMqttResponse(mqttClient, message.uuid, { code: 500, message: error.message, data: null }); } } catch (e) { console.error('[MQTT分发器] 解析消息失败'); } } } /** * 发送 MQTT 响应 */ async sendMqttResponse(mqttClient, uuid, result) { const response = { uuid: uuid, code: result.code || 200, message: result.message || 'success', data: result.data || null, timestamp: Date.now() }; try { await mqttClient.publish('response', JSON.stringify(response)); console.log(`[MQTT分发器] 发送响应 - UUID: ${uuid}, Code: ${response.code}`); } catch (error) { console.error('[MQTT分发器] 发送响应失败:', error); } } /** * 创建成功响应 */ static createSuccessResponse(data = null, message = 'success') { return { code: 200, message: message, data: data }; } /** * 创建失败响应 */ static createErrorResponse(message, code = 500) { return { code: code, message: message, data: null }; } /** * 处理心跳消息 * @param {object|string} message - 心跳消息对象或JSON字符串 */ async handleHeartbeat(message) { try { // 解析消息(如果是字符串,则解析JSON) let heartbeatData = message; if (typeof message === 'string') { heartbeatData = JSON.parse(message); } const { sn_code, timestamp, status, memory, platform_status, platform_login_status } = heartbeatData; if (!sn_code) { console.warn('[MQTT心跳] 心跳消息中未找到设备SN码'); return; } console.log(`[MQTT心跳] 收到设备 ${sn_code} 的心跳消息`); const device_status = db.getModel('device_status'); // 检查设备是否存在 let device = await device_status.findByPk(sn_code); const updateData = { isOnline: true, // 收到心跳,设备在线 lastHeartbeatTime: timestamp ? new Date(timestamp) : new Date(), missedHeartbeats: 0 }; // 更新内存信息 if (memory && memory.percent !== undefined) { updateData.memoryUsage = memory.percent; } // 优先使用 platform_login_status(新格式), const loginStatusData = platform_login_status ; if (loginStatusData) { let isLoggedIn = false; let loggedInPlatform = null; let loggedInUsername = null; let loggedInUserId = null; let loginTime = null; // 判断是新格式还是旧格式 isLoggedIn = platform_login_status.login || false; loggedInPlatform = platform_login_status.platform || null; loggedInUsername = platform_login_status.username || ''; loggedInUserId = platform_login_status.user_id || null; if (platform_login_status.timestamp) { loginTime = new Date(platform_login_status.timestamp); } // 更新登录状态 const previousIsLoggedIn = device ? device.isLoggedIn : false; updateData.isLoggedIn = isLoggedIn; if (isLoggedIn) { updateData.platform = loggedInPlatform; updateData.accountName = loggedInUsername || ''; // 如果之前未登录,现在登录了,更新登录时间 if (!previousIsLoggedIn) { updateData.loginTime = loginTime || updateData.lastHeartbeatTime; } console.log(`[MQTT心跳] 设备 ${sn_code} 已登录 - 平台: ${loggedInPlatform}, 用户: ${loggedInUsername}, ID: ${loggedInUserId}`); } else { // 如果之前已登录,现在未登录了,清空登录相关信息 if (previousIsLoggedIn) { updateData.accountName = ''; // loginTime 保持不变,记录最后登录时间 } console.log(`[MQTT心跳] 设备 ${sn_code} 未登录`); } } // 更新或创建设备记录 if (device) { await device_status.update(updateData, { where: { sn_code } }); console.log(`[MQTT心跳] 设备 ${sn_code} 状态已更新 - 在线: true, 登录: ${updateData.isLoggedIn}`); } else { // 创建新设备记录 await device_status.create({ sn_code, deviceName: `设备_${sn_code}`, deviceType: 'node_mqtt_client', ...updateData, isRunning: false, taskStatus: 'idle', healthStatus: 'unknown', healthScore: 0 }); console.log(`[MQTT心跳] 设备 ${sn_code} 记录已创建 - 在线: true, 登录: ${updateData.isLoggedIn}`); } // 记录心跳到设备管理器 await deviceManager.recordHeartbeat(sn_code, heartbeatData); } catch (error) { console.error('[MQTT心跳] 处理心跳消息失败:', error); } } /** * 处理响应消息 * @param {object|string} message - 响应消息对象或JSON字符串 */ handleResponse(message) { // 响应消息处理逻辑(目前为空,可根据需要扩展) // 例如:记录响应日志、更新任务状态等 } } module.exports = MqttDispatcher;