309 lines
11 KiB
JavaScript
309 lines
11 KiB
JavaScript
const logs = require('../logProxy');
|
||
const db = require('../dbProxy');
|
||
const jobManager = require('../job/index');
|
||
const ScheduleUtils = require('./utils');
|
||
const ScheduleConfig = require('./config');
|
||
|
||
|
||
/**
|
||
* 指令管理器
|
||
* 负责管理任务下的多个指令,简化MQTT通信流程
|
||
*/
|
||
class CommandManager {
|
||
constructor() {
|
||
this.pendingCommands = new Map(); // 等待响应的指令 { commandId: { resolve, reject, timeout } }
|
||
}
|
||
|
||
|
||
|
||
/**
|
||
* 执行指令序列
|
||
* @param {Array} commands - 指令数组
|
||
* @param {object} mqttClient - MQTT客户端
|
||
* @param {object} options - 执行选项
|
||
* @returns {Promise<object>} 执行结果
|
||
*/
|
||
async executeCommands(taskId, commands, mqttClient, options = {}) {
|
||
// try {
|
||
if (!commands || commands.length === 0) {
|
||
throw new Error('没有找到要执行的指令');
|
||
}
|
||
|
||
const {
|
||
maxRetries = 1, // 最大重试次数
|
||
retryDelay = 1000 // 重试延迟(毫秒)
|
||
} = options;
|
||
|
||
console.log(`[指令管理] 开始执行 ${commands.length} 个指令`);
|
||
|
||
const results = [];
|
||
const errors = [];
|
||
|
||
// 顺序执行指令,失败时停止
|
||
for (let i = 0; i < commands.length; i++) {
|
||
const command = commands[i];
|
||
let retryCount = 0;
|
||
let commandResult = null;
|
||
|
||
// 重试逻辑
|
||
while (retryCount <= maxRetries) {
|
||
|
||
console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name} (尝试 ${retryCount + 1}/${maxRetries + 1})`);
|
||
|
||
commandResult = await this.executeCommand(taskId, command, mqttClient);
|
||
|
||
results. push(commandResult);
|
||
break; // 成功执行,跳出重试循环
|
||
|
||
}
|
||
|
||
}
|
||
|
||
const successCount = results.length;
|
||
const errorCount = errors.length;
|
||
|
||
console.log(`[指令管理] 指令执行完成: 成功 ${successCount}/${commands.length}, 失败 ${errorCount}`);
|
||
|
||
return {
|
||
success: errorCount === 0, // 只有全部成功才算成功
|
||
results: results,
|
||
errors: errors,
|
||
totalCommands: commands.length,
|
||
successCount: successCount,
|
||
errorCount: errorCount
|
||
};
|
||
|
||
// } catch (error) {
|
||
// console.error(`[指令管理] 执行指令序列失败:`, error);
|
||
// throw error;
|
||
// }
|
||
}
|
||
|
||
|
||
/**
|
||
* 执行单个指令
|
||
* @param {object} command - 指令对象
|
||
* @param {object} mqttClient - MQTT客户端
|
||
* @returns {Promise<object>} 执行结果
|
||
*/
|
||
async executeCommand(taskId, command, mqttClient) {
|
||
const startTime = new Date();
|
||
let commandRecord = null;
|
||
|
||
const task = await db.getModel('task_status').findByPk(taskId);
|
||
|
||
// 获取指令信息(支持两种格式)
|
||
const commandName = command.command_name;
|
||
const commandType = command.command_type;
|
||
const commandParams = command.command_params ? JSON.parse(command.command_params) : {};
|
||
|
||
// 创建指令记录
|
||
commandRecord = await db.getModel('task_commands').create({
|
||
task_id: taskId,
|
||
command_type: commandType,
|
||
command_name: commandName,
|
||
command_params: JSON.stringify(commandParams),
|
||
priority: command.priority || 1,
|
||
sequence: command.sequence || 1,
|
||
max_retries: command.maxRetries || command.max_retries || 3,
|
||
status: 'pending'
|
||
});
|
||
|
||
let commandId = commandRecord.id;
|
||
console.log(`[指令管理] 创建指令记录: ${commandName} (ID: ${commandId})`);
|
||
|
||
|
||
// 更新指令状态为运行中
|
||
await this.updateCommandStatus(commandId, 'running');
|
||
|
||
|
||
console.log(`[指令管理] 执行指令: ${commandName} (ID: ${commandId})`);
|
||
|
||
const sn_code = task.sn_code;
|
||
|
||
// 将驼峰命名转换为下划线命名(如:getOnlineResume -> get_online_resume)
|
||
const toSnakeCase = (str) => {
|
||
// 如果已经是下划线格式,直接返回
|
||
if (str.includes('_')) {
|
||
return str;
|
||
}
|
||
// 驼峰转下划线
|
||
return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, '');
|
||
};
|
||
|
||
const methodName = toSnakeCase(commandType);
|
||
|
||
// 获取指令超时时间(从配置中获取,默认5分钟)
|
||
const timeout = ScheduleConfig.taskTimeouts[commandType] || ScheduleConfig.taskTimeouts[methodName] || 5 * 60 * 1000;
|
||
|
||
let result;
|
||
try {
|
||
// 使用超时机制包装指令执行
|
||
const commandPromise = (async () => {
|
||
if (commandType && jobManager[methodName]) {
|
||
return await jobManager[methodName](sn_code, mqttClient, commandParams);
|
||
} else {
|
||
// 如果转换后找不到,尝试直接使用原名称
|
||
if (jobManager[commandType]) {
|
||
return await jobManager[commandType](sn_code, mqttClient, commandParams);
|
||
} else {
|
||
throw new Error(`未知的指令类型: ${commandType} (尝试的方法名: ${methodName})`);
|
||
}
|
||
}
|
||
})();
|
||
|
||
// 使用超时机制
|
||
result = await ScheduleUtils.withTimeout(
|
||
commandPromise,
|
||
timeout,
|
||
`指令执行超时: ${commandName} (超时时间: ${timeout / 1000}秒)`
|
||
);
|
||
} catch (error) {
|
||
const endTime = new Date();
|
||
const duration = endTime - startTime;
|
||
|
||
// 如果是超时错误,更新指令状态为失败
|
||
const errorMessage = error.message || '指令执行失败';
|
||
await this.updateCommandStatus(commandId, 'failed', null, errorMessage);
|
||
|
||
throw error;
|
||
}
|
||
|
||
const endTime = new Date();
|
||
const duration = endTime - startTime;
|
||
|
||
// 更新指令状态为完成
|
||
await this.updateCommandStatus(commandId, 'completed', result);
|
||
|
||
return {
|
||
commandId: commandId,
|
||
commandName: commandName,
|
||
result: result,
|
||
duration: duration,
|
||
success: true
|
||
};
|
||
|
||
|
||
}
|
||
|
||
|
||
/**
|
||
* 更新指令状态
|
||
* @param {number} commandId - 指令ID
|
||
* @param {string} status - 状态
|
||
* @param {object} result - 结果
|
||
* @param {string} errorMessage - 错误信息
|
||
*/
|
||
async updateCommandStatus(commandId, status, result = null, errorMessage = null) {
|
||
try {
|
||
const updateData = {
|
||
status: status,
|
||
updated_at: new Date()
|
||
};
|
||
|
||
if (status === 'running') {
|
||
updateData.start_time = new Date();
|
||
} else if (status === 'completed' || status === 'failed') {
|
||
updateData.end_time = new Date();
|
||
if (result) {
|
||
// 将结果转换为JSON字符串,并限制长度(TEXT类型最大约65KB)
|
||
let resultStr = JSON.stringify(result);
|
||
const maxLength = 60000; // 限制为60KB,留一些余量
|
||
|
||
if (resultStr.length > maxLength) {
|
||
// 如果结果太长,尝试压缩或截断
|
||
try {
|
||
// 如果是对象,尝试只保存关键信息
|
||
if (typeof result === 'object' && result !== null) {
|
||
const summary = {
|
||
success: result.success !== undefined ? result.success : true,
|
||
message: result.message || '执行成功',
|
||
dataLength: resultStr.length,
|
||
truncated: true,
|
||
preview: resultStr.substring(0, 1000) // 保存前1000字符作为预览
|
||
};
|
||
resultStr = JSON.stringify(summary);
|
||
} else {
|
||
// 直接截断
|
||
resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]';
|
||
}
|
||
} catch (e) {
|
||
// 如果处理失败,直接截断
|
||
resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]';
|
||
}
|
||
}
|
||
updateData.result = resultStr;
|
||
updateData.progress = 100;
|
||
}
|
||
if (errorMessage) {
|
||
// 错误信息也限制长度
|
||
const maxErrorLength = 10000; // 错误信息限制10KB
|
||
updateData.error_message = errorMessage.length > maxErrorLength
|
||
? errorMessage.substring(0, maxErrorLength) + '...[错误信息已截断]'
|
||
: errorMessage;
|
||
}
|
||
// 计算执行时长
|
||
const command = await db.getModel('task_commands').findByPk(commandId);
|
||
if (command && command.start_time) {
|
||
const duration = new Date() - new Date(command.start_time);
|
||
updateData.duration = duration;
|
||
}
|
||
}
|
||
|
||
await db.getModel('task_commands').update(updateData, {
|
||
where: { id: commandId }
|
||
});
|
||
|
||
} catch (error) {
|
||
logs.error(`[指令管理] 更新指令状态失败:`, error, {
|
||
commandId: commandId,
|
||
status: status
|
||
});
|
||
// 如果是因为数据太长导致的错误,尝试只保存错误信息
|
||
if (error.message && error.message.includes('Data too long')) {
|
||
try {
|
||
await db.getModel('task_commands').update({
|
||
status: status,
|
||
error_message: '结果数据过长,无法保存完整结果',
|
||
end_time: new Date(),
|
||
updated_at: new Date()
|
||
}, {
|
||
where: { id: commandId }
|
||
});
|
||
} catch (e) {
|
||
console.error(`[指令管理] 保存截断结果也失败:`, e);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
/**
|
||
* 清理过期的指令记录
|
||
* @param {number} days - 保留天数
|
||
*/
|
||
async cleanupExpiredCommands(days = 30) {
|
||
try {
|
||
const cutoffDate = new Date();
|
||
cutoffDate.setDate(cutoffDate.getDate() - days);
|
||
|
||
const deletedCount = await db.getModel('task_commands').destroy({
|
||
where: {
|
||
create_time: {
|
||
[db.Sequelize.Op.lt]: cutoffDate
|
||
}
|
||
}
|
||
});
|
||
|
||
console.log(`[指令管理] 清理了 ${deletedCount} 条过期指令记录`);
|
||
return deletedCount;
|
||
|
||
} catch (error) {
|
||
logs.error(`[指令管理] 清理过期指令失败:`, error);
|
||
return 0;
|
||
}
|
||
}
|
||
}
|
||
|
||
module.exports = new CommandManager();
|