const node_schedule = require("node-schedule"); const dayjs = require('dayjs'); const config = require('./config.js'); const deviceManager = require('./deviceManager.js'); const command = require('./command.js'); const db = require('../dbProxy'); // 引入新的任务模块 const tasks = require('./tasks'); const { autoSearchTask, autoDeliverTask, autoChatTask, autoActiveTask } = tasks; const Framework = require("../../../framework/node-core-framework.js"); /** * 定时任务管理器(重构版) * 使用独立的任务模块,职责更清晰,易于维护和扩展 */ class ScheduledJobs { constructor(components, taskHandlers) { this.taskQueue = components.taskQueue; this.taskHandlers = taskHandlers; this.jobs = []; } /** * 启动所有定时任务 */ start() { console.log('[定时任务] 开始启动所有定时任务...'); // ==================== 系统维护任务 ==================== // 每天凌晨重置统计数据 const resetJob = node_schedule.scheduleJob(config.schedules.dailyReset, () => { this.resetDailyStats(); }); this.jobs.push(resetJob); console.log('[定时任务] ✓ 已启动每日统计重置任务'); // 启动心跳检查定时任务(每分钟检查一次) const monitoringJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { await deviceManager.checkHeartbeatStatus().catch(error => { console.error('[定时任务] 检查心跳状态失败:', error); }); }); this.jobs.push(monitoringJob); console.log('[定时任务] ✓ 已启动心跳检查任务'); // 启动离线设备任务清理定时任务(每分钟检查一次) const cleanupOfflineTasksJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { await this.cleanupOfflineDeviceTasks().catch(error => { console.error('[定时任务] 清理离线设备任务失败:', error); }); }); this.jobs.push(cleanupOfflineTasksJob); console.log('[定时任务] ✓ 已启动离线设备任务清理任务'); // 启动任务超时检查定时任务(每分钟检查一次) const timeoutCheckJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { await this.checkTaskTimeouts().catch(error => { console.error('[定时任务] 检查任务超时失败:', error); }); }); this.jobs.push(timeoutCheckJob); console.log('[定时任务] ✓ 已启动任务超时检查任务'); // 启动任务状态摘要同步定时任务(每10秒发送一次) const taskSummaryJob = node_schedule.scheduleJob('*/10 * * * * *', async () => { await this.syncTaskStatusSummary().catch(error => { console.error('[定时任务] 同步任务状态摘要失败:', error); }); }); this.jobs.push(taskSummaryJob); console.log('[定时任务] ✓ 已启动任务状态摘要同步任务'); // ==================== 业务任务(使用新的任务模块) ==================== // 1. 自动搜索任务 - 每60分钟执行一次 const autoSearchJob = node_schedule.scheduleJob(config.schedules.autoSearch || '0 0 */1 * * *', () => { this.runAutoSearchTask(); }); this.jobs.push(autoSearchJob); console.log('[定时任务] ✓ 已启动自动搜索任务 (每60分钟)'); // 2. 自动投递任务 - 每1分钟检查一次 const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => { this.runAutoDeliverTask(); }); this.jobs.push(autoDeliverJob); console.log('[定时任务] ✓ 已启动自动投递任务 (每1分钟)'); // 3. 自动沟通任务 - 每15分钟执行一次 const autoChatJob = node_schedule.scheduleJob(config.schedules.autoChat || '0 */15 * * * *', () => { this.runAutoChatTask(); }); this.jobs.push(autoChatJob); console.log('[定时任务] ✓ 已启动自动沟通任务 (每15分钟)'); // 4. 自动活跃任务 - 每2小时执行一次 const autoActiveJob = node_schedule.scheduleJob(config.schedules.autoActive || '0 0 */2 * * *', () => { this.runAutoActiveTask(); }); this.jobs.push(autoActiveJob); console.log('[定时任务] ✓ 已启动自动活跃任务 (每2小时)'); // 立即执行一次业务任务(可选) setTimeout(() => { console.log('[定时任务] 立即执行一次初始化任务...'); this.runAutoDeliverTask(); this.runAutoChatTask(); }, 3000); // 延迟3秒,等待系统初始化完成 console.log('[定时任务] 所有定时任务启动完成!'); } // ==================== 业务任务执行方法(使用新的任务模块) ==================== /** * 运行自动搜索任务 * 为所有启用自动搜索的账号添加搜索任务 */ async runAutoSearchTask() { try { const accounts = await this.getEnabledAccounts('auto_search'); if (accounts.length === 0) { return; } console.log(`[自动搜索调度] 找到 ${accounts.length} 个启用自动搜索的账号`); let successCount = 0; let failedCount = 0; for (const account of accounts) { const result = await autoSearchTask.addToQueue(account.sn_code, this.taskQueue); if (result.success) { successCount++; } else { failedCount++; } } if (successCount > 0 || failedCount > 0) { console.log(`[自动搜索调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount} 个`); } } catch (error) { console.error('[自动搜索调度] 执行失败:', error); } } /** * 运行自动投递任务 * 为所有启用自动投递的账号添加投递任务 */ async runAutoDeliverTask() { try { const accounts = await this.getEnabledAccounts('auto_deliver'); if (accounts.length === 0) { return; } console.log(`[自动投递调度] 找到 ${accounts.length} 个启用自动投递的账号`); let successCount = 0; let failedCount = 0; for (const account of accounts) { const result = await autoDeliverTask.addToQueue(account.sn_code, this.taskQueue); if (result.success) { successCount++; } else { failedCount++; } } if (successCount > 0 || failedCount > 0) { console.log(`[自动投递调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount} 个`); } } catch (error) { console.error('[自动投递调度] 执行失败:', error); } } /** * 运行自动沟通任务 * 为所有启用自动沟通的账号添加沟通任务 */ async runAutoChatTask() { try { const accounts = await this.getEnabledAccounts('auto_chat'); if (accounts.length === 0) { return; } console.log(`[自动沟通调度] 找到 ${accounts.length} 个启用自动沟通的账号`); let successCount = 0; let failedCount = 0; for (const account of accounts) { const result = await autoChatTask.addToQueue(account.sn_code, this.taskQueue); if (result.success) { successCount++; } else { failedCount++; } } if (successCount > 0 || failedCount > 0) { console.log(`[自动沟通调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount} 个`); } } catch (error) { console.error('[自动沟通调度] 执行失败:', error); } } /** * 运行自动活跃任务 * 为所有启用自动活跃的账号添加活跃任务 */ async runAutoActiveTask() { try { const accounts = await this.getEnabledAccounts('auto_active'); if (accounts.length === 0) { return; } console.log(`[自动活跃调度] 找到 ${accounts.length} 个启用自动活跃的账号`); let successCount = 0; let failedCount = 0; for (const account of accounts) { const result = await autoActiveTask.addToQueue(account.sn_code, this.taskQueue); if (result.success) { successCount++; } else { failedCount++; } } if (successCount > 0 || failedCount > 0) { console.log(`[自动活跃调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount} 个`); } } catch (error) { console.error('[自动活跃调度] 执行失败:', error); } } /** * 获取启用指定功能的账号列表 * @param {string} featureType - 功能类型: auto_search, auto_deliver, auto_chat, auto_active */ async getEnabledAccounts(featureType) { try { const { pla_account } = db.models; const accounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1, [featureType]: 1 }, attributes: ['sn_code', 'name', 'keyword', 'platform_type'] }); return accounts.map(acc => acc.toJSON()); } catch (error) { console.error(`[获取账号列表] 失败 (${featureType}):`, error); return []; } } // ==================== 系统维护方法(保留原有逻辑) ==================== /** * 重置每日统计 */ resetDailyStats() { console.log('[定时任务] 重置每日统计数据'); try { deviceManager.resetAllDailyCounters(); console.log('[定时任务] 每日统计重置完成'); } catch (error) { console.error('[定时任务] 重置统计失败:', error); } } /** * 清理过期数据 */ cleanupCaches() { console.log('[定时任务] 开始清理过期数据'); try { deviceManager.cleanupOfflineDevices(config.monitoring.offlineThreshold); command.cleanupExpiredCommands(30); console.log('[定时任务] 数据清理完成'); } catch (error) { console.error('[定时任务] 数据清理失败:', error); } } /** * 清理离线设备任务 * 检查离线超过10分钟的设备,取消其所有pending/running状态的任务 */ async cleanupOfflineDeviceTasks() { try { // 离线阈值:10分钟 const offlineThreshold = 10 * 60 * 1000; const now = Date.now(); const thresholdTime = now - offlineThreshold; // 获取所有启用的账号 const pla_account = db.getModel('pla_account'); const accounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1 }, attributes: ['sn_code'] }); if (!accounts || accounts.length === 0) { return; } // 通过 deviceManager 检查哪些设备离线超过10分钟 const offlineSnCodes = []; const offlineDevicesInfo = []; for (const account of accounts) { const sn_code = account.sn_code; const device = deviceManager.devices.get(sn_code); if (!device) { offlineSnCodes.push(sn_code); offlineDevicesInfo.push({ sn_code: sn_code, lastHeartbeatTime: null }); } else { const lastHeartbeat = device.lastHeartbeat || 0; if (lastHeartbeat < thresholdTime || !device.isOnline) { offlineSnCodes.push(sn_code); offlineDevicesInfo.push({ sn_code: sn_code, lastHeartbeatTime: lastHeartbeat ? new Date(lastHeartbeat) : null }); } } } if (offlineSnCodes.length === 0) { return; } console.log(`[清理离线任务] 发现 ${offlineSnCodes.length} 个离线超过10分钟的设备`); let totalCancelled = 0; const task_status = db.getModel('task_status'); for (const sn_code of offlineSnCodes) { try { const deviceInfo = offlineDevicesInfo.find(d => d.sn_code === sn_code); const updateResult = await task_status.update( { status: 'cancelled', endTime: new Date(), result: JSON.stringify({ reason: '设备离线超过10分钟,任务已自动取消', offlineTime: deviceInfo?.lastHeartbeatTime }) }, { where: { sn_code: sn_code, status: ['pending', 'running'] } } ); const cancelledCount = Array.isArray(updateResult) ? updateResult[0] : updateResult; totalCancelled += cancelledCount; if (this.taskQueue && typeof this.taskQueue.cancelDeviceTasks === 'function') { await this.taskQueue.cancelDeviceTasks(sn_code); } if (cancelledCount > 0) { console.log(`[清理离线任务] 设备 ${sn_code} 已取消 ${cancelledCount} 个任务`); } } catch (error) { console.error(`[清理离线任务] 取消设备 ${sn_code} 的任务失败:`, error); } } if (totalCancelled > 0) { console.log(`[清理离线任务] 共取消 ${totalCancelled} 个离线设备的任务`); } } catch (error) { console.error('[清理离线任务] 执行失败:', error); } } /** * 同步任务状态摘要到客户端 */ async syncTaskStatusSummary() { try { const { pla_account } = await Framework.getModels(); const accounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1 }, attributes: ['sn_code'] }); if (!accounts || accounts.length === 0) { return; } const offlineThreshold = 3 * 60 * 1000; const now = Date.now(); for (const account of accounts) { const sn_code = account.sn_code; const device = deviceManager.devices.get(sn_code); if (!device) { continue; } const lastHeartbeat = device.lastHeartbeat || 0; const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold); if (!isOnline) { continue; } try { const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier'); const summary = await this.taskQueue.getTaskStatusSummary(sn_code); await deviceWorkStatusNotifier.sendDeviceWorkStatus(sn_code, summary, { currentCommand: summary.currentCommand || null }); } catch (error) { console.error(`[设备工作状态同步] 设备 ${sn_code} 同步失败:`, error.message); } } } catch (error) { console.error('[任务状态同步] 执行失败:', error); } } /** * 检查任务超时并强制标记为失败 */ async checkTaskTimeouts() { try { const Sequelize = require('sequelize'); const { task_status, op } = db.models; const runningTasks = await task_status.findAll({ where: { status: 'running' }, attributes: ['id', 'sn_code', 'taskType', 'taskName', 'startTime', 'create_time'] }); if (!runningTasks || runningTasks.length === 0) { return; } const now = new Date(); let timeoutCount = 0; for (const task of runningTasks) { const taskData = task.toJSON(); const startTime = taskData.startTime ? new Date(taskData.startTime) : (taskData.create_time ? new Date(taskData.create_time) : null); if (!startTime) { continue; } const taskTimeout = config.getTaskTimeout(taskData.taskType) || 10 * 60 * 1000; const maxAllowedTime = taskTimeout * 1.2; const elapsedTime = now.getTime() - startTime.getTime(); if (elapsedTime > maxAllowedTime) { try { await task_status.update( { status: 'failed', endTime: now, duration: elapsedTime, result: JSON.stringify({ error: `任务执行超时(运行时间: ${Math.round(elapsedTime / 1000)}秒,超时限制: ${Math.round(maxAllowedTime / 1000)}秒)`, timeout: true, taskType: taskData.taskType, startTime: startTime.toISOString() }), progress: 0 }, { where: { id: taskData.id } } ); timeoutCount++; console.warn(`[任务超时检查] 任务 ${taskData.id} (${taskData.taskName}) 运行时间过长,已强制标记为失败`); if (this.taskQueue && typeof this.taskQueue.deviceStatus !== 'undefined') { const deviceStatus = this.taskQueue.deviceStatus.get(taskData.sn_code); if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.id === taskData.id) { deviceStatus.isRunning = false; deviceStatus.currentTask = null; deviceStatus.runningCount = Math.max(0, deviceStatus.runningCount - 1); this.taskQueue.globalRunningCount = Math.max(0, this.taskQueue.globalRunningCount - 1); console.log(`[任务超时检查] 已重置设备 ${taskData.sn_code} 的状态`); setTimeout(() => { this.taskQueue.processQueue(taskData.sn_code).catch(error => { console.error(`[任务超时检查] 继续处理队列失败:`, error); }); }, 100); } } } catch (error) { console.error(`[任务超时检查] 更新超时任务状态失败:`, error); } } } if (timeoutCount > 0) { console.log(`[任务超时检查] 共检测到 ${timeoutCount} 个超时任务`); } } catch (error) { console.error('[任务超时检查] 执行失败:', error); } } /** * 停止所有定时任务 */ stop() { console.log('[定时任务] 停止所有定时任务...'); for (const job of this.jobs) { if (job) { job.cancel(); } } this.jobs = []; console.log('[定时任务] 所有定时任务已停止'); } } module.exports = ScheduledJobs;