Files
autoAiWorkSys/api/middleware/mqtt/mqttClient.js
张成 52876229a8 1
2025-12-30 17:06:14 +08:00

250 lines
8.2 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const mqtt = require('mqtt')
const { v4: uuidv4 } = require('uuid'); // 顶部添加
const Framework = require('../../../framework/node-core-framework');
const logs = require('../logProxy');
// 获取logsService
class MqttSyncClient {
constructor(brokerUrl, options = {}) {
this.client = mqtt.connect(brokerUrl, options)
this.isConnected = false
// 使用 Map 结构优化消息监听器,按 topic 分组
this.messageListeners = new Map(); // Map<topic, Set<listener>>
this.globalListeners = new Set(); // 全局监听器(监听所有 topic
this.client.on('connect', () => {
this.isConnected = true
console.log('MQTT 服务端已连接')
})
this.client.on('message', (topic, message) => {
let messageObj;
try {
messageObj = JSON.parse(message.toString());
} catch (error) {
console.warn('[MQTT] 消息解析失败:', error.message);
return;
}
// 1. 触发该 topic 的专用监听器
const topicListeners = this.messageListeners.get(topic);
if (topicListeners && topicListeners.size > 0) {
topicListeners.forEach(listener => {
try {
listener(topic, messageObj);
} catch (error) {
console.error('[MQTT] Topic监听器执行失败:', error.message);
}
});
}
// 2. 触发全局监听器
if (this.globalListeners.size > 0) {
this.globalListeners.forEach(listener => {
try {
listener(topic, messageObj);
} catch (error) {
console.error('[MQTT] 全局监听器执行失败:', error.message);
}
});
}
})
this.client.on('error', (err) => {
console.warn('[MQTT] Error:', err.message)
})
}
waitForConnect(timeout = 5000) {
return new Promise((resolve, reject) => {
if (this.isConnected) return resolve()
const timer = setTimeout(() => {
reject(new Error('MQTT connect timeout'))
}, timeout)
const check = () => {
if (this.isConnected) {
clearTimeout(timer)
resolve()
} else {
setTimeout(check, 100)
}
}
check()
})
}
publish(topic, message, options = {}) {
return new Promise((resolve, reject) => {
this.client.publish(topic, message, options, (err) => {
if (err) reject(err)
else resolve()
})
})
}
subscribe(topic, callback = null, options = {}) {
return new Promise((resolve, reject) => {
this.client.subscribe(topic, options, (err, granted) => {
if (err) {
reject(err)
} else {
// 如果提供了回调函数,添加到消息监听器
if (callback && typeof callback === 'function') {
const messageHandler = (responseTopic, message) => {
if (responseTopic !== topic) return
try {
callback(topic, message)
} catch (error) {
console.warn(`[MQTT] 处理订阅消息失败: ${error.message}`)
}
}
// 添加到消息监听器
this.addMessageListener(messageHandler)
// 在订阅成功后返回订阅信息和消息处理器
resolve({ granted, messageHandler, topic })
} else {
resolve(granted)
}
}
1
})
})
}
/**
* 发布指令并等待某个主题返回模拟同步通过uuid确定唯一项
* @param {*} requestTopic
* @param {*} requestMessage
* @param {*} responseTopic
* @param {*} timeout
*/
async publishAndWait(sn_code, requestMessage, timeout = 60 * 3 * 1000) {
return new Promise((resolve, reject) => {
console.log(`[MQTT指令] 发送到主题: request_${sn_code}`, requestMessage);
const uuid = uuidv4();
// 将uuid加入消息体建议使用JSON格式
const msgObj = typeof requestMessage === 'object'
? { ...requestMessage, uuid }
: { data: requestMessage, uuid };
const sendMsg = JSON.stringify(msgObj);
const timer = setTimeout(() => {
this.removeMessageListener(onMessage);
reject(new Error('Timeout waiting for response'));
}, timeout);
const onMessage = (topic, message) => {
let { uuid: responseUuid } = message
if (topic === 'response' && responseUuid === uuid) {
clearTimeout(timer);
this.removeMessageListener(onMessage);
resolve(message)
}
}
this.addMessageListener(onMessage)
this.publish(`request_${sn_code}`, sendMsg).catch(err => {
clearTimeout(timer);
this.removeMessageListener(onMessage);
reject(err);
});
});
}
/**
* 添加消息监听器
* @param {Function} fn - 监听器函数
* @param {string} topic - 可选,指定监听的 topic不指定则监听所有
*/
addMessageListener(fn, topic = null) {
if (typeof fn !== 'function') {
throw new Error('监听器必须是函数');
}
if (topic) {
// 添加到特定 topic 的监听器
if (!this.messageListeners.has(topic)) {
this.messageListeners.set(topic, new Set());
}
this.messageListeners.get(topic).add(fn);
} else {
// 添加到全局监听器
this.globalListeners.add(fn);
}
}
/**
* 移除消息监听器
* @param {Function} fn - 监听器函数
* @param {string} topic - 可选,指定从哪个 topic 移除
*/
removeMessageListener(fn, topic = null) {
if (topic) {
// 从特定 topic 移除
const topicListeners = this.messageListeners.get(topic);
if (topicListeners) {
topicListeners.delete(fn);
// 如果该 topic 没有监听器了,删除整个 Set
if (topicListeners.size === 0) {
this.messageListeners.delete(topic);
}
}
} else {
// 从全局监听器移除
this.globalListeners.delete(fn);
// 也尝试从所有 topic 中移除(兼容旧代码)
for (const [topicKey, listeners] of this.messageListeners.entries()) {
listeners.delete(fn);
if (listeners.size === 0) {
this.messageListeners.delete(topicKey);
}
}
}
}
/**
* 取消订阅主题
* @param {string} topic 要取消订阅的主题
* @param {object} subscriptionInfo 订阅时返回的订阅信息
*/
async unsubscribe(topic, subscriptionInfo = null) {
return new Promise((resolve, reject) => {
this.client.unsubscribe(topic, (err) => {
if (err) {
reject(err)
} else {
// 如果提供了订阅信息,移除对应的消息处理器
if (subscriptionInfo && subscriptionInfo.messageHandler) {
this.removeMessageListener(subscriptionInfo.messageHandler)
}
resolve()
}
})
})
}
end(force = false) {
this.client.end(force)
}
}
module.exports = MqttSyncClient