300 lines
7.2 KiB
Markdown
300 lines
7.2 KiB
Markdown
# autoAiWorkSys 调度架构分析与优化建议
|
||
|
||
## 📋 目录
|
||
1. [架构概览](#架构概览)
|
||
2. [核心问题分析](#核心问题分析)
|
||
3. [优化建议](#优化建议)
|
||
4. [重构方案](#重构方案)
|
||
|
||
---
|
||
|
||
## 架构概览
|
||
|
||
### 当前架构层次
|
||
|
||
```
|
||
应用入口 (app.js)
|
||
└─> ScheduleManager (middleware/schedule/index.js)
|
||
├─> TaskQueue (taskQueue.js) - 设备级任务队列
|
||
├─> Strategy (strategy.js) - 调度策略
|
||
├─> Monitor (monitor.js) - 监控系统
|
||
├─> Command (command.js) - 指令执行
|
||
└─> MQTT Client - 设备通信
|
||
|
||
ServiceManager (services/index.js)
|
||
├─> TaskScheduler (task_scheduler.js) - 通用任务调度器(未使用)
|
||
├─> JobService (job_service.js) - 职位服务
|
||
└─> JobManager (job/jobManager.js) - 工作管理
|
||
```
|
||
|
||
### 任务执行流程
|
||
|
||
```
|
||
任务创建 → TaskQueue.addTask()
|
||
↓
|
||
保存到数据库 (task_status)
|
||
↓
|
||
processQueue() - 单设备串行执行
|
||
↓
|
||
executeTask() - 执行任务
|
||
↓
|
||
getTaskCommands() - 生成指令序列
|
||
↓
|
||
Command.executeCommands() - 执行指令
|
||
↓
|
||
MQTT.publishAndWait() - 发送到设备
|
||
↓
|
||
更新任务状态
|
||
```
|
||
|
||
---
|
||
|
||
## 核心问题分析
|
||
|
||
### 🔴 问题1: 架构层次混乱,职责不清
|
||
|
||
**问题描述:**
|
||
- 存在两套调度系统:`TaskScheduler` (services层) 和 `ScheduleManager` (middleware层)
|
||
- `TaskScheduler` 定义了完整的调度功能但未被使用
|
||
- `TaskQueue` 和 `TaskScheduler` 功能重叠(都有优先级队列、重试机制)
|
||
- `ServiceManager` 和 `ScheduleManager` 职责边界模糊
|
||
|
||
**影响:**
|
||
- 代码维护困难,新人难以理解
|
||
- 功能重复,增加维护成本
|
||
- 扩展性差,难以统一优化
|
||
|
||
---
|
||
|
||
### 🔴 问题2: 任务执行效率低
|
||
|
||
**问题描述:**
|
||
- 每个设备单线程串行执行(`TaskQueue.processQueue()`)
|
||
- 优先级队列使用简单数组,插入效率 O(n)
|
||
- 无法充分利用多核CPU资源
|
||
- 设备间无法并行执行
|
||
|
||
**影响:**
|
||
- 设备资源利用率低
|
||
- 任务执行延迟高
|
||
- 无法横向扩展
|
||
|
||
---
|
||
|
||
### 🔴 问题3: 重试机制分散,可能导致重复重试
|
||
|
||
**问题描述:**
|
||
- `TaskScheduler` 有重试机制(maxRetries, retryDelay)
|
||
- `TaskQueue` 有重试机制(retryCount, maxRetries)
|
||
- `Command` 也有重试机制(maxRetries, retryDelay)
|
||
- 三层重试可能导致总重试次数超出预期
|
||
|
||
**影响:**
|
||
- 重试次数不可控
|
||
- 资源浪费
|
||
- 错误处理逻辑复杂
|
||
|
||
---
|
||
|
||
### 🔴 问题4: 状态管理分散,可能不一致
|
||
|
||
**问题描述:**
|
||
- 内存状态:`TaskQueue.deviceQueues`、`TaskQueue.deviceStatus`
|
||
- 数据库状态:`task_status` 表
|
||
- 监控状态:`Monitor.deviceOnlineStatus`
|
||
- 策略状态:`Strategy.deviceTimestamps`、`Strategy.dailyCounters`
|
||
|
||
**影响:**
|
||
- 服务重启后状态丢失
|
||
- 内存和数据库状态可能不一致
|
||
- 难以追踪任务真实状态
|
||
|
||
---
|
||
|
||
### 🔴 问题5: 优先级队列实现效率低
|
||
|
||
**问题描述:**
|
||
- 使用简单数组 + `sort()` 实现优先级队列
|
||
- 每次插入都需要排序,时间复杂度 O(n log n)
|
||
- 应该使用堆(Heap)数据结构
|
||
|
||
**影响:**
|
||
- 队列操作性能差
|
||
- 任务数量多时性能下降明显
|
||
|
||
---
|
||
|
||
### 🔴 问题6: MQTT客户端获取方式不统一
|
||
|
||
**问题描述:**
|
||
- `ScheduleManager` 初始化时创建 MQTT 客户端
|
||
- `TaskQueue` 通过 `getMqttClient()` 动态获取
|
||
- `JobService` 直接从 `scheduleManager` 获取
|
||
- 可能导致多个MQTT连接或连接丢失
|
||
|
||
**影响:**
|
||
- 资源管理混乱
|
||
- 连接状态不可控
|
||
- 难以监控和调试
|
||
|
||
---
|
||
|
||
### 🔴 问题7: 错误处理不完善
|
||
|
||
**问题描述:**
|
||
- 部分异步操作缺少 try-catch
|
||
- 错误信息记录不完整
|
||
- 错误恢复机制缺失
|
||
|
||
**影响:**
|
||
- 错误难以追踪
|
||
- 系统稳定性差
|
||
- 调试困难
|
||
|
||
---
|
||
|
||
## 优化建议
|
||
|
||
### ✅ 优化1: 统一调度架构
|
||
|
||
**建议:**
|
||
1. **移除未使用的 `TaskScheduler`**,统一使用 `ScheduleManager` + `TaskQueue`
|
||
2. **明确职责划分**:
|
||
- `ScheduleManager`: 系统初始化、组件协调、定时任务
|
||
- `TaskQueue`: 任务队列管理、执行调度
|
||
- `Command`: 指令执行、MQTT通信
|
||
- `Strategy`: 调度策略、频率控制
|
||
- `Monitor`: 监控、统计、告警
|
||
|
||
---
|
||
|
||
### ✅ 优化2: 提升任务执行效率
|
||
|
||
**建议:**
|
||
1. **使用工作池模式**:允许设备间并行执行
|
||
2. **优化优先级队列**:使用堆(Heap)数据结构
|
||
3. **支持任务并发控制**:每个设备可配置最大并发数
|
||
|
||
---
|
||
|
||
### ✅ 优化3: 统一重试机制
|
||
|
||
**建议:**
|
||
1. **只在 TaskQueue 层实现重试**,移除 Command 层的重试
|
||
2. **使用指数退避策略**
|
||
3. **记录重试原因和次数**
|
||
|
||
---
|
||
|
||
### ✅ 优化4: 统一状态管理
|
||
|
||
**建议:**
|
||
1. **使用数据库作为唯一数据源**(Single Source of Truth)
|
||
2. **内存状态仅作为缓存**,定期同步到数据库
|
||
3. **服务启动时从数据库恢复状态**
|
||
|
||
---
|
||
|
||
### ✅ 优化5: 优化优先级队列
|
||
|
||
**建议:**
|
||
使用堆(Heap)数据结构实现优先级队列
|
||
|
||
---
|
||
|
||
### ✅ 优化6: 统一MQTT客户端管理
|
||
|
||
**建议:**
|
||
1. **使用单例模式**统一管理MQTT客户端
|
||
2. **实现连接池**(如果需要多个连接)
|
||
3. **添加连接状态监控和自动重连**
|
||
|
||
---
|
||
|
||
### ✅ 优化7: 完善错误处理
|
||
|
||
**建议:**
|
||
1. **统一错误处理中间件**
|
||
2. **完善错误日志记录**(包含上下文信息)
|
||
3. **实现错误恢复机制**
|
||
|
||
---
|
||
|
||
## 重构方案
|
||
|
||
### 阶段1: 架构清理(优先级:高)
|
||
|
||
1. **移除未使用的代码**
|
||
- 删除或标记 `TaskScheduler`(如果确实未使用)
|
||
- 清理重复功能
|
||
|
||
2. **统一MQTT管理**
|
||
- 实现统一的MQTT客户端管理器
|
||
- 所有模块通过统一接口获取客户端
|
||
|
||
3. **统一错误处理**
|
||
- 实现错误处理中间件
|
||
- 完善错误日志
|
||
|
||
### 阶段2: 性能优化(优先级:高)
|
||
|
||
1. **优化优先级队列**
|
||
- 使用堆数据结构
|
||
- 提升插入和删除效率
|
||
|
||
2. **实现工作池模式**
|
||
- 允许设备间并行执行
|
||
- 支持并发控制
|
||
|
||
3. **优化数据库操作**
|
||
- 批量更新任务状态
|
||
- 使用事务保证一致性
|
||
|
||
### 阶段3: 状态管理优化(优先级:中)
|
||
|
||
1. **统一状态管理**
|
||
- 数据库作为唯一数据源
|
||
- 内存状态作为缓存
|
||
|
||
2. **实现状态同步**
|
||
- 定期同步内存状态到数据库
|
||
- 服务启动时恢复状态
|
||
|
||
### 阶段4: 监控和可观测性(优先级:中)
|
||
|
||
1. **完善监控指标**
|
||
- 任务执行时间分布
|
||
- 错误率统计
|
||
- 资源使用情况
|
||
|
||
2. **实现告警机制**
|
||
- 任务失败率告警
|
||
- 设备离线告警
|
||
- 系统资源告警
|
||
|
||
---
|
||
|
||
## 总结
|
||
|
||
### 关键优化点
|
||
|
||
1. ✅ **统一架构**:移除冗余,明确职责
|
||
2. ✅ **提升性能**:工作池模式、堆队列、并发控制
|
||
3. ✅ **统一重试**:避免重复重试,使用指数退避
|
||
4. ✅ **状态管理**:数据库为主,内存为缓存
|
||
5. ✅ **资源管理**:统一MQTT客户端管理
|
||
6. ✅ **错误处理**:完善错误处理和恢复机制
|
||
|
||
### 预期收益
|
||
|
||
- **性能提升**:任务执行效率提升 50-100%
|
||
- **稳定性提升**:错误处理更完善,系统更稳定
|
||
- **可维护性提升**:代码结构更清晰,易于维护
|
||
- **可扩展性提升**:支持更多设备和任务类型
|
||
|
||
---
|
||
|
||
*文档生成时间:2024年*
|
||
*分析范围:autoAiWorkSys 调度架构*
|
||
|