Files
autoAiWorkSys/api/middleware/mqtt/mqttDispatcher.js
张成 6e5c35f144 1
2025-12-15 18:36:20 +08:00

309 lines
10 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const db = require('../dbProxy.js');
const logProxy = require('../logProxy.js');
const deviceManager = require('../schedule/deviceManager.js');
/**
* MQTT 消息分发器
* 处理所有 MQTT 消息的分发和响应(包括请求、心跳、响应等)
*/
class MqttDispatcher {
constructor(components, mqttClient) {
this.taskQueue = components.taskQueue;
this.mqttClient = mqttClient;
this.actionHandlers = new Map();
this.subscribedTopics = new Set();
}
/**
* 启动 MQTT 分发器
*/
start() {
this.registerActionHandlers();
console.log('[MQTT分发器] 已启动');
}
/**
* 注册指令处理器
*/
registerActionHandlers() {
// 注册各种MQTT指令处理器
this.actionHandlers.set('manual_job', async (params) => {
return await this.handleManualJobRequest(params);
});
this.actionHandlers.set('get_status', async () => {
return this.getSystemStatus();
});
}
/**
* 处理手动任务请求
*/
async handleManualJobRequest(params) {
const { sn_code, data } = params;
const { keyword, taskType } = data;
const config = require('../schedule/config.js');
try {
// addTask 内部会检查账号是否启用,这里直接调用即可
const taskId = await this.taskQueue.addTask(sn_code, {
taskType: taskType,
taskName: `手动任务 - ${keyword || taskType}`,
taskParams: data,
priority: config.getTaskPriority(taskType, { urgent: true })
});
return {
success: true,
taskId: taskId,
message: '任务已添加到队列'
};
} catch (error) {
return {
success: false,
error: error.message
};
}
}
/**
* 获取系统状态(供调度管理器使用)
*/
getSystemStatus() {
return {
systemStats: deviceManager.getSystemStats(),
allDevices: deviceManager.getAllDevicesStatus(),
taskQueues: this.taskQueue.getAllDeviceStatus()
};
}
/**
* 分发 MQTT 请求消息
*/
async dispatchMqttMessage(topic, messageStr, mqttClient) {
try {
const message = JSON.parse(messageStr);
const { action, data, uuid, platform, sn_code } = message;
const deviceCode = topic.replace('request_', '') || sn_code;
console.log(`[MQTT分发器] 收到指令 - 设备: ${deviceCode}, 动作: ${action}, UUID: ${uuid}`);
// 检查设备是否在线
if (!deviceManager.isDeviceOnline(deviceCode)) {
console.log(`[MQTT分发器] 设备 ${deviceCode} 离线,忽略指令`);
return this.sendMqttResponse(mqttClient, uuid, {
code: 500,
message: '设备离线',
data: null
});
}
// 检查授权状态(对于需要授权的操作)
const authorizationService = require('../../services/authorization_service');
const authCheck = await authorizationService.checkAuthorization(deviceCode, 'sn_code');
if (!authCheck.is_authorized) {
console.log(`[MQTT分发器] 设备 ${deviceCode} 授权检查失败: ${authCheck.message}`);
return this.sendMqttResponse(mqttClient, uuid, {
code: 403,
message: authCheck.message,
data: null
});
}
// 获取对应的处理器
const handler = this.actionHandlers.get(action);
if (!handler) {
console.log(`[MQTT分发器] 未找到动作 ${action} 的处理器`);
return this.sendMqttResponse(mqttClient, uuid, {
code: 404,
message: `未找到动作 ${action} 的处理器`,
data: null
});
}
// 执行处理器
const result = await handler({
sn_code: deviceCode,
action,
data,
platform,
mqttClient,
uuid
});
// 发送响应
return this.sendMqttResponse(mqttClient, uuid, result);
} catch (error) {
console.error('[MQTT分发器] 分发指令时出错:', error);
try {
const message = JSON.parse(messageStr);
if (message.uuid) {
return this.sendMqttResponse(mqttClient, message.uuid, {
code: 500,
message: error.message,
data: null
});
}
} catch (e) {
console.error('[MQTT分发器] 解析消息失败');
}
}
}
/**
* 发送 MQTT 响应
*/
async sendMqttResponse(mqttClient, uuid, result) {
const response = {
uuid: uuid,
code: result.code || 200,
message: result.message || 'success',
data: result.data || null,
timestamp: Date.now()
};
try {
await mqttClient.publish('response', JSON.stringify(response));
console.log(`[MQTT分发器] 发送响应 - UUID: ${uuid}, Code: ${response.code}`);
} catch (error) {
console.error('[MQTT分发器] 发送响应失败:', error);
}
}
/**
* 创建成功响应
*/
static createSuccessResponse(data = null, message = 'success') {
return {
code: 200,
message: message,
data: data
};
}
/**
* 创建失败响应
*/
static createErrorResponse(message, code = 500) {
return {
code: code,
message: message,
data: null
};
}
/**
* 处理心跳消息
* @param {object|string} message - 心跳消息对象或JSON字符串
*/
async handleHeartbeat(message) {
try {
// 解析消息如果是字符串则解析JSON
let heartbeatData = message;
if (typeof message === 'string') {
heartbeatData = JSON.parse(message);
}
const { sn_code, timestamp, status, memory, platform_status, platform_login_status,clientId } = heartbeatData;
if (!sn_code) {
console.warn('[MQTT心跳] 心跳消息中未找到设备SN码');
return;
}
console.log(`[MQTT心跳] 收到设备 ${sn_code} 的心跳消息`);
const device_status = db.getModel('device_status');
// 检查设备是否存在
let device = await device_status.findByPk(sn_code);
const updateData = {
isOnline: true, // 收到心跳,设备在线
lastHeartbeatTime: timestamp ? new Date(timestamp) : new Date(),
missedHeartbeats: 0
};
// 更新内存信息
if (memory && memory.percent !== undefined) {
updateData.memoryUsage = memory.percent;
}
// 优先使用 platform_login_status新格式
const loginStatusData = platform_login_status ;
if (loginStatusData) {
let isLoggedIn = false;
let loggedInPlatform = null;
let loggedInUsername = null;
let loggedInUserId = null;
let loginTime = null;
// 判断是新格式还是旧格式
isLoggedIn = platform_login_status.login || false;
loggedInPlatform = platform_login_status.platform || null;
loggedInUsername = platform_login_status.username || '';
loggedInUserId = platform_login_status.user_id || null;
if (platform_login_status.timestamp) {
loginTime = new Date(platform_login_status.timestamp);
}
// 更新登录状态
const previousIsLoggedIn = device ? device.isLoggedIn : false;
updateData.isLoggedIn = isLoggedIn;
if (isLoggedIn) {
updateData.platform = loggedInPlatform;
updateData.accountName = loggedInUsername || '';
// 如果之前未登录,现在登录了,更新登录时间
if (!previousIsLoggedIn) {
updateData.loginTime = loginTime || updateData.lastHeartbeatTime;
}
console.log(`[MQTT心跳] 设备 ${sn_code} 已登录 - 平台: ${loggedInPlatform}, 用户: ${loggedInUsername}, ID: ${loggedInUserId}`);
} else {
// 如果之前已登录,现在未登录了,清空登录相关信息
if (previousIsLoggedIn) {
updateData.accountName = '';
// loginTime 保持不变,记录最后登录时间
}
console.log(`[MQTT心跳] 设备 ${sn_code} 未登录`);
}
}
// 更新或创建设备记录
if (device) {
await device_status.update(updateData, { where: { sn_code } });
console.log(`[MQTT心跳] 设备 ${sn_code} 状态已更新 - 在线: true, 登录: ${updateData.isLoggedIn}`);
}
else
{
logProxy.error('[MQTT心跳] 设备 ${sn_code} 不存在', { sn_code });
return;
}
// 记录心跳到设备管理器
await deviceManager.recordHeartbeat(sn_code, heartbeatData);
} catch (error) {
console.error('[MQTT心跳] 处理心跳消息失败:', error);
}
}
/**
* 处理响应消息
* @param {object|string} message - 响应消息对象或JSON字符串
*/
handleResponse(message) {
// 响应消息处理逻辑(目前为空,可根据需要扩展)
// 例如:记录响应日志、更新任务状态等
}
}
module.exports = MqttDispatcher;