const node_schedule = require("node-schedule"); const dayjs = require('dayjs'); const config = require('./config.js'); const deviceManager = require('./deviceManager.js'); const command = require('./command.js'); const db = require('../dbProxy'); const Framework = require("../../../framework/node-core-framework.js"); /** * 检查当前时间是否在指定的时间范围内 * @param {Object} timeRange - 时间范围配置 {start_time: '09:00', end_time: '18:00', workdays_only: 1} * @returns {Object} {allowed: boolean, reason: string} */ function checkTimeRange(timeRange) { if (!timeRange || !timeRange.start_time || !timeRange.end_time) { return { allowed: true, reason: '未配置时间范围' }; } const now = new Date(); const currentHour = now.getHours(); const currentMinute = now.getMinutes(); const currentTime = currentHour * 60 + currentMinute; // 转换为分钟数 // 解析开始时间和结束时间 const [startHour, startMinute] = timeRange.start_time.split(':').map(Number); const [endHour, endMinute] = timeRange.end_time.split(':').map(Number); const startTime = startHour * 60 + startMinute; const endTime = endHour * 60 + endMinute; // 检查是否仅工作日 if (timeRange.workdays_only === 1) { const dayOfWeek = now.getDay(); // 0=周日, 1=周一, ..., 6=周六 if (dayOfWeek === 0 || dayOfWeek === 6) { return { allowed: false, reason: '当前是周末,不在允许的时间范围内' }; } } // 检查当前时间是否在时间范围内 if (startTime <= endTime) { // 正常情况:09:00 - 18:00 if (currentTime < startTime || currentTime >= endTime) { return { allowed: false, reason: `当前时间 ${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')} 不在允许的时间范围内 (${timeRange.start_time} - ${timeRange.end_time})` }; } } else { // 跨天情况:22:00 - 06:00 if (currentTime < startTime && currentTime >= endTime) { return { allowed: false, reason: `当前时间 ${now.getHours().toString().padStart(2, '0')}:${now.getMinutes().toString().padStart(2, '0')} 不在允许的时间范围内 (${timeRange.start_time} - ${timeRange.end_time})` }; } } return { allowed: true, reason: '在允许的时间范围内' }; } /** * 定时任务管理器(简化版) * 管理所有定时任务的创建和销毁 */ class ScheduledJobs { constructor(components, taskHandlers) { this.taskQueue = components.taskQueue; this.taskHandlers = taskHandlers; this.jobs = []; } /** * 启动所有定时任务 */ start() { // 每天凌晨重置统计数据 const resetJob = node_schedule.scheduleJob(config.schedules.dailyReset, () => { this.resetDailyStats(); }); this.jobs.push(resetJob); // 启动心跳检查定时任务(每分钟检查一次) const monitoringJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { await deviceManager.checkHeartbeatStatus().catch(error => { console.error('[定时任务] 检查心跳状态失败:', error); }); }); this.jobs.push(monitoringJob); // 启动离线设备任务清理定时任务(每分钟检查一次) const cleanupOfflineTasksJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { await this.cleanupOfflineDeviceTasks().catch(error => { console.error('[定时任务] 清理离线设备任务失败:', error); }); }); this.jobs.push(cleanupOfflineTasksJob); console.log('[定时任务] 已启动离线设备任务清理任务'); // 启动任务超时检查定时任务(每分钟检查一次) const timeoutCheckJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => { await this.checkTaskTimeouts().catch(error => { console.error('[定时任务] 检查任务超时失败:', error); }); }); this.jobs.push(timeoutCheckJob); console.log('[定时任务] 已启动任务超时检查任务'); // 启动任务状态摘要同步定时任务(每10秒发送一次) const taskSummaryJob = node_schedule.scheduleJob('*/10 * * * * *', async () => { await this.syncTaskStatusSummary().catch(error => { console.error('[定时任务] 同步任务状态摘要失败:', error); }); }); this.jobs.push(taskSummaryJob); console.log('[定时任务] 已启动任务状态摘要同步任务'); // 执行自动投递任务 const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => { this.autoDeliverTask(); }); // 立即执行一次自动投递任务 this.autoDeliverTask(); this.jobs.push(autoDeliverJob); console.log('[定时任务] 已启动自动投递任务'); // 执行自动沟通任务 const autoChatJob = node_schedule.scheduleJob(config.schedules.autoChat || '0 */15 * * * *', () => { this.autoChatTask(); }); // 立即执行一次自动沟通任务 this.autoChatTask(); this.jobs.push(autoChatJob); console.log('[定时任务] 已启动自动沟通任务'); } /** * 重置每日统计 */ resetDailyStats() { console.log('[定时任务] 重置每日统计数据'); try { deviceManager.resetAllDailyCounters(); console.log('[定时任务] 每日统计重置完成'); } catch (error) { console.error('[定时任务] 重置统计失败:', error); } } /** * 清理过期数据 */ cleanupCaches() { console.log('[定时任务] 开始清理过期数据'); try { deviceManager.cleanupOfflineDevices(config.monitoring.offlineThreshold); command.cleanupExpiredCommands(30); console.log('[定时任务] 数据清理完成'); } catch (error) { console.error('[定时任务] 数据清理失败:', error); } } /** * 清理离线设备任务 * 检查离线超过10分钟的设备,取消其所有pending/running状态的任务 */ async cleanupOfflineDeviceTasks() { try { // 离线阈值:10分钟 const offlineThreshold = 10 * 60 * 1000; // 10分钟 const now = Date.now(); const thresholdTime = now - offlineThreshold; // 获取所有启用的账号 const pla_account = db.getModel('pla_account'); const accounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1 }, attributes: ['sn_code'] }); if (!accounts || accounts.length === 0) { return; } // 通过 deviceManager 检查哪些设备离线超过10分钟 const offlineSnCodes = []; const offlineDevicesInfo = []; for (const account of accounts) { const sn_code = account.sn_code; const device = deviceManager.devices.get(sn_code); if (!device) { // 设备从未发送过心跳,视为离线 offlineSnCodes.push(sn_code); offlineDevicesInfo.push({ sn_code: sn_code, lastHeartbeatTime: null }); } else { // 检查最后心跳时间 const lastHeartbeat = device.lastHeartbeat || 0; if (lastHeartbeat < thresholdTime || !device.isOnline) { offlineSnCodes.push(sn_code); offlineDevicesInfo.push({ sn_code: sn_code, lastHeartbeatTime: lastHeartbeat ? new Date(lastHeartbeat) : null }); } } } if (offlineSnCodes.length === 0) { return; } console.log(`[清理离线任务] 发现 ${offlineSnCodes.length} 个离线超过10分钟的设备: ${offlineSnCodes.join(', ')}`); let totalCancelled = 0; // 为每个离线设备取消任务 const task_status = db.getModel('task_status'); for (const sn_code of offlineSnCodes) { try { // 查询该设备的所有pending/running任务 const pendingTasks = await task_status.findAll({ where: { sn_code: sn_code, status: ['pending', 'running'] }, attributes: ['id'] }); if (pendingTasks.length === 0) { continue; } const deviceInfo = offlineDevicesInfo.find(d => d.sn_code === sn_code); // 更新任务状态为cancelled const updateResult = await task_status.update( { status: 'cancelled', endTime: new Date(), result: JSON.stringify({ reason: '设备离线超过10分钟,任务已自动取消', offlineTime: deviceInfo?.lastHeartbeatTime }) }, { where: { sn_code: sn_code, status: ['pending', 'running'] } } ); const cancelledCount = Array.isArray(updateResult) ? updateResult[0] : updateResult; totalCancelled += cancelledCount; // 从内存队列中移除任务 if (this.taskQueue && typeof this.taskQueue.cancelDeviceTasks === 'function') { await this.taskQueue.cancelDeviceTasks(sn_code); } console.log(`[清理离线任务] 设备 ${sn_code} 已取消 ${cancelledCount} 个任务`); } catch (error) { console.error(`[清理离线任务] 取消设备 ${sn_code} 的任务失败:`, error); } } if (totalCancelled > 0) { console.log(`[清理离线任务] 共取消 ${totalCancelled} 个离线设备的任务`); } } catch (error) { console.error('[清理离线任务] 执行失败:', error); } } /** * 同步任务状态摘要到客户端 * 定期向所有在线设备发送任务状态摘要(当前任务、待执行任务、下次执行时间等) */ async syncTaskStatusSummary() { try { const { pla_account } = await Framework.getModels(); // 获取所有启用的账号 const accounts = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1 }, attributes: ['sn_code'] }); if (!accounts || accounts.length === 0) { return; } // 离线阈值:3分钟 const offlineThreshold = 3 * 60 * 1000; // 3分钟 const now = Date.now(); // 为每个在线设备发送任务状态摘要 for (const account of accounts) { const sn_code = account.sn_code; // 检查设备是否在线 const device = deviceManager.devices.get(sn_code); if (!device) { // 设备从未发送过心跳,视为离线,跳过 continue; } // 检查最后心跳时间 const lastHeartbeat = device.lastHeartbeat || 0; const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold); if (!isOnline) { // 设备离线,跳过 continue; } // 设备在线,推送设备工作状态 try { const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier'); const summary = await this.taskQueue.getTaskStatusSummary(sn_code); await deviceWorkStatusNotifier.sendDeviceWorkStatus(sn_code, summary, { currentCommand: summary.currentCommand || null }); } catch (error) { console.error(`[设备工作状态同步] 设备 ${sn_code} 同步失败:`, error.message); } } } catch (error) { console.error('[任务状态同步] 执行失败:', error); } } /** * 检查任务超时并强制标记为失败 * 检测长时间运行的任务(可能是卡住的),强制标记为失败,释放资源 */ async checkTaskTimeouts() { try { const Sequelize = require('sequelize'); const { task_status, op } = db.models; // 查询所有运行中的任务 const runningTasks = await task_status.findAll({ where: { status: 'running' }, attributes: ['id', 'sn_code', 'taskType', 'taskName', 'startTime', 'create_time'] }); if (!runningTasks || runningTasks.length === 0) { return; } const now = new Date(); let timeoutCount = 0; for (const task of runningTasks) { const taskData = task.toJSON(); const startTime = taskData.startTime ? new Date(taskData.startTime) : (taskData.create_time ? new Date(taskData.create_time) : null); if (!startTime) { continue; } // 获取任务类型的超时时间(默认10分钟) const taskTimeout = config.getTaskTimeout(taskData.taskType) || 10 * 60 * 1000; // 允许额外20%的缓冲时间 const maxAllowedTime = taskTimeout * 1.2; const elapsedTime = now.getTime() - startTime.getTime(); // 如果任务运行时间超过最大允许时间,标记为超时失败 if (elapsedTime > maxAllowedTime) { try { await task_status.update( { status: 'failed', endTime: now, duration: elapsedTime, result: JSON.stringify({ error: `任务执行超时(运行时间: ${Math.round(elapsedTime / 1000)}秒,超时限制: ${Math.round(maxAllowedTime / 1000)}秒)`, timeout: true, taskType: taskData.taskType, startTime: startTime.toISOString() }), progress: 0 }, { where: { id: taskData.id } } ); timeoutCount++; console.warn(`[任务超时检查] 任务 ${taskData.id} (${taskData.taskName}) 运行时间过长,已强制标记为失败`, { task_id: taskData.id, sn_code: taskData.sn_code, taskType: taskData.taskType, elapsedTime: Math.round(elapsedTime / 1000) + '秒', maxAllowedTime: Math.round(maxAllowedTime / 1000) + '秒' }); // 如果任务队列中有这个任务,也需要从内存中清理 if (this.taskQueue && typeof this.taskQueue.deviceStatus !== 'undefined') { const deviceStatus = this.taskQueue.deviceStatus.get(taskData.sn_code); if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.id === taskData.id) { // 重置设备状态,允许继续执行下一个任务 deviceStatus.isRunning = false; deviceStatus.currentTask = null; deviceStatus.runningCount = Math.max(0, deviceStatus.runningCount - 1); this.taskQueue.globalRunningCount = Math.max(0, this.taskQueue.globalRunningCount - 1); console.log(`[任务超时检查] 已重置设备 ${taskData.sn_code} 的状态,可以继续执行下一个任务`); // 尝试继续处理该设备的队列 setTimeout(() => { this.taskQueue.processQueue(taskData.sn_code).catch(error => { console.error(`[任务超时检查] 继续处理队列失败 (设备: ${taskData.sn_code}):`, error); }); }, 100); } } } catch (error) { console.error(`[任务超时检查] 更新超时任务状态失败 (任务ID: ${taskData.id}):`, error); } } } if (timeoutCount > 0) { console.log(`[任务超时检查] 共检测到 ${timeoutCount} 个超时任务,已强制标记为失败`); } } catch (error) { console.error('[任务超时检查] 执行失败:', error); } } /** * 自动投递任务 */ async autoDeliverTask() { const now = new Date(); console.log(`[自动投递] ${now.toLocaleString()} 开始执行自动投递任务`); // 检查是否在工作时间 if (!config.isWorkingHours()) { console.log(`[自动投递] 非工作时间,跳过执行`); return; } try { // 移除 device_status 依赖,改为直接从 pla_account 查询启用且开启自动投递的账号 const models = db.models; const { pla_account, op } = models; // 直接从 pla_account 查询启用且开启自动投递的账号 // 注意:不再检查在线状态,因为 device_status 已移除 const pla_users = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1, // 只获取启用的账号 auto_deliver: 1 } }); if (!pla_users || pla_users.length === 0) { console.log('[自动投递] 没有启用且开启自动投递的账号'); return; } console.log(`[自动投递] 找到 ${pla_users.length} 个可用账号`); // 获取 task_status 模型用于查询上次投递时间 const { task_status } = models; // 为每个设备添加自动投递任务到队列 for (const pl_user of pla_users) { const userData = pl_user.toJSON(); // 检查设备是否在线(离线阈值:3分钟) const offlineThreshold = 3 * 60 * 1000; // 3分钟 const now = Date.now(); const device = deviceManager.devices.get(userData.sn_code); if (!device) { // 设备从未发送过心跳,视为离线 console.log(`[自动投递] 设备 ${userData.sn_code} 离线(从未发送心跳),跳过添加任务`); continue; } // 检查最后心跳时间 const lastHeartbeat = device.lastHeartbeat || 0; const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold); if (!isOnline) { const offlineMinutes = lastHeartbeat ? Math.round((now - lastHeartbeat) / (60 * 1000)) : '未知'; console.log(`[自动投递] 设备 ${userData.sn_code} 离线(最后心跳: ${offlineMinutes}分钟前),跳过添加任务`); continue; } // 检查设备调度策略 const canExecute = deviceManager.canExecuteOperation(userData.sn_code, 'deliver'); if (!canExecute.allowed) { console.log(`[自动投递] 设备 ${userData.sn_code} 不满足执行条件: ${canExecute.reason}`); continue; } // 获取投递配置,如果不存在则使用默认值 let deliver_config = userData.deliver_config; if (typeof deliver_config === 'string') { try { deliver_config = JSON.parse(deliver_config); } catch (e) { deliver_config = {}; } } deliver_config = deliver_config || { deliver_interval: 30, min_salary: 0, max_salary: 0, page_count: 3, max_deliver: 10, filter_keywords: [], exclude_keywords: [] }; // 检查投递时间范围 if (deliver_config.time_range) { const timeCheck = checkTimeRange(deliver_config.time_range); if (!timeCheck.allowed) { console.log(`[自动投递] 设备 ${userData.sn_code} ${timeCheck.reason}`); continue; } } // 检查投递间隔时间 const deliver_interval = deliver_config.deliver_interval || 30; // 默认30分钟 const interval_ms = deliver_interval * 60 * 1000; // 转换为毫秒 // 查询该账号最近一次成功完成的自动投递任务 const lastDeliverTask = await task_status.findOne({ where: { sn_code: userData.sn_code, taskType: 'auto_deliver', status: 'completed' }, order: [['endTime', 'DESC']], attributes: ['endTime'] }); // 如果存在上次投递记录,检查是否已经过了间隔时间 if (lastDeliverTask && lastDeliverTask.endTime) { const lastDeliverTime = new Date(lastDeliverTask.endTime); const elapsedTime = new Date().getTime() - lastDeliverTime.getTime(); if (elapsedTime < interval_ms) { const remainingMinutes = Math.ceil((interval_ms - elapsedTime) / (60 * 1000)); console.log(`[自动投递] 设备 ${userData.sn_code} 距离上次投递仅 ${Math.round(elapsedTime / (60 * 1000))} 分钟,还需等待 ${remainingMinutes} 分钟(间隔: ${deliver_interval} 分钟)`); continue; } } // 添加自动投递任务到队列 await this.taskQueue.addTask(userData.sn_code, { taskType: 'auto_deliver', taskName: `自动投递 - ${userData.keyword || ''}`, taskParams: { keyword: userData.keyword || '', platform: userData.platform_type || 'boss', pageCount: deliver_config.page_count || 3, maxCount: deliver_config.max_deliver || 10, filterRules: { minSalary: deliver_config.min_salary || 0, maxSalary: deliver_config.max_salary || 0, keywords: deliver_config.filter_keywords || [], excludeKeywords: deliver_config.exclude_keywords || [] } }, priority: config.getTaskPriority('auto_deliver') || 6 }); console.log(`[自动投递] 已为设备 ${userData.sn_code} 添加自动投递任务,关键词: ${userData.keyword || '默认'},投递间隔: ${deliver_interval} 分钟`); } console.log('[自动投递] 任务添加完成'); } catch (error) { console.error('[自动投递] 执行失败:', error); } } /** * 自动沟通任务 */ async autoChatTask() { const now = new Date(); console.log(`[自动沟通] ${now.toLocaleString()} 开始执行自动沟通任务`); // 检查是否在工作时间 if (!config.isWorkingHours()) { console.log(`[自动沟通] 非工作时间,跳过执行`); return; } try { // 移除 device_status 依赖,改为直接从 pla_account 查询启用且开启自动沟通的账号 const models = db.models; const { pla_account, op } = models; // 直接从 pla_account 查询启用且开启自动沟通的账号 // 注意:不再检查在线状态,因为 device_status 已移除 const pla_users = await pla_account.findAll({ where: { is_delete: 0, is_enabled: 1, // 只获取启用的账号 auto_chat: 1 } }); if (!pla_users || pla_users.length === 0) { console.log('[自动沟通] 没有启用且开启自动沟通的账号'); return; } console.log(`[自动沟通] 找到 ${pla_users.length} 个可用账号`); // 获取 task_status 模型用于查询上次沟通时间 const { task_status } = models; // 为每个设备添加自动沟通任务到队列 for (const pl_user of pla_users) { const userData = pl_user.toJSON(); // 检查设备是否在线(离线阈值:3分钟) const offlineThreshold = 3 * 60 * 1000; // 3分钟 const now = Date.now(); const device = deviceManager.devices.get(userData.sn_code); if (!device) { // 设备从未发送过心跳,视为离线 console.log(`[自动沟通] 设备 ${userData.sn_code} 离线(从未发送心跳),跳过添加任务`); continue; } // 检查最后心跳时间 const lastHeartbeat = device.lastHeartbeat || 0; const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold); if (!isOnline) { const offlineMinutes = lastHeartbeat ? Math.round((now - lastHeartbeat) / (60 * 1000)) : '未知'; console.log(`[自动沟通] 设备 ${userData.sn_code} 离线(最后心跳: ${offlineMinutes}分钟前),跳过添加任务`); continue; } // 检查设备调度策略 const canExecute = deviceManager.canExecuteOperation(userData.sn_code, 'chat'); if (!canExecute.allowed) { console.log(`[自动沟通] 设备 ${userData.sn_code} 不满足执行条件: ${canExecute.reason}`); continue; } // 获取沟通策略配置 let chatStrategy = {}; if (userData.chat_strategy) { chatStrategy = typeof userData.chat_strategy === 'string' ? JSON.parse(userData.chat_strategy) : userData.chat_strategy; } // 检查沟通时间范围 if (chatStrategy.time_range) { const timeCheck = checkTimeRange(chatStrategy.time_range); if (!timeCheck.allowed) { console.log(`[自动沟通] 设备 ${userData.sn_code} ${timeCheck.reason}`); continue; } } // 检查沟通间隔时间 const chat_interval = chatStrategy.chat_interval || 30; // 默认30分钟 const interval_ms = chat_interval * 60 * 1000; // 转换为毫秒 // 查询该账号最近一次成功完成的自动沟通任务 const lastChatTask = await task_status.findOne({ where: { sn_code: userData.sn_code, taskType: 'auto_chat', status: 'completed' }, order: [['endTime', 'DESC']], attributes: ['endTime'] }); // 如果存在上次沟通记录,检查是否已经过了间隔时间 if (lastChatTask && lastChatTask.endTime) { const lastChatTime = new Date(lastChatTask.endTime); const elapsedTime = now.getTime() - lastChatTime.getTime(); if (elapsedTime < interval_ms) { const remainingMinutes = Math.ceil((interval_ms - elapsedTime) / (60 * 1000)); console.log(`[自动沟通] 设备 ${userData.sn_code} 距离上次沟通仅 ${Math.round(elapsedTime / (60 * 1000))} 分钟,还需等待 ${remainingMinutes} 分钟(间隔: ${chat_interval} 分钟)`); continue; } } // 添加自动沟通任务到队列 await this.taskQueue.addTask(userData.sn_code, { taskType: 'auto_chat', taskName: `自动沟通 - ${userData.name || '默认'}`, taskParams: { platform: userData.platform_type || 'boss' }, priority: config.getTaskPriority('auto_chat') || 6 }); console.log(`[自动沟通] 已为设备 ${userData.sn_code} 添加自动沟通任务,沟通间隔: ${chat_interval} 分钟`); } console.log('[自动沟通] 任务添加完成'); } catch (error) { console.error('[自动沟通] 执行失败:', error); } } } module.exports = ScheduledJobs;