Files
autoAiWorkSys/api/middleware/mqtt/mqttManager.js
张成 5d7444cd65 1
2025-11-24 13:23:42 +08:00

128 lines
3.9 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const MqttSyncClient = require('./mqttClient');
const Framework = require('../../../framework/node-core-framework');
const logs = require('../logProxy');
// action.js 已合并到 mqttDispatcher.js不再需要单独引入
/**
* MQTT管理器 - 单例模式
* 负责管理MQTT连接确保全局只有一个MQTT客户端实例
*/
class MqttManager {
constructor() {
this.client = null;
this.isInitialized = false;
this.config = {
brokerUrl: 'mqtt://192.144.167.231:1883', // MQTT Broker地址
options: {
clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`,
clean: true,
connectTimeout: 5000,
reconnectPeriod: 5000, // 自动重连间隔
keepalive: 10
}
};
}
/**
* 获取MQTT客户端实例单例模式
* @param {object} config - 可选的配置覆盖
* @returns {Promise<MqttSyncClient>} MQTT客户端实例
*/
async getInstance(config = {}) {
if (this.client && this.isInitialized) {
console.log('[MQTT管理器] 返回已存在的MQTT客户端实例');
return this.client;
}
// 合并配置
if (config.brokerUrl) {
this.config.brokerUrl = config.brokerUrl;
}
if (config.options) {
this.config.options = { ...this.config.options, ...config.options };
}
try {
console.log('[MQTT管理器] 创建新的MQTT客户端实例');
console.log(`[MQTT管理器] Broker地址: ${this.config.brokerUrl}`);
// 创建MQTT客户端
this.client = new MqttSyncClient(this.config.brokerUrl, this.config.options);
// 等待连接成功
await this.client.waitForConnect(10000);
// 注意:心跳和响应的订阅已移到 mqttDispatcher 中处理
// 这里不再直接订阅,由调度系统通过 mqttDispatcher 统一管理
console.log('[MQTT管理器] MQTT客户端连接成功等待分发器初始化订阅');
this.isInitialized = true;
console.log('[MQTT管理器] MQTT客户端初始化成功');
return this.client;
} catch (error) {
console.error('[MQTT管理器] MQTT客户端初始化失败:', error);
this.client = null;
this.isInitialized = false;
throw error;
}
}
/**
* 重置MQTT客户端用于重新连接
*/
async reset() {
if (this.client) {
try {
console.log('[MQTT管理器] 关闭现有MQTT连接');
this.client.end(true);
} catch (error) {
console.error('[MQTT管理器] 关闭MQTT连接时出错:', error);
}
}
this.client = null;
this.isInitialized = false;
console.log('[MQTT管理器] MQTT客户端已重置');
}
/**
* 检查MQTT客户端是否已初始化
* @returns {boolean}
*/
isReady() {
return this.isInitialized && this.client && this.client.isConnected;
}
/**
* 更新配置
* @param {object} config - 新的配置
*/
updateConfig(config) {
if (this.isInitialized) {
console.warn('[MQTT管理器] MQTT客户端已初始化配置更新需要重置连接');
}
if (config.brokerUrl) {
this.config.brokerUrl = config.brokerUrl;
}
if (config.options) {
this.config.options = { ...this.config.options, ...config.options };
}
console.log('[MQTT管理器] 配置已更新:', this.config);
}
/**
* 获取当前配置
* @returns {object}
*/
getConfig() {
return { ...this.config };
}
}
// 导出单例
const mqttManager = new MqttManager();
module.exports = mqttManager;