Files
autoAiWorkSys/api/middleware/schedule/taskHandlers.js
张成 5d7444cd65 1
2025-11-24 13:23:42 +08:00

433 lines
16 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const db = require('../dbProxy.js');
const config = require('./config.js');
const deviceManager = require('./deviceManager.js');
const command = require('./command.js');
const jobFilterService = require('../job/job_filter_service.js');
/**
* 任务处理器(简化版)
* 处理各种类型的任务
*/
class TaskHandlers {
constructor(mqttClient) {
this.mqttClient = mqttClient;
}
/**
* 注册任务处理器到任务队列
* @param {object} taskQueue - 任务队列实例
*/
register(taskQueue) {
taskQueue.registerHandler('get_resume', async (task) => {
return await this.handleGetResumeTask(task);
});
taskQueue.registerHandler('get_job_list', async (task) => {
return await this.handleGetJobListTask(task);
});
taskQueue.registerHandler('send_chat', async (task) => {
return await this.handleSendChatTask(task);
});
taskQueue.registerHandler('apply_job', async (task) => {
return await this.handleApplyJobTask(task);
});
taskQueue.registerHandler('auto_deliver', async (task) => {
return await this.handleAutoDeliverTask(task);
});
}
/**
* 处理获取简历任务
*/
async handleGetResumeTask(task) {
const { sn_code } = task;
console.log(`[任务处理器] 获取简历任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code }),
priority: config.getTaskPriority('get_resume')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理获取岗位列表任务
*/
async handleGetJobListTask(task) {
const { sn_code, taskParams } = task;
const { keyword, platform } = taskParams;
console.log(`[任务处理器] 获取岗位列表任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'getJobList',
command_name: '获取岗位列表',
command_params: JSON.stringify({ sn_code, keyword, platform }),
priority: config.getTaskPriority('search_jobs')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理发送聊天任务
*/
async handleSendChatTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 发送聊天任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'sendChatMessage',
command_name: '发送聊天消息',
command_params: JSON.stringify(taskParams),
priority: config.getTaskPriority('chat')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理投递简历任务
*/
async handleApplyJobTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 投递简历任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const commands = [{
command_type: 'applyJob',
command_name: '投递简历',
command_params: JSON.stringify(taskParams),
priority: config.getTaskPriority('apply')
}];
const result = await command.executeCommands(task.id, commands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理自动投递任务
*/
async handleAutoDeliverTask(task) {
const { sn_code, taskParams } = task;
const { keyword, platform, pageCount, maxCount } = taskParams;
console.log(`[任务处理器] 自动投递任务 - 设备: ${sn_code}, 关键词: ${keyword}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const job_postings = db.getModel('job_postings');
const pla_account = db.getModel('pla_account');
const resume_info = db.getModel('resume_info');
const job_types = db.getModel('job_types');
// 1. 检查并获取在线简历如果2小时内没有获取
const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000);
let resume = await resume_info.findOne({
where: {
sn_code,
platform: platform || 'boss',
isActive: true
},
order: [['last_modify_time', 'DESC']]
});
const needRefreshResume = !resume ||
!resume.last_modify_time ||
new Date(resume.last_modify_time) < twoHoursAgo;
if (needRefreshResume) {
console.log(`[任务处理器] 简历超过2小时未更新重新获取在线简历`);
try {
// 通过 command 系统获取在线简历,而不是直接调用 jobManager
const getResumeCommand = {
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code, platform: platform || 'boss' }),
priority: config.getTaskPriority('get_resume') || 5
};
await command.executeCommands(task.id, [getResumeCommand], this.mqttClient);
// 重新查询简历
resume = await resume_info.findOne({
where: {
sn_code,
platform: platform || 'boss',
isActive: true
},
order: [['last_modify_time', 'DESC']]
});
} catch (error) {
console.warn(`[任务处理器] 获取在线简历失败,使用已有简历:`, error.message);
}
}
if (!resume) {
console.log(`[任务处理器] 未找到简历信息,无法进行自动投递`);
return {
success: false,
deliveredCount: 0,
message: '未找到简历信息'
};
}
// 2. 获取账号配置和职位类型配置
const account = await pla_account.findOne({
where: { sn_code, platform_type: platform || 'boss' }
});
if (!account) {
console.log(`[任务处理器] 未找到账号配置`);
return {
success: false,
deliveredCount: 0,
message: '未找到账号配置'
};
}
const accountConfig = account.toJSON();
const resumeInfo = resume.toJSON();
// 获取职位类型配置
let jobTypeConfig = null;
if (accountConfig.job_type_id) {
const jobType = await job_types.findByPk(accountConfig.job_type_id);
if (jobType) {
jobTypeConfig = jobType.toJSON();
}
}
// 获取优先级权重配置
let priorityWeights = accountConfig.is_salary_priority;
if (!Array.isArray(priorityWeights) || priorityWeights.length === 0) {
priorityWeights = [
{ key: "distance", weight: 50 },
{ key: "salary", weight: 20 },
{ key: "work_years", weight: 10 },
{ key: "education", weight: 20 }
];
}
// 3. 先获取职位列表
const getJobListCommand = {
command_type: 'getJobList',
command_name: '获取职位列表',
command_params: JSON.stringify({
sn_code: sn_code,
keyword: keyword || accountConfig.keyword || '',
platform: platform || 'boss',
pageCount: pageCount || 3
}),
priority: config.getTaskPriority('search_jobs') || 5
};
await command.executeCommands(task.id, [getJobListCommand], this.mqttClient);
// 4. 从数据库获取待投递的职位
const pendingJobs = await job_postings.findAll({
where: {
sn_code: sn_code,
platform: platform || 'boss',
applyStatus: 'pending'
},
order: [['create_time', 'DESC']],
limit: (maxCount || 10) * 3 // 获取更多职位用于筛选
});
if (!pendingJobs || pendingJobs.length === 0) {
console.log(`[任务处理器] 没有待投递的职位`);
return {
success: true,
deliveredCount: 0,
message: '没有待投递的职位'
};
}
// 5. 根据简历信息、职位类型配置和权重配置进行评分和过滤
const scoredJobs = [];
const excludeKeywords = jobTypeConfig && jobTypeConfig.excludeKeywords
? (typeof jobTypeConfig.excludeKeywords === 'string'
? JSON.parse(jobTypeConfig.excludeKeywords)
: jobTypeConfig.excludeKeywords)
: [];
// 获取一个月内已投递的公司列表(用于过滤)
const apply_records = db.getModel('apply_records');
const Sequelize = require('sequelize');
const oneMonthAgo = new Date();
oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1);
const recentApplies = await apply_records.findAll({
where: {
sn_code: sn_code,
applyTime: {
[Sequelize.Op.gte]: oneMonthAgo
}
},
attributes: ['companyName'],
group: ['companyName']
});
const recentCompanyNames = new Set(recentApplies.map(apply => apply.companyName).filter(Boolean));
for (const job of pendingJobs) {
const jobData = job.toJSON ? job.toJSON() : job;
// 排除关键词过滤
if (Array.isArray(excludeKeywords) && excludeKeywords.length > 0) {
const jobText = `${jobData.jobTitle} ${jobData.companyName} ${jobData.jobDescription || ''}`.toLowerCase();
const hasExcluded = excludeKeywords.some(kw => jobText.includes(kw.toLowerCase()));
if (hasExcluded) {
continue;
}
}
// 检查该公司是否在一个月内已投递过
if (jobData.companyName && recentCompanyNames.has(jobData.companyName)) {
console.log(`[任务处理器] 跳过一个月内已投递的公司: ${jobData.companyName}`);
continue;
}
// 使用 job_filter_service 计算评分
const scoreResult = jobFilterService.calculateJobScoreWithWeights(
jobData,
resumeInfo,
accountConfig,
jobTypeConfig,
priorityWeights
);
// 只保留总分 >= 60 的职位
if (scoreResult.totalScore >= 60) {
scoredJobs.push({
...jobData,
matchScore: scoreResult.totalScore,
scoreDetails: scoreResult.scores
});
}
}
// 按总分降序排序
scoredJobs.sort((a, b) => b.matchScore - a.matchScore);
// 取前 maxCount 个职位
const jobsToDeliver = scoredJobs.slice(0, maxCount || 10);
console.log(`[任务处理器] 职位评分完成,共 ${pendingJobs.length} 个职位,评分后 ${scoredJobs.length} 个符合条件,将投递 ${jobsToDeliver.length}`);
if (jobsToDeliver.length === 0) {
return {
success: true,
deliveredCount: 0,
message: '没有符合条件的职位'
};
}
// 6. 为每个职位创建一条独立的投递指令
const deliverCommands = [];
for (const jobData of jobsToDeliver) {
console.log(`[任务处理器] 准备投递职位: ${jobData.jobTitle} @ ${jobData.companyName}, 评分: ${jobData.matchScore}`, jobData.scoreDetails);
deliverCommands.push({
command_type: 'applyJob',
command_name: `投递简历 - ${jobData.jobTitle} @ ${jobData.companyName} (评分:${jobData.matchScore})`,
command_params: JSON.stringify({
sn_code: sn_code,
platform: platform || 'boss',
jobId: jobData.jobId,
encryptBossId: jobData.encryptBossId || '',
securityId: jobData.securityId || '',
brandName: jobData.companyName,
jobTitle: jobData.jobTitle,
companyName: jobData.companyName,
matchScore: jobData.matchScore,
scoreDetails: jobData.scoreDetails
}),
priority: config.getTaskPriority('apply') || 6
});
}
// 7. 执行所有投递指令
const result = await command.executeCommands(task.id, deliverCommands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
console.log(`[任务处理器] 自动投递任务完成 - 设备: ${sn_code}, 创建了 ${deliverCommands.length} 条投递指令, 耗时: ${duration}ms`);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
console.error(`[任务处理器] 自动投递任务失败 - 设备: ${sn_code}:`, error);
throw error;
}
}
}
module.exports = TaskHandlers;