128 lines
3.9 KiB
JavaScript
128 lines
3.9 KiB
JavaScript
const MqttSyncClient = require('./mqttClient');
|
||
const Framework = require('../../../framework/node-core-framework');
|
||
const logs = require('../logProxy');
|
||
// action.js 已合并到 mqttDispatcher.js,不再需要单独引入
|
||
|
||
/**
|
||
* MQTT管理器 - 单例模式
|
||
* 负责管理MQTT连接,确保全局只有一个MQTT客户端实例
|
||
*/
|
||
class MqttManager {
|
||
constructor() {
|
||
this.client = null;
|
||
this.isInitialized = false;
|
||
this.config = {
|
||
brokerUrl: 'mqtt://192.144.167.231:1883', // MQTT Broker地址
|
||
options: {
|
||
clientId: `mqtt_server_${Math.random().toString(16).substr(2, 8)}`,
|
||
clean: true,
|
||
connectTimeout: 5000,
|
||
reconnectPeriod: 5000, // 自动重连间隔
|
||
keepalive: 10
|
||
}
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 获取MQTT客户端实例(单例模式)
|
||
* @param {object} config - 可选的配置覆盖
|
||
* @returns {Promise<MqttSyncClient>} MQTT客户端实例
|
||
*/
|
||
async getInstance(config = {}) {
|
||
if (this.client && this.isInitialized) {
|
||
console.log('[MQTT管理器] 返回已存在的MQTT客户端实例');
|
||
return this.client;
|
||
}
|
||
|
||
// 合并配置
|
||
if (config.brokerUrl) {
|
||
this.config.brokerUrl = config.brokerUrl;
|
||
}
|
||
if (config.options) {
|
||
this.config.options = { ...this.config.options, ...config.options };
|
||
}
|
||
|
||
try {
|
||
console.log('[MQTT管理器] 创建新的MQTT客户端实例');
|
||
console.log(`[MQTT管理器] Broker地址: ${this.config.brokerUrl}`);
|
||
|
||
// 创建MQTT客户端
|
||
this.client = new MqttSyncClient(this.config.brokerUrl, this.config.options);
|
||
|
||
// 等待连接成功
|
||
await this.client.waitForConnect(10000);
|
||
|
||
// 注意:心跳和响应的订阅已移到 mqttDispatcher 中处理
|
||
// 这里不再直接订阅,由调度系统通过 mqttDispatcher 统一管理
|
||
console.log('[MQTT管理器] MQTT客户端连接成功,等待分发器初始化订阅');
|
||
|
||
this.isInitialized = true;
|
||
console.log('[MQTT管理器] MQTT客户端初始化成功');
|
||
|
||
return this.client;
|
||
} catch (error) {
|
||
console.error('[MQTT管理器] MQTT客户端初始化失败:', error);
|
||
this.client = null;
|
||
this.isInitialized = false;
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 重置MQTT客户端(用于重新连接)
|
||
*/
|
||
async reset() {
|
||
if (this.client) {
|
||
try {
|
||
console.log('[MQTT管理器] 关闭现有MQTT连接');
|
||
this.client.end(true);
|
||
} catch (error) {
|
||
console.error('[MQTT管理器] 关闭MQTT连接时出错:', error);
|
||
}
|
||
}
|
||
|
||
this.client = null;
|
||
this.isInitialized = false;
|
||
console.log('[MQTT管理器] MQTT客户端已重置');
|
||
}
|
||
|
||
/**
|
||
* 检查MQTT客户端是否已初始化
|
||
* @returns {boolean}
|
||
*/
|
||
isReady() {
|
||
return this.isInitialized && this.client && this.client.isConnected;
|
||
}
|
||
|
||
/**
|
||
* 更新配置
|
||
* @param {object} config - 新的配置
|
||
*/
|
||
updateConfig(config) {
|
||
if (this.isInitialized) {
|
||
console.warn('[MQTT管理器] MQTT客户端已初始化,配置更新需要重置连接');
|
||
}
|
||
|
||
if (config.brokerUrl) {
|
||
this.config.brokerUrl = config.brokerUrl;
|
||
}
|
||
if (config.options) {
|
||
this.config.options = { ...this.config.options, ...config.options };
|
||
}
|
||
|
||
console.log('[MQTT管理器] 配置已更新:', this.config);
|
||
}
|
||
|
||
/**
|
||
* 获取当前配置
|
||
* @returns {object}
|
||
*/
|
||
getConfig() {
|
||
return { ...this.config };
|
||
}
|
||
}
|
||
|
||
// 导出单例
|
||
const mqttManager = new MqttManager();
|
||
module.exports = mqttManager;
|