207 lines
6.3 KiB
JavaScript
207 lines
6.3 KiB
JavaScript
const mqttManager = require("../mqtt/mqttManager.js");
|
||
|
||
// 导入调度模块(简化版)
|
||
const TaskQueue = require('./taskQueue.js');
|
||
const Command = require('./command.js');
|
||
const deviceManager = require('./deviceManager.js');
|
||
const config = require('./config.js');
|
||
const utils = require('./utils.js');
|
||
|
||
// 导入新的模块
|
||
const TaskHandlers = require('./taskHandlers.js');
|
||
const MqttDispatcher = require('../mqtt/mqttDispatcher.js');
|
||
const ScheduledJobs = require('./scheduledJobs.js');
|
||
const DeviceWorkStatusNotifier = require('./deviceWorkStatusNotifier.js');
|
||
|
||
/**
|
||
* 调度系统管理器
|
||
* 统一管理整个调度系统的生命周期
|
||
*/
|
||
class ScheduleManager {
|
||
constructor() {
|
||
this.mqttClient = null;
|
||
this.isInitialized = false;
|
||
this.startTime = new Date();
|
||
|
||
// 子模块
|
||
this.taskHandlers = null;
|
||
this.mqttDispatcher = null;
|
||
this.scheduledJobs = null;
|
||
}
|
||
|
||
/**
|
||
* 初始化调度系统
|
||
*/
|
||
async init() {
|
||
try {
|
||
console.log('[调度管理器] 开始初始化...');
|
||
|
||
// 1. 初始化MQTT管理器
|
||
await this.initMqttClient();
|
||
console.log('[调度管理器] MQTT管理器已初始化');
|
||
|
||
// 2. 初始化各个组件
|
||
await this.initComponents();
|
||
console.log('[调度管理器] 组件已初始化');
|
||
|
||
|
||
// 3. 初始化子模块
|
||
this.initSubModules();
|
||
console.log('[调度管理器] 子模块已初始化');
|
||
|
||
// 4. 启动心跳监听
|
||
this.startHeartbeatListener();
|
||
console.log('[调度管理器] 心跳监听已启动');
|
||
|
||
// 5. 启动定时任务
|
||
// this.scheduledJobs.start();
|
||
// console.log('[调度管理器] 定时任务已启动');
|
||
|
||
this.isInitialized = true;
|
||
|
||
} catch (error) {
|
||
console.error('[调度管理器] 初始化失败:', error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 初始化MQTT客户端
|
||
*/
|
||
async initMqttClient() {
|
||
this.mqttClient = await mqttManager.getInstance();
|
||
// 设置设备工作状态推送服务的 MQTT 客户端
|
||
DeviceWorkStatusNotifier.setMqttClient(this.mqttClient);
|
||
}
|
||
|
||
/**
|
||
* 初始化各个组件(简化版)
|
||
*/
|
||
async initComponents() {
|
||
// 初始化设备管理器
|
||
await deviceManager.init();
|
||
|
||
// 初始化任务队列
|
||
await TaskQueue.init?.();
|
||
}
|
||
|
||
/**
|
||
* 初始化子模块
|
||
*/
|
||
initSubModules() {
|
||
// 初始化任务处理器
|
||
this.taskHandlers = new TaskHandlers(this.mqttClient);
|
||
this.taskHandlers.register(TaskQueue);
|
||
|
||
// 初始化 MQTT 分发器(简化:不再需要 components)
|
||
this.mqttDispatcher = new MqttDispatcher({ deviceManager, taskQueue: TaskQueue }, this.mqttClient);
|
||
this.mqttDispatcher.start();
|
||
|
||
// 初始化定时任务管理器
|
||
this.scheduledJobs = new ScheduledJobs({ deviceManager, taskQueue: TaskQueue }, this.taskHandlers);
|
||
}
|
||
|
||
/**
|
||
* 启动心跳监听
|
||
*/
|
||
startHeartbeatListener() {
|
||
// 订阅心跳主题,使用 mqttDispatcher 处理
|
||
this.mqttClient.subscribe("heartbeat", async (topic, message) => {
|
||
try {
|
||
await this.mqttDispatcher.handleHeartbeat(message);
|
||
} catch (error) {
|
||
console.error('[调度管理器] 处理心跳消息失败:', error);
|
||
}
|
||
});
|
||
|
||
// 订阅响应主题
|
||
this.mqttClient.subscribe("response", async (topic, message) => {
|
||
try {
|
||
if (this.mqttDispatcher) {
|
||
this.mqttDispatcher.handleResponse(message);
|
||
}
|
||
} catch (error) {
|
||
console.error('[调度管理器] 处理响应消息失败:', error);
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 手动执行找工作流程(已废弃,full_flow 不再使用)
|
||
* @deprecated 请使用其他任务类型,如 auto_deliver
|
||
*/
|
||
async manualExecuteJobFlow(sn_code, keyword = '前端') {
|
||
console.warn(`[手动执行] manualExecuteJobFlow 已废弃,full_flow 不再使用`);
|
||
throw new Error('full_flow 任务类型已废弃,请使用其他任务类型');
|
||
}
|
||
|
||
/**
|
||
* 获取系统状态
|
||
*/
|
||
getSystemStatus() {
|
||
const status = this.mqttDispatcher ? this.mqttDispatcher.getSystemStatus() : {};
|
||
return {
|
||
isInitialized: this.isInitialized,
|
||
mqttConnected: this.mqttClient && this.mqttClient.isConnected,
|
||
systemStats: deviceManager.getSystemStats(),
|
||
allDevices: deviceManager.getAllDevicesStatus(),
|
||
taskQueues: TaskQueue.getAllDeviceStatus(),
|
||
uptime: utils.formatDuration(Date.now() - this.startTime.getTime()),
|
||
startTime: utils.formatTimestamp(this.startTime),
|
||
...status
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 停止调度系统
|
||
*/
|
||
stop() {
|
||
console.log('[调度管理器] 正在停止...');
|
||
|
||
// 停止所有定时任务
|
||
if (this.scheduledJobs) {
|
||
this.scheduledJobs.stop();
|
||
}
|
||
|
||
// 停止任务队列扫描器
|
||
if (TaskQueue && typeof TaskQueue.stopQueueScanner === 'function') {
|
||
TaskQueue.stopQueueScanner();
|
||
}
|
||
|
||
// 关闭MQTT连接
|
||
if (this.mqttClient) {
|
||
this.mqttClient.end();
|
||
}
|
||
|
||
this.isInitialized = false;
|
||
console.log('[调度管理器] 已停止');
|
||
}
|
||
}
|
||
|
||
// 创建调度管理器实例
|
||
const scheduleManager = new ScheduleManager();
|
||
|
||
// 导出兼容的接口,保持与原有代码的一致性
|
||
module.exports = {
|
||
// 初始化方法
|
||
init: () => scheduleManager.init(),
|
||
|
||
// 手动执行任务
|
||
manualExecuteJobFlow: (sn_code, keyword) => scheduleManager.manualExecuteJobFlow(sn_code, keyword),
|
||
|
||
// 获取系统状态
|
||
getSystemStatus: () => scheduleManager.getSystemStatus(),
|
||
|
||
// 停止系统
|
||
stop: () => scheduleManager.stop(),
|
||
|
||
// 访问各个组件(为了兼容性)
|
||
get mqttClient() { return scheduleManager.mqttClient; },
|
||
get isInitialized() { return scheduleManager.isInitialized; },
|
||
|
||
// 访问各个组件实例(简化版)
|
||
get taskQueue() { return TaskQueue; },
|
||
get command() { return Command; },
|
||
get deviceManager() { return deviceManager; }
|
||
};
|