1
This commit is contained in:
176
api/middleware/mqtt/mqttClient.js
Normal file
176
api/middleware/mqtt/mqttClient.js
Normal file
@@ -0,0 +1,176 @@
|
||||
const mqtt = require('mqtt')
|
||||
const { v4: uuidv4 } = require('uuid'); // 顶部添加
|
||||
const Framework = require('../../../framework/node-core-framework');
|
||||
const logs = require('../logProxy');
|
||||
// 获取logsService
|
||||
class MqttSyncClient {
|
||||
constructor(brokerUrl, options = {}) {
|
||||
this.client = mqtt.connect(brokerUrl, options)
|
||||
this.isConnected = false
|
||||
|
||||
this.messageListeners = []
|
||||
|
||||
this.client.on('connect', () => {
|
||||
this.isConnected = true
|
||||
|
||||
console.log('MQTT 服务端已连接')
|
||||
})
|
||||
|
||||
this.client.on('message', (topic, message) => {
|
||||
|
||||
message = JSON.parse(message.toString())
|
||||
|
||||
console.log('MQTT 收到消息', topic, message)
|
||||
|
||||
this.messageListeners.forEach(listener => listener(topic, message))
|
||||
|
||||
})
|
||||
|
||||
this.client.on('error', (err) => {
|
||||
console.warn('[MQTT] Error:', err.message)
|
||||
})
|
||||
}
|
||||
|
||||
waitForConnect(timeout = 5000) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.isConnected) return resolve()
|
||||
const timer = setTimeout(() => {
|
||||
reject(new Error('MQTT connect timeout'))
|
||||
}, timeout)
|
||||
const check = () => {
|
||||
if (this.isConnected) {
|
||||
clearTimeout(timer)
|
||||
resolve()
|
||||
} else {
|
||||
setTimeout(check, 100)
|
||||
}
|
||||
}
|
||||
check()
|
||||
})
|
||||
}
|
||||
|
||||
publish(topic, message, options = {}) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.client.publish(topic, message, options, (err) => {
|
||||
if (err) reject(err)
|
||||
else resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
subscribe(topic, callback = null, options = {}) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.client.subscribe(topic, options, (err, granted) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
// 如果提供了回调函数,添加到消息监听器
|
||||
if (callback && typeof callback === 'function') {
|
||||
const messageHandler = (responseTopic, message) => {
|
||||
if (responseTopic !== topic) return
|
||||
try {
|
||||
callback(topic, message)
|
||||
} catch (error) {
|
||||
console.warn(`[MQTT] 处理订阅消息失败: ${error.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加到消息监听器
|
||||
this.addMessageListener(messageHandler)
|
||||
|
||||
// 在订阅成功后返回订阅信息和消息处理器
|
||||
resolve({ granted, messageHandler, topic })
|
||||
} else {
|
||||
resolve(granted)
|
||||
}
|
||||
}
|
||||
1
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布指令并等待某个主题返回(模拟同步),通过uuid确定唯一项
|
||||
* @param {*} requestTopic
|
||||
* @param {*} requestMessage
|
||||
* @param {*} responseTopic
|
||||
* @param {*} timeout
|
||||
*/
|
||||
async publishAndWait(sn_code, requestMessage, timeout = 60 * 3 * 1000) {
|
||||
return new Promise((resolve, reject) => {
|
||||
console.log(`[MQTT指令] 发送到主题: request_${sn_code}`, requestMessage);
|
||||
|
||||
|
||||
const uuid = uuidv4();
|
||||
|
||||
// 将uuid加入消息体,建议使用JSON格式
|
||||
const msgObj = typeof requestMessage === 'object'
|
||||
? { ...requestMessage, uuid }
|
||||
: { data: requestMessage, uuid };
|
||||
|
||||
|
||||
const sendMsg = JSON.stringify(msgObj);
|
||||
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
this.removeMessageListener(onMessage);
|
||||
reject(new Error('Timeout waiting for response'));
|
||||
}, timeout);
|
||||
|
||||
const onMessage = (topic, message) => {
|
||||
let { uuid: responseUuid } = message
|
||||
if (topic === 'response' && responseUuid === uuid) {
|
||||
clearTimeout(timer);
|
||||
this.removeMessageListener(onMessage);
|
||||
resolve(message)
|
||||
}
|
||||
}
|
||||
|
||||
this.addMessageListener(onMessage)
|
||||
|
||||
this.publish(`request_${sn_code}`, sendMsg).catch(err => {
|
||||
clearTimeout(timer);
|
||||
this.removeMessageListener(onMessage);
|
||||
reject(err);
|
||||
});
|
||||
|
||||
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
addMessageListener(fn) {
|
||||
this.messageListeners.push(fn)
|
||||
}
|
||||
|
||||
removeMessageListener(fn) {
|
||||
this.messageListeners = this.messageListeners.filter(f => f !== fn)
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消订阅主题
|
||||
* @param {string} topic 要取消订阅的主题
|
||||
* @param {object} subscriptionInfo 订阅时返回的订阅信息
|
||||
*/
|
||||
async unsubscribe(topic, subscriptionInfo = null) {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.client.unsubscribe(topic, (err) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
// 如果提供了订阅信息,移除对应的消息处理器
|
||||
if (subscriptionInfo && subscriptionInfo.messageHandler) {
|
||||
this.removeMessageListener(subscriptionInfo.messageHandler)
|
||||
}
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
end(force = false) {
|
||||
this.client.end(force)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = MqttSyncClient
|
||||
302
api/middleware/mqtt/mqttDispatcher.js
Normal file
302
api/middleware/mqtt/mqttDispatcher.js
Normal file
@@ -0,0 +1,302 @@
|
||||
const db = require('../dbProxy.js');
|
||||
const deviceManager = require('../schedule/deviceManager.js');
|
||||
|
||||
/**
|
||||
* MQTT 消息分发器
|
||||
* 处理所有 MQTT 消息的分发和响应(包括请求、心跳、响应等)
|
||||
*/
|
||||
class MqttDispatcher {
|
||||
constructor(components, mqttClient) {
|
||||
this.taskQueue = components.taskQueue;
|
||||
this.mqttClient = mqttClient;
|
||||
this.actionHandlers = new Map();
|
||||
this.subscribedTopics = new Set();
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动 MQTT 分发器
|
||||
*/
|
||||
start() {
|
||||
this.registerActionHandlers();
|
||||
console.log('[MQTT分发器] 已启动');
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册指令处理器
|
||||
*/
|
||||
registerActionHandlers() {
|
||||
// 注册各种MQTT指令处理器
|
||||
this.actionHandlers.set('manual_job', async (params) => {
|
||||
return await this.handleManualJobRequest(params);
|
||||
});
|
||||
|
||||
this.actionHandlers.set('get_status', async () => {
|
||||
return this.getSystemStatus();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理手动任务请求
|
||||
*/
|
||||
async handleManualJobRequest(params) {
|
||||
const { sn_code, data } = params;
|
||||
const { keyword, taskType } = data;
|
||||
const config = require('../schedule/config.js');
|
||||
|
||||
try {
|
||||
// addTask 内部会检查账号是否启用,这里直接调用即可
|
||||
const taskId = await this.taskQueue.addTask(sn_code, {
|
||||
taskType: taskType,
|
||||
taskName: `手动任务 - ${keyword || taskType}`,
|
||||
taskParams: data,
|
||||
priority: config.getTaskPriority(taskType, { urgent: true })
|
||||
});
|
||||
return {
|
||||
success: true,
|
||||
taskId: taskId,
|
||||
message: '任务已添加到队列'
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
error: error.message
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取系统状态(供调度管理器使用)
|
||||
*/
|
||||
getSystemStatus() {
|
||||
return {
|
||||
systemStats: deviceManager.getSystemStats(),
|
||||
allDevices: deviceManager.getAllDevicesStatus(),
|
||||
taskQueues: this.taskQueue.getAllDeviceStatus()
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 分发 MQTT 请求消息
|
||||
*/
|
||||
async dispatchMqttMessage(topic, messageStr, mqttClient) {
|
||||
try {
|
||||
const message = JSON.parse(messageStr);
|
||||
const { action, data, uuid, platform, sn_code } = message;
|
||||
|
||||
const deviceCode = topic.replace('request_', '') || sn_code;
|
||||
console.log(`[MQTT分发器] 收到指令 - 设备: ${deviceCode}, 动作: ${action}, UUID: ${uuid}`);
|
||||
|
||||
// 检查设备是否在线
|
||||
if (!deviceManager.isDeviceOnline(deviceCode)) {
|
||||
console.log(`[MQTT分发器] 设备 ${deviceCode} 离线,忽略指令`);
|
||||
return this.sendMqttResponse(mqttClient, uuid, {
|
||||
code: 500,
|
||||
message: '设备离线',
|
||||
data: null
|
||||
});
|
||||
}
|
||||
|
||||
// 获取对应的处理器
|
||||
const handler = this.actionHandlers.get(action);
|
||||
if (!handler) {
|
||||
console.log(`[MQTT分发器] 未找到动作 ${action} 的处理器`);
|
||||
return this.sendMqttResponse(mqttClient, uuid, {
|
||||
code: 404,
|
||||
message: `未找到动作 ${action} 的处理器`,
|
||||
data: null
|
||||
});
|
||||
}
|
||||
|
||||
// 执行处理器
|
||||
const result = await handler({
|
||||
sn_code: deviceCode,
|
||||
action,
|
||||
data,
|
||||
platform,
|
||||
mqttClient,
|
||||
uuid
|
||||
});
|
||||
|
||||
// 发送响应
|
||||
return this.sendMqttResponse(mqttClient, uuid, result);
|
||||
|
||||
} catch (error) {
|
||||
console.error('[MQTT分发器] 分发指令时出错:', error);
|
||||
|
||||
try {
|
||||
const message = JSON.parse(messageStr);
|
||||
if (message.uuid) {
|
||||
return this.sendMqttResponse(mqttClient, message.uuid, {
|
||||
code: 500,
|
||||
message: error.message,
|
||||
data: null
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[MQTT分发器] 解析消息失败');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 MQTT 响应
|
||||
*/
|
||||
async sendMqttResponse(mqttClient, uuid, result) {
|
||||
const response = {
|
||||
uuid: uuid,
|
||||
code: result.code || 200,
|
||||
message: result.message || 'success',
|
||||
data: result.data || null,
|
||||
timestamp: Date.now()
|
||||
};
|
||||
|
||||
try {
|
||||
await mqttClient.publish('response', JSON.stringify(response));
|
||||
console.log(`[MQTT分发器] 发送响应 - UUID: ${uuid}, Code: ${response.code}`);
|
||||
} catch (error) {
|
||||
console.error('[MQTT分发器] 发送响应失败:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建成功响应
|
||||
*/
|
||||
static createSuccessResponse(data = null, message = 'success') {
|
||||
return {
|
||||
code: 200,
|
||||
message: message,
|
||||
data: data
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建失败响应
|
||||
*/
|
||||
static createErrorResponse(message, code = 500) {
|
||||
return {
|
||||
code: code,
|
||||
message: message,
|
||||
data: null
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理心跳消息
|
||||
* @param {object|string} message - 心跳消息对象或JSON字符串
|
||||
*/
|
||||
async handleHeartbeat(message) {
|
||||
try {
|
||||
// 解析消息(如果是字符串,则解析JSON)
|
||||
let heartbeatData = message;
|
||||
if (typeof message === 'string') {
|
||||
heartbeatData = JSON.parse(message);
|
||||
}
|
||||
|
||||
const { sn_code, timestamp, status, memory, platform_status, platform_login_status } = heartbeatData;
|
||||
|
||||
if (!sn_code) {
|
||||
console.warn('[MQTT心跳] 心跳消息中未找到设备SN码');
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[MQTT心跳] 收到设备 ${sn_code} 的心跳消息`);
|
||||
|
||||
const device_status = db.getModel('device_status');
|
||||
|
||||
// 检查设备是否存在
|
||||
let device = await device_status.findByPk(sn_code);
|
||||
|
||||
const updateData = {
|
||||
isOnline: true, // 收到心跳,设备在线
|
||||
lastHeartbeatTime: timestamp ? new Date(timestamp) : new Date(),
|
||||
missedHeartbeats: 0
|
||||
};
|
||||
|
||||
// 更新内存信息
|
||||
if (memory && memory.percent !== undefined) {
|
||||
updateData.memoryUsage = memory.percent;
|
||||
}
|
||||
// 优先使用 platform_login_status(新格式),
|
||||
const loginStatusData = platform_login_status ;
|
||||
|
||||
if (loginStatusData) {
|
||||
let isLoggedIn = false;
|
||||
let loggedInPlatform = null;
|
||||
let loggedInUsername = null;
|
||||
let loggedInUserId = null;
|
||||
let loginTime = null;
|
||||
|
||||
// 判断是新格式还是旧格式
|
||||
isLoggedIn = platform_login_status.login || false;
|
||||
loggedInPlatform = platform_login_status.platform || null;
|
||||
loggedInUsername = platform_login_status.username || '';
|
||||
loggedInUserId = platform_login_status.user_id || null;
|
||||
|
||||
if (platform_login_status.timestamp) {
|
||||
loginTime = new Date(platform_login_status.timestamp);
|
||||
}
|
||||
|
||||
// 更新登录状态
|
||||
const previousIsLoggedIn = device ? device.isLoggedIn : false;
|
||||
|
||||
updateData.isLoggedIn = isLoggedIn;
|
||||
|
||||
if (isLoggedIn) {
|
||||
updateData.platform = loggedInPlatform;
|
||||
updateData.accountName = loggedInUsername || '';
|
||||
|
||||
// 如果之前未登录,现在登录了,更新登录时间
|
||||
if (!previousIsLoggedIn) {
|
||||
updateData.loginTime = loginTime || updateData.lastHeartbeatTime;
|
||||
}
|
||||
|
||||
console.log(`[MQTT心跳] 设备 ${sn_code} 已登录 - 平台: ${loggedInPlatform}, 用户: ${loggedInUsername}, ID: ${loggedInUserId}`);
|
||||
} else {
|
||||
// 如果之前已登录,现在未登录了,清空登录相关信息
|
||||
if (previousIsLoggedIn) {
|
||||
updateData.accountName = '';
|
||||
// loginTime 保持不变,记录最后登录时间
|
||||
}
|
||||
|
||||
console.log(`[MQTT心跳] 设备 ${sn_code} 未登录`);
|
||||
}
|
||||
}
|
||||
|
||||
// 更新或创建设备记录
|
||||
if (device) {
|
||||
await device_status.update(updateData, { where: { sn_code } });
|
||||
console.log(`[MQTT心跳] 设备 ${sn_code} 状态已更新 - 在线: true, 登录: ${updateData.isLoggedIn}`);
|
||||
} else {
|
||||
// 创建新设备记录
|
||||
await device_status.create({
|
||||
sn_code,
|
||||
deviceName: `设备_${sn_code}`,
|
||||
deviceType: 'node_mqtt_client',
|
||||
...updateData,
|
||||
isRunning: false,
|
||||
taskStatus: 'idle',
|
||||
healthStatus: 'unknown',
|
||||
healthScore: 0
|
||||
});
|
||||
console.log(`[MQTT心跳] 设备 ${sn_code} 记录已创建 - 在线: true, 登录: ${updateData.isLoggedIn}`);
|
||||
}
|
||||
|
||||
// 记录心跳到设备管理器
|
||||
await deviceManager.recordHeartbeat(sn_code, heartbeatData);
|
||||
} catch (error) {
|
||||
console.error('[MQTT心跳] 处理心跳消息失败:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理响应消息
|
||||
* @param {object|string} message - 响应消息对象或JSON字符串
|
||||
*/
|
||||
handleResponse(message) {
|
||||
// 响应消息处理逻辑(目前为空,可根据需要扩展)
|
||||
// 例如:记录响应日志、更新任务状态等
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = MqttDispatcher;
|
||||
|
||||
127
api/middleware/mqtt/mqttManager.js
Normal file
127
api/middleware/mqtt/mqttManager.js
Normal file
@@ -0,0 +1,127 @@
|
||||
const MqttSyncClient = require('./mqttClient');
|
||||
const Framework = require('../../../framework/node-core-framework');
|
||||
const logs = require('../logProxy');
|
||||
// action.js 已合并到 mqttDispatcher.js,不再需要单独引入
|
||||
|
||||
/**
|
||||
* MQTT管理器 - 单例模式
|
||||
* 负责管理MQTT连接,确保全局只有一个MQTT客户端实例
|
||||
*/
|
||||
class MqttManager {
|
||||
constructor() {
|
||||
this.client = null;
|
||||
this.isInitialized = false;
|
||||
this.config = {
|
||||
brokerUrl: 'mqtt://192.144.167.231:1883', // MQTT Broker地址
|
||||
options: {
|
||||
clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`,
|
||||
clean: true,
|
||||
connectTimeout: 5000,
|
||||
reconnectPeriod: 5000, // 自动重连间隔
|
||||
keepalive: 10
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取MQTT客户端实例(单例模式)
|
||||
* @param {object} config - 可选的配置覆盖
|
||||
* @returns {Promise<MqttSyncClient>} MQTT客户端实例
|
||||
*/
|
||||
async getInstance(config = {}) {
|
||||
if (this.client && this.isInitialized) {
|
||||
console.log('[MQTT管理器] 返回已存在的MQTT客户端实例');
|
||||
return this.client;
|
||||
}
|
||||
|
||||
// 合并配置
|
||||
if (config.brokerUrl) {
|
||||
this.config.brokerUrl = config.brokerUrl;
|
||||
}
|
||||
if (config.options) {
|
||||
this.config.options = { ...this.config.options, ...config.options };
|
||||
}
|
||||
|
||||
try {
|
||||
console.log('[MQTT管理器] 创建新的MQTT客户端实例');
|
||||
console.log(`[MQTT管理器] Broker地址: ${this.config.brokerUrl}`);
|
||||
|
||||
// 创建MQTT客户端
|
||||
this.client = new MqttSyncClient(this.config.brokerUrl, this.config.options);
|
||||
|
||||
// 等待连接成功
|
||||
await this.client.waitForConnect(10000);
|
||||
|
||||
// 注意:心跳和响应的订阅已移到 mqttDispatcher 中处理
|
||||
// 这里不再直接订阅,由调度系统通过 mqttDispatcher 统一管理
|
||||
console.log('[MQTT管理器] MQTT客户端连接成功,等待分发器初始化订阅');
|
||||
|
||||
this.isInitialized = true;
|
||||
console.log('[MQTT管理器] MQTT客户端初始化成功');
|
||||
|
||||
return this.client;
|
||||
} catch (error) {
|
||||
console.error('[MQTT管理器] MQTT客户端初始化失败:', error);
|
||||
this.client = null;
|
||||
this.isInitialized = false;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 重置MQTT客户端(用于重新连接)
|
||||
*/
|
||||
async reset() {
|
||||
if (this.client) {
|
||||
try {
|
||||
console.log('[MQTT管理器] 关闭现有MQTT连接');
|
||||
this.client.end(true);
|
||||
} catch (error) {
|
||||
console.error('[MQTT管理器] 关闭MQTT连接时出错:', error);
|
||||
}
|
||||
}
|
||||
|
||||
this.client = null;
|
||||
this.isInitialized = false;
|
||||
console.log('[MQTT管理器] MQTT客户端已重置');
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查MQTT客户端是否已初始化
|
||||
* @returns {boolean}
|
||||
*/
|
||||
isReady() {
|
||||
return this.isInitialized && this.client && this.client.isConnected;
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新配置
|
||||
* @param {object} config - 新的配置
|
||||
*/
|
||||
updateConfig(config) {
|
||||
if (this.isInitialized) {
|
||||
console.warn('[MQTT管理器] MQTT客户端已初始化,配置更新需要重置连接');
|
||||
}
|
||||
|
||||
if (config.brokerUrl) {
|
||||
this.config.brokerUrl = config.brokerUrl;
|
||||
}
|
||||
if (config.options) {
|
||||
this.config.options = { ...this.config.options, ...config.options };
|
||||
}
|
||||
|
||||
console.log('[MQTT管理器] 配置已更新:', this.config);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前配置
|
||||
* @returns {object}
|
||||
*/
|
||||
getConfig() {
|
||||
return { ...this.config };
|
||||
}
|
||||
}
|
||||
|
||||
// 导出单例
|
||||
const mqttManager = new MqttManager();
|
||||
module.exports = mqttManager;
|
||||
Reference in New Issue
Block a user