Files
autoAiWorkSys/api/middleware/schedule/taskHandlers.js
张成 130167acc9 1
2025-12-10 12:53:38 +08:00

455 lines
18 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const db = require('../dbProxy.js');
const config = require('./config.js');
const deviceManager = require('./deviceManager.js');
const command = require('./command.js');
const jobFilterService = require('../job/job_filter_service.js');
/**
* 任务处理器(简化版)
* 处理各种类型的任务
*/
class TaskHandlers {
constructor(mqttClient) {
this.mqttClient = mqttClient;
}
/**
* 注册任务处理器到任务队列
* @param {object} taskQueue - 任务队列实例
*/
register(taskQueue) {
// 自动投递任务
taskQueue.registerHandler('auto_deliver', async (task) => {
return await this.handleAutoDeliverTask(task);
});
// 自动沟通任务(待实现)
taskQueue.registerHandler('auto_chat', async (task) => {
return await this.handleAutoChatTask(task);
});
// 自动活跃账号任务(待实现)
taskQueue.registerHandler('auto_active_account', async (task) => {
return await this.handleAutoActiveAccountTask(task);
});
}
/**
* 处理自动投递任务
*/
async handleAutoDeliverTask(task) {
const { sn_code, taskParams } = task;
const { keyword, platform, pageCount, maxCount, filterRules = {} } = taskParams;
console.log(`[任务处理器] 自动投递任务 - 设备: ${sn_code}, 关键词: ${keyword}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
const job_postings = db.getModel('job_postings');
const pla_account = db.getModel('pla_account');
const resume_info = db.getModel('resume_info');
const job_types = db.getModel('job_types');
const apply_records = db.getModel('apply_records');
const Sequelize = require('sequelize');
const { Op } = Sequelize;
// 检查今日投递次数限制
const currentPlatform = platform || 'boss';
const dailyLimit = config.getDailyLimit('apply', currentPlatform);
// 获取今日开始时间00:00:00
const today = new Date();
today.setHours(0, 0, 0, 0);
// 查询今日已投递次数
const todayApplyCount = await apply_records.count({
where: {
sn_code: sn_code,
platform: currentPlatform,
applyTime: {
[Op.gte]: today
}
}
});
console.log(`[任务处理器] 今日已投递 ${todayApplyCount} 次,限制: ${dailyLimit}`);
// 如果已达到每日投递上限,则跳过
if (todayApplyCount >= dailyLimit) {
console.log(`[任务处理器] 已达到每日投递上限(${dailyLimit}次),跳过投递`);
return {
success: false,
deliveredCount: 0,
message: `已达到每日投递上限(${dailyLimit}次),今日已投递 ${todayApplyCount}`
};
}
// 计算本次可投递的数量(不超过剩余限额)
const remainingQuota = dailyLimit - todayApplyCount;
const actualMaxCount = Math.min(maxCount || 10, remainingQuota);
if (actualMaxCount < (maxCount || 10)) {
console.log(`[任务处理器] 受每日投递上限限制,本次最多投递 ${actualMaxCount} 个职位(剩余限额: ${remainingQuota}`);
}
// 1. 检查并获取在线简历如果2小时内没有获取
const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000);
let resume = await resume_info.findOne({
where: {
sn_code,
platform: platform || 'boss',
isActive: true
},
order: [['last_modify_time', 'DESC']]
});
const needRefreshResume = !resume ||
!resume.last_modify_time ||
new Date(resume.last_modify_time) < twoHoursAgo;
if (needRefreshResume) {
console.log(`[任务处理器] 简历超过2小时未更新重新获取在线简历`);
try {
// 通过 command 系统获取在线简历,而不是直接调用 jobManager
const getResumeCommand = {
command_type: 'getOnlineResume',
command_name: '获取在线简历',
command_params: JSON.stringify({ sn_code, platform: platform || 'boss' }),
priority: config.getTaskPriority('get_resume') || 5
};
await command.executeCommands(task.id, [getResumeCommand], this.mqttClient);
// 重新查询简历
resume = await resume_info.findOne({
where: {
sn_code,
platform: platform || 'boss',
isActive: true
},
order: [['last_modify_time', 'DESC']]
});
} catch (error) {
console.warn(`[任务处理器] 获取在线简历失败,使用已有简历:`, error.message);
}
}
if (!resume) {
console.log(`[任务处理器] 未找到简历信息,无法进行自动投递`);
return {
success: false,
deliveredCount: 0,
message: '未找到简历信息'
};
}
// 2. 获取账号配置和职位类型配置
const account = await pla_account.findOne({
where: { sn_code, platform_type: platform || 'boss' }
});
if (!account) {
console.log(`[任务处理器] 未找到账号配置`);
return {
success: false,
deliveredCount: 0,
message: '未找到账号配置'
};
}
const accountConfig = account.toJSON();
const resumeInfo = resume.toJSON();
// 获取职位类型配置
let jobTypeConfig = null;
if (accountConfig.job_type_id) {
const jobType = await job_types.findByPk(accountConfig.job_type_id);
if (jobType) {
jobTypeConfig = jobType.toJSON();
}
}
// 获取优先级权重配置
let priorityWeights = accountConfig.is_salary_priority;
if (!Array.isArray(priorityWeights) || priorityWeights.length === 0) {
priorityWeights = [
{ key: "distance", weight: 50 },
{ key: "salary", weight: 20 },
{ key: "work_years", weight: 10 },
{ key: "education", weight: 20 }
];
}
// 3. 先获取职位列表
const getJobListCommand = {
command_type: 'getJobList',
command_name: '获取职位列表',
command_params: JSON.stringify({
sn_code: sn_code,
keyword: keyword || accountConfig.keyword || '',
platform: platform || 'boss',
pageCount: pageCount || 3
}),
priority: config.getTaskPriority('search_jobs') || 5
};
await command.executeCommands(task.id, [getJobListCommand], this.mqttClient);
// 4. 从数据库获取待投递的职位
const pendingJobs = await job_postings.findAll({
where: {
sn_code: sn_code,
platform: platform || 'boss',
applyStatus: 'pending'
},
order: [['create_time', 'DESC']],
limit: actualMaxCount * 3 // 获取更多职位用于筛选(受每日投递上限限制)
});
if (!pendingJobs || pendingJobs.length === 0) {
console.log(`[任务处理器] 没有待投递的职位`);
return {
success: true,
deliveredCount: 0,
message: '没有待投递的职位'
};
}
// 5. 根据简历信息、职位类型配置和权重配置进行评分和过滤
const scoredJobs = [];
// 合并排除关键词:从职位类型配置和任务参数中获取
const jobTypeExcludeKeywords = jobTypeConfig && jobTypeConfig.excludeKeywords
? (typeof jobTypeConfig.excludeKeywords === 'string'
? JSON.parse(jobTypeConfig.excludeKeywords)
: jobTypeConfig.excludeKeywords)
: [];
const taskExcludeKeywords = filterRules.excludeKeywords || [];
const excludeKeywords = [...jobTypeExcludeKeywords, ...taskExcludeKeywords];
// 获取过滤关键词(用于优先匹配)
const filterKeywords = filterRules.keywords || [];
// 获取薪资范围过滤
const minSalary = filterRules.minSalary || 0;
const maxSalary = filterRules.maxSalary || 0;
// 获取一个月内已投递的公司列表(用于过滤)
// 注意apply_records 和 Sequelize 已在方法开头定义
const oneMonthAgo = new Date();
oneMonthAgo.setMonth(oneMonthAgo.getMonth() - 1);
const recentApplies = await apply_records.findAll({
where: {
sn_code: sn_code,
applyTime: {
[Sequelize.Op.gte]: oneMonthAgo
}
},
attributes: ['companyName'],
group: ['companyName']
});
const recentCompanyNames = new Set(recentApplies.map(apply => apply.companyName).filter(Boolean));
for (const job of pendingJobs) {
const jobData = job.toJSON ? job.toJSON() : job;
// 薪资范围过滤
if (minSalary > 0 || maxSalary > 0) {
const jobSalaryMin = jobData.salaryMin || 0;
const jobSalaryMax = jobData.salaryMax || 0;
// 如果职位薪资范围与过滤范围没有交集,则跳过
if (minSalary > 0 && jobSalaryMax > 0 && minSalary > jobSalaryMax) {
continue;
}
if (maxSalary > 0 && jobSalaryMin > 0 && maxSalary < jobSalaryMin) {
continue;
}
}
// 排除关键词过滤
if (Array.isArray(excludeKeywords) && excludeKeywords.length > 0) {
const jobText = `${jobData.jobTitle} ${jobData.companyName} ${jobData.jobDescription || ''}`.toLowerCase();
const hasExcluded = excludeKeywords.some(kw => jobText.includes(kw.toLowerCase()));
if (hasExcluded) {
continue;
}
}
// 检查该公司是否在一个月内已投递过
if (jobData.companyName && recentCompanyNames.has(jobData.companyName)) {
console.log(`[任务处理器] 跳过一个月内已投递的公司: ${jobData.companyName}`);
continue;
}
// 使用 job_filter_service 计算评分
const scoreResult = jobFilterService.calculateJobScoreWithWeights(
jobData,
resumeInfo,
accountConfig,
jobTypeConfig,
priorityWeights
);
// 如果配置了过滤关键词,给包含这些关键词的职位加分
let keywordBonus = 0;
if (Array.isArray(filterKeywords) && filterKeywords.length > 0) {
const jobText = `${jobData.jobTitle} ${jobData.companyName} ${jobData.jobDescription || ''}`.toLowerCase();
const matchedKeywords = filterKeywords.filter(kw => jobText.includes(kw.toLowerCase()));
if (matchedKeywords.length > 0) {
// 每匹配一个关键词加5分最多加20分
keywordBonus = Math.min(matchedKeywords.length * 5, 20);
}
}
const finalScore = scoreResult.totalScore + keywordBonus;
// 只保留总分 >= 60 的职位
if (finalScore >= 60) {
scoredJobs.push({
...jobData,
matchScore: finalScore,
scoreDetails: {
...scoreResult.scores,
keywordBonus: keywordBonus
}
});
}
}
// 按总分降序排序
scoredJobs.sort((a, b) => b.matchScore - a.matchScore);
// 取前 actualMaxCount 个职位(受每日投递上限限制)
const jobsToDeliver = scoredJobs.slice(0, actualMaxCount);
console.log(`[任务处理器] 职位评分完成,共 ${pendingJobs.length} 个职位,评分后 ${scoredJobs.length} 个符合条件,将投递 ${jobsToDeliver.length}`);
if (jobsToDeliver.length === 0) {
return {
success: true,
deliveredCount: 0,
message: '没有符合条件的职位'
};
}
// 6. 为每个职位创建一条独立的投递指令
const deliverCommands = [];
for (const jobData of jobsToDeliver) {
console.log(`[任务处理器] 准备投递职位: ${jobData.jobTitle} @ ${jobData.companyName}, 评分: ${jobData.matchScore}`, jobData.scoreDetails);
deliverCommands.push({
command_type: 'applyJob',
command_name: `投递简历 - ${jobData.jobTitle} @ ${jobData.companyName} (评分:${jobData.matchScore})`,
command_params: JSON.stringify({
sn_code: sn_code,
platform: platform || 'boss',
jobId: jobData.jobId,
encryptBossId: jobData.encryptBossId || '',
securityId: jobData.securityId || '',
brandName: jobData.companyName,
jobTitle: jobData.jobTitle,
companyName: jobData.companyName,
matchScore: jobData.matchScore,
scoreDetails: jobData.scoreDetails
}),
priority: config.getTaskPriority('apply') || 6
});
}
// 7. 执行所有投递指令
const result = await command.executeCommands(task.id, deliverCommands, this.mqttClient);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
console.log(`[任务处理器] 自动投递任务完成 - 设备: ${sn_code}, 创建了 ${deliverCommands.length} 条投递指令, 耗时: ${duration}ms`);
return result;
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
console.error(`[任务处理器] 自动投递任务失败 - 设备: ${sn_code}:`, error);
throw error;
}
}
/**
* 处理自动沟通任务(待实现)
* 功能自动与HR进行沟通回复消息等
*/
async handleAutoChatTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 自动沟通任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
// TODO: 实现自动沟通逻辑
// 1. 获取待回复的聊天列表
// 2. 根据消息内容生成回复
// 3. 发送回复消息
// 4. 记录沟通结果
console.log(`[任务处理器] 自动沟通任务 - 逻辑待实现`);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return {
success: true,
message: '自动沟通任务框架已就绪,逻辑待实现',
chatCount: 0
};
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
/**
* 处理自动活跃账号任务(待实现)
* 功能:自动执行一些操作来保持账号活跃度,如浏览职位、搜索等
*/
async handleAutoActiveAccountTask(task) {
const { sn_code, taskParams } = task;
console.log(`[任务处理器] 自动活跃账号任务 - 设备: ${sn_code}`);
deviceManager.recordTaskStart(sn_code, task);
const startTime = Date.now();
try {
// TODO: 实现自动活跃账号逻辑
// 1. 随机搜索一些职位
// 2. 浏览职位详情
// 3. 查看公司信息
// 4. 执行一些模拟用户行为
console.log(`[任务处理器] 自动活跃账号任务 - 逻辑待实现`);
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, true, duration);
return {
success: true,
message: '自动活跃账号任务框架已就绪,逻辑待实现',
actionCount: 0
};
} catch (error) {
const duration = Date.now() - startTime;
deviceManager.recordTaskComplete(sn_code, task, false, duration);
throw error;
}
}
}
module.exports = TaskHandlers;