From e27c0dc41ae14c16bc7848383b28fd1479a29099 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=88=90?= Date: Wed, 26 Nov 2025 15:00:14 +0800 Subject: [PATCH] 1 --- _doc/任务与指令的区别说明.md | 354 +++++++++++++++++ _doc/指令和任务模式适配检查报告.md | 123 ++++++ api/middleware/schedule/ErrorHandler.js | 2 +- api/middleware/schedule/command.js | 460 +++++++++++++++-------- api/middleware/schedule/config.js | 15 +- api/middleware/schedule/deviceManager.js | 18 +- api/middleware/schedule/scheduledJobs.js | 112 ++++++ api/middleware/schedule/taskHandlers.js | 218 ++++------- api/middleware/schedule/taskQueue.js | 102 +++-- 9 files changed, 1046 insertions(+), 358 deletions(-) create mode 100644 _doc/任务与指令的区别说明.md create mode 100644 _doc/指令和任务模式适配检查报告.md diff --git a/_doc/任务与指令的区别说明.md b/_doc/任务与指令的区别说明.md new file mode 100644 index 0000000..6030bae --- /dev/null +++ b/_doc/任务与指令的区别说明.md @@ -0,0 +1,354 @@ +# 任务与指令的区别说明 + +## 📋 概述 + +在调度系统中,**任务(Task)** 和 **指令(Command)** 是两个不同层次的概念,它们的关系是:**一个任务可以包含多个指令**。 + +### ⚠️ 重要说明 + +**当前系统实际情况**: +- **真正的任务**:目前只有 `auto_deliver`(自动投递任务)是真正的任务,它包含多个步骤和指令 +- **伪任务**:虽然代码中有 `get_resume`、`get_job_list`、`send_chat`、`apply_job` 等任务处理器,但它们实际上只是包装了单个指令,本质上就是直接执行指令 + +**为什么会有伪任务**: +1. 统一的任务追踪和日志记录 +2. 保持接口的一致性 +3. 未来可能扩展为真正的任务(包含多个步骤) + +## 🔄 层级关系 + +``` +任务(Task) + ├── 指令1(Command) + ├── 指令2(Command) + └── 指令3(Command) +``` + +## 📊 详细对比 + +| 维度 | 任务(Task) | 指令(Command) | +|------|------------|----------------| +| **概念层级** | 业务层 | 执行层 | +| **数据库表** | `task_status` | `task_commands` | +| **管理模块** | TaskQueue(任务队列) | CommandManager(指令管理器) | +| **处理模块** | TaskHandlers(任务处理器) | jobManager(业务管理器) | +| **粒度** | 粗粒度(业务流程) | 细粒度(具体操作) | +| **包含关系** | 包含多个指令 | 属于某个任务 | +| **执行方式** | 由任务队列调度 | 由指令管理器执行 | +| **通信方式** | 内部调度 | 通过 MQTT 发送到客户端 | + +## 🎯 任务(Task) + +### 定义 +任务是业务层面的概念,代表一个完整的业务流程或工作单元。 + +### 特点 +- **业务导向**:代表一个完整的业务目标 +- **可包含多个步骤**:一个任务可以包含多个指令 +- **有生命周期**:pending → running → completed/failed +- **有优先级**:可以设置任务优先级 +- **有超时机制**:任务级别有超时保护 + +### 任务类型示例 + +**真正的任务(包含多个步骤)**: +- `auto_deliver` - 自动投递任务(包含多个子操作:获取简历、获取岗位列表、筛选职位、批量投递) +- `auto_chat` - 自动沟通任务(待实现:自动与HR进行沟通,回复消息等) +- `auto_active_account` - 自动活跃账号任务(待实现:自动执行操作保持账号活跃度) + +**注意**:目前系统中只有 `auto_deliver` 是已实现的真正任务,`auto_chat` 和 `auto_active_account` 是待实现的任务框架。 + +### 任务表结构(task_status) +```javascript +{ + id: 1, + sn_code: 'GHJU', + taskType: 'auto_deliver', + taskName: '自动投递 - 前端开发', + status: 'running', + priority: 7, + taskParams: { keyword: '前端', platform: 'boss' }, + result: {}, + startTime: '2024-01-01 10:00:00', + endTime: null, + duration: 0 +} +``` + +### 任务执行流程 +```javascript +// 1. 添加任务到队列 +await taskQueue.addTask(sn_code, { + taskType: 'auto_deliver', + taskName: '自动投递', + taskParams: { keyword: '前端' } +}); + +// 2. 任务队列调度执行 +// 3. 任务处理器处理任务 +// 4. 任务处理器创建并执行指令 +``` + +## ⚙️ 指令(Command) + +### 定义 +指令是执行层面的概念,代表一个具体的操作,通过 MQTT 发送到客户端执行。 + +### 特点 +- **执行导向**:代表一个具体的操作 +- **原子性**:一个指令是一个不可分割的操作 +- **有执行顺序**:指令可以按顺序执行 +- **有超时机制**:指令级别有超时保护 +- **MQTT 通信**:通过 MQTT 发送到客户端 + +### 指令类型示例 +- `getOnlineResume` - 获取在线简历 +- `getJobList` - 获取岗位列表 +- `applyJob` - 投递简历 +- `sendChatMessage` - 发送聊天消息 +- `getLoginQrCode` - 获取登录二维码 + +### 指令表结构(task_commands) +```javascript +{ + id: 1, + task_id: 100, // 关联的任务ID + command_type: 'getOnlineResume', + command_name: '获取在线简历', + command_params: '{"sn_code":"GHJU"}', + status: 'completed', + sequence: 1, + priority: 9, + start_time: '2024-01-01 10:00:00', + end_time: '2024-01-01 10:00:30', + duration: 30000 +} +``` + +### 指令执行流程 +```javascript +// 1. 任务处理器创建指令 +const commands = [{ + command_type: 'getOnlineResume', + command_name: '获取在线简历', + command_params: JSON.stringify({ sn_code }) +}]; + +// 2. 指令管理器执行指令 +await command.executeCommands(taskId, commands, mqttClient); + +// 3. 通过 MQTT 发送到客户端 +// 4. 客户端执行并返回结果 +``` + +## 🔗 关系示例 + +### 示例1:自动投递任务 + +**任务**:`auto_deliver`(自动投递任务) + +**包含的指令**: +1. `getOnlineResume` - 获取在线简历 +2. `getJobList` - 获取岗位列表 +3. `applyJob` - 投递简历(可能多个) + +```javascript +// 任务处理器创建多个指令 +async handleAutoDeliverTask(task) { + // 1. 获取简历指令 + const getResumeCommand = { + command_type: 'getOnlineResume', + command_name: '获取在线简历', + ... + }; + + // 2. 获取岗位列表指令 + const getJobListCommand = { + command_type: 'getJobList', + command_name: '获取岗位列表', + ... + }; + + // 3. 投递指令(可能多个) + const applyCommands = jobs.map(job => ({ + command_type: 'applyJob', + command_name: `投递简历 - ${job.jobTitle}`, + ... + })); + + // 执行所有指令 + await command.executeCommands(task.id, [ + getResumeCommand, + getJobListCommand, + ...applyCommands + ], mqttClient); +} +``` + +### 示例2:获取简历(伪任务,实际是指令) + +**说明**:虽然代码中有 `get_resume` 任务处理器,但它实际上只是包装了单个指令,本质上就是直接执行指令。 + +**任务**:`get_resume`(获取简历任务) + +**包含的指令**: +1. `getOnlineResume` - 获取在线简历 + +```javascript +async handleGetResumeTask(task) { + // 实际上只是创建一个指令并执行 + const commands = [{ + command_type: 'getOnlineResume', + command_name: '获取在线简历', + command_params: JSON.stringify({ sn_code: task.sn_code }) + }]; + + await command.executeCommands(task.id, commands, this.mqttClient); +} +``` + +**注意**:这种"任务"实际上可以直接作为指令执行,不需要通过任务队列。它们存在的原因可能是为了: +1. 统一的任务追踪和日志记录 +2. 未来可能扩展为真正的任务(包含多个步骤) +3. 保持接口的一致性 + +## 📈 执行流程图 + +``` +┌─────────────────┐ +│ 任务队列 │ +│ (TaskQueue) │ +└────────┬────────┘ + │ 调度任务 + ↓ +┌─────────────────┐ +│ 任务处理器 │ +│ (TaskHandlers) │ +└────────┬────────┘ + │ 创建指令 + ↓ +┌─────────────────┐ +│ 指令管理器 │ +│ (CommandManager)│ +└────────┬────────┘ + │ 执行指令 + ↓ +┌─────────────────┐ +│ 业务管理器 │ +│ (jobManager) │ +└────────┬────────┘ + │ MQTT 发送 + ↓ +┌─────────────────┐ +│ 客户端设备 │ +│ (Python Client)│ +└─────────────────┘ +``` + +## 🎨 设计优势 + +### 1. **职责分离** +- **任务层**:负责业务逻辑和流程编排 +- **指令层**:负责具体操作和 MQTT 通信 + +### 2. **灵活性** +- 一个任务可以包含不同数量的指令 +- 可以根据业务需求动态创建指令 + +### 3. **可追踪性** +- 任务级别:可以追踪整个业务流程 +- 指令级别:可以追踪每个具体操作 + +### 4. **错误处理** +- 任务级别:处理业务逻辑错误 +- 指令级别:处理执行错误和超时 + +## 📝 代码示例 + +### 任务处理器创建指令 + +```javascript +// api/middleware/schedule/taskHandlers.js +async handleAutoDeliverTask(task) { + const { sn_code, taskParams } = task; + + // 1. 创建获取简历指令 + const getResumeCommand = { + command_type: 'getOnlineResume', + command_name: '获取在线简历', + command_params: JSON.stringify({ sn_code, platform: 'boss' }) + }; + + // 2. 创建获取岗位列表指令 + const getJobListCommand = { + command_type: 'getJobList', + command_name: '获取岗位列表', + command_params: JSON.stringify({ + sn_code, + keyword: taskParams.keyword, + platform: 'boss' + }) + }; + + // 3. 执行指令序列 + const result = await command.executeCommands( + task.id, + [getResumeCommand, getJobListCommand], + this.mqttClient + ); + + return result; +} +``` + +### 指令管理器执行指令 + +```javascript +// api/middleware/schedule/command.js +async executeCommand(taskId, command, mqttClient) { + // 1. 创建指令记录 + const commandRecord = await db.getModel('task_commands').create({ + task_id: taskId, + command_type: command.command_type, + command_name: command.command_name, + status: 'pending' + }); + + // 2. 调用业务管理器执行 + const result = await jobManager[commandType]( + sn_code, + mqttClient, + commandParams + ); + + // 3. 更新指令状态 + await this.updateCommandStatus(commandId, 'completed', result); + + return result; +} +``` + +## 🔍 总结 + +- **任务(Task)**:业务层面的工作单元,代表一个完整的业务流程 + - **真正的任务**:包含多个步骤/指令,如 `auto_deliver` + - **伪任务**:虽然叫任务,但实际只是包装了单个指令,如 `get_resume`、`get_job_list` 等 + +- **指令(Command)**:执行层面的操作单元,代表一个具体的操作 + - 通过 MQTT 发送到客户端执行 + - 如:`getOnlineResume`、`getJobList`、`applyJob` 等 + +- **关系**: + - 真正的任务包含多个指令,指令按顺序执行 + - 伪任务只是指令的包装,本质上就是直接执行指令 + +- **管理**:任务由任务队列管理,指令由指令管理器管理 + +- **通信**:任务在服务端内部调度,指令通过 MQTT 发送到客户端 + +- **当前状态**: + - 目前系统中只有 `auto_deliver` 是真正的任务(包含多个步骤) + - 其他如 `get_resume`、`get_job_list`、`send_chat`、`apply_job` 虽然叫任务,但实际只是指令的包装 + +这种设计实现了业务逻辑和执行逻辑的分离,提高了系统的灵活性和可维护性。伪任务的存在可能是为了统一的任务追踪和未来扩展。 + diff --git a/_doc/指令和任务模式适配检查报告.md b/_doc/指令和任务模式适配检查报告.md new file mode 100644 index 0000000..a62e6ee --- /dev/null +++ b/_doc/指令和任务模式适配检查报告.md @@ -0,0 +1,123 @@ +# 指令和任务模式适配检查报告 + +## 📋 检查范围 +检查 `api/middleware` 目录下的代码是否适用于新的指令和任务模式。 + +## ✅ 已适配的部分 + +### 1. **任务处理器 (taskHandlers.js)** +- ✅ 正确使用 `command.executeCommands()` 执行指令 +- ✅ 在 `handleAutoDeliverTask` 中创建指令并执行 +- ✅ 指令类型使用驼峰命名(`getOnlineResume`, `getJobList`, `applyJob`) + +### 2. **指令管理器 (command.js)** +- ✅ 已重构完成,统一封装指令执行 +- ✅ 统一处理成功、失败、超时 +- ✅ 统一记录数据库 +- ✅ 支持驼峰转下划线的命名转换 + +### 3. **任务队列 (taskQueue.js)** +- ✅ 正确使用任务处理器 +- ✅ 通过 `taskHandlers` 执行任务 + +## ⚠️ 需要修复的问题 + +### 1. **方法命名不一致** + +**问题描述**: +- 指令类型使用驼峰命名:`getOnlineResume`, `getJobList`, `applyJob` +- 大部分方法使用下划线命名:`get_online_resume`, `get_job_list` +- 但 `applyJob` 方法名是驼峰命名,与指令类型一致 + +**当前转换逻辑**: +```javascript +// command.js 中的转换 +const to_snake_case = (str) => { + if (str.includes('_')) return str; + return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, ''); +}; + +// getOnlineResume -> get_online_resume ✓ +// getJobList -> get_job_list ✓ +// applyJob -> apply_job ✗ (但实际方法名是 applyJob) +``` + +**解决方案**: +1. **方案1(推荐)**:统一使用下划线命名,将 `applyJob` 改为 `apply_job` +2. **方案2**:保持现状,`command.js` 中已支持两种命名方式(先尝试下划线,再尝试原名称) + +**当前状态**:方案2已实现,代码可以正常工作,但命名不统一。 + +### 2. **sendChatMessage 方法** + +**问题描述**: +- `chatManager.js` 中的方法是 `sendChatMessage`(驼峰命名) +- 如果指令类型是 `sendChatMessage`,转换后会变成 `send_chat_message`,但实际方法名是 `sendChatMessage` + +**当前状态**:`command.js` 中已支持回退机制,如果下划线命名找不到,会尝试原名称,所以可以正常工作。 + +## 📊 方法命名对照表 + +| 指令类型 (command_type) | 转换后方法名 | 实际方法名 | 状态 | +|------------------------|-------------|-----------|------| +| `getOnlineResume` | `get_online_resume` | `get_online_resume` | ✅ 匹配 | +| `getJobList` | `get_job_list` | `get_job_list` | ✅ 匹配 | +| `applyJob` | `apply_job` | `applyJob` | ⚠️ 不匹配(但可工作) | +| `sendChatMessage` | `send_chat_message` | `sendChatMessage` | ⚠️ 不匹配(但可工作) | + +## 🔧 建议修复 + +### 方案1:统一使用下划线命名(推荐) + +**修改文件**: +1. `api/middleware/job/jobManager.js`:将 `applyJob` 改为 `apply_job` +2. `api/middleware/job/chatManager.js`:将 `sendChatMessage` 改为 `send_chat_message` +3. `api/middleware/schedule/taskHandlers.js`:将指令类型改为下划线命名 + +**优点**: +- 命名统一,符合项目规范 +- 代码更清晰,减少混淆 + +**缺点**: +- 需要修改多个文件 +- 可能影响其他调用方 + +### 方案2:保持现状(当前方案) + +**优点**: +- 不需要修改现有代码 +- `command.js` 已支持两种命名方式 + +**缺点**: +- 命名不统一,容易混淆 +- 代码可读性稍差 + +## 📝 其他检查项 + +### 1. **deviceManager.js** +- ✅ 不直接涉及指令和任务,主要用于设备状态管理 +- ✅ 与任务系统配合良好 + +### 2. **job/index.js** +- ✅ 正确导出所有方法 +- ✅ 支持下划线命名规范 + +### 3. **MQTT 相关** +- ✅ 通过 `mqttClient.publishAndWait` 发送指令 +- ✅ 与指令系统配合良好 + +## ✅ 总结 + +**整体适配情况**:**良好** ✅ + +1. ✅ 核心功能已正确适配新的指令和任务模式 +2. ✅ 指令执行统一封装,处理逻辑完善 +3. ⚠️ 存在命名不一致问题,但不影响功能(有回退机制) +4. 💡 建议统一命名规范,提高代码可维护性 + +## 🎯 下一步行动 + +1. **可选**:统一方法命名规范(下划线命名) +2. **可选**:添加单元测试验证指令执行流程 +3. **可选**:完善错误处理和日志记录 + diff --git a/api/middleware/schedule/ErrorHandler.js b/api/middleware/schedule/ErrorHandler.js index 9b66bea..7e13e63 100644 --- a/api/middleware/schedule/ErrorHandler.js +++ b/api/middleware/schedule/ErrorHandler.js @@ -30,7 +30,7 @@ class ErrorHandler { stack: error.stack || '', code: error.code || '', context: { - taskId: context.taskId, + task_id: context.task_id, sn_code: context.sn_code, taskType: context.taskType, ...context diff --git a/api/middleware/schedule/command.js b/api/middleware/schedule/command.js index fb0ca84..4ce2f58 100644 --- a/api/middleware/schedule/command.js +++ b/api/middleware/schedule/command.js @@ -11,52 +11,48 @@ const ScheduleConfig = require('./config'); */ class CommandManager { constructor() { - this.pendingCommands = new Map(); // 等待响应的指令 { commandId: { resolve, reject, timeout } } + this.pendingCommands = new Map(); // 等待响应的指令 { command_id: { resolve, reject, timeout } } } /** * 执行指令序列 + * @param {number} task_id - 任务ID * @param {Array} commands - 指令数组 * @param {object} mqttClient - MQTT客户端 - * @param {object} options - 执行选项 + * @param {object} options - 执行选项(保留用于扩展) * @returns {Promise} 执行结果 */ - async executeCommands(taskId, commands, mqttClient, options = {}) { - // try { + async executeCommands(task_id, commands, mqttClient, options = {}) { if (!commands || commands.length === 0) { throw new Error('没有找到要执行的指令'); } - const { - maxRetries = 1, // 最大重试次数 - retryDelay = 1000 // 重试延迟(毫秒) - } = options; - console.log(`[指令管理] 开始执行 ${commands.length} 个指令`); const results = []; const errors = []; - // 顺序执行指令,失败时停止 + // 顺序执行指令,失败时继续执行后续指令 for (let i = 0; i < commands.length; i++) { const command = commands[i]; - let retryCount = 0; - let commandResult = null; + + try { + console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name}`); - // 重试逻辑 - while (retryCount <= maxRetries) { - - console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name} (尝试 ${retryCount + 1}/${maxRetries + 1})`); - - commandResult = await this.executeCommand(taskId, command, mqttClient); - - results. push(commandResult); - break; // 成功执行,跳出重试循环 + const commandResult = await this.executeCommand(task_id, command, mqttClient); + results.push(commandResult); + } catch (error) { + // 指令执行失败,记录错误但继续执行后续指令 + errors.push({ + command: command, + error: error + }); + console.error(`[指令管理] 指令执行失败: ${command.command_name || command.name}, 错误: ${error.message}`); + // 失败时继续执行后续指令 } - } const successCount = results.length; @@ -72,203 +68,260 @@ class CommandManager { successCount: successCount, errorCount: errorCount }; - - // } catch (error) { - // console.error(`[指令管理] 执行指令序列失败:`, error); - // throw error; - // } } /** - * 执行单个指令 + * 执行单个指令(统一封装) + * 统一处理成功、失败、超时,统一记录数据库 + * @param {number} task_id - 任务ID * @param {object} command - 指令对象 * @param {object} mqttClient - MQTT客户端 * @returns {Promise} 执行结果 */ - async executeCommand(taskId, command, mqttClient) { - const startTime = new Date(); - let commandRecord = null; + async executeCommand(task_id, command, mqttClient) { + const start_time = new Date(); + let command_id = null; + let command_record = null; - const task = await db.getModel('task_status').findByPk(taskId); + try { + // 1. 获取任务信息 + const task = await db.getModel('task_status').findByPk(task_id); + if (!task) { + throw new Error(`任务不存在: ${task_id}`); + } - // 获取指令信息(支持两种格式) - const commandName = command.command_name; - const commandType = command.command_type; - const commandParams = command.command_params ? JSON.parse(command.command_params) : {}; + // 2. 获取指令信息 + const command_name = command.command_name || command.name || '未知指令'; + const command_type = command.command_type || command.type; + const command_params = command.command_params ? + (typeof command.command_params === 'string' ? JSON.parse(command.command_params) : command.command_params) : + {}; - // 创建指令记录 - commandRecord = await db.getModel('task_commands').create({ - task_id: taskId, - command_type: commandType, - command_name: commandName, - command_params: JSON.stringify(commandParams), - priority: command.priority || 1, - sequence: command.sequence || 1, - max_retries: command.maxRetries || command.max_retries || 3, - status: 'pending' - }); + if (!command_type) { + throw new Error('指令类型不能为空'); + } - let commandId = commandRecord.id; - console.log(`[指令管理] 创建指令记录: ${commandName} (ID: ${commandId})`); + // 3. 创建指令记录 + command_record = await db.getModel('task_commands').create({ + task_id: task_id, + command_type: command_type, + command_name: command_name, + command_params: JSON.stringify(command_params), + priority: command.priority || 1, + sequence: command.sequence || 1, + max_retries: command.maxRetries || command.max_retries || 3, + status: 'pending' + }); + command_id = command_record.id; + console.log(`[指令管理] 创建指令记录: ${command_name} (ID: ${command_id})`); - // 更新指令状态为运行中 - await this.updateCommandStatus(commandId, 'running'); + // 4. 更新指令状态为运行中 + await this._update_command_status(command_id, 'running', null, null, start_time); + // 5. 执行指令(统一封装) + const result = await this._execute_command_with_timeout( + command_id, + command_type, + command_name, + command_params, + task.sn_code, + mqttClient, + start_time + ); - console.log(`[指令管理] 执行指令: ${commandName} (ID: ${commandId})`); + // 6. 记录成功结果 + await this._record_command_result(command_id, 'completed', result, null, start_time); - const sn_code = task.sn_code; + return { + command_id: command_id, + command_name: command_name, + result: result, + duration: new Date() - start_time, + success: true + }; - // 将驼峰命名转换为下划线命名(如:getOnlineResume -> get_online_resume) - const toSnakeCase = (str) => { - // 如果已经是下划线格式,直接返回 + } catch (error) { + // 统一处理错误(失败或超时) + if (command_id) { + await this._record_command_result( + command_id, + 'failed', + null, + error, + start_time + ); + } + + // 重新抛出错误,让调用方知道执行失败 + throw error; + } + } + + /** + * 执行指令(带超时保护) + * @private + */ + async _execute_command_with_timeout(command_id, command_type, command_name, command_params, sn_code, mqttClient, start_time) { + // 将驼峰命名转换为下划线命名 + const to_snake_case = (str) => { if (str.includes('_')) { return str; } - // 驼峰转下划线 return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, ''); }; - const methodName = toSnakeCase(commandType); + const method_name = to_snake_case(command_type); // 获取指令超时时间(从配置中获取,默认5分钟) - const timeout = ScheduleConfig.taskTimeouts[commandType] || ScheduleConfig.taskTimeouts[methodName] || 5 * 60 * 1000; + const timeout = ScheduleConfig.taskTimeouts[command_type] || + ScheduleConfig.taskTimeouts[method_name] || + 5 * 60 * 1000; - let result; - try { - // 使用超时机制包装指令执行 - const commandPromise = (async () => { - if (commandType && jobManager[methodName]) { - return await jobManager[methodName](sn_code, mqttClient, commandParams); - } else { - // 如果转换后找不到,尝试直接使用原名称 - if (jobManager[commandType]) { - return await jobManager[commandType](sn_code, mqttClient, commandParams); - } else { - throw new Error(`未知的指令类型: ${commandType} (尝试的方法名: ${methodName})`); - } - } - })(); + // 构建指令执行 Promise + const command_promise = (async () => { + if (command_type && jobManager[method_name]) { + return await jobManager[method_name](sn_code, mqttClient, command_params); + } else if (jobManager[command_type]) { + return await jobManager[command_type](sn_code, mqttClient, command_params); + } else { + throw new Error(`未知的指令类型: ${command_type} (尝试的方法名: ${method_name})`); + } + })(); - // 使用超时机制 - result = await ScheduleUtils.withTimeout( - commandPromise, + // 使用超时机制包装 + try { + const result = await ScheduleUtils.withTimeout( + command_promise, timeout, - `指令执行超时: ${commandName} (超时时间: ${timeout / 1000}秒)` + `指令执行超时: ${command_name} (超时时间: ${timeout / 1000}秒)` ); + return result; } catch (error) { - const endTime = new Date(); - const duration = endTime - startTime; - - // 如果是超时错误,更新指令状态为失败 - const errorMessage = error.message || '指令执行失败'; - await this.updateCommandStatus(commandId, 'failed', null, errorMessage); - + // 判断是否为超时错误 + if (error.message && error.message.includes('超时')) { + error.isTimeout = true; + } throw error; } - - const endTime = new Date(); - const duration = endTime - startTime; - - // 更新指令状态为完成 - await this.updateCommandStatus(commandId, 'completed', result); - - return { - commandId: commandId, - commandName: commandName, - result: result, - duration: duration, - success: true - }; - - } + /** + * 记录指令执行结果(统一封装) + * 处理成功、失败、超时等情况,统一记录到数据库 + * @private + */ + async _record_command_result(command_id, status, result, error, start_time) { + const end_time = new Date(); + const duration = end_time.getTime() - start_time.getTime(); + + let error_message = null; + let error_stack = null; + let result_data = null; + + // 处理错误信息 + if (error) { + error_message = error.message || '指令执行失败'; + error_stack = error.stack || ''; + + // 如果是超时错误,添加标识 + if (error.isTimeout) { + error_message = `[超时] ${error_message}`; + } + } + + // 处理结果数据 + if (result && status === 'completed') { + result_data = result; + } + + // 更新数据库 + await this._update_command_status( + command_id, + status, + result_data, + error_message, + start_time, + end_time, + duration, + error_stack + ); + + // 记录日志 + if (status === 'completed') { + console.log(`[指令管理] 指令执行成功: ${command_id} (耗时: ${duration}ms)`); + } else if (status === 'failed') { + const error_type = error && error.isTimeout ? '超时' : '失败'; + console.error(`[指令管理] 指令执行${error_type}: ${command_id}, 错误: ${error_message}`, { + command_id: command_id, + duration: duration, + isTimeout: error && error.isTimeout + }); + } + } /** - * 更新指令状态 - * @param {number} commandId - 指令ID - * @param {string} status - 状态 - * @param {object} result - 结果 - * @param {string} errorMessage - 错误信息 + * 更新指令状态(统一封装) + * @private */ - async updateCommandStatus(commandId, status, result = null, errorMessage = null) { + async _update_command_status(command_id, status, result, error_message, start_time, end_time, duration, error_stack) { try { - const updateData = { + const update_data = { status: status, updated_at: new Date() }; - if (status === 'running') { - updateData.start_time = new Date(); - } else if (status === 'completed' || status === 'failed') { - updateData.end_time = new Date(); - if (result) { - // 将结果转换为JSON字符串,并限制长度(TEXT类型最大约65KB) - let resultStr = JSON.stringify(result); - const maxLength = 60000; // 限制为60KB,留一些余量 - - if (resultStr.length > maxLength) { - // 如果结果太长,尝试压缩或截断 - try { - // 如果是对象,尝试只保存关键信息 - if (typeof result === 'object' && result !== null) { - const summary = { - success: result.success !== undefined ? result.success : true, - message: result.message || '执行成功', - dataLength: resultStr.length, - truncated: true, - preview: resultStr.substring(0, 1000) // 保存前1000字符作为预览 - }; - resultStr = JSON.stringify(summary); - } else { - // 直接截断 - resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]'; - } - } catch (e) { - // 如果处理失败,直接截断 - resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]'; - } - } - updateData.result = resultStr; - updateData.progress = 100; - } - if (errorMessage) { - // 错误信息也限制长度 - const maxErrorLength = 10000; // 错误信息限制10KB - updateData.error_message = errorMessage.length > maxErrorLength - ? errorMessage.substring(0, maxErrorLength) + '...[错误信息已截断]' - : errorMessage; - } - // 计算执行时长 - const command = await db.getModel('task_commands').findByPk(commandId); - if (command && command.start_time) { - const duration = new Date() - new Date(command.start_time); - updateData.duration = duration; + // 设置开始时间 + if (status === 'running' && start_time) { + update_data.start_time = start_time; + } + + // 设置结束时间和执行时长 + if ((status === 'completed' || status === 'failed') && end_time) { + update_data.end_time = end_time; + if (duration !== undefined) { + update_data.duration = duration; } } - await db.getModel('task_commands').update(updateData, { - where: { id: commandId } + // 处理执行结果 + if (result && status === 'completed') { + const result_str = this._format_result_for_storage(result); + update_data.result = result_str; + update_data.progress = 100; + } + + // 处理错误信息 + if (error_message) { + update_data.error_message = this._truncate_string(error_message, 10000); + } + + if (error_stack) { + update_data.error_stack = this._truncate_string(error_stack, 50000); + } + + // 更新数据库 + await db.getModel('task_commands').update(update_data, { + where: { id: command_id } }); - } catch (error) { - logs.error(`[指令管理] 更新指令状态失败:`, error, { - commandId: commandId, + } catch (db_error) { + logs.error(`[指令管理] 更新指令状态失败:`, db_error, { + command_id: command_id, status: status }); + // 如果是因为数据太长导致的错误,尝试只保存错误信息 - if (error.message && error.message.includes('Data too long')) { + if (db_error.message && db_error.message.includes('Data too long')) { try { await db.getModel('task_commands').update({ status: status, error_message: '结果数据过长,无法保存完整结果', - end_time: new Date(), + end_time: end_time || new Date(), updated_at: new Date() }, { - where: { id: commandId } + where: { id: command_id } }); } catch (e) { console.error(`[指令管理] 保存截断结果也失败:`, e); @@ -277,6 +330,89 @@ class CommandManager { } } + /** + * 格式化结果数据用于存储 + * @private + */ + _format_result_for_storage(result) { + try { + let result_str = JSON.stringify(result); + const max_length = 60000; // 限制为60KB + + if (result_str.length > max_length) { + // 如果结果太长,尝试压缩或截断 + if (typeof result === 'object' && result !== null) { + const summary = { + success: result.success !== undefined ? result.success : true, + message: result.message || '执行成功', + dataLength: result_str.length, + truncated: true, + preview: result_str.substring(0, 1000) // 保存前1000字符作为预览 + }; + result_str = JSON.stringify(summary); + } else { + result_str = result_str.substring(0, max_length) + '...[数据已截断]'; + } + } + + return result_str; + } catch (error) { + return JSON.stringify({ error: '结果序列化失败', message: error.message }); + } + } + + /** + * 截断字符串 + * @private + */ + _truncate_string(str, max_length) { + if (!str || str.length <= max_length) { + return str; + } + return str.substring(0, max_length) + '...[已截断]'; + } + + + /** + * 更新指令状态(兼容旧接口,内部调用统一方法) + * @param {number} command_id - 指令ID + * @param {string} status - 状态 + * @param {object} result - 结果 + * @param {string} error_message - 错误信息 + * @deprecated 建议使用 _update_command_status 统一方法 + */ + async updateCommandStatus(command_id, status, result = null, error_message = null) { + const start_time = status === 'running' ? new Date() : null; + const end_time = (status === 'completed' || status === 'failed') ? new Date() : null; + + // 计算执行时长 + let duration = null; + if (end_time && start_time) { + duration = end_time.getTime() - start_time.getTime(); + } else if (end_time) { + // 如果没有开始时间,尝试从数据库获取 + try { + const command = await db.getModel('task_commands').findByPk(command_id); + if (command && command.start_time) { + duration = end_time.getTime() - new Date(command.start_time).getTime(); + } + } catch (e) { + // 忽略错误 + } + } + + await this._update_command_status( + command_id, + status, + result, + error_message, + start_time, + end_time, + duration, + null + ); + } + /** * 清理过期的指令记录 diff --git a/api/middleware/schedule/config.js b/api/middleware/schedule/config.js index 9331218..4c19d0c 100644 --- a/api/middleware/schedule/config.js +++ b/api/middleware/schedule/config.js @@ -29,21 +29,16 @@ class ScheduleConfig { // 任务超时配置(毫秒) this.taskTimeouts = { - get_login_qr_code: 30 * 1000, // 登录检查:30秒 - get_resume: 60 * 1000, // 获取简历:1分钟 - search_jobs: 5 * 60 * 1000, // 搜索岗位:5分钟 - chat: 30 * 1000, // 聊天:30秒 - apply: 30 * 1000 // 投递:30秒 + auto_deliver: 30 * 60 * 1000, // 自动投递任务:30分钟(包含多个子任务) + auto_chat: 15 * 60 * 1000, // 自动沟通任务:15分钟 + auto_active_account: 10 * 60 * 1000 // 自动活跃账号任务:10分钟 }; // 任务优先级配置 this.taskPriorities = { - get_login_qr_code: 10, // 最高优先级 - get_resume: 9, - apply: 8, auto_deliver: 7, // 自动投递任务 - search_jobs: 6, - chat: 5, + auto_chat: 6, // 自动沟通任务 + auto_active_account: 5, // 自动活跃账号任务 cleanup: 1 }; diff --git a/api/middleware/schedule/deviceManager.js b/api/middleware/schedule/deviceManager.js index d07eac9..e13a280 100644 --- a/api/middleware/schedule/deviceManager.js +++ b/api/middleware/schedule/deviceManager.js @@ -92,7 +92,7 @@ class DeviceManager { /** * 检查是否可以执行操作 */ - canExecuteOperation(sn_code, operationType) { + canExecuteOperation(sn_code, operation_type) { // 检查工作时间 if (!config.isWorkingHours()) { return { allowed: false, reason: '不在工作时间内' }; @@ -101,8 +101,8 @@ class DeviceManager { // 检查频率限制 const device = this.devices.get(sn_code); if (device) { - const lastTime = device[`last${operationType.charAt(0).toUpperCase() + operationType.slice(1)}`] || 0; - const interval = config.getRateLimit(operationType); + const lastTime = device[`last${operation_type.charAt(0).toUpperCase() + operation_type.slice(1)}`] || 0; + const interval = config.getRateLimit(operation_type); if (Date.now() - lastTime < interval) { return { allowed: false, reason: '操作过于频繁' }; } @@ -114,11 +114,11 @@ class DeviceManager { if (device.dailyCounts.date !== today) { device.dailyCounts = { date: today, searchCount: 0, applyCount: 0, chatCount: 0 }; } - const countKey = `${operationType}Count`; + const countKey = `${operation_type}Count`; const current = device.dailyCounts[countKey] || 0; - const max = config.getDailyLimit(operationType); + const max = config.getDailyLimit(operation_type); if (current >= max) { - return { allowed: false, reason: `今日${operationType}操作已达上限` }; + return { allowed: false, reason: `今日${operation_type}操作已达上限` }; } } @@ -128,12 +128,12 @@ class DeviceManager { /** * 记录操作 */ - recordOperation(sn_code, operationType) { + recordOperation(sn_code, operation_type) { const device = this.devices.get(sn_code) || {}; - device[`last${operationType.charAt(0).toUpperCase() + operationType.slice(1)}`] = Date.now(); + device[`last${operation_type.charAt(0).toUpperCase() + operation_type.slice(1)}`] = Date.now(); if (device.dailyCounts) { - const countKey = `${operationType}Count`; + const countKey = `${operation_type}Count`; device.dailyCounts[countKey] = (device.dailyCounts[countKey] || 0) + 1; } diff --git a/api/middleware/schedule/scheduledJobs.js b/api/middleware/schedule/scheduledJobs.js index eebc653..2f8d212 100644 --- a/api/middleware/schedule/scheduledJobs.js +++ b/api/middleware/schedule/scheduledJobs.js @@ -43,6 +43,16 @@ class ScheduledJobs { this.jobs.push(cleanupOfflineTasksJob); console.log('[定时任务] 已启动离线设备任务清理任务'); + // 启动任务超时检查定时任务(每分钟检查一次) + const timeoutCheckJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { + await this.checkTaskTimeouts().catch(error => { + console.error('[定时任务] 检查任务超时失败:', error); + }); + }); + + this.jobs.push(timeoutCheckJob); + console.log('[定时任务] 已启动任务超时检查任务'); + // 执行自动投递任务 const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => { @@ -178,6 +188,108 @@ class ScheduledJobs { } } + /** + * 检查任务超时并强制标记为失败 + * 检测长时间运行的任务(可能是卡住的),强制标记为失败,释放资源 + */ + async checkTaskTimeouts() { + try { + const Sequelize = require('sequelize'); + const { task_status, op } = db.models; + + // 查询所有运行中的任务 + const runningTasks = await task_status.findAll({ + where: { + status: 'running' + }, + attributes: ['id', 'sn_code', 'taskType', 'taskName', 'startTime', 'create_time'] + }); + + if (!runningTasks || runningTasks.length === 0) { + return; + } + + const now = new Date(); + let timeoutCount = 0; + + for (const task of runningTasks) { + const taskData = task.toJSON(); + const startTime = taskData.startTime ? new Date(taskData.startTime) : (taskData.create_time ? new Date(taskData.create_time) : null); + + if (!startTime) { + continue; + } + + // 获取任务类型的超时时间(默认10分钟) + const taskTimeout = config.getTaskTimeout(taskData.taskType) || 10 * 60 * 1000; + // 允许额外20%的缓冲时间 + const maxAllowedTime = taskTimeout * 1.2; + const elapsedTime = now.getTime() - startTime.getTime(); + + // 如果任务运行时间超过最大允许时间,标记为超时失败 + if (elapsedTime > maxAllowedTime) { + try { + await task_status.update( + { + status: 'failed', + endTime: now, + duration: elapsedTime, + result: JSON.stringify({ + error: `任务执行超时(运行时间: ${Math.round(elapsedTime / 1000)}秒,超时限制: ${Math.round(maxAllowedTime / 1000)}秒)`, + timeout: true, + taskType: taskData.taskType, + startTime: startTime.toISOString() + }), + progress: 0 + }, + { + where: { id: taskData.id } + } + ); + + timeoutCount++; + console.warn(`[任务超时检查] 任务 ${taskData.id} (${taskData.taskName}) 运行时间过长,已强制标记为失败`, { + task_id: taskData.id, + sn_code: taskData.sn_code, + taskType: taskData.taskType, + elapsedTime: Math.round(elapsedTime / 1000) + '秒', + maxAllowedTime: Math.round(maxAllowedTime / 1000) + '秒' + }); + + // 如果任务队列中有这个任务,也需要从内存中清理 + if (this.taskQueue && typeof this.taskQueue.deviceStatus !== 'undefined') { + const deviceStatus = this.taskQueue.deviceStatus.get(taskData.sn_code); + if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.id === taskData.id) { + // 重置设备状态,允许继续执行下一个任务 + deviceStatus.isRunning = false; + deviceStatus.currentTask = null; + deviceStatus.runningCount = Math.max(0, deviceStatus.runningCount - 1); + this.taskQueue.globalRunningCount = Math.max(0, this.taskQueue.globalRunningCount - 1); + + console.log(`[任务超时检查] 已重置设备 ${taskData.sn_code} 的状态,可以继续执行下一个任务`); + + // 尝试继续处理该设备的队列 + setTimeout(() => { + this.taskQueue.processQueue(taskData.sn_code).catch(error => { + console.error(`[任务超时检查] 继续处理队列失败 (设备: ${taskData.sn_code}):`, error); + }); + }, 100); + } + } + } catch (error) { + console.error(`[任务超时检查] 更新超时任务状态失败 (任务ID: ${taskData.id}):`, error); + } + } + } + + if (timeoutCount > 0) { + console.log(`[任务超时检查] 共检测到 ${timeoutCount} 个超时任务,已强制标记为失败`); + } + } catch (error) { + console.error('[任务超时检查] 执行失败:', error); + } + } + /** * 自动投递任务 */ diff --git a/api/middleware/schedule/taskHandlers.js b/api/middleware/schedule/taskHandlers.js index cb7e4e1..c5a70f2 100644 --- a/api/middleware/schedule/taskHandlers.js +++ b/api/middleware/schedule/taskHandlers.js @@ -19,152 +19,22 @@ class TaskHandlers { * @param {object} taskQueue - 任务队列实例 */ register(taskQueue) { - taskQueue.registerHandler('get_resume', async (task) => { - return await this.handleGetResumeTask(task); - }); - - taskQueue.registerHandler('get_job_list', async (task) => { - return await this.handleGetJobListTask(task); - }); - - taskQueue.registerHandler('send_chat', async (task) => { - return await this.handleSendChatTask(task); - }); - - taskQueue.registerHandler('apply_job', async (task) => { - return await this.handleApplyJobTask(task); - }); - + // 自动投递任务 taskQueue.registerHandler('auto_deliver', async (task) => { return await this.handleAutoDeliverTask(task); }); + // 自动沟通任务(待实现) + taskQueue.registerHandler('auto_chat', async (task) => { + return await this.handleAutoChatTask(task); + }); + // 自动活跃账号任务(待实现) + taskQueue.registerHandler('auto_active_account', async (task) => { + return await this.handleAutoActiveAccountTask(task); + }); } - /** - * 处理获取简历任务 - */ - async handleGetResumeTask(task) { - const { sn_code } = task; - console.log(`[任务处理器] 获取简历任务 - 设备: ${sn_code}`); - - deviceManager.recordTaskStart(sn_code, task); - const startTime = Date.now(); - - try { - const commands = [{ - command_type: 'getOnlineResume', - command_name: '获取在线简历', - command_params: JSON.stringify({ sn_code }), - priority: config.getTaskPriority('get_resume') - }]; - - const result = await command.executeCommands(task.id, commands, this.mqttClient); - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, true, duration); - - return result; - } catch (error) { - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, false, duration); - throw error; - } - } - - /** - * 处理获取岗位列表任务 - */ - async handleGetJobListTask(task) { - const { sn_code, taskParams } = task; - const { keyword, platform } = taskParams; - - console.log(`[任务处理器] 获取岗位列表任务 - 设备: ${sn_code}`); - - deviceManager.recordTaskStart(sn_code, task); - const startTime = Date.now(); - - try { - const commands = [{ - command_type: 'getJobList', - command_name: '获取岗位列表', - command_params: JSON.stringify({ sn_code, keyword, platform }), - priority: config.getTaskPriority('search_jobs') - }]; - - const result = await command.executeCommands(task.id, commands, this.mqttClient); - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, true, duration); - - return result; - } catch (error) { - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, false, duration); - throw error; - } - } - - /** - * 处理发送聊天任务 - */ - async handleSendChatTask(task) { - const { sn_code, taskParams } = task; - - console.log(`[任务处理器] 发送聊天任务 - 设备: ${sn_code}`); - - deviceManager.recordTaskStart(sn_code, task); - const startTime = Date.now(); - - try { - const commands = [{ - command_type: 'sendChatMessage', - command_name: '发送聊天消息', - command_params: JSON.stringify(taskParams), - priority: config.getTaskPriority('chat') - }]; - - const result = await command.executeCommands(task.id, commands, this.mqttClient); - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, true, duration); - - return result; - } catch (error) { - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, false, duration); - throw error; - } - } - - /** - * 处理投递简历任务 - */ - async handleApplyJobTask(task) { - const { sn_code, taskParams } = task; - - console.log(`[任务处理器] 投递简历任务 - 设备: ${sn_code}`); - - deviceManager.recordTaskStart(sn_code, task); - const startTime = Date.now(); - - try { - const commands = [{ - command_type: 'applyJob', - command_name: '投递简历', - command_params: JSON.stringify(taskParams), - priority: config.getTaskPriority('apply') - }]; - - const result = await command.executeCommands(task.id, commands, this.mqttClient); - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, true, duration); - - return result; - } catch (error) { - const duration = Date.now() - startTime; - deviceManager.recordTaskComplete(sn_code, task, false, duration); - throw error; - } - } @@ -426,6 +296,76 @@ class TaskHandlers { throw error; } } + + /** + * 处理自动沟通任务(待实现) + * 功能:自动与HR进行沟通,回复消息等 + */ + async handleAutoChatTask(task) { + const { sn_code, taskParams } = task; + console.log(`[任务处理器] 自动沟通任务 - 设备: ${sn_code}`); + + deviceManager.recordTaskStart(sn_code, task); + const startTime = Date.now(); + + try { + // TODO: 实现自动沟通逻辑 + // 1. 获取待回复的聊天列表 + // 2. 根据消息内容生成回复 + // 3. 发送回复消息 + // 4. 记录沟通结果 + + console.log(`[任务处理器] 自动沟通任务 - 逻辑待实现`); + + const duration = Date.now() - startTime; + deviceManager.recordTaskComplete(sn_code, task, true, duration); + + return { + success: true, + message: '自动沟通任务框架已就绪,逻辑待实现', + chatCount: 0 + }; + } catch (error) { + const duration = Date.now() - startTime; + deviceManager.recordTaskComplete(sn_code, task, false, duration); + throw error; + } + } + + /** + * 处理自动活跃账号任务(待实现) + * 功能:自动执行一些操作来保持账号活跃度,如浏览职位、搜索等 + */ + async handleAutoActiveAccountTask(task) { + const { sn_code, taskParams } = task; + console.log(`[任务处理器] 自动活跃账号任务 - 设备: ${sn_code}`); + + deviceManager.recordTaskStart(sn_code, task); + const startTime = Date.now(); + + try { + // TODO: 实现自动活跃账号逻辑 + // 1. 随机搜索一些职位 + // 2. 浏览职位详情 + // 3. 查看公司信息 + // 4. 执行一些模拟用户行为 + + console.log(`[任务处理器] 自动活跃账号任务 - 逻辑待实现`); + + const duration = Date.now() - startTime; + deviceManager.recordTaskComplete(sn_code, task, true, duration); + + return { + success: true, + message: '自动活跃账号任务框架已就绪,逻辑待实现', + actionCount: 0 + }; + } catch (error) { + const duration = Date.now() - startTime; + deviceManager.recordTaskComplete(sn_code, task, false, duration); + throw error; + } + } } module.exports = TaskHandlers; diff --git a/api/middleware/schedule/taskQueue.js b/api/middleware/schedule/taskQueue.js index d153695..c94f3ce 100644 --- a/api/middleware/schedule/taskQueue.js +++ b/api/middleware/schedule/taskQueue.js @@ -6,6 +6,8 @@ const command = require('./command'); const PriorityQueue = require('./PriorityQueue'); const ErrorHandler = require('./ErrorHandler'); const deviceManager = require('./deviceManager'); +const ScheduleUtils = require('./utils'); +const ScheduleConfig = require('./config'); /** * 任务队列管理器(重构版) @@ -548,7 +550,7 @@ class TaskQueue { } /** - * 执行任务(统一重试机制) + * 执行任务(统一重试机制 + 超时保护) * @param {object} task - 任务对象 */ async executeTask(task) { @@ -587,8 +589,16 @@ class TaskQueue { throw new Error(`未找到任务类型 ${task.taskType} 的处理器,请先注册处理器`); } - // 执行任务处理器 - const result = await handler(task); + // 获取任务超时时间(从配置中获取,默认10分钟) + const taskTimeout = ScheduleConfig.getTaskTimeout(task.taskType) || 10 * 60 * 1000; + + // 使用超时机制包装任务执行,防止任务卡住 + const taskPromise = handler(task); + const result = await ScheduleUtils.withTimeout( + taskPromise, + taskTimeout, + `任务执行超时: ${task.taskName} (任务类型: ${task.taskType}, 超时时间: ${taskTimeout / 1000}秒)` + ); // 任务成功 task.status = 'completed'; @@ -613,46 +623,64 @@ class TaskQueue { } catch (error) { // 使用统一错误处理 const errorInfo = await ErrorHandler.handleError(error, { - taskId: task.id, + task_id: task.id, sn_code: task.sn_code, taskType: task.taskType, taskName: task.taskName }); - - // 直接标记为失败(重试已禁用) - task.status = 'failed'; - task.endTime = new Date(); - task.duration = Date.now() - startTime; - task.errorMessage = errorInfo.message || error.message || '未知错误'; - task.errorStack = errorInfo.stack || error.stack || ''; + // 直接标记为失败(重试已禁用) + task.status = 'failed'; + task.endTime = new Date(); + task.duration = Date.now() - startTime; + task.errorMessage = errorInfo.message || error.message || '未知错误'; + task.errorStack = errorInfo.stack || error.stack || ''; - console.error(`[任务队列] 任务执行失败: ${task.taskName} (ID: ${task.id}), 错误: ${task.errorMessage}`, { - errorStack: task.errorStack, - taskId: task.id, - sn_code: task.sn_code, - taskType: task.taskType - }); - + // 更新数据库任务状态为失败 + try { + await db.getModel('task_status').update( + { + status: 'failed', + endTime: task.endTime, + duration: task.duration, + result: JSON.stringify({ + error: task.errorMessage, + stack: task.errorStack + }), + progress: 0 + }, + { where: { id: task.id } } + ); + } catch (dbError) { + console.error(`[任务队列] 更新任务失败状态到数据库失败:`, dbError); + } + + console.error(`[任务队列] 任务执行失败: ${task.taskName} (ID: ${task.id}), 错误: ${task.errorMessage}`, { + errorStack: task.errorStack, + task_id: task.id, + sn_code: task.sn_code, + taskType: task.taskType, + duration: task.duration + }); } } /** * 取消任务 - * @param {string} taskId - 任务ID + * @param {string} task_id - 任务ID * @returns {Promise} 是否成功取消 */ - async cancelTask(taskId) { + async cancelTask(task_id) { // 遍历所有设备队列查找任务 for (const [sn_code, queue] of this.deviceQueues.entries()) { - const removed = queue.remove(task => task.id === taskId); + const removed = queue.remove(task => task.id === task_id); if (removed) { // 检查是否正在执行 const status = this.deviceStatus.get(sn_code); - if (status && status.currentTask && status.currentTask.id === taskId) { + if (status && status.currentTask && status.currentTask.id === task_id) { // 正在执行的任务无法取消,只能标记 - console.warn(`[任务队列] 任务 ${taskId} 正在执行,无法取消`); + console.warn(`[任务队列] 任务 ${task_id} 正在执行,无法取消`); return false; } @@ -662,10 +690,10 @@ class TaskQueue { status: 'cancelled', endTime: new Date() }, - { where: { id: taskId } } + { where: { id: task_id } } ); - console.log(`[任务队列] 任务已取消: ${taskId}`); + console.log(`[任务队列] 任务已取消: ${task_id}`); return true; } } @@ -766,13 +794,13 @@ class TaskQueue { /** * 获取任务状态 - * @param {string} taskId - 任务ID + * @param {string} task_id - 任务ID * @returns {Promise} 任务对象 */ - async getTaskStatus(taskId) { + async getTaskStatus(task_id) { // 先从内存中查找 for (const queue of this.deviceQueues.values()) { - const task = queue.find(t => t.id === taskId); + const task = queue.find(t => t.id === task_id); if (task) { return task; } @@ -780,7 +808,7 @@ class TaskQueue { // 从正在执行的任务中查找 for (const status of this.deviceStatus.values()) { - if (status.currentTask && status.currentTask.id === taskId) { + if (status.currentTask && status.currentTask.id === task_id) { return status.currentTask; } } @@ -788,7 +816,7 @@ class TaskQueue { // 从数据库中查找 try { const taskRecord = await db.getModel('task_status').findOne({ - where: { id: taskId } + where: { id: task_id } }); if (taskRecord) { @@ -796,7 +824,7 @@ class TaskQueue { } } catch (error) { console.error(`[任务队列] 查询任务状态失败:`, error, { - taskId: taskId + task_id: task_id }); } @@ -858,24 +886,24 @@ class TaskQueue { }); console.log(`[任务队列] 已删除任务指令记录: ${commandsDeleted} 条`); - // 删除聊天记录中关联的任务记录(删除所有有 taskId 且不为空的记录) + // 删除聊天记录中关联的任务记录(删除所有有 task_id 且不为空的记录) const chatRecordsDeleted = await chatRecordsModel.destroy({ where: { [Sequelize.Op.and]: [ - { taskId: { [Sequelize.Op.ne]: null } }, - { taskId: { [Sequelize.Op.ne]: '' } } + { task_id: { [Sequelize.Op.ne]: null } }, + { task_id: { [Sequelize.Op.ne]: '' } } ] }, truncate: false }); console.log(`[任务队列] 已删除聊天记录: ${chatRecordsDeleted} 条`); - // 删除投递记录中关联的任务记录(删除所有有 taskId 且不为空的记录) + // 删除投递记录中关联的任务记录(删除所有有 task_id 且不为空的记录) const applyRecordsDeleted = await applyRecordsModel.destroy({ where: { [Sequelize.Op.and]: [ - { taskId: { [Sequelize.Op.ne]: null } }, - { taskId: { [Sequelize.Op.ne]: '' } } + { task_id: { [Sequelize.Op.ne]: null } }, + { task_id: { [Sequelize.Op.ne]: '' } } ] }, truncate: false