This commit is contained in:
张成
2025-12-17 17:30:03 +08:00
parent 663a5f7fa6
commit 735fa0c621
6 changed files with 495 additions and 124 deletions

View File

@@ -127,6 +127,26 @@ class CommandManager {
// 4. 更新指令状态为运行中
await this._update_command_status(command_id, 'running', null, null, start_time);
// 4.5 推送指令开始执行状态
try {
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const taskQueue = require('./taskQueue');
const summary = await taskQueue.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary, {
currentCommand: {
command_id: command_id,
command_name: command_name,
command_type: command_type,
command_params: command_params,
progress: 0,
startTime: start_time.toISOString()
}
});
} catch (pushError) {
// 推送失败不影响指令执行
console.warn(`[指令管理] 推送设备工作状态失败:`, pushError.message);
}
// 5. 执行指令(统一封装)
const result = await this._execute_command_with_timeout(
command_id,
@@ -141,6 +161,17 @@ class CommandManager {
// 6. 记录成功结果
await this._record_command_result(command_id, 'completed', result, null, start_time);
// 6.5 推送指令完成状态
try {
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const taskQueue = require('./taskQueue');
const summary = await taskQueue.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
} catch (pushError) {
// 推送失败不影响指令执行
console.warn(`[指令管理] 推送设备工作状态失败:`, pushError.message);
}
return {
command_id: command_id,
command_name: command_name,
@@ -159,6 +190,17 @@ class CommandManager {
error,
start_time
);
// 推送指令失败状态
try {
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const taskQueue = require('./taskQueue');
const summary = await taskQueue.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
} catch (pushError) {
// 推送失败不影响错误处理
console.warn(`[指令管理] 推送设备工作状态失败:`, pushError.message);
}
}
// 重新抛出错误,让调用方知道执行失败

View File

@@ -0,0 +1,253 @@
/**
* 设备工作状态推送服务
* 负责向客户端推送设备当前工作状态(任务、指令等)
*/
const db = require('../dbProxy');
class DeviceWorkStatusNotifier {
constructor() {
this.mqttClient = null;
}
/**
* 设置 MQTT 客户端
* @param {object} mqttClient - MQTT 客户端实例
*/
setMqttClient(mqttClient) {
this.mqttClient = mqttClient;
}
/**
* 获取 MQTT 客户端
* @returns {Promise<object>} MQTT 客户端实例
*/
async getMqttClient() {
if (this.mqttClient) {
return this.mqttClient;
}
// 从调度系统获取 MQTT 客户端(与 taskQueue 保持一致)
try {
const scheduleManager = require('./index');
if (scheduleManager.mqttClient) {
return scheduleManager.mqttClient;
}
} catch (error) {
// 调度系统未初始化
}
return null;
}
/**
* 发送设备工作状态(统一方法,包含任务和指令状态)
* @param {string} sn_code - 设备SN码
* @param {object} taskStatusSummary - 任务状态摘要(从 taskQueue.getTaskStatusSummary 获取)
* @param {object} options - 可选参数
* @param {object} options.currentCommand - 当前执行的指令信息(可选)
*/
async sendDeviceWorkStatus(sn_code, taskStatusSummary, options = {}) {
try {
const mqttClient = await this.getMqttClient();
if (!mqttClient) {
return; // MQTT客户端不可用静默失败
}
// 构建设备工作状态数据
const workStatus = {
sn_code: taskStatusSummary.sn_code || sn_code,
timestamp: new Date().toISOString(),
// 当前活动(任务或指令)
currentActivity: null,
// 待执行队列
pendingQueue: {
count: taskStatusSummary.pendingCount || 0,
totalCount: taskStatusSummary.totalPendingCount || 0,
nextExecuteTime: taskStatusSummary.nextTaskTime ? new Date(taskStatusSummary.nextTaskTime).toISOString() : null,
tasks: taskStatusSummary.pendingTasks || []
},
// 设备状态(从 deviceManager 获取)
deviceStatus: {
isOnline: true, // 默认在线,实际应从 deviceManager 获取
workMode: 'auto'
}
};
// 如果有当前执行的指令,优先显示指令状态
if (options.currentCommand) {
const cmd = options.currentCommand;
workStatus.currentActivity = {
type: 'command',
id: cmd.command_id || cmd.id,
name: cmd.command_name || cmd.name || '执行指令',
description: this._formatCommandDescription(cmd),
status: 'running',
progress: cmd.progress || 0,
currentStep: cmd.currentStep || '',
startTime: cmd.startTime || new Date().toISOString()
};
}
// 如果有当前执行的任务,显示任务状态
else if (taskStatusSummary.currentTask) {
const task = taskStatusSummary.currentTask;
workStatus.currentActivity = {
type: 'task',
id: task.taskId,
name: task.taskName || task.taskType || '未知任务',
description: this._formatTaskDescription(task),
status: task.status || 'running',
progress: task.progress || 0,
currentStep: task.currentStep || '',
startTime: task.startTime ? new Date(task.startTime).toISOString() : new Date().toISOString()
};
}
// 格式化显示文案(服务端统一处理,客户端直接显示)
workStatus.displayText = this._formatDisplayText(workStatus);
// 格式化下次执行时间显示文案
if (workStatus.pendingQueue.nextExecuteTime) {
const nextTime = new Date(workStatus.pendingQueue.nextExecuteTime);
const now = new Date();
const diff = nextTime - now;
if (diff < 0) {
workStatus.pendingQueue.nextExecuteTimeText = '即将执行';
} else if (diff < 60000) {
workStatus.pendingQueue.nextExecuteTimeText = `${Math.floor(diff / 1000)}秒后`;
} else if (diff < 3600000) {
workStatus.pendingQueue.nextExecuteTimeText = `${Math.floor(diff / 60000)}分钟后`;
} else if (diff < 86400000) {
workStatus.pendingQueue.nextExecuteTimeText = `${Math.floor(diff / 3600000)}小时后`;
} else {
workStatus.pendingQueue.nextExecuteTimeText = nextTime.toLocaleString('zh-CN', {
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit'
});
}
} else {
workStatus.pendingQueue.nextExecuteTimeText = '暂无';
}
// 通过MQTT发布设备工作状态
// 主题格式: device_work_status_{sn_code}
const topic = `device_work_status_${sn_code}`;
const message = JSON.stringify({
action: 'device_work_status',
data: workStatus,
timestamp: new Date().toISOString()
});
await mqttClient.publish(topic, message);
// 输出日志
console.log(`[设备工作状态] 已推送到 ${sn_code}: ${workStatus.displayText}`);
} catch (error) {
// 通知失败不影响任务执行,只记录日志
console.warn(`[设备工作状态] 推送失败:`, error.message);
}
}
/**
* 格式化任务描述
* @private
*/
_formatTaskDescription(task) {
if (task.jobTitle || task.companyName) {
const jobInfo = [];
if (task.jobTitle) {
jobInfo.push(task.jobTitle);
}
if (task.companyName) {
const companyName = task.companyName.length > 20
? task.companyName.substring(0, 20) + '...'
: task.companyName;
jobInfo.push(companyName);
}
if (jobInfo.length > 0) {
return `投递职位: ${jobInfo.join(' @ ')}`;
}
}
return task.taskName || task.taskType || '未知任务';
}
/**
* 格式化指令描述
* @private
*/
_formatCommandDescription(command) {
const params = command.command_params || command.params || {};
let parsedParams = {};
if (typeof params === 'string') {
try {
parsedParams = JSON.parse(params);
} catch (e) {
// 解析失败,忽略
}
} else {
parsedParams = params;
}
// 根据指令类型格式化描述
const commandType = command.command_type || command.type || '';
const commandName = command.command_name || command.name || '';
if (parsedParams.jobTitle && parsedParams.companyName) {
const companyName = parsedParams.companyName.length > 20
? parsedParams.companyName.substring(0, 20) + '...'
: parsedParams.companyName;
return `投递职位: ${parsedParams.jobTitle} @ ${companyName}`;
} else if (parsedParams.jobTitle) {
return `投递职位: ${parsedParams.jobTitle}`;
} else if (commandType === 'applyJob' || commandName.includes('投递')) {
return '投递简历';
} else if (commandType === 'searchJobs' || commandName.includes('搜索')) {
return `搜索职位: ${parsedParams.keyword || '默认关键词'}`;
} else if (commandType === 'sendChatMessage' || commandName.includes('沟通')) {
return '发送消息';
} else if (commandName) {
return commandName;
}
return '执行指令';
}
/**
* 格式化整体显示文案(服务端统一处理,客户端直接显示)
* @private
*/
_formatDisplayText(workStatus) {
const parts = [];
// 当前活动
if (workStatus.currentActivity) {
const activity = workStatus.currentActivity;
const typeText = activity.type === 'command' ? '指令' : '任务';
const statusText = activity.status === 'running' ? '执行中' :
activity.status === 'completed' ? '已完成' :
activity.status === 'failed' ? '失败' : '未知';
parts.push(`${typeText}: ${activity.description || activity.name} (${statusText}, 进度: ${activity.progress}%)`);
} else {
parts.push('当前活动: 无');
}
// 待执行数量
parts.push(`待执行: ${workStatus.pendingQueue.totalCount}`);
// 下次执行时间
parts.push(`下次执行: ${workStatus.pendingQueue.nextExecuteTimeText || '暂无'}`);
return parts.join(' | ');
}
}
// 导出单例
const deviceWorkStatusNotifier = new DeviceWorkStatusNotifier();
module.exports = deviceWorkStatusNotifier;

View File

@@ -11,6 +11,7 @@ const utils = require('./utils.js');
const TaskHandlers = require('./taskHandlers.js');
const MqttDispatcher = require('../mqtt/mqttDispatcher.js');
const ScheduledJobs = require('./scheduledJobs.js');
const DeviceWorkStatusNotifier = require('./deviceWorkStatusNotifier.js');
/**
* 调度系统管理器
@@ -69,6 +70,8 @@ class ScheduleManager {
*/
async initMqttClient() {
this.mqttClient = await mqttManager.getInstance();
// 设置设备工作状态推送服务的 MQTT 客户端
DeviceWorkStatusNotifier.setMqttClient(this.mqttClient);
}
/**

View File

@@ -331,11 +331,13 @@ class ScheduledJobs {
continue;
}
// 设备在线,发送任务状态摘要
// 设备在线,推送设备工作状态
try {
await this.taskQueue.sendTaskStatusSummary(sn_code);
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const summary = await this.taskQueue.getTaskStatusSummary(sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(sn_code, summary);
} catch (error) {
console.error(`[任务状态同步] 设备 ${sn_code} 同步失败:`, error.message);
console.error(`[设备工作状态同步] 设备 ${sn_code} 同步失败:`, error.message);
}
}
} catch (error) {

View File

@@ -582,15 +582,10 @@ class TaskQueue {
{ where: { id: task.id } }
);
// 通知客户端任务状态变更
await this.notifyTaskStatusChange(task.sn_code, {
taskId: task.id,
taskName: task.taskName,
taskType: task.taskType,
status: 'running',
progress: 0,
startTime: task.startTime
});
// 推送设备工作状态(任务开始执行)
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const summary = await this.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
// 使用注册的任务处理器执行任务
const handler = this.taskHandlers.get(task.taskType);
@@ -627,15 +622,10 @@ class TaskQueue {
{ where: { id: task.id } }
);
// 通知客户端任务状态变更
await this.notifyTaskStatusChange(task.sn_code, {
taskId: task.id,
taskName: task.taskName,
taskType: task.taskType,
status: 'completed',
progress: 100,
endTime: task.endTime
});
// 推送设备工作状态(任务完成)
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const summary = await this.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
console.log(`[任务队列] 设备 ${task.sn_code} 任务执行成功: ${task.taskName} (耗时: ${task.duration}ms)`);
@@ -671,16 +661,10 @@ class TaskQueue {
{ where: { id: task.id } }
);
// 通知客户端任务状态变更
await this.notifyTaskStatusChange(task.sn_code, {
taskId: task.id,
taskName: task.taskName,
taskType: task.taskType,
status: 'failed',
progress: 0,
errorMessage: task.errorMessage,
endTime: task.endTime
});
// 推送设备工作状态(任务失败)
const deviceWorkStatusNotifier = require('./deviceWorkStatusNotifier');
const summary = await this.getTaskStatusSummary(task.sn_code);
await deviceWorkStatusNotifier.sendDeviceWorkStatus(task.sn_code, summary);
} catch (dbError) {
console.error(`[任务队列] 更新任务失败状态到数据库失败:`, dbError);
}
@@ -1026,34 +1010,7 @@ class TaskQueue {
}
}
/**
* 通知客户端任务状态变更
* @param {string} sn_code - 设备SN码
* @param {object} taskData - 任务数据
*/
async notifyTaskStatusChange(sn_code, taskData) {
try {
const mqttClient = await this.getMqttClient();
if (!mqttClient) {
return; // MQTT客户端不可用静默失败
}
// 通过MQTT发布任务状态变更通知
// 主题格式: task_status_{sn_code}
const topic = `task_status_${sn_code}`;
const message = JSON.stringify({
action: 'task_status_update',
data: taskData,
timestamp: new Date().toISOString()
});
await mqttClient.publish(topic, message);
console.log(`[任务队列] 已通知客户端任务状态变更: ${sn_code} - ${taskData.taskName || taskData.taskType || '未知任务'} (${taskData.status})`);
} catch (error) {
// 通知失败不影响任务执行,只记录日志
console.warn(`[任务队列] 通知客户端任务状态变更失败:`, error.message);
}
}
// notifyTaskStatusChange 方法已移除,统一使用 deviceWorkStatusNotifier.sendDeviceWorkStatus
/**
* 获取任务状态摘要(用于同步到客户端)
@@ -1260,74 +1217,12 @@ class TaskQueue {
}
/**
* 向客户端发送任务状态摘要
* 发送设备工作状态(统一方法,包含任务和指令状态)
* @param {string} sn_code - 设备SN码
* @param {object} options - 可选参数
* @param {object} options.currentCommand - 当前执行的指令信息(可选)
*/
async sendTaskStatusSummary(sn_code) {
try {
const mqttClient = await this.getMqttClient();
if (!mqttClient) {
return; // MQTT客户端不可用静默失败
}
const summary = await this.getTaskStatusSummary(sn_code);
// 通过MQTT发布任务状态摘要
// 主题格式: task_status_{sn_code}
const topic = `task_status_${sn_code}`;
const message = JSON.stringify({
action: 'task_status_summary',
data: summary,
timestamp: new Date().toISOString()
});
await mqttClient.publish(topic, message);
// 改进日志输出,显示更详细的任务信息
if (summary.currentTask) {
const task = summary.currentTask;
// 构建任务标识信息
let taskIdentifier = task.taskName || task.taskType || '未知任务';
// 如果是投递简历任务,显示具体的职位和公司信息
if (task.jobTitle || task.companyName) {
const jobInfo = [];
if (task.jobTitle) {
jobInfo.push(task.jobTitle);
}
if (task.companyName) {
// 截断过长的公司名称
const companyName = task.companyName.length > 20
? task.companyName.substring(0, 20) + '...'
: task.companyName;
jobInfo.push(companyName);
}
if (jobInfo.length > 0) {
taskIdentifier = `投递职位: ${jobInfo.join(' @ ')}`;
}
}
const progressInfo = task.progress !== null && task.progress !== undefined
? `进度: ${task.progress}%`
: '';
const stepInfo = task.currentStep
? `步骤: ${task.currentStep}`
: '';
const detailInfo = [progressInfo, stepInfo].filter(Boolean).join(', ');
const detailStr = detailInfo ? ` (${detailInfo})` : '';
// 使用总待执行数(包括当前任务的剩余步骤)
const totalCount = summary.totalPendingCount !== undefined ? summary.totalPendingCount : summary.pendingCount;
console.log(`[任务队列] 已发送任务状态摘要到 ${sn_code}: 当前任务=${taskIdentifier}${detailStr}, 待执行=${totalCount}`);
} else {
console.log(`[任务队列] 已发送任务状态摘要到 ${sn_code}: 当前任务=无, 待执行=${summary.pendingCount}`);
}
} catch (error) {
// 通知失败不影响任务执行,只记录日志
console.warn(`[任务队列] 发送任务状态摘要失败:`, error.message);
}
}
}
// 导出单例