452 lines
16 KiB
JavaScript
452 lines
16 KiB
JavaScript
const logs = require('../logProxy');
|
||
const db = require('../dbProxy');
|
||
const jobManager = require('../job/index');
|
||
const ScheduleUtils = require('./utils');
|
||
const ScheduleConfig = require('./config');
|
||
const authorizationService = require('../../services/authorization_service');
|
||
|
||
|
||
/**
|
||
* 指令管理器
|
||
* 负责管理任务下的多个指令,简化MQTT通信流程
|
||
*/
|
||
class CommandManager {
|
||
constructor() {
|
||
this.pendingCommands = new Map(); // 等待响应的指令 { command_id: { resolve, reject, timeout } }
|
||
}
|
||
|
||
|
||
|
||
/**
|
||
* 执行指令序列
|
||
* @param {number} task_id - 任务ID
|
||
* @param {Array} commands - 指令数组
|
||
* @param {object} mqttClient - MQTT客户端
|
||
* @param {object} options - 执行选项(保留用于扩展)
|
||
* @returns {Promise<object>} 执行结果
|
||
*/
|
||
async executeCommands(task_id, commands, mqttClient, options = {}) {
|
||
if (!commands || commands.length === 0) {
|
||
throw new Error('没有找到要执行的指令');
|
||
}
|
||
|
||
console.log(`[指令管理] 开始执行 ${commands.length} 个指令`);
|
||
|
||
const results = [];
|
||
const errors = [];
|
||
|
||
// 顺序执行指令,失败时继续执行后续指令
|
||
for (let i = 0; i < commands.length; i++) {
|
||
const command = commands[i];
|
||
|
||
try {
|
||
console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name}`);
|
||
|
||
const commandResult = await this.executeCommand(task_id, command, mqttClient);
|
||
results.push(commandResult);
|
||
|
||
} catch (error) {
|
||
// 指令执行失败,记录错误但继续执行后续指令
|
||
errors.push({
|
||
command: command,
|
||
error: error
|
||
});
|
||
console.error(`[指令管理] 指令执行失败: ${command.command_name || command.name}, 错误: ${error.message}`);
|
||
// 失败时继续执行后续指令
|
||
}
|
||
}
|
||
|
||
const successCount = results.length;
|
||
const errorCount = errors.length;
|
||
|
||
console.log(`[指令管理] 指令执行完成: 成功 ${successCount}/${commands.length}, 失败 ${errorCount}`);
|
||
|
||
return {
|
||
success: errorCount === 0, // 只有全部成功才算成功
|
||
results: results,
|
||
errors: errors,
|
||
totalCommands: commands.length,
|
||
successCount: successCount,
|
||
errorCount: errorCount
|
||
};
|
||
}
|
||
|
||
|
||
/**
|
||
* 执行单个指令(统一封装)
|
||
* 统一处理成功、失败、超时,统一记录数据库
|
||
* @param {number} task_id - 任务ID
|
||
* @param {object} command - 指令对象
|
||
* @param {object} mqttClient - MQTT客户端
|
||
* @returns {Promise<object>} 执行结果
|
||
*/
|
||
async executeCommand(task_id, command, mqttClient) {
|
||
const start_time = new Date();
|
||
let command_id = null;
|
||
let command_record = null;
|
||
|
||
try {
|
||
// 1. 获取任务信息
|
||
const task = await db.getModel('task_status').findByPk(task_id);
|
||
if (!task) {
|
||
throw new Error(`任务不存在: ${task_id}`);
|
||
}
|
||
|
||
// 1.5 检查账号授权状态
|
||
const authCheck = await authorizationService.checkAuthorization(task.sn_code, 'sn_code');
|
||
if (!authCheck.is_authorized) {
|
||
throw new Error(`授权检查失败: ${authCheck.message}`);
|
||
}
|
||
|
||
// 2. 获取指令信息
|
||
const command_name = command.command_name || command.name || '未知指令';
|
||
const command_type = command.command_type || command.type;
|
||
const command_params = command.command_params ?
|
||
(typeof command.command_params === 'string' ? JSON.parse(command.command_params) : command.command_params) :
|
||
{};
|
||
|
||
if (!command_type) {
|
||
throw new Error('指令类型不能为空');
|
||
}
|
||
|
||
// 3. 创建指令记录
|
||
command_record = await db.getModel('task_commands').create({
|
||
task_id: task_id,
|
||
command_type: command_type,
|
||
command_name: command_name,
|
||
command_params: JSON.stringify(command_params),
|
||
priority: command.priority || 1,
|
||
sequence: command.sequence || 1,
|
||
max_retries: command.maxRetries || command.max_retries || 3,
|
||
status: 'pending'
|
||
});
|
||
|
||
command_id = command_record.id;
|
||
console.log(`[指令管理] 创建指令记录: ${command_name} (ID: ${command_id})`);
|
||
|
||
// 4. 更新指令状态为运行中
|
||
await this._update_command_status(command_id, 'running', null, null, start_time);
|
||
|
||
// 5. 执行指令(统一封装)
|
||
const result = await this._execute_command_with_timeout(
|
||
command_id,
|
||
command_type,
|
||
command_name,
|
||
command_params,
|
||
task.sn_code,
|
||
mqttClient,
|
||
start_time
|
||
);
|
||
|
||
// 6. 记录成功结果
|
||
await this._record_command_result(command_id, 'completed', result, null, start_time);
|
||
|
||
return {
|
||
command_id: command_id,
|
||
command_name: command_name,
|
||
result: result,
|
||
duration: new Date() - start_time,
|
||
success: true
|
||
};
|
||
|
||
} catch (error) {
|
||
// 统一处理错误(失败或超时)
|
||
if (command_id) {
|
||
await this._record_command_result(
|
||
command_id,
|
||
'failed',
|
||
null,
|
||
error,
|
||
start_time
|
||
);
|
||
}
|
||
|
||
// 重新抛出错误,让调用方知道执行失败
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 执行指令(带超时保护)
|
||
* @private
|
||
*/
|
||
async _execute_command_with_timeout(command_id, command_type, command_name, command_params, sn_code, mqttClient, start_time) {
|
||
// 将驼峰命名转换为下划线命名
|
||
const to_snake_case = (str) => {
|
||
if (str.includes('_')) {
|
||
return str;
|
||
}
|
||
return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, '');
|
||
};
|
||
|
||
const method_name = to_snake_case(command_type);
|
||
|
||
// 获取指令超时时间(从配置中获取,默认5分钟)
|
||
const timeout = ScheduleConfig.taskTimeouts[command_type] ||
|
||
ScheduleConfig.taskTimeouts[method_name] ||
|
||
5 * 60 * 1000;
|
||
|
||
// 构建指令执行 Promise
|
||
const command_promise = (async () => {
|
||
if (command_type && jobManager[method_name]) {
|
||
return await jobManager[method_name](sn_code, mqttClient, command_params);
|
||
} else if (jobManager[command_type]) {
|
||
return await jobManager[command_type](sn_code, mqttClient, command_params);
|
||
} else {
|
||
throw new Error(`未知的指令类型: ${command_type} (尝试的方法名: ${method_name})`);
|
||
}
|
||
})();
|
||
|
||
// 使用超时机制包装
|
||
try {
|
||
const result = await ScheduleUtils.withTimeout(
|
||
command_promise,
|
||
timeout,
|
||
`指令执行超时: ${command_name} (超时时间: ${timeout / 1000}秒)`
|
||
);
|
||
return result;
|
||
} catch (error) {
|
||
// 判断是否为超时错误
|
||
if (error.message && error.message.includes('超时')) {
|
||
error.isTimeout = true;
|
||
}
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 记录指令执行结果(统一封装)
|
||
* 处理成功、失败、超时等情况,统一记录到数据库
|
||
* @private
|
||
*/
|
||
async _record_command_result(command_id, status, result, error, start_time) {
|
||
const end_time = new Date();
|
||
const duration = end_time.getTime() - start_time.getTime();
|
||
|
||
let error_message = null;
|
||
let error_stack = null;
|
||
let result_data = null;
|
||
|
||
// 处理错误信息
|
||
if (error) {
|
||
error_message = error.message || '指令执行失败';
|
||
error_stack = error.stack || '';
|
||
|
||
// 如果是超时错误,添加标识
|
||
if (error.isTimeout) {
|
||
error_message = `[超时] ${error_message}`;
|
||
}
|
||
}
|
||
|
||
// 处理结果数据
|
||
if (result && status === 'completed') {
|
||
result_data = result;
|
||
}
|
||
|
||
// 更新数据库
|
||
await this._update_command_status(
|
||
command_id,
|
||
status,
|
||
result_data,
|
||
error_message,
|
||
start_time,
|
||
end_time,
|
||
duration,
|
||
error_stack
|
||
);
|
||
|
||
// 记录日志
|
||
if (status === 'completed') {
|
||
console.log(`[指令管理] 指令执行成功: ${command_id} (耗时: ${duration}ms)`);
|
||
} else if (status === 'failed') {
|
||
const error_type = error && error.isTimeout ? '超时' : '失败';
|
||
console.error(`[指令管理] 指令执行${error_type}: ${command_id}, 错误: ${error_message}`, {
|
||
command_id: command_id,
|
||
duration: duration,
|
||
isTimeout: error && error.isTimeout
|
||
});
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 更新指令状态(统一封装)
|
||
* @private
|
||
*/
|
||
async _update_command_status(command_id, status, result, error_message, start_time, end_time, duration, error_stack) {
|
||
try {
|
||
const update_data = {
|
||
status: status,
|
||
updated_at: new Date()
|
||
};
|
||
|
||
// 设置开始时间
|
||
if (status === 'running' && start_time) {
|
||
update_data.start_time = start_time;
|
||
}
|
||
|
||
// 设置结束时间和执行时长
|
||
if ((status === 'completed' || status === 'failed') && end_time) {
|
||
update_data.end_time = end_time;
|
||
if (duration !== undefined) {
|
||
update_data.duration = duration;
|
||
}
|
||
}
|
||
|
||
// 处理执行结果
|
||
if (result && status === 'completed') {
|
||
const result_str = this._format_result_for_storage(result);
|
||
update_data.result = result_str;
|
||
update_data.progress = 100;
|
||
}
|
||
|
||
// 处理错误信息
|
||
if (error_message) {
|
||
update_data.error_message = this._truncate_string(error_message, 10000);
|
||
}
|
||
|
||
if (error_stack) {
|
||
update_data.error_stack = this._truncate_string(error_stack, 50000);
|
||
}
|
||
|
||
// 更新数据库
|
||
await db.getModel('task_commands').update(update_data, {
|
||
where: { id: command_id }
|
||
});
|
||
|
||
} catch (db_error) {
|
||
logs.error(`[指令管理] 更新指令状态失败:`, db_error, {
|
||
command_id: command_id,
|
||
status: status
|
||
});
|
||
|
||
// 如果是因为数据太长导致的错误,尝试只保存错误信息
|
||
if (db_error.message && db_error.message.includes('Data too long')) {
|
||
try {
|
||
await db.getModel('task_commands').update({
|
||
status: status,
|
||
error_message: '结果数据过长,无法保存完整结果',
|
||
end_time: end_time || new Date(),
|
||
updated_at: new Date()
|
||
}, {
|
||
where: { id: command_id }
|
||
});
|
||
} catch (e) {
|
||
console.error(`[指令管理] 保存截断结果也失败:`, e);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 格式化结果数据用于存储
|
||
* @private
|
||
*/
|
||
_format_result_for_storage(result) {
|
||
try {
|
||
let result_str = JSON.stringify(result);
|
||
const max_length = 60000; // 限制为60KB
|
||
|
||
if (result_str.length > max_length) {
|
||
// 如果结果太长,尝试压缩或截断
|
||
if (typeof result === 'object' && result !== null) {
|
||
const summary = {
|
||
success: result.success !== undefined ? result.success : true,
|
||
message: result.message || '执行成功',
|
||
dataLength: result_str.length,
|
||
truncated: true,
|
||
preview: result_str.substring(0, 1000) // 保存前1000字符作为预览
|
||
};
|
||
result_str = JSON.stringify(summary);
|
||
} else {
|
||
result_str = result_str.substring(0, max_length) + '...[数据已截断]';
|
||
}
|
||
}
|
||
|
||
return result_str;
|
||
} catch (error) {
|
||
return JSON.stringify({ error: '结果序列化失败', message: error.message });
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 截断字符串
|
||
* @private
|
||
*/
|
||
_truncate_string(str, max_length) {
|
||
if (!str || str.length <= max_length) {
|
||
return str;
|
||
}
|
||
return str.substring(0, max_length) + '...[已截断]';
|
||
}
|
||
|
||
|
||
/**
|
||
* 更新指令状态(兼容旧接口,内部调用统一方法)
|
||
* @param {number} command_id - 指令ID
|
||
* @param {string} status - 状态
|
||
* @param {object} result - 结果
|
||
* @param {string} error_message - 错误信息
|
||
* @deprecated 建议使用 _update_command_status 统一方法
|
||
*/
|
||
async updateCommandStatus(command_id, status, result = null, error_message = null) {
|
||
const start_time = status === 'running' ? new Date() : null;
|
||
const end_time = (status === 'completed' || status === 'failed') ? new Date() : null;
|
||
|
||
// 计算执行时长
|
||
let duration = null;
|
||
if (end_time && start_time) {
|
||
duration = end_time.getTime() - start_time.getTime();
|
||
} else if (end_time) {
|
||
// 如果没有开始时间,尝试从数据库获取
|
||
try {
|
||
const command = await db.getModel('task_commands').findByPk(command_id);
|
||
if (command && command.start_time) {
|
||
duration = end_time.getTime() - new Date(command.start_time).getTime();
|
||
}
|
||
} catch (e) {
|
||
// 忽略错误
|
||
}
|
||
}
|
||
|
||
await this._update_command_status(
|
||
command_id,
|
||
status,
|
||
result,
|
||
error_message,
|
||
start_time,
|
||
end_time,
|
||
duration,
|
||
null
|
||
);
|
||
}
|
||
|
||
|
||
/**
|
||
* 清理过期的指令记录
|
||
* @param {number} days - 保留天数
|
||
*/
|
||
async cleanupExpiredCommands(days = 30) {
|
||
try {
|
||
const cutoffDate = new Date();
|
||
cutoffDate.setDate(cutoffDate.getDate() - days);
|
||
|
||
const deletedCount = await db.getModel('task_commands').destroy({
|
||
where: {
|
||
create_time: {
|
||
[db.Sequelize.Op.lt]: cutoffDate
|
||
}
|
||
}
|
||
});
|
||
|
||
console.log(`[指令管理] 清理了 ${deletedCount} 条过期指令记录`);
|
||
return deletedCount;
|
||
|
||
} catch (error) {
|
||
logs.error(`[指令管理] 清理过期指令失败:`, error);
|
||
return 0;
|
||
}
|
||
}
|
||
}
|
||
|
||
module.exports = new CommandManager();
|