This commit is contained in:
张成
2025-12-30 15:46:18 +08:00
parent d14f89e008
commit 65833dd32d
29 changed files with 2416 additions and 1048 deletions

View File

@@ -0,0 +1,570 @@
const node_schedule = require("node-schedule");
const dayjs = require('dayjs');
const config = require('../infrastructure/config.js');
const deviceManager = require('./deviceManager.js');
const command = require('./command.js');
const db = require('../../dbProxy');
// 引入新的任务模块
const tasks = require('../tasks');
const { autoSearchTask, autoDeliverTask, autoChatTask, autoActiveTask } = tasks;
const Framework = require("../../../../framework/node-core-framework.js");
/**
* 定时任务管理器(重构版)
* 使用独立的任务模块,职责更清晰,易于维护和扩展
*/
class ScheduledJobs {
constructor(components, taskHandlers) {
this.taskQueue = components.taskQueue;
this.taskHandlers = taskHandlers;
this.jobs = [];
}
/**
* 启动所有定时任务
*/
start() {
console.log('[定时任务] 开始启动所有定时任务...');
// ==================== 系统维护任务 ====================
// 每天凌晨重置统计数据
const resetJob = node_schedule.scheduleJob(config.schedules.dailyReset, () => {
this.resetDailyStats();
});
this.jobs.push(resetJob);
console.log('[定时任务] ✓ 已启动每日统计重置任务');
// 启动心跳检查定时任务(每分钟检查一次)
const monitoringJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => {
await deviceManager.checkHeartbeatStatus().catch(error => {
console.error('[定时任务] 检查心跳状态失败:', error);
});
});
this.jobs.push(monitoringJob);
console.log('[定时任务] ✓ 已启动心跳检查任务');
// 启动离线设备任务清理定时任务(每分钟检查一次)
const cleanupOfflineTasksJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => {
await this.cleanupOfflineDeviceTasks().catch(error => {
console.error('[定时任务] 清理离线设备任务失败:', error);
});
});
this.jobs.push(cleanupOfflineTasksJob);
console.log('[定时任务] ✓ 已启动离线设备任务清理任务');
// 启动任务超时检查定时任务(每分钟检查一次)
const timeoutCheckJob = node_schedule.scheduleJob(config.schedules.monitoringInterval, async () => {
await this.checkTaskTimeouts().catch(error => {
console.error('[定时任务] 检查任务超时失败:', error);
});
});
this.jobs.push(timeoutCheckJob);
console.log('[定时任务] ✓ 已启动任务超时检查任务');
// 启动任务状态摘要同步定时任务每10秒发送一次
const taskSummaryJob = node_schedule.scheduleJob('*/10 * * * * *', async () => {
await this.syncTaskStatusSummary().catch(error => {
console.error('[定时任务] 同步任务状态摘要失败:', error);
});
});
this.jobs.push(taskSummaryJob);
console.log('[定时任务] ✓ 已启动任务状态摘要同步任务');
// ==================== 业务任务(使用新的任务模块) ====================
// 1. 自动搜索任务 - 每60分钟执行一次
const autoSearchJob = node_schedule.scheduleJob(config.schedules.autoSearch || '0 0 */1 * * *', () => {
this.runAutoSearchTask();
});
this.jobs.push(autoSearchJob);
console.log('[定时任务] ✓ 已启动自动搜索任务 (每60分钟)');
// 2. 自动投递任务 - 每1分钟检查一次
const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => {
this.runAutoDeliverTask();
});
this.jobs.push(autoDeliverJob);
console.log('[定时任务] ✓ 已启动自动投递任务 (每1分钟)');
// 3. 自动沟通任务 - 每15分钟执行一次
const autoChatJob = node_schedule.scheduleJob(config.schedules.autoChat || '0 */15 * * * *', () => {
this.runAutoChatTask();
});
this.jobs.push(autoChatJob);
console.log('[定时任务] ✓ 已启动自动沟通任务 (每15分钟)');
// 4. 自动活跃任务 - 每2小时执行一次
const autoActiveJob = node_schedule.scheduleJob(config.schedules.autoActive || '0 0 */2 * * *', () => {
this.runAutoActiveTask();
});
this.jobs.push(autoActiveJob);
console.log('[定时任务] ✓ 已启动自动活跃任务 (每2小时)');
// 立即执行一次业务任务(可选)
setTimeout(() => {
console.log('[定时任务] 立即执行一次初始化任务...');
this.runAutoDeliverTask();
this.runAutoChatTask();
}, 3000); // 延迟3秒,等待系统初始化完成
console.log('[定时任务] 所有定时任务启动完成!');
}
// ==================== 业务任务执行方法(使用新的任务模块) ====================
/**
* 运行自动搜索任务
* 为所有启用自动搜索的账号添加搜索任务
*/
async runAutoSearchTask() {
try {
const accounts = await this.getEnabledAccounts('auto_search');
if (accounts.length === 0) {
return;
}
console.log(`[自动搜索调度] 找到 ${accounts.length} 个启用自动搜索的账号`);
let successCount = 0;
let failedCount = 0;
for (const account of accounts) {
const result = await autoSearchTask.addToQueue(account.sn_code, this.taskQueue);
if (result.success) {
successCount++;
} else {
failedCount++;
}
}
if (successCount > 0 || failedCount > 0) {
console.log(`[自动搜索调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount}`);
}
} catch (error) {
console.error('[自动搜索调度] 执行失败:', error);
}
}
/**
* 运行自动投递任务
* 为所有启用自动投递的账号添加投递任务
*/
async runAutoDeliverTask() {
try {
const accounts = await this.getEnabledAccounts('auto_deliver');
if (accounts.length === 0) {
return;
}
console.log(`[自动投递调度] 找到 ${accounts.length} 个启用自动投递的账号`);
let successCount = 0;
let failedCount = 0;
for (const account of accounts) {
const result = await autoDeliverTask.addToQueue(account.sn_code, this.taskQueue);
if (result.success) {
successCount++;
} else {
failedCount++;
}
}
if (successCount > 0 || failedCount > 0) {
console.log(`[自动投递调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount}`);
}
} catch (error) {
console.error('[自动投递调度] 执行失败:', error);
}
}
/**
* 运行自动沟通任务
* 为所有启用自动沟通的账号添加沟通任务
*/
async runAutoChatTask() {
try {
const accounts = await this.getEnabledAccounts('auto_chat');
if (accounts.length === 0) {
return;
}
console.log(`[自动沟通调度] 找到 ${accounts.length} 个启用自动沟通的账号`);
let successCount = 0;
let failedCount = 0;
for (const account of accounts) {
const result = await autoChatTask.addToQueue(account.sn_code, this.taskQueue);
if (result.success) {
successCount++;
} else {
failedCount++;
}
}
if (successCount > 0 || failedCount > 0) {
console.log(`[自动沟通调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount}`);
}
} catch (error) {
console.error('[自动沟通调度] 执行失败:', error);
}
}
/**
* 运行自动活跃任务
* 为所有启用自动活跃的账号添加活跃任务
*/
async runAutoActiveTask() {
try {
const accounts = await this.getEnabledAccounts('auto_active');
if (accounts.length === 0) {
return;
}
console.log(`[自动活跃调度] 找到 ${accounts.length} 个启用自动活跃的账号`);
let successCount = 0;
let failedCount = 0;
for (const account of accounts) {
const result = await autoActiveTask.addToQueue(account.sn_code, this.taskQueue);
if (result.success) {
successCount++;
} else {
failedCount++;
}
}
if (successCount > 0 || failedCount > 0) {
console.log(`[自动活跃调度] 完成: 成功 ${successCount} 个, 失败/跳过 ${failedCount}`);
}
} catch (error) {
console.error('[自动活跃调度] 执行失败:', error);
}
}
/**
* 获取启用指定功能的账号列表
* @param {string} featureType - 功能类型: auto_search, auto_deliver, auto_chat, auto_active
*/
async getEnabledAccounts(featureType) {
try {
const { pla_account } = db.models;
const accounts = await pla_account.findAll({
where: {
is_delete: 0,
is_enabled: 1,
[featureType]: 1
},
attributes: ['sn_code', 'name', 'keyword', 'platform_type']
});
return accounts.map(acc => acc.toJSON());
} catch (error) {
console.error(`[获取账号列表] 失败 (${featureType}):`, error);
return [];
}
}
// ==================== 系统维护方法(保留原有逻辑) ====================
/**
* 重置每日统计
*/
resetDailyStats() {
console.log('[定时任务] 重置每日统计数据');
try {
deviceManager.resetAllDailyCounters();
console.log('[定时任务] 每日统计重置完成');
} catch (error) {
console.error('[定时任务] 重置统计失败:', error);
}
}
/**
* 清理过期数据
*/
cleanupCaches() {
console.log('[定时任务] 开始清理过期数据');
try {
deviceManager.cleanupOfflineDevices(config.monitoring.offlineThreshold);
command.cleanupExpiredCommands(30);
console.log('[定时任务] 数据清理完成');
} catch (error) {
console.error('[定时任务] 数据清理失败:', error);
}
}
/**
* 清理离线设备任务
* 检查离线超过10分钟的设备取消其所有pending/running状态的任务
*/
async cleanupOfflineDeviceTasks() {
try {
// 离线阈值10分钟
const offlineThreshold = 10 * 60 * 1000;
const now = Date.now();
const thresholdTime = now - offlineThreshold;
// 获取所有启用的账号
const pla_account = db.getModel('pla_account');
const accounts = await pla_account.findAll({
where: {
is_delete: 0,
is_enabled: 1
},
attributes: ['sn_code']
});
if (!accounts || accounts.length === 0) {
return;
}
// 通过 deviceManager 检查哪些设备离线超过10分钟
const offlineSnCodes = [];
const offlineDevicesInfo = [];
for (const account of accounts) {
const sn_code = account.sn_code;
const device = deviceManager.devices.get(sn_code);
if (!device) {
offlineSnCodes.push(sn_code);
offlineDevicesInfo.push({
sn_code: sn_code,
lastHeartbeatTime: null
});
} else {
const lastHeartbeat = device.lastHeartbeat || 0;
if (lastHeartbeat < thresholdTime || !device.isOnline) {
offlineSnCodes.push(sn_code);
offlineDevicesInfo.push({
sn_code: sn_code,
lastHeartbeatTime: lastHeartbeat ? new Date(lastHeartbeat) : null
});
}
}
}
if (offlineSnCodes.length === 0) {
return;
}
console.log(`[清理离线任务] 发现 ${offlineSnCodes.length} 个离线超过10分钟的设备`);
let totalCancelled = 0;
const task_status = db.getModel('task_status');
for (const sn_code of offlineSnCodes) {
try {
const deviceInfo = offlineDevicesInfo.find(d => d.sn_code === sn_code);
const updateResult = await task_status.update(
{
status: 'cancelled',
endTime: new Date(),
result: JSON.stringify({
reason: '设备离线超过10分钟任务已自动取消',
offlineTime: deviceInfo?.lastHeartbeatTime
})
},
{
where: {
sn_code: sn_code,
status: ['pending', 'running']
}
}
);
const cancelledCount = Array.isArray(updateResult) ? updateResult[0] : updateResult;
totalCancelled += cancelledCount;
if (this.taskQueue && typeof this.taskQueue.cancelDeviceTasks === 'function') {
await this.taskQueue.cancelDeviceTasks(sn_code);
}
if (cancelledCount > 0) {
console.log(`[清理离线任务] 设备 ${sn_code} 已取消 ${cancelledCount} 个任务`);
}
} catch (error) {
console.error(`[清理离线任务] 取消设备 ${sn_code} 的任务失败:`, error);
}
}
if (totalCancelled > 0) {
console.log(`[清理离线任务] 共取消 ${totalCancelled} 个离线设备的任务`);
}
} catch (error) {
console.error('[清理离线任务] 执行失败:', error);
}
}
/**
* 同步任务状态摘要到客户端
*/
async syncTaskStatusSummary() {
try {
const { pla_account } = await Framework.getModels();
const accounts = await pla_account.findAll({
where: {
is_delete: 0,
is_enabled: 1
},
attributes: ['sn_code']
});
if (!accounts || accounts.length === 0) {
return;
}
const offlineThreshold = 3 * 60 * 1000;
const now = Date.now();
for (const account of accounts) {
const sn_code = account.sn_code;
const device = deviceManager.devices.get(sn_code);
if (!device) {
continue;
}
const lastHeartbeat = device.lastHeartbeat || 0;
const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold);
if (!isOnline) {
continue;
}
try {
const deviceWorkStatusNotifier = require('../notifiers/deviceWorkStatusNotifier');
const summary = await this.taskQueue.getTaskStatusSummary(sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(sn_code, summary, {
currentCommand: summary.currentCommand || null
});
} catch (error) {
console.error(`[设备工作状态同步] 设备 ${sn_code} 同步失败:`, error.message);
}
}
} catch (error) {
console.error('[任务状态同步] 执行失败:', error);
}
}
/**
* 检查任务超时并强制标记为失败
*/
async checkTaskTimeouts() {
try {
const Sequelize = require('sequelize');
const { task_status, op } = db.models;
const runningTasks = await task_status.findAll({
where: {
status: 'running'
},
attributes: ['id', 'sn_code', 'taskType', 'taskName', 'startTime', 'create_time']
});
if (!runningTasks || runningTasks.length === 0) {
return;
}
const now = new Date();
let timeoutCount = 0;
for (const task of runningTasks) {
const taskData = task.toJSON();
const startTime = taskData.startTime ? new Date(taskData.startTime) : (taskData.create_time ? new Date(taskData.create_time) : null);
if (!startTime) {
continue;
}
const taskTimeout = config.getTaskTimeout(taskData.taskType) || 10 * 60 * 1000;
const maxAllowedTime = taskTimeout * 1.2;
const elapsedTime = now.getTime() - startTime.getTime();
if (elapsedTime > maxAllowedTime) {
try {
await task_status.update(
{
status: 'failed',
endTime: now,
duration: elapsedTime,
result: JSON.stringify({
error: `任务执行超时(运行时间: ${Math.round(elapsedTime / 1000)}秒,超时限制: ${Math.round(maxAllowedTime / 1000)}秒)`,
timeout: true,
taskType: taskData.taskType,
startTime: startTime.toISOString()
}),
progress: 0
},
{
where: { id: taskData.id }
}
);
timeoutCount++;
console.warn(`[任务超时检查] 任务 ${taskData.id} (${taskData.taskName}) 运行时间过长,已强制标记为失败`);
if (this.taskQueue && typeof this.taskQueue.deviceStatus !== 'undefined') {
const deviceStatus = this.taskQueue.deviceStatus.get(taskData.sn_code);
if (deviceStatus && deviceStatus.currentTask && deviceStatus.currentTask.id === taskData.id) {
deviceStatus.isRunning = false;
deviceStatus.currentTask = null;
deviceStatus.runningCount = Math.max(0, deviceStatus.runningCount - 1);
this.taskQueue.globalRunningCount = Math.max(0, this.taskQueue.globalRunningCount - 1);
console.log(`[任务超时检查] 已重置设备 ${taskData.sn_code} 的状态`);
setTimeout(() => {
this.taskQueue.processQueue(taskData.sn_code).catch(error => {
console.error(`[任务超时检查] 继续处理队列失败:`, error);
});
}, 100);
}
}
} catch (error) {
console.error(`[任务超时检查] 更新超时任务状态失败:`, error);
}
}
}
if (timeoutCount > 0) {
console.log(`[任务超时检查] 共检测到 ${timeoutCount} 个超时任务`);
}
} catch (error) {
console.error('[任务超时检查] 执行失败:', error);
}
}
/**
* 停止所有定时任务
*/
stop() {
console.log('[定时任务] 停止所有定时任务...');
for (const job of this.jobs) {
if (job) {
job.cancel();
}
}
this.jobs = [];
console.log('[定时任务] 所有定时任务已停止');
}
}
module.exports = ScheduledJobs;