Files
autoAiWorkSys/api/services/pla_account_service.js
张成 34ebad316a 1
2025-12-19 11:40:25 +08:00

826 lines
25 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 平台账号服务
* 处理平台账号相关的业务逻辑
*/
const db = require('../middleware/dbProxy');
const scheduleManager = require('../middleware/schedule/index.js');
const locationService = require('./locationService');
const authorizationService = require('./authorization_service');
class PlaAccountService {
/**
* 根据ID获取账号信息
* @param {number} id - 账号ID
* @returns {Promise<Object>} 账号信息
*/
async getAccountById(id) {
const pla_account = db.getModel('pla_account');
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
const accountData = account.get({ plain: true });
// is_online 和 is_logged_in 字段已存在于数据库中,直接返回
// 如果字段不存在,设置默认值
if (accountData.is_online === undefined) {
accountData.is_online = false;
}
if (accountData.is_logged_in === undefined) {
accountData.is_logged_in = false;
}
return accountData;
}
/**
* 根据设备SN码获取账号信息
* @param {string} sn_code - 设备SN码
* @returns {Promise<Object>} 账号信息
*/
async getAccountBySnCode(sn_code) {
const pla_account = db.getModel('pla_account');
if (!sn_code) {
throw new Error('设备SN码不能为空');
}
// 根据 sn_code 查询账号,排除已删除的账号
const account = await pla_account.findOne({
where: {
sn_code: sn_code,
is_delete: 0
}
});
if (!account) {
throw new Error('账号不存在');
}
const accountData = account.get({ plain: true });
// is_online 和 is_logged_in 字段已存在于数据库中,直接返回
// 如果字段不存在,设置默认值
if (accountData.is_online === undefined) {
accountData.is_online = false;
}
if (accountData.is_logged_in === undefined) {
accountData.is_logged_in = false;
}
return accountData;
}
/**
* 获取账号列表
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 账号列表
*/
async getAccountList(params) {
const pla_account = db.getModel('pla_account');
const op = db.getModel('op');
const { key, value, platform_type, is_online, limit, offset } = params;
const where = { is_delete: 0 };
// 搜索条件
if (key && value) {
// 对于 sn_code 使用精确匹配,其他字段使用模糊匹配
if (key === 'sn_code') {
where[key] = value;
} else {
where[key] = { [op.like]: `%${value}%` };
}
}
// 平台筛选
if (platform_type) {
where.platform_type = platform_type;
}
// 移除 device_status 依赖is_online 筛选功能暂时禁用
// 如果需要在线状态筛选,可以在 pla_account 表中添加相应字段
const result = await pla_account.findAndCountAll({
where,
limit,
offset,
order: [['id', 'DESC']]
});
// 处理返回数据is_online 和 is_logged_in 从数据库读取
const rows = result.rows.map(account => {
const accountData = account.get({ plain: true });
// 如果字段不存在,设置默认值
if (accountData.is_online === undefined) {
accountData.is_online = false;
}
if (accountData.is_logged_in === undefined) {
accountData.is_logged_in = false;
}
return accountData;
});
return {
count: result.count,
rows
};
}
/**
* 创建账号
* @param {Object} data - 账号数据
* @returns {Promise<Object>} 创建的账号
*/
async createAccount(data) {
const pla_account = db.getModel('pla_account');
const { name, sn_code, platform_type, login_name, pwd, keyword, ...otherData } = data;
if (!name || !sn_code || !platform_type || !login_name) {
throw new Error('账户名、设备SN码、平台和登录名为必填项');
}
// 将布尔字段从 true/false 转换为 0/1
const booleanFields = ['auto_deliver', 'auto_chat', 'auto_reply', 'auto_active'];
const processedData = {
name,
sn_code,
platform_type,
login_name,
pwd: pwd || '',
keyword: keyword || '',
...otherData
};
booleanFields.forEach(field => {
if (processedData[field] !== undefined && processedData[field] !== null) {
processedData[field] = processedData[field] ? 1 : 0;
}
});
const account = await pla_account.create(processedData);
return account;
}
/**
* 更新账号信息
* @param {number} id - 账号ID
* @param {Object} updateData - 更新的数据
* @returns {Promise<void>}
*/
async updateAccount(id, updateData) {
const pla_account = db.getModel('pla_account');
if (!id) {
throw new Error('账号ID不能为空');
}
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 将布尔字段从 true/false 转换为 0/1确保数据库兼容性
const booleanFields = ['auto_deliver', 'auto_chat', 'auto_reply', 'auto_active'];
const processedData = { ...updateData };
booleanFields.forEach(field => {
if (processedData[field] !== undefined && processedData[field] !== null) {
processedData[field] = processedData[field] ? 1 : 0;
}
});
await pla_account.update(processedData, { where: { id } });
}
/**
* 删除账号(软删除)
* @param {number} id - 账号ID
* @returns {Promise<void>}
*/
async deleteAccount(id) {
const pla_account = db.getModel('pla_account');
if (!id) {
throw new Error('账号ID不能为空');
}
// 软删除
const result = await pla_account.update(
{ is_delete: 1 },
{ where: { id } }
);
if (result[0] === 0) {
throw new Error('账号不存在');
}
}
/**
* 获取账号的任务列表
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 任务列表
*/
async getAccountTasks(params) {
const pla_account = db.getModel('pla_account');
const task_status = db.getModel('task_status');
const { id, limit, offset } = params;
if (!id) {
throw new Error('账号ID不能为空');
}
// 先获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 通过 sn_code 查询任务列表
const result = await task_status.findAndCountAll({
where: {
sn_code: account.sn_code
},
limit,
offset,
order: [['id', 'DESC']]
});
// 将 Sequelize 模型实例转换为普通对象
const rows = result.rows.map(row => row.get({ plain: true }));
return {
count: result.count,
rows: rows
};
}
/**
* 获取账号的指令列表
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 指令列表
*/
async getAccountCommands(params) {
const pla_account = db.getModel('pla_account');
const task_commands = db.getModel('task_commands');
const task_status = db.getModel('task_status');
const Sequelize = require('sequelize');
const Op = Sequelize.Op;
const { id, limit, offset } = params;
if (!id) {
throw new Error('账号ID不能为空');
}
// 先获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 先查询所有匹配 sn_code 的任务 ID
const tasks = await task_status.findAll({
where: {
sn_code: account.sn_code
},
attributes: ['id']
});
const taskIds = tasks.map(task => task.id);
// 如果没有任务,直接返回空结果
if (taskIds.length === 0) {
return {
count: 0,
rows: []
};
}
// 使用 Sequelize 模型查询指令列表
const result = await task_commands.findAndCountAll({
where: {
task_id: {
[Op.in]: taskIds
}
},
limit,
offset,
order: [['id', 'DESC']]
});
// 将 Sequelize 模型实例转换为普通对象
const rows = result.rows.map(row => {
const plainRow = row.get({ plain: true });
// 添加 create_time 字段(使用 start_time 或 createdAt
if (!plainRow.create_time) {
plainRow.create_time = plainRow.start_time || plainRow.createdAt || null;
}
// 解析 JSON 字段
if (plainRow.command_params && typeof plainRow.command_params === 'string') {
try {
plainRow.command_params = JSON.parse(plainRow.command_params);
} catch (e) {
// 解析失败保持原样
}
}
if (plainRow.result && typeof plainRow.result === 'string') {
try {
plainRow.result = JSON.parse(plainRow.result);
} catch (e) {
// 解析失败保持原样
}
}
return plainRow;
});
return {
count: result.count,
rows: rows
};
}
/**
* 执行账号指令
* @param {Object} params - 指令参数
* @returns {Promise<Object>} 执行结果
*/
async runCommand(params) {
const pla_account = db.getModel('pla_account');
const task_status = db.getModel('task_status');
const { id, commandType, commandName, commandParams } = params;
if (!id || !commandType) {
throw new Error('账号ID和指令类型不能为空');
}
// 获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 检查授权状态
const authCheck = await authorizationService.checkAuthorization(id, 'id');
if (!authCheck.is_authorized) {
throw new Error(authCheck.message);
}
// 移除 device_status 依赖,在线状态检查暂时移除
// 如果需要在线状态检查,可以在 pla_account 表中添加相应字段
// 获取调度管理器并执行指令
if (!scheduleManager.mqttClient) {
throw new Error('MQTT客户端未初始化');
}
// 将驼峰格式转换为下划线格式用于jobManager方法调用
const toSnakeCase = (str) => {
// 如果已经是下划线格式,直接返回
if (str.includes('_')) {
return str;
}
// 驼峰转下划线
return str.replace(/([A-Z])/g, '_$1').toLowerCase();
};
// 直接使用commandType转换为下划线格式作为command_type
const commandTypeSnake = toSnakeCase(commandType);
// 构建基础参数
const baseParams = {
sn_code: account.sn_code,
platform: account.platform_type
};
// 合并参数commandParams优先
const finalParams = commandParams
? { ...baseParams, ...commandParams }
: baseParams;
// 如果有关键词相关的操作,添加关键词
if (['search_jobs', 'get_job_list'].includes(commandTypeSnake) && account.keyword) {
finalParams.keyword = account.keyword;
}
// 构建指令对象
const command = {
command_type: commandTypeSnake,
command_name: commandName || commandType,
command_params: JSON.stringify(finalParams)
};
// 创建任务记录
const task = await task_status.create({
sn_code: account.sn_code,
taskType: commandTypeSnake,
taskName: commandName || commandType,
taskParams: JSON.stringify(finalParams)
});
// 直接执行指令
const result = await scheduleManager.command.executeCommand(task.id, command, scheduleManager.mqttClient);
return result;
}
/**
* 获取指令详情
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 指令详情
*/
async getCommandDetail(params) {
const task_commands = db.getModel('task_commands');
const { accountId, commandId } = params;
if (!accountId || !commandId) {
throw new Error('账号ID和指令ID不能为空');
}
// 查询指令详情
const command = await task_commands.findByPk(commandId);
if (!command) {
throw new Error('指令不存在');
}
// 解析JSON字段
const commandDetail = command.toJSON();
try {
if (commandDetail.command_params) {
commandDetail.command_params = JSON.parse(commandDetail.command_params);
}
if (commandDetail.result) {
commandDetail.result = JSON.parse(commandDetail.result);
}
} catch (error) {
console.warn('解析指令详情失败:', error);
}
return commandDetail;
}
/**
* 重试指令
* @param {Object} params - 重试参数
* @param {number} params.commandId - 指令ID
* @returns {Promise<Object>} 重试结果
*/
async retryCommand(params) {
const task_commands = db.getModel('task_commands');
const task_status = db.getModel('task_status');
const scheduleManager = require('../middleware/schedule/index.js');
const { commandId } = params;
if (!commandId) {
throw new Error('指令ID不能为空');
}
// 查询指令信息
const command = await task_commands.findByPk(commandId);
if (!command) {
throw new Error('指令不存在');
}
// 检查指令状态
if (command.status !== 'failed') {
throw new Error('只能重试失败的指令');
}
// 获取任务信息
const task = await task_status.findByPk(command.task_id);
if (!task) {
throw new Error('任务不存在');
}
// 获取账号信息
const pla_account = db.getModel('pla_account');
const account = await pla_account.findOne({ where: { sn_code: task.sn_code } });
if (!account) {
throw new Error('账号不存在');
}
// 检查授权状态
const authCheck = await authorizationService.checkAuthorization(account.id, 'id');
if (!authCheck.is_authorized) {
throw new Error(authCheck.message);
}
// 检查 MQTT 客户端
if (!scheduleManager.mqttClient) {
throw new Error('MQTT客户端未初始化');
}
// 重置指令状态
await command.update({
status: 'pending',
error_message: null,
error_stack: null,
retry_count: (command.retry_count || 0) + 1,
start_time: null,
end_time: null,
duration: null,
result: null
});
// 解析指令参数
let commandParams = {};
if (command.command_params) {
try {
commandParams = typeof command.command_params === 'string'
? JSON.parse(command.command_params)
: command.command_params;
} catch (error) {
console.warn('解析指令参数失败:', error);
}
}
// 构建指令对象
const commandObj = {
command_type: command.command_type,
command_name: command.command_name,
command_params: JSON.stringify(commandParams)
};
// 执行指令
const result = await scheduleManager.command.executeCommand(task.id, commandObj, scheduleManager.mqttClient);
return {
success: true,
message: '指令重试成功',
commandId: command.id,
result: result
};
}
/**
* 执行账号任务(旧接口兼容)
* @param {Object} params - 任务参数
* @returns {Promise<Object>} 执行结果
*/
async runTask(params) {
const pla_account = db.getModel('pla_account');
const { id, taskType, taskName } = params;
if (!id || !taskType) {
throw new Error('账号ID和指令类型不能为空');
}
// 获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 检查账号是否启用
if (!account.is_enabled) {
throw new Error('账号未启用,无法执行指令');
}
// 移除 device_status 依赖,在线状态检查暂时移除
// 如果需要在线状态检查,可以在 pla_account 表中添加相应字段
// 获取调度管理器并执行指令
if (!scheduleManager.mqttClient) {
throw new Error('MQTT客户端未初始化');
}
await scheduleManager.taskQueue.addTask(account.sn_code, {
taskType: taskType,
taskName: `手动任务 - ${taskName}`,
taskParams: {
keyword: account.keyword,
platform: account.platform_type
}
});
return {
message: '任务已添加到队列',
taskId: task.id
};
}
/**
* 停止账号的所有任务
* @param {Object} params - 参数对象
* @param {number} params.id - 账号ID
* @param {string} params.sn_code - 设备SN码
* @returns {Promise<Object>} 停止结果
*/
async stopTasks(params) {
const { id, sn_code } = params;
if (!id && !sn_code) {
throw new Error('账号ID或设备SN码不能为空');
}
const pla_account = db.getModel('pla_account');
const task_status = db.getModel('task_status');
const scheduleManager = require('../middleware/schedule/index.js');
// 获取账号信息
let account;
if (id) {
account = await pla_account.findByPk(id);
} else if (sn_code) {
account = await pla_account.findOne({ where: { sn_code } });
}
if (!account) {
throw new Error('账号不存在');
}
const deviceSnCode = account.sn_code;
// 1. 从任务队列中取消该设备的所有待执行任务
const taskQueue = scheduleManager.taskQueue;
let cancelledCount = 0;
if (taskQueue && typeof taskQueue.cancelDeviceTasks === 'function') {
cancelledCount = await taskQueue.cancelDeviceTasks(deviceSnCode);
} else {
// 如果没有 cancelDeviceTasks 方法,手动取消
const tasks = await task_status.findAll({
where: {
sn_code: deviceSnCode,
status: ['pending', 'running']
}
});
for (const task of tasks) {
try {
if (taskQueue && typeof taskQueue.cancelTask === 'function') {
await taskQueue.cancelTask(task.id);
cancelledCount++;
} else {
// 直接更新数据库
await task_status.update(
{
status: 'cancelled',
endTime: new Date()
},
{ where: { id: task.id } }
);
cancelledCount++;
}
} catch (error) {
console.error(`[停止任务] 取消任务 ${task.id} 失败:`, error);
}
}
}
return {
success: true,
message: `已停止 ${cancelledCount} 个任务`,
cancelledCount: cancelledCount,
sn_code: deviceSnCode
};
}
/**
* 解析地址并更新经纬度
* @param {Object} params - 参数对象
* @param {number} params.id - 账号ID
* @param {string} params.address - 地址(可选,如果不提供则使用账号中的地址)
* @returns {Promise<Object>} 解析结果
*/
async parseLocation(params) {
const { id, address } = params;
if (!id) {
throw new Error('账号ID不能为空');
}
const pla_account = db.getModel('pla_account');
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 如果提供了地址参数,使用参数中的地址;否则使用账号中的地址
const addressToParse = address || account.user_address;
if (!addressToParse || addressToParse.trim() === '') {
throw new Error('地址不能为空,请先设置用户地址');
}
try {
// 调用位置服务解析地址
const location = await locationService.getLocationByAddress(addressToParse);
if (!location || !location.lat || !location.lng) {
throw new Error('地址解析失败,未获取到经纬度信息');
}
// 更新账号的地址和经纬度
await account.update({
user_address: addressToParse,
user_longitude: String(location.lng),
user_latitude: String(location.lat)
});
return {
success: true,
message: '地址解析成功',
data: {
address: addressToParse,
longitude: location.lng,
latitude: location.lat
}
};
} catch (error) {
console.error('[账号管理] 地址解析失败:', error);
throw new Error('地址解析失败:' + (error.message || '请检查地址是否正确'));
}
}
/**
* 批量解析地址并更新经纬度
* @param {Array<number>} ids - 账号ID数组
* @returns {Promise<Object>} 批量解析结果
*/
async batchParseLocation(ids) {
if (!ids || !Array.isArray(ids) || ids.length === 0) {
throw new Error('账号ID列表不能为空');
}
const pla_account = db.getModel('pla_account');
const op = db.getModel('op');
const accounts = await pla_account.findAll({
where: {
id: { [op.in]: ids }
}
});
if (accounts.length === 0) {
throw new Error('未找到指定的账号');
}
const results = {
success: 0,
failed: 0,
details: []
};
// 逐个解析地址
for (const account of accounts) {
try {
if (!account.user_address || account.user_address.trim() === '') {
results.details.push({
id: account.id,
name: account.name,
success: false,
message: '地址为空,跳过解析'
});
results.failed++;
continue;
}
const result = await this.parseLocation({
id: account.id,
address: account.user_address
});
results.details.push({
id: account.id,
name: account.name,
success: true,
message: '解析成功',
data: result.data
});
results.success++;
} catch (error) {
results.details.push({
id: account.id,
name: account.name,
success: false,
message: error.message || '解析失败'
});
results.failed++;
}
}
return results;
}
}
// 导出单例
module.exports = new PlaAccountService();