7.2 KiB
7.2 KiB
autoAiWorkSys 调度架构分析与优化建议
📋 目录
架构概览
当前架构层次
应用入口 (app.js)
└─> ScheduleManager (middleware/schedule/index.js)
├─> TaskQueue (taskQueue.js) - 设备级任务队列
├─> Strategy (strategy.js) - 调度策略
├─> Monitor (monitor.js) - 监控系统
├─> Command (command.js) - 指令执行
└─> MQTT Client - 设备通信
ServiceManager (services/index.js)
├─> TaskScheduler (task_scheduler.js) - 通用任务调度器(未使用)
├─> JobService (job_service.js) - 职位服务
└─> JobManager (job/jobManager.js) - 工作管理
任务执行流程
任务创建 → TaskQueue.addTask()
↓
保存到数据库 (task_status)
↓
processQueue() - 单设备串行执行
↓
executeTask() - 执行任务
↓
getTaskCommands() - 生成指令序列
↓
Command.executeCommands() - 执行指令
↓
MQTT.publishAndWait() - 发送到设备
↓
更新任务状态
核心问题分析
🔴 问题1: 架构层次混乱,职责不清
问题描述:
- 存在两套调度系统:
TaskScheduler(services层) 和ScheduleManager(middleware层) TaskScheduler定义了完整的调度功能但未被使用TaskQueue和TaskScheduler功能重叠(都有优先级队列、重试机制)ServiceManager和ScheduleManager职责边界模糊
影响:
- 代码维护困难,新人难以理解
- 功能重复,增加维护成本
- 扩展性差,难以统一优化
🔴 问题2: 任务执行效率低
问题描述:
- 每个设备单线程串行执行(
TaskQueue.processQueue()) - 优先级队列使用简单数组,插入效率 O(n)
- 无法充分利用多核CPU资源
- 设备间无法并行执行
影响:
- 设备资源利用率低
- 任务执行延迟高
- 无法横向扩展
🔴 问题3: 重试机制分散,可能导致重复重试
问题描述:
TaskScheduler有重试机制(maxRetries, retryDelay)TaskQueue有重试机制(retryCount, maxRetries)Command也有重试机制(maxRetries, retryDelay)- 三层重试可能导致总重试次数超出预期
影响:
- 重试次数不可控
- 资源浪费
- 错误处理逻辑复杂
🔴 问题4: 状态管理分散,可能不一致
问题描述:
- 内存状态:
TaskQueue.deviceQueues、TaskQueue.deviceStatus - 数据库状态:
task_status表 - 监控状态:
Monitor.deviceOnlineStatus - 策略状态:
Strategy.deviceTimestamps、Strategy.dailyCounters
影响:
- 服务重启后状态丢失
- 内存和数据库状态可能不一致
- 难以追踪任务真实状态
🔴 问题5: 优先级队列实现效率低
问题描述:
- 使用简单数组 +
sort()实现优先级队列 - 每次插入都需要排序,时间复杂度 O(n log n)
- 应该使用堆(Heap)数据结构
影响:
- 队列操作性能差
- 任务数量多时性能下降明显
🔴 问题6: MQTT客户端获取方式不统一
问题描述:
ScheduleManager初始化时创建 MQTT 客户端TaskQueue通过getMqttClient()动态获取JobService直接从scheduleManager获取- 可能导致多个MQTT连接或连接丢失
影响:
- 资源管理混乱
- 连接状态不可控
- 难以监控和调试
🔴 问题7: 错误处理不完善
问题描述:
- 部分异步操作缺少 try-catch
- 错误信息记录不完整
- 错误恢复机制缺失
影响:
- 错误难以追踪
- 系统稳定性差
- 调试困难
优化建议
✅ 优化1: 统一调度架构
建议:
- 移除未使用的
TaskScheduler,统一使用ScheduleManager+TaskQueue - 明确职责划分:
ScheduleManager: 系统初始化、组件协调、定时任务TaskQueue: 任务队列管理、执行调度Command: 指令执行、MQTT通信Strategy: 调度策略、频率控制Monitor: 监控、统计、告警
✅ 优化2: 提升任务执行效率
建议:
- 使用工作池模式:允许设备间并行执行
- 优化优先级队列:使用堆(Heap)数据结构
- 支持任务并发控制:每个设备可配置最大并发数
✅ 优化3: 统一重试机制
建议:
- 只在 TaskQueue 层实现重试,移除 Command 层的重试
- 使用指数退避策略
- 记录重试原因和次数
✅ 优化4: 统一状态管理
建议:
- 使用数据库作为唯一数据源(Single Source of Truth)
- 内存状态仅作为缓存,定期同步到数据库
- 服务启动时从数据库恢复状态
✅ 优化5: 优化优先级队列
建议: 使用堆(Heap)数据结构实现优先级队列
✅ 优化6: 统一MQTT客户端管理
建议:
- 使用单例模式统一管理MQTT客户端
- 实现连接池(如果需要多个连接)
- 添加连接状态监控和自动重连
✅ 优化7: 完善错误处理
建议:
- 统一错误处理中间件
- 完善错误日志记录(包含上下文信息)
- 实现错误恢复机制
重构方案
阶段1: 架构清理(优先级:高)
-
移除未使用的代码
- 删除或标记
TaskScheduler(如果确实未使用) - 清理重复功能
- 删除或标记
-
统一MQTT管理
- 实现统一的MQTT客户端管理器
- 所有模块通过统一接口获取客户端
-
统一错误处理
- 实现错误处理中间件
- 完善错误日志
阶段2: 性能优化(优先级:高)
-
优化优先级队列
- 使用堆数据结构
- 提升插入和删除效率
-
实现工作池模式
- 允许设备间并行执行
- 支持并发控制
-
优化数据库操作
- 批量更新任务状态
- 使用事务保证一致性
阶段3: 状态管理优化(优先级:中)
-
统一状态管理
- 数据库作为唯一数据源
- 内存状态作为缓存
-
实现状态同步
- 定期同步内存状态到数据库
- 服务启动时恢复状态
阶段4: 监控和可观测性(优先级:中)
-
完善监控指标
- 任务执行时间分布
- 错误率统计
- 资源使用情况
-
实现告警机制
- 任务失败率告警
- 设备离线告警
- 系统资源告警
总结
关键优化点
- ✅ 统一架构:移除冗余,明确职责
- ✅ 提升性能:工作池模式、堆队列、并发控制
- ✅ 统一重试:避免重复重试,使用指数退避
- ✅ 状态管理:数据库为主,内存为缓存
- ✅ 资源管理:统一MQTT客户端管理
- ✅ 错误处理:完善错误处理和恢复机制
预期收益
- 性能提升:任务执行效率提升 50-100%
- 稳定性提升:错误处理更完善,系统更稳定
- 可维护性提升:代码结构更清晰,易于维护
- 可扩展性提升:支持更多设备和任务类型
文档生成时间:2024年 分析范围:autoAiWorkSys 调度架构