305 lines
10 KiB
JavaScript
305 lines
10 KiB
JavaScript
const db = require('../dbProxy.js');
|
||
const deviceManager = require('../schedule/deviceManager.js');
|
||
|
||
/**
|
||
* MQTT 消息分发器
|
||
* 处理所有 MQTT 消息的分发和响应(包括请求、心跳、响应等)
|
||
*/
|
||
class MqttDispatcher {
|
||
constructor(components, mqttClient) {
|
||
this.taskQueue = components.taskQueue;
|
||
this.mqttClient = mqttClient;
|
||
this.actionHandlers = new Map();
|
||
this.subscribedTopics = new Set();
|
||
}
|
||
|
||
/**
|
||
* 启动 MQTT 分发器
|
||
*/
|
||
start() {
|
||
this.registerActionHandlers();
|
||
console.log('[MQTT分发器] 已启动');
|
||
}
|
||
|
||
/**
|
||
* 注册指令处理器
|
||
*/
|
||
registerActionHandlers() {
|
||
// 注册各种MQTT指令处理器
|
||
this.actionHandlers.set('manual_job', async (params) => {
|
||
return await this.handleManualJobRequest(params);
|
||
});
|
||
|
||
this.actionHandlers.set('get_status', async () => {
|
||
return this.getSystemStatus();
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 处理手动任务请求
|
||
*/
|
||
async handleManualJobRequest(params) {
|
||
const { sn_code, data } = params;
|
||
const { keyword, taskType } = data;
|
||
const config = require('../schedule/config.js');
|
||
|
||
try {
|
||
// addTask 内部会检查账号是否启用,这里直接调用即可
|
||
const taskId = await this.taskQueue.addTask(sn_code, {
|
||
taskType: taskType,
|
||
taskName: `手动任务 - ${keyword || taskType}`,
|
||
taskParams: data,
|
||
priority: config.getTaskPriority(taskType, { urgent: true })
|
||
});
|
||
return {
|
||
success: true,
|
||
taskId: taskId,
|
||
message: '任务已添加到队列'
|
||
};
|
||
} catch (error) {
|
||
return {
|
||
success: false,
|
||
error: error.message
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取系统状态(供调度管理器使用)
|
||
*/
|
||
getSystemStatus() {
|
||
return {
|
||
systemStats: deviceManager.getSystemStats(),
|
||
allDevices: deviceManager.getAllDevicesStatus(),
|
||
taskQueues: this.taskQueue.getAllDeviceStatus()
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 分发 MQTT 请求消息
|
||
*/
|
||
async dispatchMqttMessage(topic, messageStr, mqttClient) {
|
||
try {
|
||
const message = JSON.parse(messageStr);
|
||
const { action, data, uuid, platform, sn_code } = message;
|
||
|
||
const deviceCode = topic.replace('request_', '') || sn_code;
|
||
console.log(`[MQTT分发器] 收到指令 - 设备: ${deviceCode}, 动作: ${action}, UUID: ${uuid}`);
|
||
|
||
// 检查设备是否在线
|
||
if (!deviceManager.isDeviceOnline(deviceCode)) {
|
||
console.log(`[MQTT分发器] 设备 ${deviceCode} 离线,忽略指令`);
|
||
return this.sendMqttResponse(mqttClient, uuid, {
|
||
code: 500,
|
||
message: '设备离线',
|
||
data: null
|
||
});
|
||
}
|
||
|
||
// 获取对应的处理器
|
||
const handler = this.actionHandlers.get(action);
|
||
if (!handler) {
|
||
console.log(`[MQTT分发器] 未找到动作 ${action} 的处理器`);
|
||
return this.sendMqttResponse(mqttClient, uuid, {
|
||
code: 404,
|
||
message: `未找到动作 ${action} 的处理器`,
|
||
data: null
|
||
});
|
||
}
|
||
|
||
// 执行处理器
|
||
const result = await handler({
|
||
sn_code: deviceCode,
|
||
action,
|
||
data,
|
||
platform,
|
||
mqttClient,
|
||
uuid
|
||
});
|
||
|
||
// 发送响应
|
||
return this.sendMqttResponse(mqttClient, uuid, result);
|
||
|
||
} catch (error) {
|
||
console.error('[MQTT分发器] 分发指令时出错:', error);
|
||
|
||
try {
|
||
const message = JSON.parse(messageStr);
|
||
if (message.uuid) {
|
||
return this.sendMqttResponse(mqttClient, message.uuid, {
|
||
code: 500,
|
||
message: error.message,
|
||
data: null
|
||
});
|
||
}
|
||
} catch (e) {
|
||
console.error('[MQTT分发器] 解析消息失败');
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 发送 MQTT 响应
|
||
*/
|
||
async sendMqttResponse(mqttClient, uuid, result) {
|
||
const response = {
|
||
uuid: uuid,
|
||
code: result.code || 200,
|
||
message: result.message || 'success',
|
||
data: result.data || null,
|
||
timestamp: Date.now()
|
||
};
|
||
|
||
try {
|
||
await mqttClient.publish('response', JSON.stringify(response));
|
||
console.log(`[MQTT分发器] 发送响应 - UUID: ${uuid}, Code: ${response.code}`);
|
||
} catch (error) {
|
||
console.error('[MQTT分发器] 发送响应失败:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 创建成功响应
|
||
*/
|
||
static createSuccessResponse(data = null, message = 'success') {
|
||
return {
|
||
code: 200,
|
||
message: message,
|
||
data: data
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 创建失败响应
|
||
*/
|
||
static createErrorResponse(message, code = 500) {
|
||
return {
|
||
code: code,
|
||
message: message,
|
||
data: null
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 处理心跳消息
|
||
* @param {object|string} message - 心跳消息对象或JSON字符串
|
||
*/
|
||
async handleHeartbeat(message) {
|
||
try {
|
||
// 解析消息(如果是字符串,则解析JSON)
|
||
let heartbeatData = message;
|
||
if (typeof message === 'string') {
|
||
heartbeatData = JSON.parse(message);
|
||
}
|
||
|
||
const { sn_code, timestamp, status, memory, platform_status, platform_login_status,clientId } = heartbeatData;
|
||
|
||
if (!sn_code) {
|
||
console.warn('[MQTT心跳] 心跳消息中未找到设备SN码');
|
||
return;
|
||
}
|
||
|
||
console.log(`[MQTT心跳] 收到设备 ${sn_code} 的心跳消息`);
|
||
|
||
const device_status = db.getModel('device_status');
|
||
|
||
// 检查设备是否存在
|
||
let device = await device_status.findByPk(sn_code);
|
||
|
||
const updateData = {
|
||
isOnline: true, // 收到心跳,设备在线
|
||
lastHeartbeatTime: timestamp ? new Date(timestamp) : new Date(),
|
||
missedHeartbeats: 0
|
||
};
|
||
|
||
// 更新内存信息
|
||
if (memory && memory.percent !== undefined) {
|
||
updateData.memoryUsage = memory.percent;
|
||
}
|
||
// 优先使用 platform_login_status(新格式),
|
||
const loginStatusData = platform_login_status ;
|
||
|
||
if (loginStatusData) {
|
||
let isLoggedIn = false;
|
||
let loggedInPlatform = null;
|
||
let loggedInUsername = null;
|
||
let loggedInUserId = null;
|
||
let loginTime = null;
|
||
|
||
// 判断是新格式还是旧格式
|
||
isLoggedIn = platform_login_status.login || false;
|
||
loggedInPlatform = platform_login_status.platform || null;
|
||
loggedInUsername = platform_login_status.username || '';
|
||
loggedInUserId = platform_login_status.user_id || null;
|
||
|
||
|
||
if (platform_login_status.timestamp) {
|
||
loginTime = new Date(platform_login_status.timestamp);
|
||
}
|
||
|
||
// 更新登录状态
|
||
const previousIsLoggedIn = device ? device.isLoggedIn : false;
|
||
|
||
updateData.isLoggedIn = isLoggedIn;
|
||
|
||
if (isLoggedIn) {
|
||
updateData.platform = loggedInPlatform;
|
||
updateData.accountName = loggedInUsername || '';
|
||
|
||
// 如果之前未登录,现在登录了,更新登录时间
|
||
if (!previousIsLoggedIn) {
|
||
updateData.loginTime = loginTime || updateData.lastHeartbeatTime;
|
||
}
|
||
|
||
console.log(`[MQTT心跳] 设备 ${sn_code} 已登录 - 平台: ${loggedInPlatform}, 用户: ${loggedInUsername}, ID: ${loggedInUserId}`);
|
||
} else {
|
||
// 如果之前已登录,现在未登录了,清空登录相关信息
|
||
if (previousIsLoggedIn) {
|
||
updateData.accountName = '';
|
||
// loginTime 保持不变,记录最后登录时间
|
||
}
|
||
|
||
console.log(`[MQTT心跳] 设备 ${sn_code} 未登录`);
|
||
}
|
||
}
|
||
|
||
// 更新或创建设备记录
|
||
if (device) {
|
||
await device_status.update(updateData, { where: { sn_code } });
|
||
console.log(`[MQTT心跳] 设备 ${sn_code} 状态已更新 - 在线: true, 登录: ${updateData.isLoggedIn}`);
|
||
} else {
|
||
// 创建新设备记录
|
||
await device_status.create({
|
||
sn_code,
|
||
device_id: clientId,
|
||
deviceName: `设备_${sn_code}`,
|
||
deviceType: 'node_mqtt_client',
|
||
...updateData,
|
||
isRunning: false,
|
||
taskStatus: 'idle',
|
||
healthStatus: 'unknown',
|
||
healthScore: 0
|
||
});
|
||
console.log(`[MQTT心跳] 设备 ${sn_code} 记录已创建 - 在线: true, 登录: ${updateData.isLoggedIn}`);
|
||
}
|
||
|
||
// 记录心跳到设备管理器
|
||
await deviceManager.recordHeartbeat(sn_code, heartbeatData);
|
||
} catch (error) {
|
||
console.error('[MQTT心跳] 处理心跳消息失败:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理响应消息
|
||
* @param {object|string} message - 响应消息对象或JSON字符串
|
||
*/
|
||
handleResponse(message) {
|
||
// 响应消息处理逻辑(目前为空,可根据需要扩展)
|
||
// 例如:记录响应日志、更新任务状态等
|
||
}
|
||
}
|
||
|
||
module.exports = MqttDispatcher;
|
||
|