This commit is contained in:
张成
2025-11-26 15:00:14 +08:00
parent 7858459118
commit e27c0dc41a
9 changed files with 1046 additions and 358 deletions

View File

@@ -0,0 +1,354 @@
# 任务与指令的区别说明
## 📋 概述
在调度系统中,**任务Task** 和 **指令Command** 是两个不同层次的概念,它们的关系是:**一个任务可以包含多个指令**。
### ⚠️ 重要说明
**当前系统实际情况**
- **真正的任务**:目前只有 `auto_deliver`(自动投递任务)是真正的任务,它包含多个步骤和指令
- **伪任务**:虽然代码中有 `get_resume``get_job_list``send_chat``apply_job` 等任务处理器,但它们实际上只是包装了单个指令,本质上就是直接执行指令
**为什么会有伪任务**
1. 统一的任务追踪和日志记录
2. 保持接口的一致性
3. 未来可能扩展为真正的任务(包含多个步骤)
## 🔄 层级关系
```
任务Task
├── 指令1Command
├── 指令2Command
└── 指令3Command
```
## 📊 详细对比
| 维度 | 任务Task | 指令Command |
|------|------------|----------------|
| **概念层级** | 业务层 | 执行层 |
| **数据库表** | `task_status` | `task_commands` |
| **管理模块** | TaskQueue任务队列 | CommandManager指令管理器 |
| **处理模块** | TaskHandlers任务处理器 | jobManager业务管理器 |
| **粒度** | 粗粒度(业务流程) | 细粒度(具体操作) |
| **包含关系** | 包含多个指令 | 属于某个任务 |
| **执行方式** | 由任务队列调度 | 由指令管理器执行 |
| **通信方式** | 内部调度 | 通过 MQTT 发送到客户端 |
## 🎯 任务Task
### 定义
任务是业务层面的概念,代表一个完整的业务流程或工作单元。
### 特点
- **业务导向**:代表一个完整的业务目标
- **可包含多个步骤**:一个任务可以包含多个指令
- **有生命周期**pending → running → completed/failed
- **有优先级**:可以设置任务优先级
- **有超时机制**:任务级别有超时保护
### 任务类型示例
**真正的任务(包含多个步骤)**
- `auto_deliver` - 自动投递任务(包含多个子操作:获取简历、获取岗位列表、筛选职位、批量投递)
- `auto_chat` - 自动沟通任务待实现自动与HR进行沟通回复消息等
- `auto_active_account` - 自动活跃账号任务(待实现:自动执行操作保持账号活跃度)
**注意**:目前系统中只有 `auto_deliver` 是已实现的真正任务,`auto_chat``auto_active_account` 是待实现的任务框架。
### 任务表结构task_status
```javascript
{
id: 1,
sn_code: 'GHJU',
taskType: 'auto_deliver',
taskName: '自动投递 - 前端开发',
status: 'running',
priority: 7,
taskParams: { keyword: '前端', platform: 'boss' },
result: {},
startTime: '2024-01-01 10:00:00',
endTime: null,
duration: 0
}
```
### 任务执行流程
```javascript
// 1. 添加任务到队列
await taskQueue.addTask(sn_code, {
taskType: 'auto_deliver',
taskName: '自动投递',
taskParams: { keyword: '前端' }
});
// 2. 任务队列调度执行
// 3. 任务处理器处理任务
// 4. 任务处理器创建并执行指令
```
## ⚙️ 指令Command
### 定义
指令是执行层面的概念,代表一个具体的操作,通过 MQTT 发送到客户端执行。
### 特点
- **执行导向**:代表一个具体的操作
- **原子性**:一个指令是一个不可分割的操作
- **有执行顺序**:指令可以按顺序执行
- **有超时机制**:指令级别有超时保护
- **MQTT 通信**:通过 MQTT 发送到客户端
### 指令类型示例
- `getOnlineResume` - 获取在线简历
- `getJobList` - 获取岗位列表
- `applyJob` - 投递简历
- `sendChatMessage` - 发送聊天消息
- `getLoginQrCode` - 获取登录二维码
### 指令表结构task_commands
```javascript
{
id: 1,
task_id: 100, // 关联的任务ID
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: '{"sn_code":"GHJU"}',
status: 'completed',
sequence: 1,
priority: 9,
start_time: '2024-01-01 10:00:00',
end_time: '2024-01-01 10:00:30',
duration: 30000
}
```
### 指令执行流程
```javascript
// 1. 任务处理器创建指令
const commands = [{
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code })
}];
// 2. 指令管理器执行指令
await command.executeCommands(taskId, commands, mqttClient);
// 3. 通过 MQTT 发送到客户端
// 4. 客户端执行并返回结果
```
## 🔗 关系示例
### 示例1自动投递任务
**任务**`auto_deliver`(自动投递任务)
**包含的指令**
1. `getOnlineResume` - 获取在线简历
2. `getJobList` - 获取岗位列表
3. `applyJob` - 投递简历(可能多个)
```javascript
// 任务处理器创建多个指令
async handleAutoDeliverTask(task) {
// 1. 获取简历指令
const getResumeCommand = {
command_type: 'getOnlineResume',
command_name: '获取在线简历',
...
};
// 2. 获取岗位列表指令
const getJobListCommand = {
command_type: 'getJobList',
command_name: '获取岗位列表',
...
};
// 3. 投递指令(可能多个)
const applyCommands = jobs.map(job => ({
command_type: 'applyJob',
command_name: `投递简历 - ${job.jobTitle}`,
...
}));
// 执行所有指令
await command.executeCommands(task.id, [
getResumeCommand,
getJobListCommand,
...applyCommands
], mqttClient);
}
```
### 示例2获取简历伪任务实际是指令
**说明**:虽然代码中有 `get_resume` 任务处理器,但它实际上只是包装了单个指令,本质上就是直接执行指令。
**任务**`get_resume`(获取简历任务)
**包含的指令**
1. `getOnlineResume` - 获取在线简历
```javascript
async handleGetResumeTask(task) {
// 实际上只是创建一个指令并执行
const commands = [{
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code: task.sn_code })
}];
await command.executeCommands(task.id, commands, this.mqttClient);
}
```
**注意**:这种"任务"实际上可以直接作为指令执行,不需要通过任务队列。它们存在的原因可能是为了:
1. 统一的任务追踪和日志记录
2. 未来可能扩展为真正的任务(包含多个步骤)
3. 保持接口的一致性
## 📈 执行流程图
```
┌─────────────────┐
│ 任务队列 │
│ (TaskQueue) │
└────────┬────────┘
│ 调度任务
┌─────────────────┐
│ 任务处理器 │
│ (TaskHandlers) │
└────────┬────────┘
│ 创建指令
┌─────────────────┐
│ 指令管理器 │
│ (CommandManager)│
└────────┬────────┘
│ 执行指令
┌─────────────────┐
│ 业务管理器 │
│ (jobManager) │
└────────┬────────┘
│ MQTT 发送
┌─────────────────┐
│ 客户端设备 │
│ (Python Client)│
└─────────────────┘
```
## 🎨 设计优势
### 1. **职责分离**
- **任务层**:负责业务逻辑和流程编排
- **指令层**:负责具体操作和 MQTT 通信
### 2. **灵活性**
- 一个任务可以包含不同数量的指令
- 可以根据业务需求动态创建指令
### 3. **可追踪性**
- 任务级别:可以追踪整个业务流程
- 指令级别:可以追踪每个具体操作
### 4. **错误处理**
- 任务级别:处理业务逻辑错误
- 指令级别:处理执行错误和超时
## 📝 代码示例
### 任务处理器创建指令
```javascript
// api/middleware/schedule/taskHandlers.js
async handleAutoDeliverTask(task) {
const { sn_code, taskParams } = task;
// 1. 创建获取简历指令
const getResumeCommand = {
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code, platform: 'boss' })
};
// 2. 创建获取岗位列表指令
const getJobListCommand = {
command_type: 'getJobList',
command_name: '获取岗位列表',
command_params: JSON.stringify({
sn_code,
keyword: taskParams.keyword,
platform: 'boss'
})
};
// 3. 执行指令序列
const result = await command.executeCommands(
task.id,
[getResumeCommand, getJobListCommand],
this.mqttClient
);
return result;
}
```
### 指令管理器执行指令
```javascript
// api/middleware/schedule/command.js
async executeCommand(taskId, command, mqttClient) {
// 1. 创建指令记录
const commandRecord = await db.getModel('task_commands').create({
task_id: taskId,
command_type: command.command_type,
command_name: command.command_name,
status: 'pending'
});
// 2. 调用业务管理器执行
const result = await jobManager[commandType](
sn_code,
mqttClient,
commandParams
);
// 3. 更新指令状态
await this.updateCommandStatus(commandId, 'completed', result);
return result;
}
```
## 🔍 总结
- **任务Task**:业务层面的工作单元,代表一个完整的业务流程
- **真正的任务**:包含多个步骤/指令,如 `auto_deliver`
- **伪任务**:虽然叫任务,但实际只是包装了单个指令,如 `get_resume``get_job_list`
- **指令Command**:执行层面的操作单元,代表一个具体的操作
- 通过 MQTT 发送到客户端执行
- 如:`getOnlineResume``getJobList``applyJob`
- **关系**
- 真正的任务包含多个指令,指令按顺序执行
- 伪任务只是指令的包装,本质上就是直接执行指令
- **管理**:任务由任务队列管理,指令由指令管理器管理
- **通信**:任务在服务端内部调度,指令通过 MQTT 发送到客户端
- **当前状态**
- 目前系统中只有 `auto_deliver` 是真正的任务(包含多个步骤)
- 其他如 `get_resume``get_job_list``send_chat``apply_job` 虽然叫任务,但实际只是指令的包装
这种设计实现了业务逻辑和执行逻辑的分离,提高了系统的灵活性和可维护性。伪任务的存在可能是为了统一的任务追踪和未来扩展。

View File

@@ -0,0 +1,123 @@
# 指令和任务模式适配检查报告
## 📋 检查范围
检查 `api/middleware` 目录下的代码是否适用于新的指令和任务模式。
## ✅ 已适配的部分
### 1. **任务处理器 (taskHandlers.js)**
- ✅ 正确使用 `command.executeCommands()` 执行指令
- ✅ 在 `handleAutoDeliverTask` 中创建指令并执行
- ✅ 指令类型使用驼峰命名(`getOnlineResume`, `getJobList`, `applyJob`
### 2. **指令管理器 (command.js)**
- ✅ 已重构完成,统一封装指令执行
- ✅ 统一处理成功、失败、超时
- ✅ 统一记录数据库
- ✅ 支持驼峰转下划线的命名转换
### 3. **任务队列 (taskQueue.js)**
- ✅ 正确使用任务处理器
- ✅ 通过 `taskHandlers` 执行任务
## ⚠️ 需要修复的问题
### 1. **方法命名不一致**
**问题描述**
- 指令类型使用驼峰命名:`getOnlineResume`, `getJobList`, `applyJob`
- 大部分方法使用下划线命名:`get_online_resume`, `get_job_list`
-`applyJob` 方法名是驼峰命名,与指令类型一致
**当前转换逻辑**
```javascript
// command.js 中的转换
const to_snake_case = (str) => {
if (str.includes('_')) return str;
return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, '');
};
// getOnlineResume -> get_online_resume ✓
// getJobList -> get_job_list ✓
// applyJob -> apply_job ✗ (但实际方法名是 applyJob)
```
**解决方案**
1. **方案1推荐**:统一使用下划线命名,将 `applyJob` 改为 `apply_job`
2. **方案2**:保持现状,`command.js` 中已支持两种命名方式(先尝试下划线,再尝试原名称)
**当前状态**方案2已实现代码可以正常工作但命名不统一。
### 2. **sendChatMessage 方法**
**问题描述**
- `chatManager.js` 中的方法是 `sendChatMessage`(驼峰命名)
- 如果指令类型是 `sendChatMessage`,转换后会变成 `send_chat_message`,但实际方法名是 `sendChatMessage`
**当前状态**`command.js` 中已支持回退机制,如果下划线命名找不到,会尝试原名称,所以可以正常工作。
## 📊 方法命名对照表
| 指令类型 (command_type) | 转换后方法名 | 实际方法名 | 状态 |
|------------------------|-------------|-----------|------|
| `getOnlineResume` | `get_online_resume` | `get_online_resume` | ✅ 匹配 |
| `getJobList` | `get_job_list` | `get_job_list` | ✅ 匹配 |
| `applyJob` | `apply_job` | `applyJob` | ⚠️ 不匹配(但可工作) |
| `sendChatMessage` | `send_chat_message` | `sendChatMessage` | ⚠️ 不匹配(但可工作) |
## 🔧 建议修复
### 方案1统一使用下划线命名推荐
**修改文件**
1. `api/middleware/job/jobManager.js`:将 `applyJob` 改为 `apply_job`
2. `api/middleware/job/chatManager.js`:将 `sendChatMessage` 改为 `send_chat_message`
3. `api/middleware/schedule/taskHandlers.js`:将指令类型改为下划线命名
**优点**
- 命名统一,符合项目规范
- 代码更清晰,减少混淆
**缺点**
- 需要修改多个文件
- 可能影响其他调用方
### 方案2保持现状当前方案
**优点**
- 不需要修改现有代码
- `command.js` 已支持两种命名方式
**缺点**
- 命名不统一,容易混淆
- 代码可读性稍差
## 📝 其他检查项
### 1. **deviceManager.js**
- ✅ 不直接涉及指令和任务,主要用于设备状态管理
- ✅ 与任务系统配合良好
### 2. **job/index.js**
- ✅ 正确导出所有方法
- ✅ 支持下划线命名规范
### 3. **MQTT 相关**
- ✅ 通过 `mqttClient.publishAndWait` 发送指令
- ✅ 与指令系统配合良好
## ✅ 总结
**整体适配情况****良好** ✅
1. ✅ 核心功能已正确适配新的指令和任务模式
2. ✅ 指令执行统一封装,处理逻辑完善
3. ⚠️ 存在命名不一致问题,但不影响功能(有回退机制)
4. 💡 建议统一命名规范,提高代码可维护性
## 🎯 下一步行动
1. **可选**:统一方法命名规范(下划线命名)
2. **可选**:添加单元测试验证指令执行流程
3. **可选**:完善错误处理和日志记录

View File

@@ -30,7 +30,7 @@ class ErrorHandler {
stack: error.stack || '', stack: error.stack || '',
code: error.code || '', code: error.code || '',
context: { context: {
taskId: context.taskId, task_id: context.task_id,
sn_code: context.sn_code, sn_code: context.sn_code,
taskType: context.taskType, taskType: context.taskType,
...context ...context

View File

@@ -11,52 +11,48 @@ const ScheduleConfig = require('./config');
*/ */
class CommandManager { class CommandManager {
constructor() { constructor() {
this.pendingCommands = new Map(); // 等待响应的指令 { commandId: { resolve, reject, timeout } } this.pendingCommands = new Map(); // 等待响应的指令 { command_id: { resolve, reject, timeout } }
} }
/** /**
* 执行指令序列 * 执行指令序列
* @param {number} task_id - 任务ID
* @param {Array} commands - 指令数组 * @param {Array} commands - 指令数组
* @param {object} mqttClient - MQTT客户端 * @param {object} mqttClient - MQTT客户端
* @param {object} options - 执行选项 * @param {object} options - 执行选项(保留用于扩展)
* @returns {Promise<object>} 执行结果 * @returns {Promise<object>} 执行结果
*/ */
async executeCommands(taskId, commands, mqttClient, options = {}) { async executeCommands(task_id, commands, mqttClient, options = {}) {
// try {
if (!commands || commands.length === 0) { if (!commands || commands.length === 0) {
throw new Error('没有找到要执行的指令'); throw new Error('没有找到要执行的指令');
} }
const {
maxRetries = 1, // 最大重试次数
retryDelay = 1000 // 重试延迟(毫秒)
} = options;
console.log(`[指令管理] 开始执行 ${commands.length} 个指令`); console.log(`[指令管理] 开始执行 ${commands.length} 个指令`);
const results = []; const results = [];
const errors = []; const errors = [];
// 顺序执行指令,失败时停止 // 顺序执行指令,失败时继续执行后续指令
for (let i = 0; i < commands.length; i++) { for (let i = 0; i < commands.length; i++) {
const command = commands[i]; const command = commands[i];
let retryCount = 0;
let commandResult = null; try {
console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name}`);
// 重试逻辑 const commandResult = await this.executeCommand(task_id, command, mqttClient);
while (retryCount <= maxRetries) { results.push(commandResult);
console.log(`[指令管理] 执行指令 ${i + 1}/${commands.length}: ${command.command_name || command.name} (尝试 ${retryCount + 1}/${maxRetries + 1})`);
commandResult = await this.executeCommand(taskId, command, mqttClient);
results. push(commandResult);
break; // 成功执行,跳出重试循环
} catch (error) {
// 指令执行失败,记录错误但继续执行后续指令
errors.push({
command: command,
error: error
});
console.error(`[指令管理] 指令执行失败: ${command.command_name || command.name}, 错误: ${error.message}`);
// 失败时继续执行后续指令
} }
} }
const successCount = results.length; const successCount = results.length;
@@ -72,203 +68,260 @@ class CommandManager {
successCount: successCount, successCount: successCount,
errorCount: errorCount errorCount: errorCount
}; };
// } catch (error) {
// console.error(`[指令管理] 执行指令序列失败:`, error);
// throw error;
// }
} }
/** /**
* 执行单个指令 * 执行单个指令(统一封装)
* 统一处理成功、失败、超时,统一记录数据库
* @param {number} task_id - 任务ID
* @param {object} command - 指令对象 * @param {object} command - 指令对象
* @param {object} mqttClient - MQTT客户端 * @param {object} mqttClient - MQTT客户端
* @returns {Promise<object>} 执行结果 * @returns {Promise<object>} 执行结果
*/ */
async executeCommand(taskId, command, mqttClient) { async executeCommand(task_id, command, mqttClient) {
const startTime = new Date(); const start_time = new Date();
let commandRecord = null; let command_id = null;
let command_record = null;
const task = await db.getModel('task_status').findByPk(taskId); try {
// 1. 获取任务信息
const task = await db.getModel('task_status').findByPk(task_id);
if (!task) {
throw new Error(`任务不存在: ${task_id}`);
}
// 获取指令信息(支持两种格式) // 2. 获取指令信息
const commandName = command.command_name; const command_name = command.command_name || command.name || '未知指令';
const commandType = command.command_type; const command_type = command.command_type || command.type;
const commandParams = command.command_params ? JSON.parse(command.command_params) : {}; const command_params = command.command_params ?
(typeof command.command_params === 'string' ? JSON.parse(command.command_params) : command.command_params) :
{};
// 创建指令记录 if (!command_type) {
commandRecord = await db.getModel('task_commands').create({ throw new Error('指令类型不能为空');
task_id: taskId, }
command_type: commandType,
command_name: commandName,
command_params: JSON.stringify(commandParams),
priority: command.priority || 1,
sequence: command.sequence || 1,
max_retries: command.maxRetries || command.max_retries || 3,
status: 'pending'
});
let commandId = commandRecord.id; // 3. 创建指令记录
console.log(`[指令管理] 创建指令记录: ${commandName} (ID: ${commandId})`); command_record = await db.getModel('task_commands').create({
task_id: task_id,
command_type: command_type,
command_name: command_name,
command_params: JSON.stringify(command_params),
priority: command.priority || 1,
sequence: command.sequence || 1,
max_retries: command.maxRetries || command.max_retries || 3,
status: 'pending'
});
command_id = command_record.id;
console.log(`[指令管理] 创建指令记录: ${command_name} (ID: ${command_id})`);
// 更新指令状态为运行中 // 4. 更新指令状态为运行中
await this.updateCommandStatus(commandId, 'running'); await this._update_command_status(command_id, 'running', null, null, start_time);
// 5. 执行指令(统一封装)
const result = await this._execute_command_with_timeout(
command_id,
command_type,
command_name,
command_params,
task.sn_code,
mqttClient,
start_time
);
console.log(`[指令管理] 执行指令: ${commandName} (ID: ${commandId})`); // 6. 记录成功结果
await this._record_command_result(command_id, 'completed', result, null, start_time);
const sn_code = task.sn_code; return {
command_id: command_id,
command_name: command_name,
result: result,
duration: new Date() - start_time,
success: true
};
// 将驼峰命名转换为下划线命名getOnlineResume -> get_online_resume } catch (error) {
const toSnakeCase = (str) => { // 统一处理错误(失败或超时)
// 如果已经是下划线格式,直接返回 if (command_id) {
await this._record_command_result(
command_id,
'failed',
null,
error,
start_time
);
}
// 重新抛出错误,让调用方知道执行失败
throw error;
}
}
/**
* 执行指令(带超时保护)
* @private
*/
async _execute_command_with_timeout(command_id, command_type, command_name, command_params, sn_code, mqttClient, start_time) {
// 将驼峰命名转换为下划线命名
const to_snake_case = (str) => {
if (str.includes('_')) { if (str.includes('_')) {
return str; return str;
} }
// 驼峰转下划线
return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, ''); return str.replace(/([A-Z])/g, '_$1').toLowerCase().replace(/^_/, '');
}; };
const methodName = toSnakeCase(commandType); const method_name = to_snake_case(command_type);
// 获取指令超时时间从配置中获取默认5分钟 // 获取指令超时时间从配置中获取默认5分钟
const timeout = ScheduleConfig.taskTimeouts[commandType] || ScheduleConfig.taskTimeouts[methodName] || 5 * 60 * 1000; const timeout = ScheduleConfig.taskTimeouts[command_type] ||
ScheduleConfig.taskTimeouts[method_name] ||
5 * 60 * 1000;
let result; // 构建指令执行 Promise
try { const command_promise = (async () => {
// 使用超时机制包装指令执行 if (command_type && jobManager[method_name]) {
const commandPromise = (async () => { return await jobManager[method_name](sn_code, mqttClient, command_params);
if (commandType && jobManager[methodName]) { } else if (jobManager[command_type]) {
return await jobManager[methodName](sn_code, mqttClient, commandParams); return await jobManager[command_type](sn_code, mqttClient, command_params);
} else { } else {
// 如果转换后找不到,尝试直接使用原名称 throw new Error(`未知的指令类型: ${command_type} (尝试的方法名: ${method_name})`);
if (jobManager[commandType]) { }
return await jobManager[commandType](sn_code, mqttClient, commandParams); })();
} else {
throw new Error(`未知的指令类型: ${commandType} (尝试的方法名: ${methodName})`);
}
}
})();
// 使用超时机制 // 使用超时机制包装
result = await ScheduleUtils.withTimeout( try {
commandPromise, const result = await ScheduleUtils.withTimeout(
command_promise,
timeout, timeout,
`指令执行超时: ${commandName} (超时时间: ${timeout / 1000}秒)` `指令执行超时: ${command_name} (超时时间: ${timeout / 1000}秒)`
); );
return result;
} catch (error) { } catch (error) {
const endTime = new Date(); // 判断是否为超时错误
const duration = endTime - startTime; if (error.message && error.message.includes('超时')) {
error.isTimeout = true;
// 如果是超时错误,更新指令状态为失败 }
const errorMessage = error.message || '指令执行失败';
await this.updateCommandStatus(commandId, 'failed', null, errorMessage);
throw error; throw error;
} }
const endTime = new Date();
const duration = endTime - startTime;
// 更新指令状态为完成
await this.updateCommandStatus(commandId, 'completed', result);
return {
commandId: commandId,
commandName: commandName,
result: result,
duration: duration,
success: true
};
} }
/**
* 记录指令执行结果(统一封装)
* 处理成功、失败、超时等情况,统一记录到数据库
* @private
*/
async _record_command_result(command_id, status, result, error, start_time) {
const end_time = new Date();
const duration = end_time.getTime() - start_time.getTime();
let error_message = null;
let error_stack = null;
let result_data = null;
// 处理错误信息
if (error) {
error_message = error.message || '指令执行失败';
error_stack = error.stack || '';
// 如果是超时错误,添加标识
if (error.isTimeout) {
error_message = `[超时] ${error_message}`;
}
}
// 处理结果数据
if (result && status === 'completed') {
result_data = result;
}
// 更新数据库
await this._update_command_status(
command_id,
status,
result_data,
error_message,
start_time,
end_time,
duration,
error_stack
);
// 记录日志
if (status === 'completed') {
console.log(`[指令管理] 指令执行成功: ${command_id} (耗时: ${duration}ms)`);
} else if (status === 'failed') {
const error_type = error && error.isTimeout ? '超时' : '失败';
console.error(`[指令管理] 指令执行${error_type}: ${command_id}, 错误: ${error_message}`, {
command_id: command_id,
duration: duration,
isTimeout: error && error.isTimeout
});
}
}
/** /**
* 更新指令状态 * 更新指令状态(统一封装)
* @param {number} commandId - 指令ID * @private
* @param {string} status - 状态
* @param {object} result - 结果
* @param {string} errorMessage - 错误信息
*/ */
async updateCommandStatus(commandId, status, result = null, errorMessage = null) { async _update_command_status(command_id, status, result, error_message, start_time, end_time, duration, error_stack) {
try { try {
const updateData = { const update_data = {
status: status, status: status,
updated_at: new Date() updated_at: new Date()
}; };
if (status === 'running') { // 设置开始时间
updateData.start_time = new Date(); if (status === 'running' && start_time) {
} else if (status === 'completed' || status === 'failed') { update_data.start_time = start_time;
updateData.end_time = new Date(); }
if (result) {
// 将结果转换为JSON字符串并限制长度TEXT类型最大约65KB // 设置结束时间和执行时长
let resultStr = JSON.stringify(result); if ((status === 'completed' || status === 'failed') && end_time) {
const maxLength = 60000; // 限制为60KB留一些余量 update_data.end_time = end_time;
if (duration !== undefined) {
if (resultStr.length > maxLength) { update_data.duration = duration;
// 如果结果太长,尝试压缩或截断
try {
// 如果是对象,尝试只保存关键信息
if (typeof result === 'object' && result !== null) {
const summary = {
success: result.success !== undefined ? result.success : true,
message: result.message || '执行成功',
dataLength: resultStr.length,
truncated: true,
preview: resultStr.substring(0, 1000) // 保存前1000字符作为预览
};
resultStr = JSON.stringify(summary);
} else {
// 直接截断
resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]';
}
} catch (e) {
// 如果处理失败,直接截断
resultStr = resultStr.substring(0, maxLength) + '...[数据已截断]';
}
}
updateData.result = resultStr;
updateData.progress = 100;
}
if (errorMessage) {
// 错误信息也限制长度
const maxErrorLength = 10000; // 错误信息限制10KB
updateData.error_message = errorMessage.length > maxErrorLength
? errorMessage.substring(0, maxErrorLength) + '...[错误信息已截断]'
: errorMessage;
}
// 计算执行时长
const command = await db.getModel('task_commands').findByPk(commandId);
if (command && command.start_time) {
const duration = new Date() - new Date(command.start_time);
updateData.duration = duration;
} }
} }
await db.getModel('task_commands').update(updateData, { // 处理执行结果
where: { id: commandId } if (result && status === 'completed') {
const result_str = this._format_result_for_storage(result);
update_data.result = result_str;
update_data.progress = 100;
}
// 处理错误信息
if (error_message) {
update_data.error_message = this._truncate_string(error_message, 10000);
}
if (error_stack) {
update_data.error_stack = this._truncate_string(error_stack, 50000);
}
// 更新数据库
await db.getModel('task_commands').update(update_data, {
where: { id: command_id }
}); });
} catch (error) { } catch (db_error) {
logs.error(`[指令管理] 更新指令状态失败:`, error, { logs.error(`[指令管理] 更新指令状态失败:`, db_error, {
commandId: commandId, command_id: command_id,
status: status status: status
}); });
// 如果是因为数据太长导致的错误,尝试只保存错误信息 // 如果是因为数据太长导致的错误,尝试只保存错误信息
if (error.message && error.message.includes('Data too long')) { if (db_error.message && db_error.message.includes('Data too long')) {
try { try {
await db.getModel('task_commands').update({ await db.getModel('task_commands').update({
status: status, status: status,
error_message: '结果数据过长,无法保存完整结果', error_message: '结果数据过长,无法保存完整结果',
end_time: new Date(), end_time: end_time || new Date(),
updated_at: new Date() updated_at: new Date()
}, { }, {
where: { id: commandId } where: { id: command_id }
}); });
} catch (e) { } catch (e) {
console.error(`[指令管理] 保存截断结果也失败:`, e); console.error(`[指令管理] 保存截断结果也失败:`, e);
@@ -277,6 +330,89 @@ class CommandManager {
} }
} }
/**
* 格式化结果数据用于存储
* @private
*/
_format_result_for_storage(result) {
try {
let result_str = JSON.stringify(result);
const max_length = 60000; // 限制为60KB
if (result_str.length > max_length) {
// 如果结果太长,尝试压缩或截断
if (typeof result === 'object' && result !== null) {
const summary = {
success: result.success !== undefined ? result.success : true,
message: result.message || '执行成功',
dataLength: result_str.length,
truncated: true,
preview: result_str.substring(0, 1000) // 保存前1000字符作为预览
};
result_str = JSON.stringify(summary);
} else {
result_str = result_str.substring(0, max_length) + '...[数据已截断]';
}
}
return result_str;
} catch (error) {
return JSON.stringify({ error: '结果序列化失败', message: error.message });
}
}
/**
* 截断字符串
* @private
*/
_truncate_string(str, max_length) {
if (!str || str.length <= max_length) {
return str;
}
return str.substring(0, max_length) + '...[已截断]';
}
/**
* 更新指令状态(兼容旧接口,内部调用统一方法)
* @param {number} command_id - 指令ID
* @param {string} status - 状态
* @param {object} result - 结果
* @param {string} error_message - 错误信息
* @deprecated 建议使用 _update_command_status 统一方法
*/
async updateCommandStatus(command_id, status, result = null, error_message = null) {
const start_time = status === 'running' ? new Date() : null;
const end_time = (status === 'completed' || status === 'failed') ? new Date() : null;
// 计算执行时长
let duration = null;
if (end_time && start_time) {
duration = end_time.getTime() - start_time.getTime();
} else if (end_time) {
// 如果没有开始时间,尝试从数据库获取
try {
const command = await db.getModel('task_commands').findByPk(command_id);
if (command && command.start_time) {
duration = end_time.getTime() - new Date(command.start_time).getTime();
}
} catch (e) {
// 忽略错误
}
}
await this._update_command_status(
command_id,
status,
result,
error_message,
start_time,
end_time,
duration,
null
);
}
/** /**
* 清理过期的指令记录 * 清理过期的指令记录

View File

@@ -29,21 +29,16 @@ class ScheduleConfig {
// 任务超时配置(毫秒) // 任务超时配置(毫秒)
this.taskTimeouts = { this.taskTimeouts = {
get_login_qr_code: 30 * 1000, // 登录检查30秒 auto_deliver: 30 * 60 * 1000, // 自动投递任务30分钟包含多个子任务
get_resume: 60 * 1000, // 获取简历1分钟 auto_chat: 15 * 60 * 1000, // 自动沟通任务15分钟
search_jobs: 5 * 60 * 1000, // 搜索岗位5分钟 auto_active_account: 10 * 60 * 1000 // 自动活跃账号任务10分钟
chat: 30 * 1000, // 聊天30秒
apply: 30 * 1000 // 投递30秒
}; };
// 任务优先级配置 // 任务优先级配置
this.taskPriorities = { this.taskPriorities = {
get_login_qr_code: 10, // 最高优先级
get_resume: 9,
apply: 8,
auto_deliver: 7, // 自动投递任务 auto_deliver: 7, // 自动投递任务
search_jobs: 6, auto_chat: 6, // 自动沟通任务
chat: 5, auto_active_account: 5, // 自动活跃账号任务
cleanup: 1 cleanup: 1
}; };

View File

@@ -92,7 +92,7 @@ class DeviceManager {
/** /**
* 检查是否可以执行操作 * 检查是否可以执行操作
*/ */
canExecuteOperation(sn_code, operationType) { canExecuteOperation(sn_code, operation_type) {
// 检查工作时间 // 检查工作时间
if (!config.isWorkingHours()) { if (!config.isWorkingHours()) {
return { allowed: false, reason: '不在工作时间内' }; return { allowed: false, reason: '不在工作时间内' };
@@ -101,8 +101,8 @@ class DeviceManager {
// 检查频率限制 // 检查频率限制
const device = this.devices.get(sn_code); const device = this.devices.get(sn_code);
if (device) { if (device) {
const lastTime = device[`last${operationType.charAt(0).toUpperCase() + operationType.slice(1)}`] || 0; const lastTime = device[`last${operation_type.charAt(0).toUpperCase() + operation_type.slice(1)}`] || 0;
const interval = config.getRateLimit(operationType); const interval = config.getRateLimit(operation_type);
if (Date.now() - lastTime < interval) { if (Date.now() - lastTime < interval) {
return { allowed: false, reason: '操作过于频繁' }; return { allowed: false, reason: '操作过于频繁' };
} }
@@ -114,11 +114,11 @@ class DeviceManager {
if (device.dailyCounts.date !== today) { if (device.dailyCounts.date !== today) {
device.dailyCounts = { date: today, searchCount: 0, applyCount: 0, chatCount: 0 }; device.dailyCounts = { date: today, searchCount: 0, applyCount: 0, chatCount: 0 };
} }
const countKey = `${operationType}Count`; const countKey = `${operation_type}Count`;
const current = device.dailyCounts[countKey] || 0; const current = device.dailyCounts[countKey] || 0;
const max = config.getDailyLimit(operationType); const max = config.getDailyLimit(operation_type);
if (current >= max) { if (current >= max) {
return { allowed: false, reason: `今日${operationType}操作已达上限` }; return { allowed: false, reason: `今日${operation_type}操作已达上限` };
} }
} }
@@ -128,12 +128,12 @@ class DeviceManager {
/** /**
* 记录操作 * 记录操作
*/ */
recordOperation(sn_code, operationType) { recordOperation(sn_code, operation_type) {
const device = this.devices.get(sn_code) || {}; const device = this.devices.get(sn_code) || {};
device[`last${operationType.charAt(0).toUpperCase() + operationType.slice(1)}`] = Date.now(); device[`last${operation_type.charAt(0).toUpperCase() + operation_type.slice(1)}`] = Date.now();
if (device.dailyCounts) { if (device.dailyCounts) {
const countKey = `${operationType}Count`; const countKey = `${operation_type}Count`;
device.dailyCounts[countKey] = (device.dailyCounts[countKey] || 0) + 1; device.dailyCounts[countKey] = (device.dailyCounts[countKey] || 0) + 1;
} }

View File

@@ -43,6 +43,16 @@ class ScheduledJobs {
this.jobs.push(cleanupOfflineTasksJob); this.jobs.push(cleanupOfflineTasksJob);
console.log('[定时任务] 已启动离线设备任务清理任务'); console.log('[定时任务] 已启动离线设备任务清理任务');
// 启动任务超时检查定时任务(每分钟检查一次)
const timeoutCheckJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => {
await this.checkTaskTimeouts().catch(error => {
console.error('[定时任务] 检查任务超时失败:', error);
});
});
this.jobs.push(timeoutCheckJob);
console.log('[定时任务] 已启动任务超时检查任务');
// 执行自动投递任务 // 执行自动投递任务
const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => { const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => {
@@ -178,6 +188,108 @@ class ScheduledJobs {
} }
} }
/**
* 检查任务超时并强制标记为失败
* 检测长时间运行的任务(可能是卡住的),强制标记为失败,释放资源
*/
async checkTaskTimeouts() {
try {
const Sequelize = require('sequelize');
const { task_status, op } = db.models;
// 查询所有运行中的任务
const runningTasks = await task_status.findAll({
where: {
status: 'running'
},
attributes: ['id', 'sn_code', 'taskType', 'taskName', 'startTime', 'create_time']
});
if (!runningTasks || runningTasks.length === 0) {
return;
}
const now = new Date();
let timeoutCount = 0;
for (const task of runningTasks) {
const taskData = task.toJSON();
const startTime = taskData.startTime ? new Date(taskData.startTime) : (taskData.create_time ? new Date(taskData.create_time) : null);
if (!startTime) {
continue;
}
// 获取任务类型的超时时间默认10分钟
const taskTimeout = config.getTaskTimeout(taskData.taskType) || 10 * 60 * 1000;
// 允许额外20%的缓冲时间
const maxAllowedTime = taskTimeout * 1.2;
const elapsedTime = now.getTime() - startTime.getTime();
// 如果任务运行时间超过最大允许时间,标记为超时失败
if (elapsedTime > maxAllowedTime) {
try {
await task_status.update(
{
status: 'failed',
endTime: now,
duration: elapsedTime,
result: JSON.stringify({
error: `任务执行超时(运行时间: ${Math.round(elapsedTime / 1000)}秒,超时限制: ${Math.round(maxAllowedTime / 1000)}秒)`,
timeout: true,
taskType: taskData.taskType,
startTime: startTime.toISOString()
}),
progress: 0
},
{
where: { id: taskData.id }
}
);
timeoutCount++;
console.warn(`[任务超时检查] 任务 ${taskData.id} (${taskData.taskName}) 运行时间过长,已强制标记为失败`, {
task_id: taskData.id,
sn_code: taskData.sn_code,
taskType: taskData.taskType,
elapsedTime: Math.round(elapsedTime / 1000) + '秒',
maxAllowedTime: Math.round(maxAllowedTime / 1000) + '秒'
});
// 如果任务队列中有这个任务,也需要从内存中清理
if (this.taskQueue && typeof this.taskQueue.deviceStatus !== 'undefined') {
const deviceStatus = this.taskQueue.deviceStatus.get(taskData.sn_code);
if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.id === taskData.id) {
// 重置设备状态,允许继续执行下一个任务
deviceStatus.isRunning = false;
deviceStatus.currentTask = null;
deviceStatus.runningCount = Math.max(0, deviceStatus.runningCount - 1);
this.taskQueue.globalRunningCount = Math.max(0, this.taskQueue.globalRunningCount - 1);
console.log(`[任务超时检查] 已重置设备 ${taskData.sn_code} 的状态,可以继续执行下一个任务`);
// 尝试继续处理该设备的队列
setTimeout(() => {
this.taskQueue.processQueue(taskData.sn_code).catch(error => {
console.error(`[任务超时检查] 继续处理队列失败 (设备: ${taskData.sn_code}):`, error);
});
}, 100);
}
}
} catch (error) {
console.error(`[任务超时检查] 更新超时任务状态失败 (任务ID: ${taskData.id}):`, error);
}
}
}
if (timeoutCount > 0) {
console.log(`[任务超时检查] 共检测到 ${timeoutCount} 个超时任务,已强制标记为失败`);
}
} catch (error) {
console.error('[任务超时检查] 执行失败:', error);
}
}
/** /**
* 自动投递任务 * 自动投递任务
*/ */

View File

@@ -19,152 +19,22 @@ class TaskHandlers {
* @param {object} taskQueue - 任务队列实例 * @param {object} taskQueue - 任务队列实例
*/ */
register(taskQueue) { register(taskQueue) {
taskQueue.registerHandler('get_resume', async (task) => { // 自动投递任务
return await this.handleGetResumeTask(task);
});
taskQueue.registerHandler('get_job_list', async (task) => {
return await this.handleGetJobListTask(task);
});
taskQueue.registerHandler('send_chat', async (task) => {
return await this.handleSendChatTask(task);
});
taskQueue.registerHandler('apply_job', async (task) => {
return await this.handleApplyJobTask(task);
});
taskQueue.registerHandler('auto_deliver', async (task) => { taskQueue.registerHandler('auto_deliver', async (task) => {
return await this.handleAutoDeliverTask(task); return await this.handleAutoDeliverTask(task);
}); });
// 自动沟通任务(待实现)
taskQueue.registerHandler('auto_chat', async (task) => {
return await this.handleAutoChatTask(task);
});
// 自动活跃账号任务(待实现)
taskQueue.registerHandler('auto_active_account', async (task) => {
return await this.handleAutoActiveAccountTask(task);
});
} }
/**
* 处理获取简历任务
*/
async handleGetResumeTask(task) {
const { sn_code } = task;
console.log(`[任务处理器] 获取简历任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code }),
priority: config.getTaskPriority('get_resume')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理获取岗位列表任务
*/
async handleGetJobListTask(task) {
const { sn_code, taskParams } = task;
const { keyword, platform } = taskParams;
console.log(`[任务处理器] 获取岗位列表任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'getJobList',
command_name: '获取岗位列表',
command_params: JSON.stringify({ sn_code, keyword, platform }),
priority: config.getTaskPriority('search_jobs')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理发送聊天任务
*/
async handleSendChatTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 发送聊天任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'sendChatMessage',
command_name: '发送聊天消息',
command_params: JSON.stringify(taskParams),
priority: config.getTaskPriority('chat')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理投递简历任务
*/
async handleApplyJobTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 投递简历任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'applyJob',
command_name: '投递简历',
command_params: JSON.stringify(taskParams),
priority: config.getTaskPriority('apply')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
@@ -426,6 +296,76 @@ class TaskHandlers {
throw error; throw error;
} }
} }
/**
* 处理自动沟通任务(待实现)
* 功能自动与HR进行沟通回复消息等
*/
async handleAutoChatTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 自动沟通任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
// TODO: 实现自动沟通逻辑
// 1. 获取待回复的聊天列表
// 2. 根据消息内容生成回复
// 3. 发送回复消息
// 4. 记录沟通结果
console.log(`[任务处理器] 自动沟通任务 - 逻辑待实现`);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return {
success: true,
message: '自动沟通任务框架已就绪,逻辑待实现',
chatCount: 0
};
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理自动活跃账号任务(待实现)
* 功能:自动执行一些操作来保持账号活跃度,如浏览职位、搜索等
*/
async handleAutoActiveAccountTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 自动活跃账号任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
// TODO: 实现自动活跃账号逻辑
// 1. 随机搜索一些职位
// 2. 浏览职位详情
// 3. 查看公司信息
// 4. 执行一些模拟用户行为
console.log(`[任务处理器] 自动活跃账号任务 - 逻辑待实现`);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return {
success: true,
message: '自动活跃账号任务框架已就绪,逻辑待实现',
actionCount: 0
};
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
} }
module.exports = TaskHandlers; module.exports = TaskHandlers;

View File

@@ -6,6 +6,8 @@ const command = require('./command');
const PriorityQueue = require('./PriorityQueue'); const PriorityQueue = require('./PriorityQueue');
const ErrorHandler = require('./ErrorHandler'); const ErrorHandler = require('./ErrorHandler');
const deviceManager = require('./deviceManager'); const deviceManager = require('./deviceManager');
const ScheduleUtils = require('./utils');
const ScheduleConfig = require('./config');
/** /**
* 任务队列管理器(重构版) * 任务队列管理器(重构版)
@@ -548,7 +550,7 @@ class TaskQueue {
} }
/** /**
* 执行任务(统一重试机制) * 执行任务(统一重试机制 + 超时保护
* @param {object} task - 任务对象 * @param {object} task - 任务对象
*/ */
async executeTask(task) { async executeTask(task) {
@@ -587,8 +589,16 @@ class TaskQueue {
throw new Error(`未找到任务类型 ${task.taskType} 的处理器,请先注册处理器`); throw new Error(`未找到任务类型 ${task.taskType} 的处理器,请先注册处理器`);
} }
// 执行任务处理器 // 获取任务超时时间从配置中获取默认10分钟
const result = await handler(task); const taskTimeout = ScheduleConfig.getTaskTimeout(task.taskType) || 10 * 60 * 1000;
// 使用超时机制包装任务执行,防止任务卡住
const taskPromise = handler(task);
const result = await ScheduleUtils.withTimeout(
taskPromise,
taskTimeout,
`任务执行超时: ${task.taskName} (任务类型: ${task.taskType}, 超时时间: ${taskTimeout / 1000}秒)`
);
// 任务成功 // 任务成功
task.status = 'completed'; task.status = 'completed';
@@ -613,46 +623,64 @@ class TaskQueue {
} catch (error) { } catch (error) {
// 使用统一错误处理 // 使用统一错误处理
const errorInfo = await ErrorHandler.handleError(error, { const errorInfo = await ErrorHandler.handleError(error, {
taskId: task.id, task_id: task.id,
sn_code: task.sn_code, sn_code: task.sn_code,
taskType: task.taskType, taskType: task.taskType,
taskName: task.taskName taskName: task.taskName
}); });
// 直接标记为失败(重试已禁用)
// 直接标记为失败(重试已禁用) task.status = 'failed';
task.status = 'failed'; task.endTime = new Date();
task.endTime = new Date(); task.duration = Date.now() - startTime;
task.duration = Date.now() - startTime; task.errorMessage = errorInfo.message || error.message || '未知错误';
task.errorMessage = errorInfo.message || error.message || '未知错误'; task.errorStack = errorInfo.stack || error.stack || '';
task.errorStack = errorInfo.stack || error.stack || '';
console.error(`[任务队列] 任务执行失败: ${task.taskName} (ID: ${task.id}), 错误: ${task.errorMessage}`, { // 更新数据库任务状态为失败
errorStack: task.errorStack, try {
taskId: task.id, await db.getModel('task_status').update(
sn_code: task.sn_code, {
taskType: task.taskType status: 'failed',
}); endTime: task.endTime,
duration: task.duration,
result: JSON.stringify({
error: task.errorMessage,
stack: task.errorStack
}),
progress: 0
},
{ where: { id: task.id } }
);
} catch (dbError) {
console.error(`[任务队列] 更新任务失败状态到数据库失败:`, dbError);
}
console.error(`[任务队列] 任务执行失败: ${task.taskName} (ID: ${task.id}), 错误: ${task.errorMessage}`, {
errorStack: task.errorStack,
task_id: task.id,
sn_code: task.sn_code,
taskType: task.taskType,
duration: task.duration
});
} }
} }
/** /**
* 取消任务 * 取消任务
* @param {string} taskId - 任务ID * @param {string} task_id - 任务ID
* @returns {Promise<boolean>} 是否成功取消 * @returns {Promise<boolean>} 是否成功取消
*/ */
async cancelTask(taskId) { async cancelTask(task_id) {
// 遍历所有设备队列查找任务 // 遍历所有设备队列查找任务
for (const [sn_code, queue] of this.deviceQueues.entries()) { for (const [sn_code, queue] of this.deviceQueues.entries()) {
const removed = queue.remove(task => task.id === taskId); const removed = queue.remove(task => task.id === task_id);
if (removed) { if (removed) {
// 检查是否正在执行 // 检查是否正在执行
const status = this.deviceStatus.get(sn_code); const status = this.deviceStatus.get(sn_code);
if (status && status.currentTask && status.currentTask.id === taskId) { if (status && status.currentTask && status.currentTask.id === task_id) {
// 正在执行的任务无法取消,只能标记 // 正在执行的任务无法取消,只能标记
console.warn(`[任务队列] 任务 ${taskId} 正在执行,无法取消`); console.warn(`[任务队列] 任务 ${task_id} 正在执行,无法取消`);
return false; return false;
} }
@@ -662,10 +690,10 @@ class TaskQueue {
status: 'cancelled', status: 'cancelled',
endTime: new Date() endTime: new Date()
}, },
{ where: { id: taskId } } { where: { id: task_id } }
); );
console.log(`[任务队列] 任务已取消: ${taskId}`); console.log(`[任务队列] 任务已取消: ${task_id}`);
return true; return true;
} }
} }
@@ -766,13 +794,13 @@ class TaskQueue {
/** /**
* 获取任务状态 * 获取任务状态
* @param {string} taskId - 任务ID * @param {string} task_id - 任务ID
* @returns {Promise<object|null>} 任务对象 * @returns {Promise<object|null>} 任务对象
*/ */
async getTaskStatus(taskId) { async getTaskStatus(task_id) {
// 先从内存中查找 // 先从内存中查找
for (const queue of this.deviceQueues.values()) { for (const queue of this.deviceQueues.values()) {
const task = queue.find(t => t.id === taskId); const task = queue.find(t => t.id === task_id);
if (task) { if (task) {
return task; return task;
} }
@@ -780,7 +808,7 @@ class TaskQueue {
// 从正在执行的任务中查找 // 从正在执行的任务中查找
for (const status of this.deviceStatus.values()) { for (const status of this.deviceStatus.values()) {
if (status.currentTask && status.currentTask.id === taskId) { if (status.currentTask && status.currentTask.id === task_id) {
return status.currentTask; return status.currentTask;
} }
} }
@@ -788,7 +816,7 @@ class TaskQueue {
// 从数据库中查找 // 从数据库中查找
try { try {
const taskRecord = await db.getModel('task_status').findOne({ const taskRecord = await db.getModel('task_status').findOne({
where: { id: taskId } where: { id: task_id }
}); });
if (taskRecord) { if (taskRecord) {
@@ -796,7 +824,7 @@ class TaskQueue {
} }
} catch (error) { } catch (error) {
console.error(`[任务队列] 查询任务状态失败:`, error, { console.error(`[任务队列] 查询任务状态失败:`, error, {
taskId: taskId task_id: task_id
}); });
} }
@@ -858,24 +886,24 @@ class TaskQueue {
}); });
console.log(`[任务队列] 已删除任务指令记录: ${commandsDeleted}`); console.log(`[任务队列] 已删除任务指令记录: ${commandsDeleted}`);
// 删除聊天记录中关联的任务记录(删除所有有 taskId 且不为空的记录) // 删除聊天记录中关联的任务记录(删除所有有 task_id 且不为空的记录)
const chatRecordsDeleted = await chatRecordsModel.destroy({ const chatRecordsDeleted = await chatRecordsModel.destroy({
where: { where: {
[Sequelize.Op.and]: [ [Sequelize.Op.and]: [
{ taskId: { [Sequelize.Op.ne]: null } }, { task_id: { [Sequelize.Op.ne]: null } },
{ taskId: { [Sequelize.Op.ne]: '' } } { task_id: { [Sequelize.Op.ne]: '' } }
] ]
}, },
truncate: false truncate: false
}); });
console.log(`[任务队列] 已删除聊天记录: ${chatRecordsDeleted}`); console.log(`[任务队列] 已删除聊天记录: ${chatRecordsDeleted}`);
// 删除投递记录中关联的任务记录(删除所有有 taskId 且不为空的记录) // 删除投递记录中关联的任务记录(删除所有有 task_id 且不为空的记录)
const applyRecordsDeleted = await applyRecordsModel.destroy({ const applyRecordsDeleted = await applyRecordsModel.destroy({
where: { where: {
[Sequelize.Op.and]: [ [Sequelize.Op.and]: [
{ taskId: { [Sequelize.Op.ne]: null } }, { task_id: { [Sequelize.Op.ne]: null } },
{ taskId: { [Sequelize.Op.ne]: '' } } { task_id: { [Sequelize.Op.ne]: '' } }
] ]
}, },
truncate: false truncate: false