const { v4: uuidv4 } = require('uuid'); const Sequelize = require('sequelize'); const logs = require('../logProxy'); const db = require('../dbProxy'); const command = require('./command'); const PriorityQueue = require('./PriorityQueue'); const ErrorHandler = require('./ErrorHandler'); const deviceManager = require('./deviceManager'); const ScheduleUtils = require('./utils'); const ScheduleConfig = require('./config'); /** * 任务队列管理器(重构版) * - 使用优先级队列(堆)提升性能 * - 工作池模式:设备内串行执行,设备间并行执行 * - 统一重试机制 * - 统一MQTT管理 */ class TaskQueue { constructor(config = {}) { // 设备任务队列映射 { sn_code: PriorityQueue } this.deviceQueues = new Map(); // 设备执行状态 { sn_code: { isRunning, currentTask, runningCount } } this.deviceStatus = new Map(); // 任务处理器映射 { taskType: handler } this.taskHandlers = new Map(); // 工作池配置 this.config = { maxConcurrency: config.maxConcurrency || 5, // 全局最大并发数(设备数) deviceMaxConcurrency: config.deviceMaxConcurrency || 1, // 每个设备最大并发数(保持串行) ...config }; // 全局运行中的任务数 this.globalRunningCount = 0; // 全局任务队列(用于跨设备优先级调度,可选) this.globalQueue = new PriorityQueue(); // 定期扫描定时器 this.scanInterval = null; } /** * 初始化(从数据库恢复未完成的任务) */ async init() { try { console.log('[任务队列] 初始化中...'); // 从数据库加载pending和running状态的任务 const pendingTasks = await db.getModel('task_status').findAll({ where: { status: ['pending', 'running'] }, order: [['priority', 'DESC'], ['id', 'ASC']] }); // 获取所有启用的账号(移除 device_status 依赖,不再检查在线状态) const pla_account = db.getModel('pla_account'); const enabledAccounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1 }, attributes: ['sn_code'] }); const enabledSnCodes = new Set(enabledAccounts.map(acc => acc.sn_code)); // 移除 device_status 依赖,不再检查设备在线状态 // 如果需要在线状态检查,可以在 pla_account 表中添加相应字段 const onlineSnCodes = new Set(); // 暂时设为空,表示不再检查在线状态 let restoredCount = 0; let skippedCount = 0; for (const taskRecord of pendingTasks) { const task = taskRecord.toJSON(); const sn_code = task.sn_code; // 检查账号是否启用 if (!enabledSnCodes.has(sn_code)) { console.log(`[任务队列] 初始化时跳过任务 ${task.id}:账号 ${sn_code} 未启用`); // 标记任务为已取消 await db.getModel('task_status').update( { status: 'cancelled', endTime: new Date(), result: JSON.stringify({ error: '账号未启用,任务已取消' }) }, { where: { id: task.id } } ); skippedCount++; continue; } // 检查设备是否在线 if (!onlineSnCodes.has(sn_code)) { console.log(`[任务队列] 初始化时跳过任务 ${task.id}:设备 ${sn_code} 不在线`); // 不在线的任务仍然恢复,等待设备上线后执行 // 不取消任务,只是不立即执行 } // 初始化设备队列 if (!this.deviceQueues.has(sn_code)) { this.deviceQueues.set(sn_code, new PriorityQueue()); } // 初始化设备状态(重要:确保设备状态存在) if (!this.deviceStatus.has(sn_code)) { this.deviceStatus.set(sn_code, { isRunning: false, currentTask: null, runningCount: 0 }); } // 恢复任务对象 const taskObj = { id: task.id, sn_code: task.sn_code, taskType: task.taskType, taskName: task.taskName, taskParams: task.taskParams ? JSON.parse(task.taskParams) : {}, priority: task.priority || 5, maxRetries: task.maxRetries || 3, retryCount: task.retryCount || 0, status: 'pending', createdAt: task.create_time ? new Date(task.create_time).getTime() : Date.now() }; // 添加到设备队列 this.deviceQueues.get(sn_code).push(taskObj); restoredCount++; // 如果状态是running,重置为pending if (task.status === 'running') { await db.getModel('task_status').update( { status: 'pending' }, { where: { id: task.id } } ); } } // 恢复任务后,尝试执行所有设备的队列(只执行在线且启用的设备) for (const sn_code of this.deviceQueues.keys()) { // 只处理启用且在线的设备 if (enabledSnCodes.has(sn_code) && onlineSnCodes.has(sn_code)) { this.processQueue(sn_code).catch(error => { console.error(`[任务队列] 初始化后执行队列失败 (设备: ${sn_code}):`, error); }); } else { console.log(`[任务队列] 初始化时跳过设备 ${sn_code} 的队列执行:${!enabledSnCodes.has(sn_code) ? '账号未启用' : '设备不在线'}`); } } console.log(`[任务队列] 初始化完成,恢复 ${restoredCount} 个任务,跳过 ${skippedCount} 个未启用账号的任务`); // 启动定期扫描机制(每10秒扫描一次) this.startQueueScanner(); } catch (error) { console.error('[任务队列] 初始化失败:', error); throw error; } } /** * 启动队列扫描器(定期检查并执行队列中的任务) */ startQueueScanner() { // 如果已经启动,先清除 if (this.scanInterval) { clearInterval(this.scanInterval); } // 每10秒扫描一次所有设备的队列 this.scanInterval = setInterval(() => { this.scanAndProcessQueues(); }, 10000); // 10秒扫描一次 console.log('[任务队列] 队列扫描器已启动(每10秒扫描一次)'); } /** * 停止队列扫描器 */ stopQueueScanner() { if (this.scanInterval) { clearInterval(this.scanInterval); this.scanInterval = null; console.log('[任务队列] 队列扫描器已停止'); } } /** * 扫描所有设备的队列并尝试执行任务(过滤未启用的账号和不在线的设备) */ async scanAndProcessQueues() { try { const deviceCount = this.deviceQueues.size; if (deviceCount === 0) { return; } // 获取所有启用的账号对应的设备SN码 const pla_account = db.getModel('pla_account'); const enabledAccounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1 }, attributes: ['sn_code'] }); const enabledSnCodes = new Set(enabledAccounts.map(acc => acc.sn_code)); // 移除 device_status 依赖,不再检查设备在线状态 // 如果需要在线状态检查,可以从 deviceManager 获取 const deviceManager = require('./deviceManager'); const deviceStatus = deviceManager.getAllDevicesStatus(); const onlineSnCodes = new Set( Object.entries(deviceStatus) .filter(([sn_code, status]) => status.isOnline) .map(([sn_code]) => sn_code) ); // 原有代码已移除,改为使用 deviceManager /* 原有代码已注释 const device_status = db.getModel('device_status'); const heartbeatTimeout = require('./config.js').monitoring.heartbeatTimeout; const now = new Date(); const heartbeatThreshold = new Date(now.getTime() - heartbeatTimeout); const onlineDevices = await device_status.findAll({ where: { isOnline: true, lastHeartbeatTime: { [Sequelize.Op.gte]: heartbeatThreshold // 心跳时间在阈值内 } }, attributes: ['sn_code'] }); const onlineSnCodes = new Set(onlineDevices.map(dev => dev.sn_code)); */ let processedCount = 0; let queuedCount = 0; let skippedCount = 0; // 遍历所有设备的队列,只处理启用账号且在线设备 for (const [sn_code, queue] of this.deviceQueues.entries()) { // 跳过未启用的账号 if (!enabledSnCodes.has(sn_code)) { skippedCount++; continue; } // 跳过不在线的设备 if (!onlineSnCodes.has(sn_code)) { skippedCount++; continue; } const queueSize = queue.size(); if (queueSize > 0) { queuedCount += queueSize; // 尝试处理该设备的队列 this.processQueue(sn_code).catch(error => { console.error(`[任务队列] 扫描执行队列失败 (设备: ${sn_code}):`, error); }); processedCount++; } } if (queuedCount > 0) { console.log(`[任务队列] 扫描完成: ${processedCount} 个设备有任务,共 ${queuedCount} 个待执行任务,跳过 ${skippedCount} 个设备`); } } catch (error) { console.error('[任务队列] 扫描队列失败:', error); } } /** * 注册任务处理器 * @param {string} taskType - 任务类型 * @param {function} handler - 处理函数 */ registerHandler(taskType, handler) { this.taskHandlers.set(taskType, handler); } /** * 查找设备是否已有相同类型的任务 * @param {string} sn_code - 设备SN码 * @param {string} taskType - 任务类型 * @returns {Promise} 现有任务或null */ async findExistingTask(sn_code, taskType) { // 检查当前正在执行的任务 const deviceStatus = this.deviceStatus.get(sn_code); if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.taskType === taskType) { return deviceStatus.currentTask; } // 检查队列中等待的任务 const queue = this.deviceQueues.get(sn_code); if (queue) { const existingTask = queue.find(task => task.taskType === taskType && (task.status === 'pending' || !task.status)); if (existingTask) { return existingTask; } } // 检查数据库中的pending/running任务(防止系统重启后重复添加) try { const Sequelize = require('sequelize'); const taskStatusModel = db.getModel('task_status'); const existingDbTask = await taskStatusModel.findOne({ where: { sn_code: sn_code, taskType: taskType, status: ['pending', 'running'] }, order: [['id', 'DESC']] }); if (existingDbTask) { return { id: existingDbTask.id, taskType: existingDbTask.taskType, status: existingDbTask.status }; } } catch (error) { console.error(`[任务队列] 检查数据库现有任务失败:`, error); } return null; } /** * 检查账号是否启用 * @param {string} sn_code - 设备SN码 * @returns {Promise} 是否启用 */ async checkAccountEnabled(sn_code) { try { const pla_account = db.getModel('pla_account'); const account = await pla_account.findOne({ where: { sn_code: sn_code, is_delete: 0 }, attributes: ['is_enabled'] }); if (!account) { console.warn(`[任务队列] 设备 ${sn_code} 对应的账号不存在`); return false; } const isEnabled = Boolean(account.is_enabled); if (!isEnabled) { console.log(`[任务队列] 设备 ${sn_code} 对应的账号未启用,跳过任务`); } return isEnabled; } catch (error) { console.error(`[任务队列] 检查账号启用状态失败:`, error); return false; } } /** * 添加任务到队列 * @param {string} sn_code - 设备SN码 * @param {object} taskConfig - 任务配置 * @returns {Promise} 任务ID */ async addTask(sn_code, taskConfig) { // 检查账号是否启用 const isEnabled = await this.checkAccountEnabled(sn_code); if (!isEnabled) { throw new Error(`账号未启用,无法添加任务`); } // 检查是否已有相同类型的任务在队列中或正在执行 const existingTask = await this.findExistingTask(sn_code, taskConfig.taskType); if (existingTask) { console.log(`[任务队列] 设备 ${sn_code} 已有 ${taskConfig.taskType} 任务在执行或等待中,跳过添加`); return existingTask.id; } const task = { sn_code, taskType: taskConfig.taskType, taskName: taskConfig.taskName || taskConfig.taskType, taskParams: taskConfig.taskParams || {}, priority: taskConfig.priority || 5, maxRetries: taskConfig.maxRetries || 3, retryCount: 0, status: 'pending', createdAt: Date.now() }; // 初始化设备队列 if (!this.deviceQueues.has(sn_code)) { this.deviceQueues.set(sn_code, new PriorityQueue()); } // 初始化设备状态 if (!this.deviceStatus.has(sn_code)) { this.deviceStatus.set(sn_code, { isRunning: false, currentTask: null, runningCount: 0 }); } // 保存到数据库 let res = await db.getModel('task_status').create({ sn_code: task.sn_code, taskType: task.taskType, taskName: task.taskName, taskParams: JSON.stringify(task.taskParams), status: task.status, priority: task.priority, maxRetries: task.maxRetries, retryCount: task.retryCount, }); // 使用数据库返回的自增ID task.id = res.id; // 添加到优先级队列 const queue = this.deviceQueues.get(sn_code); queue.push(task); console.log(`[任务队列] 任务已添加到队列: ${task.taskName} (ID: ${task.id}, 优先级: ${task.priority}),等待扫描机制执行`); // 不立即执行,等待扫描机制自动执行 // 扫描机制会定期检查队列并执行任务 return res.id; } /** * 处理设备的任务队列(工作池模式) * 设备内串行执行,设备间并行执行 * @param {string} sn_code - 设备SN码 */ async processQueue(sn_code) { try { // 先检查账号是否启用 const isEnabled = await this.checkAccountEnabled(sn_code); if (!isEnabled) { // 如果账号未启用,从队列中移除所有待执行任务 const queue = this.deviceQueues.get(sn_code); if (queue && queue.size() > 0) { console.log(`[任务队列] 设备 ${sn_code} 账号未启用,清空队列中的 ${queue.size()} 个待执行任务`); // 标记所有待执行任务为已取消 const queueArray = queue.toArray(); for (const task of queueArray) { if (task.status === 'pending') { try { await db.getModel('task_status').update( { status: 'cancelled', endTime: new Date(), result: JSON.stringify({ error: '账号未启用,任务已取消' }) }, { where: { id: task.id } } ); } catch (error) { console.error(`[任务队列] 更新任务状态失败:`, error); } } } queue.clear(); } return; } const status = this.deviceStatus.get(sn_code); if (!status) { console.warn(`[任务队列] 设备 ${sn_code} 状态不存在,无法执行任务`); return; } // 检查设备是否正在执行任务(设备内串行) if (status.isRunning || status.runningCount >= this.config.deviceMaxConcurrency) { console.log(`[任务队列] 设备 ${sn_code} 正在执行任务,等待中... (isRunning: ${status.isRunning}, runningCount: ${status.runningCount})`); return; } // 检查全局并发限制(设备间并行控制) if (this.globalRunningCount >= this.config.maxConcurrency) { console.log(`[任务队列] 全局并发数已达上限 (${this.globalRunningCount}/${this.config.maxConcurrency}),等待中...`); return; } const queue = this.deviceQueues.get(sn_code); if (!queue || queue.isEmpty()) { console.log(`[任务队列] 设备 ${sn_code} 队列为空,无任务可执行`); return; } // 从优先级队列取出任务 const task = queue.pop(); if (!task) { console.warn(`[任务队列] 设备 ${sn_code} 队列非空但无法取出任务`); return; } console.log(`[任务队列] 开始执行任务: ${task.taskName} (ID: ${task.id}, 设备: ${sn_code})`); // 更新状态 status.isRunning = true; status.currentTask = task; status.runningCount++; this.globalRunningCount++; // 异步执行任务(不阻塞) this.executeTask(task).finally(() => { // 任务完成后更新状态 status.isRunning = false; status.currentTask = null; status.runningCount--; this.globalRunningCount--; console.log(`[任务队列] 任务完成,设备 ${sn_code} 状态已重置,准备处理下一个任务`); // 继续处理队列中的下一个任务(延迟一小段时间,确保状态已更新) setTimeout(() => { this.processQueue(sn_code).catch(error => { console.error(`[任务队列] processQueue 执行失败 (设备: ${sn_code}):`, error); }); }, 100); // 延迟100ms确保状态已更新 }); } catch (error) { console.error(`[任务队列] processQueue 处理失败 (设备: ${sn_code}):`, error); throw error; } } /** * 执行任务(统一重试机制 + 超时保护) * @param {object} task - 任务对象 */ async executeTask(task) { const startTime = Date.now(); task.status = 'running'; task.startTime = new Date(); try { // 执行前再次检查账号是否启用(双重保险) const isEnabled = await this.checkAccountEnabled(task.sn_code); if (!isEnabled) { // 更新任务状态为已取消 await db.getModel('task_status').update( { status: 'cancelled', endTime: new Date(), result: JSON.stringify({ error: '账号未启用,任务已取消' }) }, { where: { id: task.id } } ); throw new Error(`账号未启用,任务已取消`); } // 更新数据库状态 await db.getModel('task_status').update( { status: 'running', startTime: task.startTime }, { where: { id: task.id } } ); // 通知客户端任务状态变更 await this.notifyTaskStatusChange(task.sn_code, { taskId: task.id, taskName: task.taskName, taskType: task.taskType, status: 'running', progress: 0, startTime: task.startTime }); // 使用注册的任务处理器执行任务 const handler = this.taskHandlers.get(task.taskType); if (!handler) { throw new Error(`未找到任务类型 ${task.taskType} 的处理器,请先注册处理器`); } // 获取任务超时时间(从配置中获取,默认10分钟) const taskTimeout = ScheduleConfig.getTaskTimeout(task.taskType) || 10 * 60 * 1000; // 使用超时机制包装任务执行,防止任务卡住 const taskPromise = handler(task); const result = await ScheduleUtils.withTimeout( taskPromise, taskTimeout, `任务执行超时: ${task.taskName} (任务类型: ${task.taskType}, 超时时间: ${taskTimeout / 1000}秒)` ); // 任务成功 task.status = 'completed'; task.endTime = new Date(); task.duration = Date.now() - startTime; task.result = result; // 更新数据库 await db.getModel('task_status').update( { status: 'completed', endTime: task.endTime, duration: task.duration, result: JSON.stringify(task.result), progress: 100 }, { where: { id: task.id } } ); // 通知客户端任务状态变更 await this.notifyTaskStatusChange(task.sn_code, { taskId: task.id, taskName: task.taskName, taskType: task.taskType, status: 'completed', progress: 100, endTime: task.endTime }); console.log(`[任务队列] 设备 ${task.sn_code} 任务执行成功: ${task.taskName} (耗时: ${task.duration}ms)`); } catch (error) { // 使用统一错误处理 const errorInfo = await ErrorHandler.handleError(error, { task_id: task.id, sn_code: task.sn_code, taskType: task.taskType, taskName: task.taskName }); // 直接标记为失败(重试已禁用) task.status = 'failed'; task.endTime = new Date(); task.duration = Date.now() - startTime; task.errorMessage = errorInfo.message || error.message || '未知错误'; task.errorStack = errorInfo.stack || error.stack || ''; // 更新数据库任务状态为失败 try { await db.getModel('task_status').update( { status: 'failed', endTime: task.endTime, duration: task.duration, result: JSON.stringify({ error: task.errorMessage, stack: task.errorStack }), progress: 0 }, { where: { id: task.id } } ); // 通知客户端任务状态变更 await this.notifyTaskStatusChange(task.sn_code, { taskId: task.id, taskName: task.taskName, taskType: task.taskType, status: 'failed', progress: 0, errorMessage: task.errorMessage, endTime: task.endTime }); } catch (dbError) { console.error(`[任务队列] 更新任务失败状态到数据库失败:`, dbError); } console.error(`[任务队列] 任务执行失败: ${task.taskName} (ID: ${task.id}), 错误: ${task.errorMessage}`, { errorStack: task.errorStack, task_id: task.id, sn_code: task.sn_code, taskType: task.taskType, duration: task.duration }); } } /** * 取消任务 * @param {string} task_id - 任务ID * @returns {Promise} 是否成功取消 */ async cancelTask(task_id) { // 遍历所有设备队列查找任务 for (const [sn_code, queue] of this.deviceQueues.entries()) { const removed = queue.remove(task => task.id === task_id); if (removed) { // 检查是否正在执行 const status = this.deviceStatus.get(sn_code); if (status && status.currentTask && status.currentTask.id === task_id) { // 正在执行的任务无法取消,只能标记 console.warn(`[任务队列] 任务 ${task_id} 正在执行,无法取消`); return false; } // 更新数据库 await db.getModel('task_status').update( { status: 'cancelled', endTime: new Date() }, { where: { id: task_id } } ); console.log(`[任务队列] 任务已取消: ${task_id}`); return true; } } // 未找到可取消的任务 return false; } /** * 取消设备的所有待执行任务 * @param {string} sn_code - 设备SN码 * @returns {Promise} 取消的任务数量 */ async cancelDeviceTasks(sn_code) { let cancelledCount = 0; // 1. 从队列中移除所有待执行任务 const queue = this.deviceQueues.get(sn_code); if (queue) { const pendingTasks = []; // 获取所有待执行任务(不包括正在执行的) const status = this.deviceStatus.get(sn_code); const currentTaskId = status && status.currentTask ? status.currentTask.id : null; // 遍历队列,收集待取消的任务 const queueArray = queue.toArray(); for (const task of queueArray) { if (task.id !== currentTaskId && (task.status === 'pending' || !task.status)) { pendingTasks.push(task); } } // 从队列中移除这些任务 for (const task of pendingTasks) { queue.remove(t => t.id === task.id); cancelledCount++; } } // 2. 更新数据库中的任务状态 try { const taskStatusModel = db.getModel('task_status'); const status = this.deviceStatus.get(sn_code); const currentTaskId = status && status.currentTask ? status.currentTask.id : null; // 更新所有待执行或运行中的任务(除了当前正在执行的) const whereCondition = { sn_code: sn_code, status: ['pending', 'running'] }; if (currentTaskId) { whereCondition.id = { [Sequelize.Op.ne]: currentTaskId }; } const updateResult = await taskStatusModel.update( { status: 'cancelled', endTime: new Date() }, { where: whereCondition } ); const dbCancelledCount = Array.isArray(updateResult) ? updateResult[0] : updateResult; console.log(`[任务队列] 设备 ${sn_code} 已取消 ${cancelledCount} 个队列任务,${dbCancelledCount} 个数据库任务`); } catch (error) { console.error(`[任务队列] 更新数据库任务状态失败:`, error, { sn_code: sn_code, cancelledCount: cancelledCount }); } return cancelledCount; } /** * 获取设备队列状态 * @param {string} sn_code - 设备SN码 * @returns {object} 队列状态 */ getDeviceStatus(sn_code) { const queue = this.deviceQueues.get(sn_code); const status = this.deviceStatus.get(sn_code) || { isRunning: false, currentTask: null, runningCount: 0 }; return { sn_code, isRunning: status.isRunning, currentTask: status.currentTask, queueLength: queue ? queue.size() : 0, pendingTasks: queue ? queue.size() : 0, runningCount: status.runningCount }; } /** * 获取任务状态 * @param {string} task_id - 任务ID * @returns {Promise} 任务对象 */ async getTaskStatus(task_id) { // 先从内存中查找 for (const queue of this.deviceQueues.values()) { const task = queue.find(t => t.id === task_id); if (task) { return task; } } // 从正在执行的任务中查找 for (const status of this.deviceStatus.values()) { if (status.currentTask && status.currentTask.id === task_id) { return status.currentTask; } } // 从数据库中查找 try { const taskRecord = await db.getModel('task_status').findOne({ where: { id: task_id } }); if (taskRecord) { return taskRecord.toJSON(); } } catch (error) { console.error(`[任务队列] 查询任务状态失败:`, error, { task_id: task_id }); } return null; } /** * 清空设备队列 * @param {string} sn_code - 设备SN码 */ clearQueue(sn_code) { if (this.deviceQueues.has(sn_code)) { const queue = this.deviceQueues.get(sn_code); const count = queue.size(); queue.clear(); console.log(`[任务队列] 已清空设备 ${sn_code} 的队列,共移除 ${count} 个任务`); } } /** * 删除所有任务(从内存队列和数据库) * @returns {Promise} 删除结果 */ async deleteAllTaskFromDatabase() { try { console.log('[任务队列] 开始删除所有任务...'); let totalQueued = 0; let totalRunning = 0; // 1. 清空所有设备的内存队列 for (const [sn_code, queue] of this.deviceQueues.entries()) { const queueSize = queue.size(); totalQueued += queueSize; queue.clear(); // 重置设备状态(但保留正在执行的任务信息,稍后处理) const status = this.deviceStatus.get(sn_code); if (status && status.currentTask) { totalRunning++; // 标记正在执行的任务,但不立即取消(让它们自然完成或失败) console.warn(`[任务队列] 设备 ${sn_code} 有正在执行的任务,将在完成后清理`); } } // 2. 使用 MCP MySQL 删除所有关联数据(先删除关联表,再删除主表) // 注意:MCP MySQL 是只读的,这里使用 Sequelize 执行删除操作 // 但移除数据库层面的外键关联,避免约束问题 const taskCommandsModel = db.getModel('task_commands'); const chatRecordsModel = db.getModel('chat_records'); const applyRecordsModel = db.getModel('apply_records'); const taskStatusModel = db.getModel('task_status'); // 删除任务指令记录(所有记录) const commandsDeleted = await taskCommandsModel.destroy({ where: {}, truncate: false }); console.log(`[任务队列] 已删除任务指令记录: ${commandsDeleted} 条`); // 删除聊天记录中关联的任务记录(删除所有有 task_id 且不为空的记录) const chatRecordsDeleted = await chatRecordsModel.destroy({ where: { [Sequelize.Op.and]: [ { task_id: { [Sequelize.Op.ne]: null } }, { task_id: { [Sequelize.Op.ne]: '' } } ] }, truncate: false }); console.log(`[任务队列] 已删除聊天记录: ${chatRecordsDeleted} 条`); // 删除投递记录中关联的任务记录(删除所有有 task_id 且不为空的记录) const applyRecordsDeleted = await applyRecordsModel.destroy({ where: { [Sequelize.Op.and]: [ { task_id: { [Sequelize.Op.ne]: null } }, { task_id: { [Sequelize.Op.ne]: '' } } ] }, truncate: false }); console.log(`[任务队列] 已删除投递记录: ${applyRecordsDeleted} 条`); // 3. 删除数据库中的所有任务记录 const deleteResult = await taskStatusModel.destroy({ where: {}, truncate: false // 使用 DELETE 而不是 TRUNCATE,保留表结构 }); console.log(`[任务队列] 已删除所有任务:`); console.log(` - 内存队列任务: ${totalQueued} 个`); console.log(` - 正在执行任务: ${totalRunning} 个(将在完成后清理)`); console.log(` - 任务指令记录: ${commandsDeleted} 条`); console.log(` - 聊天记录: ${chatRecordsDeleted} 条`); console.log(` - 投递记录: ${applyRecordsDeleted} 条`); console.log(` - 数据库任务记录: ${deleteResult} 条`); return { success: true, memoryQueued: totalQueued, memoryRunning: totalRunning, commandsDeleted: commandsDeleted, chatRecordsDeleted: chatRecordsDeleted, applyRecordsDeleted: applyRecordsDeleted, databaseDeleted: deleteResult, message: `已删除所有任务及关联数据(任务: ${deleteResult} 条,指令: ${commandsDeleted} 条,聊天: ${chatRecordsDeleted} 条,投递: ${applyRecordsDeleted} 条)` }; } catch (error) { console.error('[任务队列] 删除所有任务失败:', error); throw error; } } /** * 获取所有设备的队列状态 * @returns {array} 所有设备的队列状态 */ getAllDeviceStatus() { const allStatus = []; for (const sn_code of this.deviceQueues.keys()) { allStatus.push(this.getDeviceStatus(sn_code)); } return allStatus; } /** * 获取全局统计信息 * @returns {object} 统计信息 */ getStatistics() { let totalQueued = 0; for (const queue of this.deviceQueues.values()) { totalQueued += queue.size(); } return { globalRunningCount: this.globalRunningCount, maxConcurrency: this.config.maxConcurrency, totalDevices: this.deviceQueues.size, totalQueuedTasks: totalQueued, deviceStatuses: this.getAllDeviceStatus() }; } /** * 获取MQTT客户端(统一管理) * @returns {Promise} MQTT客户端实例 */ async getMqttClient() { try { // 首先尝试从调度系统获取已初始化的MQTT客户端 const scheduleManager = require('./index'); if (scheduleManager.mqttClient) { return scheduleManager.mqttClient; } // 如果调度系统没有初始化,则直接创建 const mqttManager = require('../mqtt/mqttManager'); console.log('[任务队列] 创建新的MQTT客户端'); return await mqttManager.getInstance(); } catch (error) { console.error(`[任务队列] 获取MQTT客户端失败:`, error); return null; } } /** * 通知客户端任务状态变更 * @param {string} sn_code - 设备SN码 * @param {object} taskData - 任务数据 */ async notifyTaskStatusChange(sn_code, taskData) { try { const mqttClient = await this.getMqttClient(); if (!mqttClient) { return; // MQTT客户端不可用,静默失败 } // 通过MQTT发布任务状态变更通知 // 主题格式: task_status_{sn_code} const topic = `task_status_${sn_code}`; const message = JSON.stringify({ action: 'task_status_update', data: taskData, timestamp: new Date().toISOString() }); await mqttClient.publish(topic, message); console.log(`[任务队列] 已通知客户端任务状态变更: ${sn_code} - ${taskData.taskName || taskData.taskType || '未知任务'} (${taskData.status})`); } catch (error) { // 通知失败不影响任务执行,只记录日志 console.warn(`[任务队列] 通知客户端任务状态变更失败:`, error.message); } } /** * 获取任务状态摘要(用于同步到客户端) * @param {string} sn_code - 设备SN码 * @returns {Promise} 任务状态摘要 */ async getTaskStatusSummary(sn_code) { try { const queue = this.deviceQueues.get(sn_code); const status = this.deviceStatus.get(sn_code) || { isRunning: false, currentTask: null, runningCount: 0 }; // 获取当前执行的任务(优先从内存状态获取,如果没有则从数据库查询) let currentTask = null; if (status.currentTask) { const taskData = status.currentTask; currentTask = { taskId: taskData.id, taskName: taskData.taskName || taskData.task_name || taskData.taskType || taskData.task_type || '未知任务', taskType: taskData.taskType || taskData.task_type, status: 'running', progress: taskData.progress || 0, currentStep: taskData.currentStep || taskData.current_step || '', startTime: taskData.startTime || taskData.start_time || taskData.created_time }; } else { // 如果内存中没有,从数据库查询当前运行的任务 try { const taskStatusModel = db.getModel('task_status'); const runningTask = await taskStatusModel.findOne({ where: { sn_code: sn_code, status: 'running' }, order: [['id', 'DESC']] }); if (runningTask) { const taskData = runningTask.toJSON(); currentTask = { taskId: taskData.id, taskName: taskData.taskName || taskData.task_name || taskData.taskType || taskData.task_type || '未知任务', taskType: taskData.taskType || taskData.task_type, status: 'running', progress: taskData.progress || 0, currentStep: taskData.currentStep || taskData.current_step || '', startTime: taskData.startTime || taskData.start_time || taskData.created_time }; } } catch (error) { console.error(`[任务队列] 查询当前任务失败:`, error); } } // 获取待执行任务列表(优先从内存队列获取,如果没有则从数据库查询) const pendingTasks = []; if (queue && queue.size() > 0) { const queueArray = queue.toArray(); for (const task of queueArray.slice(0, 10)) { const taskData = task; pendingTasks.push({ taskId: taskData.id, taskName: taskData.taskName || taskData.task_name || taskData.taskType || taskData.task_type || '未知任务', taskType: taskData.taskType || taskData.task_type, status: 'pending', scheduledTime: taskData.scheduledTime || taskData.scheduled_time || taskData.created_time, priority: taskData.priority || 0 }); } } else { // 如果内存队列为空,从数据库查询待执行任务 try { const taskStatusModel = db.getModel('task_status'); const dbPendingTasks = await taskStatusModel.findAll({ where: { sn_code: sn_code, status: 'pending' }, order: [ ['priority', 'DESC'], ['id', 'ASC'] ], limit: 10 }); for (const taskRecord of dbPendingTasks) { const taskData = taskRecord.toJSON(); pendingTasks.push({ taskId: taskData.id, taskName: taskData.taskName || taskData.task_name || taskData.taskType || taskData.task_type || '未知任务', taskType: taskData.taskType || taskData.task_type, status: 'pending', scheduledTime: taskData.scheduledTime || taskData.scheduled_time || taskData.created_time, priority: taskData.priority || 0 }); } } catch (error) { console.error(`[任务队列] 查询待执行任务失败:`, error); } } // 计算下次任务执行时间(队列中第一个任务的计划时间) let nextTaskTime = null; if (queue && queue.size() > 0) { const firstTask = queue.peek(); if (firstTask && (firstTask.scheduledTime || firstTask.scheduled_time)) { nextTaskTime = firstTask.scheduledTime || firstTask.scheduled_time; } } return { sn_code, currentTask, pendingTasks, nextTaskTime, pendingCount: queue ? queue.size() : 0, mqttTopic: `task_status_${sn_code}`, timestamp: new Date().toISOString() }; } catch (error) { console.error(`[任务队列] 获取任务状态摘要失败:`, error, { sn_code }); return { sn_code, currentTask: null, pendingTasks: [], nextTaskTime: null, pendingCount: 0, mqttTopic: `task_status_${sn_code}`, timestamp: new Date().toISOString() }; } } /** * 向客户端发送任务状态摘要 * @param {string} sn_code - 设备SN码 */ async sendTaskStatusSummary(sn_code) { try { const mqttClient = await this.getMqttClient(); if (!mqttClient) { return; // MQTT客户端不可用,静默失败 } const summary = await this.getTaskStatusSummary(sn_code); // 通过MQTT发布任务状态摘要 // 主题格式: task_status_{sn_code} const topic = `task_status_${sn_code}`; const message = JSON.stringify({ action: 'task_status_summary', data: summary, timestamp: new Date().toISOString() }); await mqttClient.publish(topic, message); console.log(`[任务队列] 已发送任务状态摘要到 ${sn_code}: 当前任务=${summary.currentTask ? '有' : '无'}, 待执行=${summary.pendingCount}个`); } catch (error) { // 通知失败不影响任务执行,只记录日志 console.warn(`[任务队列] 发送任务状态摘要失败:`, error.message); } } } // 导出单例 const taskQueue = new TaskQueue(); module.exports = taskQueue;