1169 lines
44 KiB
JavaScript
1169 lines
44 KiB
JavaScript
const { v4: uuidv4 } = require('uuid');
|
||
const Sequelize = require('sequelize');
|
||
const logs = require('../logProxy');
|
||
const db = require('../dbProxy');
|
||
const command = require('./command');
|
||
const PriorityQueue = require('./PriorityQueue');
|
||
const ErrorHandler = require('./ErrorHandler');
|
||
const deviceManager = require('./deviceManager');
|
||
const ScheduleUtils = require('./utils');
|
||
const ScheduleConfig = require('./config');
|
||
|
||
/**
|
||
* 任务队列管理器(重构版)
|
||
* - 使用优先级队列(堆)提升性能
|
||
* - 工作池模式:设备内串行执行,设备间并行执行
|
||
* - 统一重试机制
|
||
* - 统一MQTT管理
|
||
*/
|
||
class TaskQueue {
|
||
constructor(config = {}) {
|
||
// 设备任务队列映射 { sn_code: PriorityQueue }
|
||
this.deviceQueues = new Map();
|
||
|
||
// 设备执行状态 { sn_code: { isRunning, currentTask, runningCount } }
|
||
this.deviceStatus = new Map();
|
||
|
||
// 任务处理器映射 { taskType: handler }
|
||
this.taskHandlers = new Map();
|
||
|
||
// 工作池配置
|
||
this.config = {
|
||
maxConcurrency: config.maxConcurrency || 5, // 全局最大并发数(设备数)
|
||
deviceMaxConcurrency: config.deviceMaxConcurrency || 1, // 每个设备最大并发数(保持串行)
|
||
...config
|
||
};
|
||
|
||
// 全局运行中的任务数
|
||
this.globalRunningCount = 0;
|
||
|
||
// 全局任务队列(用于跨设备优先级调度,可选)
|
||
this.globalQueue = new PriorityQueue();
|
||
|
||
// 定期扫描定时器
|
||
this.scanInterval = null;
|
||
}
|
||
|
||
/**
|
||
* 初始化(从数据库恢复未完成的任务)
|
||
*/
|
||
async init() {
|
||
try {
|
||
console.log('[任务队列] 初始化中...');
|
||
|
||
// 从数据库加载pending和running状态的任务
|
||
const pendingTasks = await db.getModel('task_status').findAll({
|
||
where: {
|
||
status: ['pending', 'running']
|
||
},
|
||
order: [['priority', 'DESC'], ['id', 'ASC']]
|
||
});
|
||
|
||
// 获取所有启用的账号(移除 device_status 依赖,不再检查在线状态)
|
||
const pla_account = db.getModel('pla_account');
|
||
|
||
const enabledAccounts = await pla_account.findAll({
|
||
where: {
|
||
is_delete: 0,
|
||
is_enabled: 1
|
||
},
|
||
attributes: ['sn_code']
|
||
});
|
||
const enabledSnCodes = new Set(enabledAccounts.map(acc => acc.sn_code));
|
||
|
||
// 移除 device_status 依赖,不再检查设备在线状态
|
||
// 如果需要在线状态检查,可以在 pla_account 表中添加相应字段
|
||
const onlineSnCodes = new Set(); // 暂时设为空,表示不再检查在线状态
|
||
|
||
|
||
let restoredCount = 0;
|
||
let skippedCount = 0;
|
||
|
||
for (const taskRecord of pendingTasks) {
|
||
const task = taskRecord.toJSON();
|
||
const sn_code = task.sn_code;
|
||
|
||
// 检查账号是否启用
|
||
if (!enabledSnCodes.has(sn_code)) {
|
||
console.log(`[任务队列] 初始化时跳过任务 ${task.id}:账号 ${sn_code} 未启用`);
|
||
// 标记任务为已取消
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'cancelled',
|
||
endTime: new Date(),
|
||
result: JSON.stringify({ error: '账号未启用,任务已取消' })
|
||
},
|
||
{ where: { id: task.id } }
|
||
);
|
||
skippedCount++;
|
||
continue;
|
||
}
|
||
|
||
// 检查设备是否在线
|
||
if (!onlineSnCodes.has(sn_code)) {
|
||
console.log(`[任务队列] 初始化时跳过任务 ${task.id}:设备 ${sn_code} 不在线`);
|
||
// 不在线的任务仍然恢复,等待设备上线后执行
|
||
// 不取消任务,只是不立即执行
|
||
}
|
||
|
||
// 初始化设备队列
|
||
if (!this.deviceQueues.has(sn_code)) {
|
||
this.deviceQueues.set(sn_code, new PriorityQueue());
|
||
}
|
||
|
||
// 初始化设备状态(重要:确保设备状态存在)
|
||
if (!this.deviceStatus.has(sn_code)) {
|
||
this.deviceStatus.set(sn_code, {
|
||
isRunning: false,
|
||
currentTask: null,
|
||
runningCount: 0
|
||
});
|
||
}
|
||
|
||
// 恢复任务对象
|
||
const taskObj = {
|
||
id: task.id,
|
||
sn_code: task.sn_code,
|
||
taskType: task.taskType,
|
||
taskName: task.taskName,
|
||
taskParams: task.taskParams ? JSON.parse(task.taskParams) : {},
|
||
priority: task.priority || 5,
|
||
maxRetries: task.maxRetries || 3,
|
||
retryCount: task.retryCount || 0,
|
||
status: 'pending',
|
||
createdAt: task.create_time ? new Date(task.create_time).getTime() : Date.now()
|
||
};
|
||
|
||
// 添加到设备队列
|
||
this.deviceQueues.get(sn_code).push(taskObj);
|
||
restoredCount++;
|
||
|
||
// 如果状态是running,重置为pending
|
||
if (task.status === 'running') {
|
||
await db.getModel('task_status').update(
|
||
{ status: 'pending' },
|
||
{ where: { id: task.id } }
|
||
);
|
||
}
|
||
}
|
||
|
||
// 恢复任务后,尝试执行所有设备的队列(只执行在线且启用的设备)
|
||
for (const sn_code of this.deviceQueues.keys()) {
|
||
// 只处理启用且在线的设备
|
||
if (enabledSnCodes.has(sn_code) && onlineSnCodes.has(sn_code)) {
|
||
this.processQueue(sn_code).catch(error => {
|
||
console.error(`[任务队列] 初始化后执行队列失败 (设备: ${sn_code}):`, error);
|
||
});
|
||
} else {
|
||
console.log(`[任务队列] 初始化时跳过设备 ${sn_code} 的队列执行:${!enabledSnCodes.has(sn_code) ? '账号未启用' : '设备不在线'}`);
|
||
}
|
||
}
|
||
|
||
console.log(`[任务队列] 初始化完成,恢复 ${restoredCount} 个任务,跳过 ${skippedCount} 个未启用账号的任务`);
|
||
|
||
// 启动定期扫描机制(每10秒扫描一次)
|
||
this.startQueueScanner();
|
||
} catch (error) {
|
||
console.error('[任务队列] 初始化失败:', error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 启动队列扫描器(定期检查并执行队列中的任务)
|
||
*/
|
||
startQueueScanner() {
|
||
// 如果已经启动,先清除
|
||
if (this.scanInterval) {
|
||
clearInterval(this.scanInterval);
|
||
}
|
||
|
||
// 每10秒扫描一次所有设备的队列
|
||
this.scanInterval = setInterval(() => {
|
||
this.scanAndProcessQueues();
|
||
}, 10000); // 10秒扫描一次
|
||
|
||
console.log('[任务队列] 队列扫描器已启动(每10秒扫描一次)');
|
||
}
|
||
|
||
/**
|
||
* 停止队列扫描器
|
||
*/
|
||
stopQueueScanner() {
|
||
if (this.scanInterval) {
|
||
clearInterval(this.scanInterval);
|
||
this.scanInterval = null;
|
||
console.log('[任务队列] 队列扫描器已停止');
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 扫描所有设备的队列并尝试执行任务(过滤未启用的账号和不在线的设备)
|
||
*/
|
||
async scanAndProcessQueues() {
|
||
try {
|
||
const deviceCount = this.deviceQueues.size;
|
||
if (deviceCount === 0) {
|
||
return;
|
||
}
|
||
|
||
// 获取所有启用的账号对应的设备SN码
|
||
const pla_account = db.getModel('pla_account');
|
||
const enabledAccounts = await pla_account.findAll({
|
||
where: {
|
||
is_delete: 0,
|
||
is_enabled: 1
|
||
},
|
||
attributes: ['sn_code']
|
||
});
|
||
|
||
const enabledSnCodes = new Set(enabledAccounts.map(acc => acc.sn_code));
|
||
|
||
// 移除 device_status 依赖,不再检查设备在线状态
|
||
// 如果需要在线状态检查,可以从 deviceManager 获取
|
||
const deviceManager = require('./deviceManager');
|
||
const deviceStatus = deviceManager.getAllDevicesStatus();
|
||
const onlineSnCodes = new Set(
|
||
Object.entries(deviceStatus)
|
||
.filter(([sn_code, status]) => status.isOnline)
|
||
.map(([sn_code]) => sn_code)
|
||
);
|
||
|
||
// 原有代码已移除,改为使用 deviceManager
|
||
/* 原有代码已注释
|
||
const device_status = db.getModel('device_status');
|
||
const heartbeatTimeout = require('./config.js').monitoring.heartbeatTimeout;
|
||
const now = new Date();
|
||
const heartbeatThreshold = new Date(now.getTime() - heartbeatTimeout);
|
||
|
||
const onlineDevices = await device_status.findAll({
|
||
where: {
|
||
isOnline: true,
|
||
lastHeartbeatTime: {
|
||
[Sequelize.Op.gte]: heartbeatThreshold // 心跳时间在阈值内
|
||
}
|
||
},
|
||
attributes: ['sn_code']
|
||
});
|
||
const onlineSnCodes = new Set(onlineDevices.map(dev => dev.sn_code));
|
||
*/
|
||
|
||
let processedCount = 0;
|
||
let queuedCount = 0;
|
||
let skippedCount = 0;
|
||
|
||
// 遍历所有设备的队列,只处理启用账号且在线设备
|
||
for (const [sn_code, queue] of this.deviceQueues.entries()) {
|
||
// 跳过未启用的账号
|
||
if (!enabledSnCodes.has(sn_code)) {
|
||
skippedCount++;
|
||
continue;
|
||
}
|
||
|
||
// 跳过不在线的设备
|
||
if (!onlineSnCodes.has(sn_code)) {
|
||
skippedCount++;
|
||
continue;
|
||
}
|
||
|
||
const queueSize = queue.size();
|
||
if (queueSize > 0) {
|
||
queuedCount += queueSize;
|
||
// 尝试处理该设备的队列
|
||
this.processQueue(sn_code).catch(error => {
|
||
console.error(`[任务队列] 扫描执行队列失败 (设备: ${sn_code}):`, error);
|
||
});
|
||
processedCount++;
|
||
}
|
||
}
|
||
|
||
if (queuedCount > 0) {
|
||
console.log(`[任务队列] 扫描完成: ${processedCount} 个设备有任务,共 ${queuedCount} 个待执行任务,跳过 ${skippedCount} 个设备`);
|
||
}
|
||
} catch (error) {
|
||
console.error('[任务队列] 扫描队列失败:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 注册任务处理器
|
||
* @param {string} taskType - 任务类型
|
||
* @param {function} handler - 处理函数
|
||
*/
|
||
registerHandler(taskType, handler) {
|
||
this.taskHandlers.set(taskType, handler);
|
||
}
|
||
|
||
/**
|
||
* 查找设备是否已有相同类型的任务
|
||
* @param {string} sn_code - 设备SN码
|
||
* @param {string} taskType - 任务类型
|
||
* @returns {Promise<object|null>} 现有任务或null
|
||
*/
|
||
async findExistingTask(sn_code, taskType) {
|
||
// 检查当前正在执行的任务
|
||
const deviceStatus = this.deviceStatus.get(sn_code);
|
||
if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.taskType === taskType) {
|
||
return deviceStatus.currentTask;
|
||
}
|
||
|
||
// 检查队列中等待的任务
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
if (queue) {
|
||
const existingTask = queue.find(task => task.taskType === taskType && (task.status === 'pending' || !task.status));
|
||
if (existingTask) {
|
||
return existingTask;
|
||
}
|
||
}
|
||
|
||
// 检查数据库中的pending/running任务(防止系统重启后重复添加)
|
||
try {
|
||
const Sequelize = require('sequelize');
|
||
const taskStatusModel = db.getModel('task_status');
|
||
const existingDbTask = await taskStatusModel.findOne({
|
||
where: {
|
||
sn_code: sn_code,
|
||
taskType: taskType,
|
||
status: ['pending', 'running']
|
||
},
|
||
order: [['id', 'DESC']]
|
||
});
|
||
|
||
if (existingDbTask) {
|
||
return {
|
||
id: existingDbTask.id,
|
||
taskType: existingDbTask.taskType,
|
||
status: existingDbTask.status
|
||
};
|
||
}
|
||
} catch (error) {
|
||
console.error(`[任务队列] 检查数据库现有任务失败:`, error);
|
||
}
|
||
|
||
return null;
|
||
}
|
||
|
||
/**
|
||
* 检查账号是否启用
|
||
* @param {string} sn_code - 设备SN码
|
||
* @returns {Promise<boolean>} 是否启用
|
||
*/
|
||
async checkAccountEnabled(sn_code) {
|
||
try {
|
||
const pla_account = db.getModel('pla_account');
|
||
const account = await pla_account.findOne({
|
||
where: {
|
||
sn_code: sn_code,
|
||
is_delete: 0
|
||
},
|
||
attributes: ['is_enabled']
|
||
});
|
||
|
||
if (!account) {
|
||
console.warn(`[任务队列] 设备 ${sn_code} 对应的账号不存在`);
|
||
return false;
|
||
}
|
||
|
||
const isEnabled = Boolean(account.is_enabled);
|
||
if (!isEnabled) {
|
||
console.log(`[任务队列] 设备 ${sn_code} 对应的账号未启用,跳过任务`);
|
||
}
|
||
|
||
return isEnabled;
|
||
} catch (error) {
|
||
console.error(`[任务队列] 检查账号启用状态失败:`, error);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 添加任务到队列
|
||
* @param {string} sn_code - 设备SN码
|
||
* @param {object} taskConfig - 任务配置
|
||
* @returns {Promise<string>} 任务ID
|
||
*/
|
||
async addTask(sn_code, taskConfig) {
|
||
// 检查账号是否启用
|
||
const isEnabled = await this.checkAccountEnabled(sn_code);
|
||
if (!isEnabled) {
|
||
throw new Error(`账号未启用,无法添加任务`);
|
||
}
|
||
|
||
// 检查是否已有相同类型的任务在队列中或正在执行
|
||
const existingTask = await this.findExistingTask(sn_code, taskConfig.taskType);
|
||
if (existingTask) {
|
||
console.log(`[任务队列] 设备 ${sn_code} 已有 ${taskConfig.taskType} 任务在执行或等待中,跳过添加`);
|
||
return existingTask.id;
|
||
}
|
||
|
||
const task = {
|
||
sn_code,
|
||
taskType: taskConfig.taskType,
|
||
taskName: taskConfig.taskName || taskConfig.taskType,
|
||
taskParams: taskConfig.taskParams || {},
|
||
priority: taskConfig.priority || 5,
|
||
maxRetries: taskConfig.maxRetries || 3,
|
||
retryCount: 0,
|
||
status: 'pending',
|
||
createdAt: Date.now()
|
||
};
|
||
|
||
// 初始化设备队列
|
||
if (!this.deviceQueues.has(sn_code)) {
|
||
this.deviceQueues.set(sn_code, new PriorityQueue());
|
||
}
|
||
|
||
// 初始化设备状态
|
||
if (!this.deviceStatus.has(sn_code)) {
|
||
this.deviceStatus.set(sn_code, {
|
||
isRunning: false,
|
||
currentTask: null,
|
||
runningCount: 0
|
||
});
|
||
}
|
||
|
||
// 保存到数据库
|
||
let res = await db.getModel('task_status').create({
|
||
sn_code: task.sn_code,
|
||
taskType: task.taskType,
|
||
taskName: task.taskName,
|
||
taskParams: JSON.stringify(task.taskParams),
|
||
status: task.status,
|
||
priority: task.priority,
|
||
maxRetries: task.maxRetries,
|
||
retryCount: task.retryCount,
|
||
});
|
||
|
||
// 使用数据库返回的自增ID
|
||
task.id = res.id;
|
||
|
||
// 添加到优先级队列
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
queue.push(task);
|
||
|
||
console.log(`[任务队列] 任务已添加到队列: ${task.taskName} (ID: ${task.id}, 优先级: ${task.priority}),等待扫描机制执行`);
|
||
|
||
// 不立即执行,等待扫描机制自动执行
|
||
// 扫描机制会定期检查队列并执行任务
|
||
|
||
return res.id;
|
||
}
|
||
|
||
/**
|
||
* 处理设备的任务队列(工作池模式)
|
||
* 设备内串行执行,设备间并行执行
|
||
* @param {string} sn_code - 设备SN码
|
||
*/
|
||
async processQueue(sn_code) {
|
||
try {
|
||
// 先检查账号是否启用
|
||
const isEnabled = await this.checkAccountEnabled(sn_code);
|
||
if (!isEnabled) {
|
||
// 如果账号未启用,从队列中移除所有待执行任务
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
if (queue && queue.size() > 0) {
|
||
console.log(`[任务队列] 设备 ${sn_code} 账号未启用,清空队列中的 ${queue.size()} 个待执行任务`);
|
||
// 标记所有待执行任务为已取消
|
||
const queueArray = queue.toArray();
|
||
for (const task of queueArray) {
|
||
if (task.status === 'pending') {
|
||
try {
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'cancelled',
|
||
endTime: new Date(),
|
||
result: JSON.stringify({ error: '账号未启用,任务已取消' })
|
||
},
|
||
{ where: { id: task.id } }
|
||
);
|
||
} catch (error) {
|
||
console.error(`[任务队列] 更新任务状态失败:`, error);
|
||
}
|
||
}
|
||
}
|
||
queue.clear();
|
||
}
|
||
return;
|
||
}
|
||
|
||
const status = this.deviceStatus.get(sn_code);
|
||
if (!status) {
|
||
console.warn(`[任务队列] 设备 ${sn_code} 状态不存在,无法执行任务`);
|
||
return;
|
||
}
|
||
|
||
// 检查设备是否正在执行任务(设备内串行)
|
||
if (status.isRunning || status.runningCount >= this.config.deviceMaxConcurrency) {
|
||
console.log(`[任务队列] 设备 ${sn_code} 正在执行任务,等待中... (isRunning: ${status.isRunning}, runningCount: ${status.runningCount})`);
|
||
return;
|
||
}
|
||
|
||
// 检查全局并发限制(设备间并行控制)
|
||
if (this.globalRunningCount >= this.config.maxConcurrency) {
|
||
console.log(`[任务队列] 全局并发数已达上限 (${this.globalRunningCount}/${this.config.maxConcurrency}),等待中...`);
|
||
return;
|
||
}
|
||
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
if (!queue || queue.isEmpty()) {
|
||
console.log(`[任务队列] 设备 ${sn_code} 队列为空,无任务可执行`);
|
||
return;
|
||
}
|
||
|
||
// 从优先级队列取出任务
|
||
const task = queue.pop();
|
||
if (!task) {
|
||
console.warn(`[任务队列] 设备 ${sn_code} 队列非空但无法取出任务`);
|
||
return;
|
||
}
|
||
|
||
console.log(`[任务队列] 开始执行任务: ${task.taskName} (ID: ${task.id}, 设备: ${sn_code})`);
|
||
|
||
// 更新状态
|
||
status.isRunning = true;
|
||
status.currentTask = task;
|
||
status.runningCount++;
|
||
this.globalRunningCount++;
|
||
|
||
// 异步执行任务(不阻塞)
|
||
this.executeTask(task).finally(() => {
|
||
// 任务完成后更新状态
|
||
status.isRunning = false;
|
||
status.currentTask = null;
|
||
status.runningCount--;
|
||
this.globalRunningCount--;
|
||
|
||
console.log(`[任务队列] 任务完成,设备 ${sn_code} 状态已重置,准备处理下一个任务`);
|
||
|
||
// 继续处理队列中的下一个任务(延迟一小段时间,确保状态已更新)
|
||
setTimeout(() => {
|
||
this.processQueue(sn_code).catch(error => {
|
||
console.error(`[任务队列] processQueue 执行失败 (设备: ${sn_code}):`, error);
|
||
});
|
||
}, 100); // 延迟100ms确保状态已更新
|
||
});
|
||
} catch (error) {
|
||
console.error(`[任务队列] processQueue 处理失败 (设备: ${sn_code}):`, error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 执行任务(统一重试机制 + 超时保护)
|
||
* @param {object} task - 任务对象
|
||
*/
|
||
async executeTask(task) {
|
||
const startTime = Date.now();
|
||
task.status = 'running';
|
||
task.startTime = new Date();
|
||
|
||
try {
|
||
// 执行前再次检查账号是否启用(双重保险)
|
||
const isEnabled = await this.checkAccountEnabled(task.sn_code);
|
||
if (!isEnabled) {
|
||
// 更新任务状态为已取消
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'cancelled',
|
||
endTime: new Date(),
|
||
result: JSON.stringify({ error: '账号未启用,任务已取消' })
|
||
},
|
||
{ where: { id: task.id } }
|
||
);
|
||
throw new Error(`账号未启用,任务已取消`);
|
||
}
|
||
|
||
// 更新数据库状态
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'running',
|
||
startTime: task.startTime
|
||
},
|
||
{ where: { id: task.id } }
|
||
);
|
||
|
||
// 通知客户端任务状态变更
|
||
await this.notifyTaskStatusChange(task.sn_code, {
|
||
taskId: task.id,
|
||
taskName: task.taskName,
|
||
taskType: task.taskType,
|
||
status: 'running',
|
||
progress: 0,
|
||
startTime: task.startTime
|
||
});
|
||
|
||
// 使用注册的任务处理器执行任务
|
||
const handler = this.taskHandlers.get(task.taskType);
|
||
if (!handler) {
|
||
throw new Error(`未找到任务类型 ${task.taskType} 的处理器,请先注册处理器`);
|
||
}
|
||
|
||
// 获取任务超时时间(从配置中获取,默认10分钟)
|
||
const taskTimeout = ScheduleConfig.getTaskTimeout(task.taskType) || 10 * 60 * 1000;
|
||
|
||
// 使用超时机制包装任务执行,防止任务卡住
|
||
const taskPromise = handler(task);
|
||
const result = await ScheduleUtils.withTimeout(
|
||
taskPromise,
|
||
taskTimeout,
|
||
`任务执行超时: ${task.taskName} (任务类型: ${task.taskType}, 超时时间: ${taskTimeout / 1000}秒)`
|
||
);
|
||
|
||
// 任务成功
|
||
task.status = 'completed';
|
||
task.endTime = new Date();
|
||
task.duration = Date.now() - startTime;
|
||
task.result = result;
|
||
|
||
// 更新数据库
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'completed',
|
||
endTime: task.endTime,
|
||
duration: task.duration,
|
||
result: JSON.stringify(task.result),
|
||
progress: 100
|
||
},
|
||
{ where: { id: task.id } }
|
||
);
|
||
|
||
// 通知客户端任务状态变更
|
||
await this.notifyTaskStatusChange(task.sn_code, {
|
||
taskId: task.id,
|
||
taskName: task.taskName,
|
||
taskType: task.taskType,
|
||
status: 'completed',
|
||
progress: 100,
|
||
endTime: task.endTime
|
||
});
|
||
|
||
console.log(`[任务队列] 设备 ${task.sn_code} 任务执行成功: ${task.taskName} (耗时: ${task.duration}ms)`);
|
||
|
||
} catch (error) {
|
||
// 使用统一错误处理
|
||
const errorInfo = await ErrorHandler.handleError(error, {
|
||
task_id: task.id,
|
||
sn_code: task.sn_code,
|
||
taskType: task.taskType,
|
||
taskName: task.taskName
|
||
});
|
||
|
||
// 直接标记为失败(重试已禁用)
|
||
task.status = 'failed';
|
||
task.endTime = new Date();
|
||
task.duration = Date.now() - startTime;
|
||
task.errorMessage = errorInfo.message || error.message || '未知错误';
|
||
task.errorStack = errorInfo.stack || error.stack || '';
|
||
|
||
// 更新数据库任务状态为失败
|
||
try {
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'failed',
|
||
endTime: task.endTime,
|
||
duration: task.duration,
|
||
result: JSON.stringify({
|
||
error: task.errorMessage,
|
||
stack: task.errorStack
|
||
}),
|
||
progress: 0
|
||
},
|
||
{ where: { id: task.id } }
|
||
);
|
||
|
||
// 通知客户端任务状态变更
|
||
await this.notifyTaskStatusChange(task.sn_code, {
|
||
taskId: task.id,
|
||
taskName: task.taskName,
|
||
taskType: task.taskType,
|
||
status: 'failed',
|
||
progress: 0,
|
||
errorMessage: task.errorMessage,
|
||
endTime: task.endTime
|
||
});
|
||
} catch (dbError) {
|
||
console.error(`[任务队列] 更新任务失败状态到数据库失败:`, dbError);
|
||
}
|
||
|
||
console.error(`[任务队列] 任务执行失败: ${task.taskName} (ID: ${task.id}), 错误: ${task.errorMessage}`, {
|
||
errorStack: task.errorStack,
|
||
task_id: task.id,
|
||
sn_code: task.sn_code,
|
||
taskType: task.taskType,
|
||
duration: task.duration
|
||
});
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 取消任务
|
||
* @param {string} task_id - 任务ID
|
||
* @returns {Promise<boolean>} 是否成功取消
|
||
*/
|
||
async cancelTask(task_id) {
|
||
// 遍历所有设备队列查找任务
|
||
for (const [sn_code, queue] of this.deviceQueues.entries()) {
|
||
const removed = queue.remove(task => task.id === task_id);
|
||
|
||
if (removed) {
|
||
// 检查是否正在执行
|
||
const status = this.deviceStatus.get(sn_code);
|
||
if (status && status.currentTask && status.currentTask.id === task_id) {
|
||
// 正在执行的任务无法取消,只能标记
|
||
console.warn(`[任务队列] 任务 ${task_id} 正在执行,无法取消`);
|
||
return false;
|
||
}
|
||
|
||
// 更新数据库
|
||
await db.getModel('task_status').update(
|
||
{
|
||
status: 'cancelled',
|
||
endTime: new Date()
|
||
},
|
||
{ where: { id: task_id } }
|
||
);
|
||
|
||
console.log(`[任务队列] 任务已取消: ${task_id}`);
|
||
return true;
|
||
}
|
||
}
|
||
|
||
// 未找到可取消的任务
|
||
return false;
|
||
}
|
||
|
||
/**
|
||
* 取消设备的所有待执行任务
|
||
* @param {string} sn_code - 设备SN码
|
||
* @returns {Promise<number>} 取消的任务数量
|
||
*/
|
||
async cancelDeviceTasks(sn_code) {
|
||
let cancelledCount = 0;
|
||
|
||
// 1. 从队列中移除所有待执行任务
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
if (queue) {
|
||
const pendingTasks = [];
|
||
// 获取所有待执行任务(不包括正在执行的)
|
||
const status = this.deviceStatus.get(sn_code);
|
||
const currentTaskId = status && status.currentTask ? status.currentTask.id : null;
|
||
|
||
// 遍历队列,收集待取消的任务
|
||
const queueArray = queue.toArray();
|
||
for (const task of queueArray) {
|
||
if (task.id !== currentTaskId && (task.status === 'pending' || !task.status)) {
|
||
pendingTasks.push(task);
|
||
}
|
||
}
|
||
|
||
// 从队列中移除这些任务
|
||
for (const task of pendingTasks) {
|
||
queue.remove(t => t.id === task.id);
|
||
cancelledCount++;
|
||
}
|
||
}
|
||
|
||
// 2. 更新数据库中的任务状态
|
||
try {
|
||
const taskStatusModel = db.getModel('task_status');
|
||
const status = this.deviceStatus.get(sn_code);
|
||
const currentTaskId = status && status.currentTask ? status.currentTask.id : null;
|
||
|
||
// 更新所有待执行或运行中的任务(除了当前正在执行的)
|
||
const whereCondition = {
|
||
sn_code: sn_code,
|
||
status: ['pending', 'running']
|
||
};
|
||
|
||
if (currentTaskId) {
|
||
whereCondition.id = { [Sequelize.Op.ne]: currentTaskId };
|
||
}
|
||
|
||
const updateResult = await taskStatusModel.update(
|
||
{
|
||
status: 'cancelled',
|
||
endTime: new Date()
|
||
},
|
||
{ where: whereCondition }
|
||
);
|
||
|
||
const dbCancelledCount = Array.isArray(updateResult) ? updateResult[0] : updateResult;
|
||
console.log(`[任务队列] 设备 ${sn_code} 已取消 ${cancelledCount} 个队列任务,${dbCancelledCount} 个数据库任务`);
|
||
} catch (error) {
|
||
console.error(`[任务队列] 更新数据库任务状态失败:`, error, {
|
||
sn_code: sn_code,
|
||
cancelledCount: cancelledCount
|
||
});
|
||
}
|
||
|
||
return cancelledCount;
|
||
}
|
||
|
||
/**
|
||
* 获取设备队列状态
|
||
* @param {string} sn_code - 设备SN码
|
||
* @returns {object} 队列状态
|
||
*/
|
||
getDeviceStatus(sn_code) {
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
const status = this.deviceStatus.get(sn_code) || {
|
||
isRunning: false,
|
||
currentTask: null,
|
||
runningCount: 0
|
||
};
|
||
|
||
return {
|
||
sn_code,
|
||
isRunning: status.isRunning,
|
||
currentTask: status.currentTask,
|
||
queueLength: queue ? queue.size() : 0,
|
||
pendingTasks: queue ? queue.size() : 0,
|
||
runningCount: status.runningCount
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 获取任务状态
|
||
* @param {string} task_id - 任务ID
|
||
* @returns {Promise<object|null>} 任务对象
|
||
*/
|
||
async getTaskStatus(task_id) {
|
||
// 先从内存中查找
|
||
for (const queue of this.deviceQueues.values()) {
|
||
const task = queue.find(t => t.id === task_id);
|
||
if (task) {
|
||
return task;
|
||
}
|
||
}
|
||
|
||
// 从正在执行的任务中查找
|
||
for (const status of this.deviceStatus.values()) {
|
||
if (status.currentTask && status.currentTask.id === task_id) {
|
||
return status.currentTask;
|
||
}
|
||
}
|
||
|
||
// 从数据库中查找
|
||
try {
|
||
const taskRecord = await db.getModel('task_status').findOne({
|
||
where: { id: task_id }
|
||
});
|
||
|
||
if (taskRecord) {
|
||
return taskRecord.toJSON();
|
||
}
|
||
} catch (error) {
|
||
console.error(`[任务队列] 查询任务状态失败:`, error, {
|
||
task_id: task_id
|
||
});
|
||
}
|
||
|
||
return null;
|
||
}
|
||
|
||
/**
|
||
* 清空设备队列
|
||
* @param {string} sn_code - 设备SN码
|
||
*/
|
||
clearQueue(sn_code) {
|
||
if (this.deviceQueues.has(sn_code)) {
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
const count = queue.size();
|
||
queue.clear();
|
||
console.log(`[任务队列] 已清空设备 ${sn_code} 的队列,共移除 ${count} 个任务`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 删除所有任务(从内存队列和数据库)
|
||
* @returns {Promise<object>} 删除结果
|
||
*/
|
||
async deleteAllTaskFromDatabase() {
|
||
try {
|
||
console.log('[任务队列] 开始删除所有任务...');
|
||
|
||
let totalQueued = 0;
|
||
let totalRunning = 0;
|
||
|
||
// 1. 清空所有设备的内存队列
|
||
for (const [sn_code, queue] of this.deviceQueues.entries()) {
|
||
const queueSize = queue.size();
|
||
totalQueued += queueSize;
|
||
queue.clear();
|
||
|
||
// 重置设备状态(但保留正在执行的任务信息,稍后处理)
|
||
const status = this.deviceStatus.get(sn_code);
|
||
if (status && status.currentTask) {
|
||
totalRunning++;
|
||
// 标记正在执行的任务,但不立即取消(让它们自然完成或失败)
|
||
console.warn(`[任务队列] 设备 ${sn_code} 有正在执行的任务,将在完成后清理`);
|
||
}
|
||
}
|
||
|
||
// 2. 使用 MCP MySQL 删除所有关联数据(先删除关联表,再删除主表)
|
||
// 注意:MCP MySQL 是只读的,这里使用 Sequelize 执行删除操作
|
||
// 但移除数据库层面的外键关联,避免约束问题
|
||
|
||
const taskCommandsModel = db.getModel('task_commands');
|
||
const chatRecordsModel = db.getModel('chat_records');
|
||
const applyRecordsModel = db.getModel('apply_records');
|
||
const taskStatusModel = db.getModel('task_status');
|
||
|
||
// 删除任务指令记录(所有记录)
|
||
const commandsDeleted = await taskCommandsModel.destroy({
|
||
where: {},
|
||
truncate: false
|
||
});
|
||
console.log(`[任务队列] 已删除任务指令记录: ${commandsDeleted} 条`);
|
||
|
||
// 删除聊天记录中关联的任务记录(删除所有有 task_id 且不为空的记录)
|
||
const chatRecordsDeleted = await chatRecordsModel.destroy({
|
||
where: {
|
||
[Sequelize.Op.and]: [
|
||
{ task_id: { [Sequelize.Op.ne]: null } },
|
||
{ task_id: { [Sequelize.Op.ne]: '' } }
|
||
]
|
||
},
|
||
truncate: false
|
||
});
|
||
console.log(`[任务队列] 已删除聊天记录: ${chatRecordsDeleted} 条`);
|
||
|
||
// 删除投递记录中关联的任务记录(删除所有有 task_id 且不为空的记录)
|
||
const applyRecordsDeleted = await applyRecordsModel.destroy({
|
||
where: {
|
||
[Sequelize.Op.and]: [
|
||
{ task_id: { [Sequelize.Op.ne]: null } },
|
||
{ task_id: { [Sequelize.Op.ne]: '' } }
|
||
]
|
||
},
|
||
truncate: false
|
||
});
|
||
console.log(`[任务队列] 已删除投递记录: ${applyRecordsDeleted} 条`);
|
||
|
||
// 3. 删除数据库中的所有任务记录
|
||
const deleteResult = await taskStatusModel.destroy({
|
||
where: {},
|
||
truncate: false // 使用 DELETE 而不是 TRUNCATE,保留表结构
|
||
});
|
||
|
||
console.log(`[任务队列] 已删除所有任务:`);
|
||
console.log(` - 内存队列任务: ${totalQueued} 个`);
|
||
console.log(` - 正在执行任务: ${totalRunning} 个(将在完成后清理)`);
|
||
console.log(` - 任务指令记录: ${commandsDeleted} 条`);
|
||
console.log(` - 聊天记录: ${chatRecordsDeleted} 条`);
|
||
console.log(` - 投递记录: ${applyRecordsDeleted} 条`);
|
||
console.log(` - 数据库任务记录: ${deleteResult} 条`);
|
||
|
||
return {
|
||
success: true,
|
||
memoryQueued: totalQueued,
|
||
memoryRunning: totalRunning,
|
||
commandsDeleted: commandsDeleted,
|
||
chatRecordsDeleted: chatRecordsDeleted,
|
||
applyRecordsDeleted: applyRecordsDeleted,
|
||
databaseDeleted: deleteResult,
|
||
message: `已删除所有任务及关联数据(任务: ${deleteResult} 条,指令: ${commandsDeleted} 条,聊天: ${chatRecordsDeleted} 条,投递: ${applyRecordsDeleted} 条)`
|
||
};
|
||
} catch (error) {
|
||
console.error('[任务队列] 删除所有任务失败:', error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取所有设备的队列状态
|
||
* @returns {array} 所有设备的队列状态
|
||
*/
|
||
getAllDeviceStatus() {
|
||
const allStatus = [];
|
||
|
||
for (const sn_code of this.deviceQueues.keys()) {
|
||
allStatus.push(this.getDeviceStatus(sn_code));
|
||
}
|
||
|
||
return allStatus;
|
||
}
|
||
|
||
/**
|
||
* 获取全局统计信息
|
||
* @returns {object} 统计信息
|
||
*/
|
||
getStatistics() {
|
||
let totalQueued = 0;
|
||
for (const queue of this.deviceQueues.values()) {
|
||
totalQueued += queue.size();
|
||
}
|
||
|
||
return {
|
||
globalRunningCount: this.globalRunningCount,
|
||
maxConcurrency: this.config.maxConcurrency,
|
||
totalDevices: this.deviceQueues.size,
|
||
totalQueuedTasks: totalQueued,
|
||
deviceStatuses: this.getAllDeviceStatus()
|
||
};
|
||
}
|
||
|
||
|
||
/**
|
||
* 获取MQTT客户端(统一管理)
|
||
* @returns {Promise<object|null>} MQTT客户端实例
|
||
*/
|
||
async getMqttClient() {
|
||
try {
|
||
// 首先尝试从调度系统获取已初始化的MQTT客户端
|
||
const scheduleManager = require('./index');
|
||
if (scheduleManager.mqttClient) {
|
||
return scheduleManager.mqttClient;
|
||
}
|
||
|
||
// 如果调度系统没有初始化,则直接创建
|
||
const mqttManager = require('../mqtt/mqttManager');
|
||
console.log('[任务队列] 创建新的MQTT客户端');
|
||
return await mqttManager.getInstance();
|
||
} catch (error) {
|
||
console.error(`[任务队列] 获取MQTT客户端失败:`, error);
|
||
return null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 通知客户端任务状态变更
|
||
* @param {string} sn_code - 设备SN码
|
||
* @param {object} taskData - 任务数据
|
||
*/
|
||
async notifyTaskStatusChange(sn_code, taskData) {
|
||
try {
|
||
const mqttClient = await this.getMqttClient();
|
||
if (!mqttClient) {
|
||
return; // MQTT客户端不可用,静默失败
|
||
}
|
||
|
||
// 通过MQTT发布任务状态变更通知
|
||
// 主题格式: task_status_{sn_code}
|
||
const topic = `task_status_${sn_code}`;
|
||
const message = JSON.stringify({
|
||
action: 'task_status_update',
|
||
data: taskData,
|
||
timestamp: new Date().toISOString()
|
||
});
|
||
|
||
await mqttClient.publish(topic, message);
|
||
console.log(`[任务队列] 已通知客户端任务状态变更: ${sn_code} - ${taskData.taskName || taskData.taskType || '未知任务'} (${taskData.status})`);
|
||
} catch (error) {
|
||
// 通知失败不影响任务执行,只记录日志
|
||
console.warn(`[任务队列] 通知客户端任务状态变更失败:`, error.message);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取任务状态摘要(用于同步到客户端)
|
||
* @param {string} sn_code - 设备SN码
|
||
* @returns {Promise<object>} 任务状态摘要
|
||
*/
|
||
async getTaskStatusSummary(sn_code) {
|
||
try {
|
||
const queue = this.deviceQueues.get(sn_code);
|
||
const status = this.deviceStatus.get(sn_code) || {
|
||
isRunning: false,
|
||
currentTask: null,
|
||
runningCount: 0
|
||
};
|
||
|
||
// 获取当前执行的任务
|
||
let currentTask = null;
|
||
if (status.currentTask) {
|
||
const taskData = status.currentTask;
|
||
currentTask = {
|
||
taskId: taskData.id,
|
||
taskName: taskData.taskName || taskData.task_name || taskData.taskType || taskData.task_type || '未知任务',
|
||
taskType: taskData.taskType || taskData.task_type,
|
||
status: 'running',
|
||
progress: taskData.progress || 0,
|
||
currentStep: taskData.currentStep || taskData.current_step || '',
|
||
startTime: taskData.startTime || taskData.start_time || taskData.created_time
|
||
};
|
||
}
|
||
|
||
// 获取待执行任务列表(最多10个)
|
||
const pendingTasks = [];
|
||
if (queue && queue.size() > 0) {
|
||
const queueArray = queue.toArray();
|
||
for (const task of queueArray.slice(0, 10)) {
|
||
const taskData = task;
|
||
pendingTasks.push({
|
||
taskId: taskData.id,
|
||
taskName: taskData.taskName || taskData.task_name || taskData.taskType || taskData.task_type || '未知任务',
|
||
taskType: taskData.taskType || taskData.task_type,
|
||
status: 'pending',
|
||
scheduledTime: taskData.scheduledTime || taskData.scheduled_time || taskData.created_time,
|
||
priority: taskData.priority || 0
|
||
});
|
||
}
|
||
}
|
||
|
||
// 计算下次任务执行时间(队列中第一个任务的计划时间)
|
||
let nextTaskTime = null;
|
||
if (queue && queue.size() > 0) {
|
||
const firstTask = queue.peek();
|
||
if (firstTask && (firstTask.scheduledTime || firstTask.scheduled_time)) {
|
||
nextTaskTime = firstTask.scheduledTime || firstTask.scheduled_time;
|
||
}
|
||
}
|
||
|
||
return {
|
||
sn_code,
|
||
currentTask,
|
||
pendingTasks,
|
||
nextTaskTime,
|
||
pendingCount: queue ? queue.size() : 0,
|
||
mqttTopic: `task_status_${sn_code}`,
|
||
timestamp: new Date().toISOString()
|
||
};
|
||
} catch (error) {
|
||
console.error(`[任务队列] 获取任务状态摘要失败:`, error, { sn_code });
|
||
return {
|
||
sn_code,
|
||
currentTask: null,
|
||
pendingTasks: [],
|
||
nextTaskTime: null,
|
||
pendingCount: 0,
|
||
mqttTopic: `task_status_${sn_code}`,
|
||
timestamp: new Date().toISOString()
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 向客户端发送任务状态摘要
|
||
* @param {string} sn_code - 设备SN码
|
||
*/
|
||
async sendTaskStatusSummary(sn_code) {
|
||
try {
|
||
const mqttClient = await this.getMqttClient();
|
||
if (!mqttClient) {
|
||
return; // MQTT客户端不可用,静默失败
|
||
}
|
||
|
||
const summary = await this.getTaskStatusSummary(sn_code);
|
||
|
||
// 通过MQTT发布任务状态摘要
|
||
// 主题格式: task_status_{sn_code}
|
||
const topic = `task_status_${sn_code}`;
|
||
const message = JSON.stringify({
|
||
action: 'task_status_summary',
|
||
data: summary,
|
||
timestamp: new Date().toISOString()
|
||
});
|
||
|
||
await mqttClient.publish(topic, message);
|
||
} catch (error) {
|
||
// 通知失败不影响任务执行,只记录日志
|
||
console.warn(`[任务队列] 发送任务状态摘要失败:`, error.message);
|
||
}
|
||
}
|
||
}
|
||
|
||
// 导出单例
|
||
const taskQueue = new TaskQueue();
|
||
module.exports = taskQueue;
|