This commit is contained in:
张成
2025-12-19 22:24:23 +08:00
parent abe2ae3c3a
commit 10aff2f266
12 changed files with 1101 additions and 147 deletions

View File

@@ -2,13 +2,16 @@ const mqtt = require('mqtt')
const { v4: uuidv4 } = require('uuid'); // 顶部添加
const Framework = require('../../../framework/node-core-framework');
const logs = require('../logProxy');
// 获取logsService
class MqttSyncClient {
constructor(brokerUrl, options = {}) {
this.client = mqtt.connect(brokerUrl, options)
this.isConnected = false
this.messageListeners = []
// 使用 Map 结构优化消息监听器,按 topic 分组
this.messageListeners = new Map(); // Map<topic, Set<listener>>
this.globalListeners = new Set(); // 全局监听器(监听所有 topic
this.client.on('connect', () => {
this.isConnected = true
@@ -18,11 +21,42 @@ class MqttSyncClient {
this.client.on('message', (topic, message) => {
message = JSON.parse(message.toString())
let messageObj;
try {
messageObj = JSON.parse(message.toString());
} catch (error) {
console.warn('[MQTT] 消息解析失败:', error.message);
return;
}
console.log('MQTT 收到消息', topic, message)
// 记录日志但不包含敏感信息
const { maskSensitiveData } = require('../../utils/crypto_utils');
const safeMessage = maskSensitiveData(messageObj, ['password', 'pwd', 'token', 'secret', 'key', 'cookie']);
console.log('[MQTT] 收到消息', topic, '类型:', messageObj.action || messageObj.type || 'unknown');
this.messageListeners.forEach(listener => listener(topic, message))
// 优化:只通知相关 topic 的监听器,而不是所有监听器
// 1. 触发该 topic 的专用监听器
const topicListeners = this.messageListeners.get(topic);
if (topicListeners && topicListeners.size > 0) {
topicListeners.forEach(listener => {
try {
listener(topic, messageObj);
} catch (error) {
console.error('[MQTT] Topic监听器执行失败:', error.message);
}
});
}
// 2. 触发全局监听器
if (this.globalListeners.size > 0) {
this.globalListeners.forEach(listener => {
try {
listener(topic, messageObj);
} catch (error) {
console.error('[MQTT] 全局监听器执行失败:', error.message);
}
});
}
})
@@ -139,12 +173,56 @@ class MqttSyncClient {
});
}
addMessageListener(fn) {
this.messageListeners.push(fn)
/**
* 添加消息监听器
* @param {Function} fn - 监听器函数
* @param {string} topic - 可选,指定监听的 topic不指定则监听所有
*/
addMessageListener(fn, topic = null) {
if (typeof fn !== 'function') {
throw new Error('监听器必须是函数');
}
if (topic) {
// 添加到特定 topic 的监听器
if (!this.messageListeners.has(topic)) {
this.messageListeners.set(topic, new Set());
}
this.messageListeners.get(topic).add(fn);
} else {
// 添加到全局监听器
this.globalListeners.add(fn);
}
}
removeMessageListener(fn) {
this.messageListeners = this.messageListeners.filter(f => f !== fn)
/**
* 移除消息监听器
* @param {Function} fn - 监听器函数
* @param {string} topic - 可选,指定从哪个 topic 移除
*/
removeMessageListener(fn, topic = null) {
if (topic) {
// 从特定 topic 移除
const topicListeners = this.messageListeners.get(topic);
if (topicListeners) {
topicListeners.delete(fn);
// 如果该 topic 没有监听器了,删除整个 Set
if (topicListeners.size === 0) {
this.messageListeners.delete(topic);
}
}
} else {
// 从全局监听器移除
this.globalListeners.delete(fn);
// 也尝试从所有 topic 中移除(兼容旧代码)
for (const [topicKey, listeners] of this.messageListeners.entries()) {
listeners.delete(fn);
if (listeners.size === 0) {
this.messageListeners.delete(topicKey);
}
}
}
}
/**

View File

@@ -4,6 +4,8 @@ const config = require('./config.js');
const deviceManager = require('./deviceManager.js');
const command = require('./command.js');
const db = require('../dbProxy');
const authorizationService = require('../../services/authorization_service.js');
const Framework = require("../../../framework/node-core-framework.js");
/**
* 检查当前时间是否在指定的时间范围内
@@ -111,7 +113,7 @@ class ScheduledJobs {
// 执行自动投递任务
const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => {
const autoDeliverJob = node_schedule.scheduleJob(config.schedules.autoDeliver, () => {
this.autoDeliverTask();
});
@@ -197,7 +199,7 @@ class ScheduledJobs {
for (const account of accounts) {
const sn_code = account.sn_code;
const device = deviceManager.devices.get(sn_code);
if (!device) {
// 设备从未发送过心跳,视为离线
offlineSnCodes.push(sn_code);
@@ -250,7 +252,7 @@ class ScheduledJobs {
{
status: 'cancelled',
endTime: new Date(),
result: JSON.stringify({
result: JSON.stringify({
reason: '设备离线超过10分钟任务已自动取消',
offlineTime: deviceInfo?.lastHeartbeatTime
})
@@ -292,7 +294,7 @@ class ScheduledJobs {
async syncTaskStatusSummary() {
try {
const { pla_account } = await Framework.getModels();
// 获取所有启用的账号
const accounts = await pla_account.findAll({
where: {
@@ -313,19 +315,19 @@ class ScheduledJobs {
// 为每个在线设备发送任务状态摘要
for (const account of accounts) {
const sn_code = account.sn_code;
// 检查设备是否在线
const device = deviceManager.devices.get(sn_code);
if (!device) {
// 设备从未发送过心跳,视为离线,跳过
continue;
}
// 检查最后心跳时间
const lastHeartbeat = device.lastHeartbeat || 0;
const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold);
if (!isOnline) {
// 设备离线,跳过
continue;
@@ -355,7 +357,7 @@ class ScheduledJobs {
try {
const Sequelize = require('sequelize');
const { task_status, op } = db.models;
// 查询所有运行中的任务
const runningTasks = await task_status.findAll({
where: {
@@ -374,7 +376,7 @@ class ScheduledJobs {
for (const task of runningTasks) {
const taskData = task.toJSON();
const startTime = taskData.startTime ? new Date(taskData.startTime) : (taskData.create_time ? new Date(taskData.create_time) : null);
if (!startTime) {
continue;
}
@@ -424,9 +426,9 @@ class ScheduledJobs {
deviceStatus.currentTask = null;
deviceStatus.runningCount = Math.max(0, deviceStatus.runningCount - 1);
this.taskQueue.globalRunningCount = Math.max(0, this.taskQueue.globalRunningCount - 1);
console.log(`[任务超时检查] 已重置设备 ${taskData.sn_code} 的状态,可以继续执行下一个任务`);
// 尝试继续处理该设备的队列
setTimeout(() => {
this.taskQueue.processQueue(taskData.sn_code).catch(error => {
@@ -466,7 +468,7 @@ class ScheduledJobs {
// 移除 device_status 依赖,改为直接从 pla_account 查询启用且开启自动投递的账号
const models = db.models;
const { pla_account, op } = models;
// 直接从 pla_account 查询启用且开启自动投递的账号
// 注意:不再检查在线状态,因为 device_status 已移除
const pla_users = await pla_account.findAll({
@@ -477,7 +479,7 @@ class ScheduledJobs {
}
});
if (!pla_users || pla_users.length === 0) {
console.log('[自动投递] 没有启用且开启自动投递的账号');
@@ -497,17 +499,29 @@ class ScheduledJobs {
const offlineThreshold = 3 * 60 * 1000; // 3分钟
const now = Date.now();
const device = deviceManager.devices.get(userData.sn_code);
// 检查用户授权天数 是否够
const authorization = await authorizationService.checkAuthorization(userData.sn_code);
if (!authorization.is_authorized) {
console.log(`[自动投递] 设备 ${userData.sn_code} 授权天数不足,跳过添加任务`);
continue;
}
if (!device) {
// 设备从未发送过心跳,视为离线
console.log(`[自动投递] 设备 ${userData.sn_code} 离线(从未发送心跳),跳过添加任务`);
continue;
}
// 检查最后心跳时间
const lastHeartbeat = device.lastHeartbeat || 0;
const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold);
if (!isOnline) {
const offlineMinutes = lastHeartbeat ? Math.round((now - lastHeartbeat) / (60 * 1000)) : '未知';
console.log(`[自动投递] 设备 ${userData.sn_code} 离线(最后心跳: ${offlineMinutes}分钟前),跳过添加任务`);
@@ -569,7 +583,7 @@ class ScheduledJobs {
const lastDeliverTime = new Date(lastDeliverTask.endTime);
const elapsedTime = new Date().getTime() - lastDeliverTime.getTime();
if (elapsedTime < interval_ms) {
const remainingMinutes = Math.ceil((interval_ms - elapsedTime) / (60 * 1000));
const elapsedMinutes = Math.round(elapsedTime / (60 * 1000));
@@ -651,7 +665,7 @@ class ScheduledJobs {
// 移除 device_status 依赖,改为直接从 pla_account 查询启用且开启自动沟通的账号
const models = db.models;
const { pla_account, op } = models;
// 直接从 pla_account 查询启用且开启自动沟通的账号
// 注意:不再检查在线状态,因为 device_status 已移除
const pla_users = await pla_account.findAll({
@@ -680,17 +694,17 @@ class ScheduledJobs {
const offlineThreshold = 3 * 60 * 1000; // 3分钟
const now = Date.now();
const device = deviceManager.devices.get(userData.sn_code);
if (!device) {
// 设备从未发送过心跳,视为离线
console.log(`[自动沟通] 设备 ${userData.sn_code} 离线(从未发送心跳),跳过添加任务`);
continue;
}
// 检查最后心跳时间
const lastHeartbeat = device.lastHeartbeat || 0;
const isOnline = device.isOnline && (now - lastHeartbeat < offlineThreshold);
if (!isOnline) {
const offlineMinutes = lastHeartbeat ? Math.round((now - lastHeartbeat) / (60 * 1000)) : '未知';
console.log(`[自动沟通] 设备 ${userData.sn_code} 离线(最后心跳: ${offlineMinutes}分钟前),跳过添加任务`);
@@ -740,7 +754,7 @@ class ScheduledJobs {
if (lastChatTask && lastChatTask.endTime) {
const lastChatTime = new Date(lastChatTask.endTime);
const elapsedTime = now.getTime() - lastChatTime.getTime();
if (elapsedTime < interval_ms) {
const remainingMinutes = Math.ceil((interval_ms - elapsedTime) / (60 * 1000));
console.log(`[自动沟通] 设备 ${userData.sn_code} 距离上次沟通仅 ${Math.round(elapsedTime / (60 * 1000))} 分钟,还需等待 ${remainingMinutes} 分钟(间隔: ${chat_interval} 分钟)`);

View File

@@ -377,6 +377,70 @@ class TaskQueue {
}
}
/**
* 检查账号授权状态(剩余天数)
* @param {string} sn_code - 设备SN码
* @returns {Promise<{authorized: boolean, remaining_days: number, message: string}>}
*/
async checkAccountAuthorization(sn_code) {
try {
const pla_account = db.getModel('pla_account');
const account = await pla_account.findOne({
where: {
sn_code: sn_code,
is_delete: 0
},
attributes: ['authorization_date', 'authorization_days']
});
if (!account) {
return {
authorized: false,
remaining_days: 0,
message: '账号不存在'
};
}
const accountData = account.toJSON();
const authDate = accountData.authorization_date;
const authDays = accountData.authorization_days || 0;
// 使用工具函数计算剩余天数
const { calculateRemainingDays } = require('../../utils/account_utils');
const remaining_days = calculateRemainingDays(authDate, authDays);
// 如果没有授权信息或剩余天数 <= 0不允许创建任务
if (!authDate || authDays <= 0 || remaining_days <= 0) {
if (!authDate || authDays <= 0) {
return {
authorized: false,
remaining_days: 0,
message: '账号未授权,请购买使用权限后使用'
};
} else {
return {
authorized: false,
remaining_days: 0,
message: '账号使用权限已到期,请充值续费后使用'
};
}
}
return {
authorized: true,
remaining_days: remaining_days,
message: `授权有效,剩余 ${remaining_days}`
};
} catch (error) {
console.error(`[任务队列] 检查账号授权状态失败:`, error);
return {
authorized: false,
remaining_days: 0,
message: '检查授权状态失败'
};
}
}
/**
* 添加任务到队列
* @param {string} sn_code - 设备SN码
@@ -390,6 +454,12 @@ class TaskQueue {
throw new Error(`账号未启用,无法添加任务`);
}
// 检查账号授权状态(剩余天数)
const authResult = await this.checkAccountAuthorization(sn_code);
if (!authResult.authorized) {
throw new Error(authResult.message);
}
// 检查是否已有相同类型的任务在队列中或正在执行
const existingTask = await this.findExistingTask(sn_code, taskConfig.taskType);
if (existingTask) {