const logs = require('../logProxy'); const db = require('../dbProxy'); const jobManager = require('../job/index'); const ScheduleUtils = require('./utils'); const ScheduleConfig = require('./config'); /** * 指令管理器 * 负责管理任务下的多个指令,简化MQTT通信流程 */ class CommandManager { constructor() { this.pendingCommands = new Map(); // 等待响应的指令 { commandId: { resolve, reject, timeout } } } /** * 执行指令序列 * @param {Array} commands - 指令数组 * @param {object} mqttClient - MQTT客户端 * @param {object} options - 执行选项 * @returns {Promise} 执行结果 */ async executeCommands(taskId, commands, mqttClient, options = {}) { // try { if (!commands || commands.length === 0) { throw new Error('没有找到要执行的指令'); } const { maxRetries = 1, // 最大重试次数 retryDelay = 1000 // 重试延迟(毫秒) } = options; console.log(`[指令管理] 开始执行 ${commands.length} 个指令`); const results = []; const errors = []; // 顺序执行指令,失败时停止 for (let i = 0; i < commands.length; i++) { const command = commands[i]; let retryCount = 0; let commandResult = null; // 重试逻辑 while (retryCount <= maxRetries) { console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name} (尝试 ${retryCount + 1}/${maxRetries + 1})`); commandResult = await this.executeCommand(taskId, command, mqttClient); results. push(commandResult); break; // 成功执行,跳出重试循环 } } const successCount = results.length; const errorCount = errors.length; console.log(`[指令管理] 指令执行完成: 成功 ${successCount}/${commands.length}, 失败 ${errorCount}`); return { success: errorCount === 0, // 只有全部成功才算成功 results: results, errors: errors, totalCommands: commands.length, successCount: successCount, errorCount: errorCount }; // } catch (error) { // console.error(`[指令管理] 执行指令序列失败:`, error); // throw error; // } } /** * 执行单个指令 * @param {object} command - 指令对象 * @param {object} mqttClient - MQTT客户端 * @returns {Promise} 执行结果 */ async executeCommand(taskId, command, mqttClient) { const startTime = new Date(); let commandRecord = null; const task = await db.getModel('task_status').findByPk(taskId); // 获取指令信息(支持两种格式) const commandName = command.command_name; const commandType = command.command_type; const commandParams = command.command_params ? JSON.parse(command.command_params) : {}; // 创建指令记录 commandRecord = await db.getModel('task_commands').create({ task_id: taskId, command_type: commandType, command_name: commandName, command_params: JSON.stringify(commandParams), priority: command.priority || 1, sequence: command.sequence || 1, max_retries: command.maxRetries || command.max_retries || 3, status: 'pending' }); let commandId = commandRecord.id; console.log(`[指令管理] 创建指令记录: ${commandName} (ID: ${commandId})`); // 更新指令状态为运行中 await this.updateCommandStatus(commandId, 'running'); console.log(`[指令管理] 执行指令: ${commandName} (ID: ${commandId})`); const sn_code = task.sn_code; // 将驼峰命名转换为下划线命名(如:getOnlineResume -> get_online_resume) const toSnakeCase = (str) => { // 如果已经是下划线格式,直接返回 if (str.includes('_')) { return str; } // 驼峰转下划线 return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, ''); }; const methodName = toSnakeCase(commandType); // 获取指令超时时间(从配置中获取,默认5分钟) const timeout = ScheduleConfig.taskTimeouts[commandType] || ScheduleConfig.taskTimeouts[methodName] || 5 * 60 * 1000; let result; try { // 使用超时机制包装指令执行 const commandPromise = (async () => { if (commandType && jobManager[methodName]) { return await jobManager[methodName](sn_code, mqttClient, commandParams); } else { // 如果转换后找不到,尝试直接使用原名称 if (jobManager[commandType]) { return await jobManager[commandType](sn_code, mqttClient, commandParams); } else { throw new Error(`未知的指令类型: ${commandType} (尝试的方法名: ${methodName})`); } } })(); // 使用超时机制 result = await ScheduleUtils.withTimeout( commandPromise, timeout, `指令执行超时: ${commandName} (超时时间: ${timeout / 1000}秒)` ); } catch (error) { const endTime = new Date(); const duration = endTime - startTime; // 如果是超时错误,更新指令状态为失败 const errorMessage = error.message || '指令执行失败'; await this.updateCommandStatus(commandId, 'failed', null, errorMessage); throw error; } const endTime = new Date(); const duration = endTime - startTime; // 更新指令状态为完成 await this.updateCommandStatus(commandId, 'completed', result); return { commandId: commandId, commandName: commandName, result: result, duration: duration, success: true }; } /** * 更新指令状态 * @param {number} commandId - 指令ID * @param {string} status - 状态 * @param {object} result - 结果 * @param {string} errorMessage - 错误信息 */ async updateCommandStatus(commandId, status, result = null, errorMessage = null) { try { const updateData = { status: status, updated_at: new Date() }; if (status === 'running') { updateData.start_time = new Date(); } else if (status === 'completed' || status === 'failed') { updateData.end_time = new Date(); if (result) { // 将结果转换为JSON字符串,并限制长度(TEXT类型最大约65KB) let resultStr = JSON.stringify(result); const maxLength = 60000; // 限制为60KB,留一些余量 if (resultStr.length > maxLength) { // 如果结果太长,尝试压缩或截断 try { // 如果是对象,尝试只保存关键信息 if (typeof result === 'object' && result !== null) { const summary = { success: result.success !== undefined ? result.success : true, message: result.message || '执行成功', dataLength: resultStr.length, truncated: true, preview: resultStr.substring(0, 1000) // 保存前1000字符作为预览 }; resultStr = JSON.stringify(summary); } else { // 直接截断 resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]'; } } catch (e) { // 如果处理失败,直接截断 resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]'; } } updateData.result = resultStr; updateData.progress = 100; } if (errorMessage) { // 错误信息也限制长度 const maxErrorLength = 10000; // 错误信息限制10KB updateData.error_message = errorMessage.length > maxErrorLength ? errorMessage.substring(0, maxErrorLength) + '...[错误信息已截断]' : errorMessage; } // 计算执行时长 const command = await db.getModel('task_commands').findByPk(commandId); if (command && command.start_time) { const duration = new Date() - new Date(command.start_time); updateData.duration = duration; } } await db.getModel('task_commands').update(updateData, { where: { id: commandId } }); } catch (error) { logs.error(`[指令管理] 更新指令状态失败:`, error, { commandId: commandId, status: status }); // 如果是因为数据太长导致的错误,尝试只保存错误信息 if (error.message && error.message.includes('Data too long')) { try { await db.getModel('task_commands').update({ status: status, error_message: '结果数据过长,无法保存完整结果', end_time: new Date(), updated_at: new Date() }, { where: { id: commandId } }); } catch (e) { console.error(`[指令管理] 保存截断结果也失败:`, e); } } } } /** * 清理过期的指令记录 * @param {number} days - 保留天数 */ async cleanupExpiredCommands(days = 30) { try { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - days); const deletedCount = await db.getModel('task_commands').destroy({ where: { create_time: { [db.Sequelize.Op.lt]: cutoffDate } } }); console.log(`[指令管理] 清理了 ${deletedCount} 条过期指令记录`); return deletedCount; } catch (error) { logs.error(`[指令管理] 清理过期指令失败:`, error); return 0; } } } module.exports = new CommandManager();