const logs = require('../logProxy'); const db = require('../dbProxy'); const jobManager = require('../job/index'); const ScheduleUtils = require('./utils'); const ScheduleConfig = require('./config'); /** * 指令管理器 * 负责管理任务下的多个指令,简化MQTT通信流程 */ class CommandManager { constructor() { this.pendingCommands = new Map(); // 等待响应的指令 { command_id: { resolve, reject, timeout } } } /** * 执行指令序列 * @param {number} task_id - 任务ID * @param {Array} commands - 指令数组 * @param {object} mqttClient - MQTT客户端 * @param {object} options - 执行选项(保留用于扩展) * @returns {Promise} 执行结果 */ async executeCommands(task_id, commands, mqttClient, options = {}) { if (!commands || commands.length === 0) { throw new Error('没有找到要执行的指令'); } console.log(`[指令管理] 开始执行 ${commands.length} 个指令`); const results = []; const errors = []; // 顺序执行指令,失败时继续执行后续指令 for (let i = 0; i < commands.length; i++) { const command = commands[i]; try { console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name}`); const commandResult = await this.executeCommand(task_id, command, mqttClient); results.push(commandResult); } catch (error) { // 指令执行失败,记录错误但继续执行后续指令 errors.push({ command: command, error: error }); console.error(`[指令管理] 指令执行失败: ${command.command_name || command.name}, 错误: ${error.message}`); // 失败时继续执行后续指令 } } const successCount = results.length; const errorCount = errors.length; console.log(`[指令管理] 指令执行完成: 成功 ${successCount}/${commands.length}, 失败 ${errorCount}`); return { success: errorCount === 0, // 只有全部成功才算成功 results: results, errors: errors, totalCommands: commands.length, successCount: successCount, errorCount: errorCount }; } /** * 执行单个指令(统一封装) * 统一处理成功、失败、超时,统一记录数据库 * @param {number} task_id - 任务ID * @param {object} command - 指令对象 * @param {object} mqttClient - MQTT客户端 * @returns {Promise} 执行结果 */ async executeCommand(task_id, command, mqttClient) { const start_time = new Date(); let command_id = null; let command_record = null; try { // 1. 获取任务信息 const task = await db.getModel('task_status').findByPk(task_id); if (!task) { throw new Error(`任务不存在: ${task_id}`); } // 2. 获取指令信息 const command_name = command.command_name || command.name || '未知指令'; const command_type = command.command_type || command.type; const command_params = command.command_params ? (typeof command.command_params === 'string' ? JSON.parse(command.command_params) : command.command_params) : {}; if (!command_type) { throw new Error('指令类型不能为空'); } // 3. 创建指令记录 command_record = await db.getModel('task_commands').create({ task_id: task_id, command_type: command_type, command_name: command_name, command_params: JSON.stringify(command_params), priority: command.priority || 1, sequence: command.sequence || 1, max_retries: command.maxRetries || command.max_retries || 3, status: 'pending' }); command_id = command_record.id; console.log(`[指令管理] 创建指令记录: ${command_name} (ID: ${command_id})`); // 4. 更新指令状态为运行中 await this._update_command_status(command_id, 'running', null, null, start_time); // 5. 执行指令(统一封装) const result = await this._execute_command_with_timeout( command_id, command_type, command_name, command_params, task.sn_code, mqttClient, start_time ); // 6. 记录成功结果 await this._record_command_result(command_id, 'completed', result, null, start_time); return { command_id: command_id, command_name: command_name, result: result, duration: new Date() - start_time, success: true }; } catch (error) { // 统一处理错误(失败或超时) if (command_id) { await this._record_command_result( command_id, 'failed', null, error, start_time ); } // 重新抛出错误,让调用方知道执行失败 throw error; } } /** * 执行指令(带超时保护) * @private */ async _execute_command_with_timeout(command_id, command_type, command_name, command_params, sn_code, mqttClient, start_time) { // 将驼峰命名转换为下划线命名 const to_snake_case = (str) => { if (str.includes('_')) { return str; } return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, ''); }; const method_name = to_snake_case(command_type); // 获取指令超时时间(从配置中获取,默认5分钟) const timeout = ScheduleConfig.taskTimeouts[command_type] || ScheduleConfig.taskTimeouts[method_name] || 5 * 60 * 1000; // 构建指令执行 Promise const command_promise = (async () => { if (command_type && jobManager[method_name]) { return await jobManager[method_name](sn_code, mqttClient, command_params); } else if (jobManager[command_type]) { return await jobManager[command_type](sn_code, mqttClient, command_params); } else { throw new Error(`未知的指令类型: ${command_type} (尝试的方法名: ${method_name})`); } })(); // 使用超时机制包装 try { const result = await ScheduleUtils.withTimeout( command_promise, timeout, `指令执行超时: ${command_name} (超时时间: ${timeout / 1000}秒)` ); return result; } catch (error) { // 判断是否为超时错误 if (error.message && error.message.includes('超时')) { error.isTimeout = true; } throw error; } } /** * 记录指令执行结果(统一封装) * 处理成功、失败、超时等情况,统一记录到数据库 * @private */ async _record_command_result(command_id, status, result, error, start_time) { const end_time = new Date(); const duration = end_time.getTime() - start_time.getTime(); let error_message = null; let error_stack = null; let result_data = null; // 处理错误信息 if (error) { error_message = error.message || '指令执行失败'; error_stack = error.stack || ''; // 如果是超时错误,添加标识 if (error.isTimeout) { error_message = `[超时] ${error_message}`; } } // 处理结果数据 if (result && status === 'completed') { result_data = result; } // 更新数据库 await this._update_command_status( command_id, status, result_data, error_message, start_time, end_time, duration, error_stack ); // 记录日志 if (status === 'completed') { console.log(`[指令管理] 指令执行成功: ${command_id} (耗时: ${duration}ms)`); } else if (status === 'failed') { const error_type = error && error.isTimeout ? '超时' : '失败'; console.error(`[指令管理] 指令执行${error_type}: ${command_id}, 错误: ${error_message}`, { command_id: command_id, duration: duration, isTimeout: error && error.isTimeout }); } } /** * 更新指令状态(统一封装) * @private */ async _update_command_status(command_id, status, result, error_message, start_time, end_time, duration, error_stack) { try { const update_data = { status: status, updated_at: new Date() }; // 设置开始时间 if (status === 'running' && start_time) { update_data.start_time = start_time; } // 设置结束时间和执行时长 if ((status === 'completed' || status === 'failed') && end_time) { update_data.end_time = end_time; if (duration !== undefined) { update_data.duration = duration; } } // 处理执行结果 if (result && status === 'completed') { const result_str = this._format_result_for_storage(result); update_data.result = result_str; update_data.progress = 100; } // 处理错误信息 if (error_message) { update_data.error_message = this._truncate_string(error_message, 10000); } if (error_stack) { update_data.error_stack = this._truncate_string(error_stack, 50000); } // 更新数据库 await db.getModel('task_commands').update(update_data, { where: { id: command_id } }); } catch (db_error) { logs.error(`[指令管理] 更新指令状态失败:`, db_error, { command_id: command_id, status: status }); // 如果是因为数据太长导致的错误,尝试只保存错误信息 if (db_error.message && db_error.message.includes('Data too long')) { try { await db.getModel('task_commands').update({ status: status, error_message: '结果数据过长,无法保存完整结果', end_time: end_time || new Date(), updated_at: new Date() }, { where: { id: command_id } }); } catch (e) { console.error(`[指令管理] 保存截断结果也失败:`, e); } } } } /** * 格式化结果数据用于存储 * @private */ _format_result_for_storage(result) { try { let result_str = JSON.stringify(result); const max_length = 60000; // 限制为60KB if (result_str.length > max_length) { // 如果结果太长,尝试压缩或截断 if (typeof result === 'object' && result !== null) { const summary = { success: result.success !== undefined ? result.success : true, message: result.message || '执行成功', dataLength: result_str.length, truncated: true, preview: result_str.substring(0, 1000) // 保存前1000字符作为预览 }; result_str = JSON.stringify(summary); } else { result_str = result_str.substring(0, max_length) + '...[数据已截断]'; } } return result_str; } catch (error) { return JSON.stringify({ error: '结果序列化失败', message: error.message }); } } /** * 截断字符串 * @private */ _truncate_string(str, max_length) { if (!str || str.length <= max_length) { return str; } return str.substring(0, max_length) + '...[已截断]'; } /** * 更新指令状态(兼容旧接口,内部调用统一方法) * @param {number} command_id - 指令ID * @param {string} status - 状态 * @param {object} result - 结果 * @param {string} error_message - 错误信息 * @deprecated 建议使用 _update_command_status 统一方法 */ async updateCommandStatus(command_id, status, result = null, error_message = null) { const start_time = status === 'running' ? new Date() : null; const end_time = (status === 'completed' || status === 'failed') ? new Date() : null; // 计算执行时长 let duration = null; if (end_time && start_time) { duration = end_time.getTime() - start_time.getTime(); } else if (end_time) { // 如果没有开始时间,尝试从数据库获取 try { const command = await db.getModel('task_commands').findByPk(command_id); if (command && command.start_time) { duration = end_time.getTime() - new Date(command.start_time).getTime(); } } catch (e) { // 忽略错误 } } await this._update_command_status( command_id, status, result, error_message, start_time, end_time, duration, null ); } /** * 清理过期的指令记录 * @param {number} days - 保留天数 */ async cleanupExpiredCommands(days = 30) { try { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - days); const deletedCount = await db.getModel('task_commands').destroy({ where: { create_time: { [db.Sequelize.Op.lt]: cutoffDate } } }); console.log(`[指令管理] 清理了 ${deletedCount} 条过期指令记录`); return deletedCount; } catch (error) { logs.error(`[指令管理] 清理过期指令失败:`, error); return 0; } } } module.exports = new CommandManager();