Files
autoAiWorkSys/api/middleware/mqtt/mqttClient.js
张成 5d7444cd65 1
2025-11-24 13:23:42 +08:00

177 lines
5.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const mqtt = require('mqtt')
const { v4: uuidv4 } = require('uuid'); // 顶部添加
const Framework = require('../../../framework/node-core-framework');
const logs = require('../logProxy');
// 获取logsService
class MqttSyncClient {
constructor(brokerUrl, options = {}) {
this.client = mqtt.connect(brokerUrl, options)
this.isConnected = false
this.messageListeners = []
this.client.on('connect', () => {
this.isConnected = true
console.log('MQTT 服务端已连接')
})
this.client.on('message', (topic, message) => {
message = JSON.parse(message.toString())
console.log('MQTT 收到消息', topic, message)
this.messageListeners.forEach(listener => listener(topic, message))
})
this.client.on('error', (err) => {
console.warn('[MQTT] Error:', err.message)
})
}
waitForConnect(timeout = 5000) {
return new Promise((resolve, reject) => {
if (this.isConnected) return resolve()
const timer = setTimeout(() => {
reject(new Error('MQTT connect timeout'))
}, timeout)
const check = () => {
if (this.isConnected) {
clearTimeout(timer)
resolve()
} else {
setTimeout(check, 100)
}
}
check()
})
}
publish(topic, message, options = {}) {
return new Promise((resolve, reject) => {
this.client.publish(topic, message, options, (err) => {
if (err) reject(err)
else resolve()
})
})
}
subscribe(topic, callback = null, options = {}) {
return new Promise((resolve, reject) => {
this.client.subscribe(topic, options, (err, granted) => {
if (err) {
reject(err)
} else {
// 如果提供了回调函数,添加到消息监听器
if (callback && typeof callback === 'function') {
const messageHandler = (responseTopic, message) => {
if (responseTopic !== topic) return
try {
callback(topic, message)
} catch (error) {
console.warn(`[MQTT] 处理订阅消息失败: ${error.message}`)
}
}
// 添加到消息监听器
this.addMessageListener(messageHandler)
// 在订阅成功后返回订阅信息和消息处理器
resolve({ granted, messageHandler, topic })
} else {
resolve(granted)
}
}
1
})
})
}
/**
* 发布指令并等待某个主题返回模拟同步通过uuid确定唯一项
* @param {*} requestTopic
* @param {*} requestMessage
* @param {*} responseTopic
* @param {*} timeout
*/
async publishAndWait(sn_code, requestMessage, timeout = 60 * 3 * 1000) {
return new Promise((resolve, reject) => {
console.log(`[MQTT指令] 发送到主题: request_${sn_code}`, requestMessage);
const uuid = uuidv4();
// 将uuid加入消息体建议使用JSON格式
const msgObj = typeof requestMessage === 'object'
? { ...requestMessage, uuid }
: { data: requestMessage, uuid };
const sendMsg = JSON.stringify(msgObj);
const timer = setTimeout(() => {
this.removeMessageListener(onMessage);
reject(new Error('Timeout waiting for response'));
}, timeout);
const onMessage = (topic, message) => {
let { uuid: responseUuid } = message
if (topic === 'response' && responseUuid === uuid) {
clearTimeout(timer);
this.removeMessageListener(onMessage);
resolve(message)
}
}
this.addMessageListener(onMessage)
this.publish(`request_${sn_code}`, sendMsg).catch(err => {
clearTimeout(timer);
this.removeMessageListener(onMessage);
reject(err);
});
});
}
addMessageListener(fn) {
this.messageListeners.push(fn)
}
removeMessageListener(fn) {
this.messageListeners = this.messageListeners.filter(f => f !== fn)
}
/**
* 取消订阅主题
* @param {string} topic 要取消订阅的主题
* @param {object} subscriptionInfo 订阅时返回的订阅信息
*/
async unsubscribe(topic, subscriptionInfo = null) {
return new Promise((resolve, reject) => {
this.client.unsubscribe(topic, (err) => {
if (err) {
reject(err)
} else {
// 如果提供了订阅信息,移除对应的消息处理器
if (subscriptionInfo && subscriptionInfo.messageHandler) {
this.removeMessageListener(subscriptionInfo.messageHandler)
}
resolve()
}
})
})
}
end(force = false) {
this.client.end(force)
}
}
module.exports = MqttSyncClient