Files
autoAiWorkSys/api/middleware/schedule/command.js
张成 dcaf0cb428 1
2025-12-30 14:37:33 +08:00

482 lines
17 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const logs = require('../logProxy');
const db = require('../dbProxy');
const jobManager = require('../job/index');
const ScheduleUtils = require('./utils');
const ScheduleConfig = require('./config');
const authorizationService = require('../../services/authorization_service');
/**
* 指令管理器
* 负责管理任务下的多个指令简化MQTT通信流程
*/
class CommandManager {
constructor() {
this.pendingCommands = new Map(); // 等待响应的指令 { command_id: { resolve, reject, timeout } }
}
/**
* 执行指令序列
* @param {number} task_id - 任务ID
* @param {Array} commands - 指令数组
* @param {object} mqttClient - MQTT客户端
* @param {object} options - 执行选项(保留用于扩展)
* @returns {Promise<object>} 执行结果
*/
async executeCommands(task_id, commands, mqttClient, options = {}) {
if (!commands || commands.length === 0) {
throw new Error('没有找到要执行的指令');
}
console.log(`[指令管理] 开始执行 ${commands.length} 个指令`);
const results = [];
const errors = [];
// 顺序执行指令,失败时继续执行后续指令
for (let i = 0; i < commands.length; i++) {
const command = commands[i];
try {
console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name}`);
const commandResult = await this.executeCommand(task_id, command, mqttClient);
results.push(commandResult);
} catch (error) {
// 指令执行失败,记录错误但继续执行后续指令
errors.push({
command: command,
error: error
});
console.error(`[指令管理] 指令执行失败: ${command.command_name || command.name}, 错误: ${error.message}`);
// 失败时继续执行后续指令
}
}
const successCount = results.length;
const errorCount = errors.length;
console.log(`[指令管理] 指令执行完成: 成功 ${successCount}/${commands.length}, 失败 ${errorCount}`);
return {
success: errorCount === 0, // 只有全部成功才算成功
results: results,
errors: errors,
totalCommands: commands.length,
successCount: successCount,
errorCount: errorCount
};
}
/**
* 执行单个指令(统一封装)
* 统一处理成功、失败、超时,统一记录数据库
* @param {number} task_id - 任务ID
* @param {object} command - 指令对象
* @param {object} mqttClient - MQTT客户端
* @returns {Promise<object>} 执行结果
*/
async executeCommand(task_id, command, mqttClient) {
const start_time = new Date();
let command_id = null;
let command_record = null;
try {
// 1. 获取任务信息
const task = await db.getModel('task_status').findByPk(task_id);
if (!task) {
throw new Error(`任务不存在: ${task_id}`);
}
// 1.5 检查账号授权状态
const authCheck = await authorizationService.checkAuthorization(task.sn_code, 'sn_code');
if (!authCheck.is_authorized) {
throw new Error(`授权检查失败: ${authCheck.message}`);
}
// 2. 获取指令信息
const command_name = command.command_name || command.name || '未知指令';
const command_type = command.command_type || command.type;
const command_params = command.command_params ?
(typeof command.command_params === 'string' ? JSON.parse(command.command_params) : command.command_params) :
{};
if (!command_type) {
throw new Error('指令类型不能为空');
}
// 3. 创建指令记录
command_record = await db.getModel('task_commands').create({
task_id: task_id,
command_type: command_type,
command_name: command_name,
command_params: JSON.stringify(command_params),
priority: command.priority || 1,
sequence: command.sequence || 1,
max_retries: command.maxRetries || command.max_retries || 3,
status: 'pending'
});
command_id = command_record.id;
console.log(`[指令管理] 创建指令记录: ${command_name} (ID: ${command_id})`);
// 4. 更新指令状态为运行中
await this._update_command_status(command_id, 'running', null, null, start_time);
// 4.5 推送指令开始执行状态
try {
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const taskQueue = require('./taskQueue');
const summary = await taskQueue.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary, {
currentCommand: {
command_id: command_id,
command_name: command_name,
command_type: command_type,
command_params: command_params,
progress: 0,
startTime: start_time.toISOString()
}
});
} catch (pushError) {
// 推送失败不影响指令执行
console.warn(`[指令管理] 推送设备工作状态失败:`, pushError.message);
}
// 5. 执行指令(统一封装)
const result = await this._execute_command_with_timeout(
command_id,
command_type,
command_name,
command_params,
task.sn_code,
mqttClient,
start_time
);
// 6. 记录成功结果
await this._record_command_result(command_id, 'completed', result, null, start_time);
// 6.5 推送指令完成状态
try {
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const taskQueue = require('./taskQueue');
const summary = await taskQueue.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
} catch (pushError) {
// 推送失败不影响指令执行
console.warn(`[指令管理] 推送设备工作状态失败:`, pushError.message);
}
return {
command_id: command_id,
command_name: command_name,
result: result,
duration: new Date() - start_time,
success: true
};
} catch (error) {
// 统一处理错误(失败或超时)
if (command_id) {
await this._record_command_result(
command_id,
'failed',
null,
error,
start_time
);
// 推送指令失败状态
try {
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const taskQueue = require('./taskQueue');
const summary = await taskQueue.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
} catch (pushError) {
// 推送失败不影响错误处理
console.warn(`[指令管理] 推送设备工作状态失败:`, pushError.message);
}
}
// 重新抛出错误,让调用方知道执行失败
throw error;
}
}
/**
* 执行指令(带超时保护)
* @private
*/
async _execute_command_with_timeout(command_id, command_type, command_name, command_params, sn_code, mqttClient, start_time) {
// 获取指令超时时间从配置中获取默认5分钟
const timeout = ScheduleConfig.taskTimeouts[command_type] || 5 * 60 * 1000;
// 构建指令执行 Promise
const command_promise = (async () => {
// 直接使用 command_type 调用 jobManager 的方法,不做映射
// command_type 和 jobManager 的方法名保持一致
if (jobManager[command_type]) {
return await jobManager[command_type](sn_code, mqttClient, command_params);
} else {
throw new Error(`未知的指令类型: ${command_type}, jobManager 中不存在对应方法`);
}
})();
// 使用超时机制包装
try {
const result = await ScheduleUtils.withTimeout(
command_promise,
timeout,
`指令执行超时: ${command_name} (超时时间: ${timeout / 1000}秒)`
);
return result;
} catch (error) {
// 判断是否为超时错误
if (error.message && error.message.includes('超时')) {
error.isTimeout = true;
}
throw error;
}
}
/**
* 记录指令执行结果(统一封装)
* 处理成功、失败、超时等情况,统一记录到数据库
* @private
*/
async _record_command_result(command_id, status, result, error, start_time) {
const end_time = new Date();
const duration = end_time.getTime() - start_time.getTime();
let error_message = null;
let error_stack = null;
let result_data = null;
// 处理错误信息
if (error) {
error_message = error.message || '指令执行失败';
error_stack = error.stack || '';
// 如果是超时错误,添加标识
if (error.isTimeout) {
error_message = `[超时] ${error_message}`;
}
}
// 处理结果数据
if (result && status === 'completed') {
result_data = result;
}
// 更新数据库
await this._update_command_status(
command_id,
status,
result_data,
error_message,
start_time,
end_time,
duration,
error_stack
);
// 记录日志
if (status === 'completed') {
console.log(`[指令管理] 指令执行成功: ${command_id} (耗时: ${duration}ms)`);
} else if (status === 'failed') {
const error_type = error && error.isTimeout ? '超时' : '失败';
console.error(`[指令管理] 指令执行${error_type}: ${command_id}, 错误: ${error_message}`, {
command_id: command_id,
duration: duration,
isTimeout: error && error.isTimeout
});
}
}
/**
* 更新指令状态(统一封装)
* @private
*/
async _update_command_status(command_id, status, result, error_message, start_time, end_time, duration, error_stack) {
try {
const update_data = {
status: status,
updated_at: new Date()
};
// 设置开始时间
if (status === 'running' && start_time) {
update_data.start_time = start_time;
}
// 设置结束时间和执行时长
if ((status === 'completed' || status === 'failed') && end_time) {
update_data.end_time = end_time;
if (duration !== undefined) {
update_data.duration = duration;
}
}
// 处理执行结果
if (result && status === 'completed') {
const result_str = this._format_result_for_storage(result);
update_data.result = result_str;
update_data.progress = 100;
}
// 处理错误信息
if (error_message) {
update_data.error_message = this._truncate_string(error_message, 10000);
}
if (error_stack) {
update_data.error_stack = this._truncate_string(error_stack, 50000);
}
// 更新数据库
await db.getModel('task_commands').update(update_data, {
where: { id: command_id }
});
} catch (db_error) {
logs.error(`[指令管理] 更新指令状态失败:`, db_error, {
command_id: command_id,
status: status
});
// 如果是因为数据太长导致的错误,尝试只保存错误信息
if (db_error.message && db_error.message.includes('Data too long')) {
try {
await db.getModel('task_commands').update({
status: status,
error_message: '结果数据过长,无法保存完整结果',
end_time: end_time || new Date(),
updated_at: new Date()
}, {
where: { id: command_id }
});
} catch (e) {
console.error(`[指令管理] 保存截断结果也失败:`, e);
}
}
}
}
/**
* 格式化结果数据用于存储
* @private
*/
_format_result_for_storage(result) {
try {
let result_str = JSON.stringify(result);
const max_length = 60000; // 限制为60KB
if (result_str.length > max_length) {
// 如果结果太长,尝试压缩或截断
if (typeof result === 'object' && result !== null) {
const summary = {
success: result.success !== undefined ? result.success : true,
message: result.message || '执行成功',
dataLength: result_str.length,
truncated: true,
preview: result_str.substring(0, 1000) // 保存前1000字符作为预览
};
result_str = JSON.stringify(summary);
} else {
result_str = result_str.substring(0, max_length) + '...[数据已截断]';
}
}
return result_str;
} catch (error) {
return JSON.stringify({ error: '结果序列化失败', message: error.message });
}
}
/**
* 截断字符串
* @private
*/
_truncate_string(str, max_length) {
if (!str || str.length <= max_length) {
return str;
}
return str.substring(0, max_length) + '...[已截断]';
}
/**
* 更新指令状态(兼容旧接口,内部调用统一方法)
* @param {number} command_id - 指令ID
* @param {string} status - 状态
* @param {object} result - 结果
* @param {string} error_message - 错误信息
* @deprecated 建议使用 _update_command_status 统一方法
*/
async updateCommandStatus(command_id, status, result = null, error_message = null) {
const start_time = status === 'running' ? new Date() : null;
const end_time = (status === 'completed' || status === 'failed') ? new Date() : null;
// 计算执行时长
let duration = null;
if (end_time && start_time) {
duration = end_time.getTime() - start_time.getTime();
} else if (end_time) {
// 如果没有开始时间,尝试从数据库获取
try {
const command = await db.getModel('task_commands').findByPk(command_id);
if (command && command.start_time) {
duration = end_time.getTime() - new Date(command.start_time).getTime();
}
} catch (e) {
// 忽略错误
}
}
await this._update_command_status(
command_id,
status,
result,
error_message,
start_time,
end_time,
duration,
null
);
}
/**
* 清理过期的指令记录
* @param {number} days - 保留天数
*/
async cleanupExpiredCommands(days = 30) {
try {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - days);
const deletedCount = await db.getModel('task_commands').destroy({
where: {
create_time: {
[db.Sequelize.Op.lt]: cutoffDate
}
}
});
console.log(`[指令管理] 清理了 ${deletedCount} 条过期指令记录`);
return deletedCount;
} catch (error) {
logs.error(`[指令管理] 清理过期指令失败:`, error);
return 0;
}
}
}
module.exports = new CommandManager();