From d90d9917109f16a5bec1dd268f6d9826c52ffc0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=88=90?= Date: Mon, 24 Nov 2025 13:38:40 +0800 Subject: [PATCH] 1 --- api/middleware/schedule/scheduledJobs.js | 100 +++++++++++++++++++++++ api/middleware/schedule/taskQueue.js | 61 ++++++++++++-- 2 files changed, 154 insertions(+), 7 deletions(-) diff --git a/api/middleware/schedule/scheduledJobs.js b/api/middleware/schedule/scheduledJobs.js index 735cac9..1ec34a8 100644 --- a/api/middleware/schedule/scheduledJobs.js +++ b/api/middleware/schedule/scheduledJobs.js @@ -33,6 +33,16 @@ class ScheduledJobs { this.jobs.push(monitoringJob); + // 启动离线设备任务清理定时任务(每分钟检查一次) + const cleanupOfflineTasksJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { + await this.cleanupOfflineDeviceTasks().catch(error => { + console.error('[定时任务] 清理离线设备任务失败:', error); + }); + }); + + this.jobs.push(cleanupOfflineTasksJob); + console.log('[定时任务] 已启动离线设备任务清理任务'); + // 执行自动投递任务 const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => { @@ -74,6 +84,96 @@ class ScheduledJobs { } } + /** + * 清理离线设备任务 + * 检查离线超过10分钟的设备,取消其所有pending/running状态的任务 + */ + async cleanupOfflineDeviceTasks() { + try { + const Sequelize = require('sequelize'); + const { device_status, task_status, op } = db.models; + + // 离线阈值:10分钟 + const offlineThreshold = 10 * 60 * 1000; // 10分钟 + const now = new Date(); + const thresholdTime = new Date(now.getTime() - offlineThreshold); + + // 查询离线超过10分钟的设备 + const offlineDevices = await device_status.findAll({ + where: { + isOnline: false, + lastHeartbeatTime: { + [op.lt]: thresholdTime + } + }, + attributes: ['sn_code', 'lastHeartbeatTime'] + }); + + if (!offlineDevices || offlineDevices.length === 0) { + return; + } + + const offlineSnCodes = offlineDevices.map(dev => dev.sn_code); + console.log(`[清理离线任务] 发现 ${offlineSnCodes.length} 个离线超过10分钟的设备: ${offlineSnCodes.join(', ')}`); + + let totalCancelled = 0; + + // 为每个离线设备取消任务 + for (const sn_code of offlineSnCodes) { + try { + // 查询该设备的所有pending/running任务 + const pendingTasks = await task_status.findAll({ + where: { + sn_code: sn_code, + status: ['pending', 'running'] + }, + attributes: ['id'] + }); + + if (pendingTasks.length === 0) { + continue; + } + + // 更新任务状态为cancelled + const updateResult = await task_status.update( + { + status: 'cancelled', + endTime: new Date(), + result: JSON.stringify({ + reason: '设备离线超过10分钟,任务已自动取消', + offlineTime: offlineDevices.find(d => d.sn_code === sn_code)?.lastHeartbeatTime + }) + }, + { + where: { + sn_code: sn_code, + status: ['pending', 'running'] + } + } + ); + + const cancelledCount = Array.isArray(updateResult) ? updateResult[0] : updateResult; + totalCancelled += cancelledCount; + + // 从内存队列中移除任务 + if (this.taskQueue && typeof this.taskQueue.cancelDeviceTasks === 'function') { + await this.taskQueue.cancelDeviceTasks(sn_code); + } + + console.log(`[清理离线任务] 设备 ${sn_code} 已取消 ${cancelledCount} 个任务`); + } catch (error) { + console.error(`[清理离线任务] 取消设备 ${sn_code} 的任务失败:`, error); + } + } + + if (totalCancelled > 0) { + console.log(`[清理离线任务] 共取消 ${totalCancelled} 个离线设备的任务`); + } + } catch (error) { + console.error('[清理离线任务] 执行失败:', error); + } + } + /** * 自动投递任务 */ diff --git a/api/middleware/schedule/taskQueue.js b/api/middleware/schedule/taskQueue.js index 431a270..d153695 100644 --- a/api/middleware/schedule/taskQueue.js +++ b/api/middleware/schedule/taskQueue.js @@ -209,7 +209,7 @@ class TaskQueue { } /** - * 扫描所有设备的队列并尝试执行任务(过滤未启用的账号) + * 扫描所有设备的队列并尝试执行任务(过滤未启用的账号和不在线的设备) */ async scanAndProcessQueues() { try { @@ -230,11 +230,28 @@ class TaskQueue { const enabledSnCodes = new Set(enabledAccounts.map(acc => acc.sn_code)); + // 检查设备在线状态(需要同时满足:isOnline = true 且心跳未超时) + const device_status = db.getModel('device_status'); + const heartbeatTimeout = require('./config.js').monitoring.heartbeatTimeout; + const now = new Date(); + const heartbeatThreshold = new Date(now.getTime() - heartbeatTimeout); + + const onlineDevices = await device_status.findAll({ + where: { + isOnline: true, + lastHeartbeatTime: { + [Sequelize.Op.gte]: heartbeatThreshold // 心跳时间在阈值内 + } + }, + attributes: ['sn_code'] + }); + const onlineSnCodes = new Set(onlineDevices.map(dev => dev.sn_code)); + let processedCount = 0; let queuedCount = 0; let skippedCount = 0; - // 遍历所有设备的队列,只处理启用账号的设备 + // 遍历所有设备的队列,只处理启用账号且在线设备 for (const [sn_code, queue] of this.deviceQueues.entries()) { // 跳过未启用的账号 if (!enabledSnCodes.has(sn_code)) { @@ -242,6 +259,12 @@ class TaskQueue { continue; } + // 跳过不在线的设备 + if (!onlineSnCodes.has(sn_code)) { + skippedCount++; + continue; + } + const queueSize = queue.size(); if (queueSize > 0) { queuedCount += queueSize; @@ -254,7 +277,7 @@ class TaskQueue { } if (queuedCount > 0) { - console.log(`[任务队列] 扫描完成: ${processedCount} 个设备有任务,共 ${queuedCount} 个待执行任务`); + console.log(`[任务队列] 扫描完成: ${processedCount} 个设备有任务,共 ${queuedCount} 个待执行任务,跳过 ${skippedCount} 个设备`); } } catch (error) { console.error('[任务队列] 扫描队列失败:', error); @@ -274,9 +297,9 @@ class TaskQueue { * 查找设备是否已有相同类型的任务 * @param {string} sn_code - 设备SN码 * @param {string} taskType - 任务类型 - * @returns {object|null} 现有任务或null + * @returns {Promise} 现有任务或null */ - findExistingTask(sn_code, taskType) { + async findExistingTask(sn_code, taskType) { // 检查当前正在执行的任务 const deviceStatus = this.deviceStatus.get(sn_code); if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.taskType === taskType) { @@ -286,12 +309,36 @@ class TaskQueue { // 检查队列中等待的任务 const queue = this.deviceQueues.get(sn_code); if (queue) { - const existingTask = queue.find(task => task.taskType === taskType && task.status === 'pending'); + const existingTask = queue.find(task => task.taskType === taskType && (task.status === 'pending' || !task.status)); if (existingTask) { return existingTask; } } + // 检查数据库中的pending/running任务(防止系统重启后重复添加) + try { + const Sequelize = require('sequelize'); + const taskStatusModel = db.getModel('task_status'); + const existingDbTask = await taskStatusModel.findOne({ + where: { + sn_code: sn_code, + taskType: taskType, + status: ['pending', 'running'] + }, + order: [['id', 'DESC']] + }); + + if (existingDbTask) { + return { + id: existingDbTask.id, + taskType: existingDbTask.taskType, + status: existingDbTask.status + }; + } + } catch (error) { + console.error(`[任务队列] 检查数据库现有任务失败:`, error); + } + return null; } @@ -342,7 +389,7 @@ class TaskQueue { } // 检查是否已有相同类型的任务在队列中或正在执行 - const existingTask = this.findExistingTask(sn_code, taskConfig.taskType); + const existingTask = await this.findExistingTask(sn_code, taskConfig.taskType); if (existingTask) { console.log(`[任务队列] 设备 ${sn_code} 已有 ${taskConfig.taskType} 任务在执行或等待中,跳过添加`); return existingTask.id;