Files
autoAiWorkSys/api/services/pla_account_service.js
张成 4443d43ec1 1
2025-12-15 22:03:01 +08:00

696 lines
21 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* 平台账号服务
* 处理平台账号相关的业务逻辑
*/
const db = require('../middleware/dbProxy');
const scheduleManager = require('../middleware/schedule/index.js');
const locationService = require('./locationService');
const authorizationService = require('./authorization_service');
class PlaAccountService {
/**
* 根据ID获取账号信息
* @param {number} id - 账号ID
* @returns {Promise<Object>} 账号信息
*/
async getAccountById(id) {
const pla_account = db.getModel('pla_account');
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
const accountData = account.get({ plain: true });
// 移除 device_status 依赖,在线状态和登录状态设为默认值
accountData.is_online = false;
accountData.is_logged_in = false;
return accountData;
}
/**
* 根据设备SN码获取账号信息
* @param {string} sn_code - 设备SN码
* @returns {Promise<Object>} 账号信息
*/
async getAccountBySnCode(sn_code) {
const pla_account = db.getModel('pla_account');
if (!sn_code) {
throw new Error('设备SN码不能为空');
}
// 根据 sn_code 查询账号,排除已删除的账号
const account = await pla_account.findOne({
where: {
sn_code: sn_code,
is_delete: 0
}
});
if (!account) {
throw new Error('账号不存在');
}
const accountData = account.get({ plain: true });
// 移除 device_status 依赖,在线状态和登录状态设为默认值
accountData.is_online = false;
accountData.is_logged_in = false;
return accountData;
}
/**
* 获取账号列表
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 账号列表
*/
async getAccountList(params) {
const pla_account = db.getModel('pla_account');
const op = db.getModel('op');
const { key, value, platform_type, is_online, limit, offset } = params;
const where = { is_delete: 0 };
// 搜索条件
if (key && value) {
// 对于 sn_code 使用精确匹配,其他字段使用模糊匹配
if (key === 'sn_code') {
where[key] = value;
} else {
where[key] = { [op.like]: `%${value}%` };
}
}
// 平台筛选
if (platform_type) {
where.platform_type = platform_type;
}
// 移除 device_status 依赖is_online 筛选功能暂时禁用
// 如果需要在线状态筛选,可以在 pla_account 表中添加相应字段
const result = await pla_account.findAndCountAll({
where,
limit,
offset,
order: [['id', 'DESC']]
});
// 处理返回数据is_online 设为默认值 false
const rows = result.rows.map(account => {
const accountData = account.get({ plain: true });
accountData.is_online = false;
return accountData;
});
return {
count: result.count,
rows
};
}
/**
* 创建账号
* @param {Object} data - 账号数据
* @returns {Promise<Object>} 创建的账号
*/
async createAccount(data) {
const pla_account = db.getModel('pla_account');
const { name, sn_code, platform_type, login_name, pwd, keyword, ...otherData } = data;
if (!name || !sn_code || !platform_type || !login_name) {
throw new Error('账户名、设备SN码、平台和登录名为必填项');
}
// 将布尔字段从 true/false 转换为 0/1
const booleanFields = ['auto_deliver', 'auto_chat', 'auto_reply', 'auto_active'];
const processedData = {
name,
sn_code,
platform_type,
login_name,
pwd: pwd || '',
keyword: keyword || '',
...otherData
};
booleanFields.forEach(field => {
if (processedData[field] !== undefined && processedData[field] !== null) {
processedData[field] = processedData[field] ? 1 : 0;
}
});
const account = await pla_account.create(processedData);
return account;
}
/**
* 更新账号信息
* @param {number} id - 账号ID
* @param {Object} updateData - 更新的数据
* @returns {Promise<void>}
*/
async updateAccount(id, updateData) {
const pla_account = db.getModel('pla_account');
if (!id) {
throw new Error('账号ID不能为空');
}
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 将布尔字段从 true/false 转换为 0/1确保数据库兼容性
const booleanFields = ['auto_deliver', 'auto_chat', 'auto_reply', 'auto_active'];
const processedData = { ...updateData };
booleanFields.forEach(field => {
if (processedData[field] !== undefined && processedData[field] !== null) {
processedData[field] = processedData[field] ? 1 : 0;
}
});
await pla_account.update(processedData, { where: { id } });
}
/**
* 删除账号(软删除)
* @param {number} id - 账号ID
* @returns {Promise<void>}
*/
async deleteAccount(id) {
const pla_account = db.getModel('pla_account');
if (!id) {
throw new Error('账号ID不能为空');
}
// 软删除
const result = await pla_account.update(
{ is_delete: 1 },
{ where: { id } }
);
if (result[0] === 0) {
throw new Error('账号不存在');
}
}
/**
* 获取账号的任务列表
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 任务列表
*/
async getAccountTasks(params) {
const pla_account = db.getModel('pla_account');
const task_status = db.getModel('task_status');
const { id, limit, offset } = params;
if (!id) {
throw new Error('账号ID不能为空');
}
// 先获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 通过 sn_code 查询任务列表
const result = await task_status.findAndCountAll({
where: {
sn_code: account.sn_code
},
limit,
offset,
order: [['id', 'DESC']]
});
return {
count: result.count,
rows: result.rows
};
}
/**
* 获取账号的指令列表
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 指令列表
*/
async getAccountCommands(params) {
const pla_account = db.getModel('pla_account');
const task_commands = db.getModel('task_commands');
const Sequelize = require('sequelize');
const { id, limit, offset } = params;
if (!id) {
throw new Error('账号ID不能为空');
}
// 先获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 获取 sequelize 实例
const sequelize = task_commands.sequelize;
// 使用原生 SQL JOIN 查询
const countSql = `
SELECT COUNT(DISTINCT tc.id) as count
FROM task_commands tc
INNER JOIN task_status ts ON tc.task_id = ts.id
WHERE ts.sn_code = :sn_code
`;
const dataSql = `
SELECT tc.*
FROM task_commands tc
INNER JOIN task_status ts ON tc.task_id = ts.id
WHERE ts.sn_code = :sn_code
ORDER BY tc.id DESC
LIMIT :limit OFFSET :offset
`;
// 并行执行查询和计数
const [countResult, dataResult] = await Promise.all([
sequelize.query(countSql, {
replacements: { sn_code: account.sn_code },
type: Sequelize.QueryTypes.SELECT
}),
sequelize.query(dataSql, {
replacements: {
sn_code: account.sn_code,
limit: limit,
offset: offset
},
type: Sequelize.QueryTypes.SELECT
})
]);
const count = countResult[0]?.count || 0;
// 将原始数据转换为 Sequelize 模型实例
const rows = dataResult.map(row => {
return task_commands.build(row, { isNewRecord: false });
});
return {
count: parseInt(count),
rows: rows
};
}
/**
* 执行账号指令
* @param {Object} params - 指令参数
* @returns {Promise<Object>} 执行结果
*/
async runCommand(params) {
const pla_account = db.getModel('pla_account');
const task_status = db.getModel('task_status');
const { id, commandType, commandName, commandParams } = params;
if (!id || !commandType) {
throw new Error('账号ID和指令类型不能为空');
}
// 获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 检查授权状态
const authCheck = await authorizationService.checkAuthorization(id, 'id');
if (!authCheck.is_authorized) {
throw new Error(authCheck.message);
}
// 移除 device_status 依赖,在线状态检查暂时移除
// 如果需要在线状态检查,可以在 pla_account 表中添加相应字段
// 获取调度管理器并执行指令
if (!scheduleManager.mqttClient) {
throw new Error('MQTT客户端未初始化');
}
// 将驼峰格式转换为下划线格式用于jobManager方法调用
const toSnakeCase = (str) => {
// 如果已经是下划线格式,直接返回
if (str.includes('_')) {
return str;
}
// 驼峰转下划线
return str.replace(/([A-Z])/g, '_$1').toLowerCase();
};
// 直接使用commandType转换为下划线格式作为command_type
const commandTypeSnake = toSnakeCase(commandType);
// 构建基础参数
const baseParams = {
sn_code: account.sn_code,
platform: account.platform_type
};
// 合并参数commandParams优先
const finalParams = commandParams
? { ...baseParams, ...commandParams }
: baseParams;
// 如果有关键词相关的操作,添加关键词
if (['search_jobs', 'get_job_list'].includes(commandTypeSnake) && account.keyword) {
finalParams.keyword = account.keyword;
}
// 构建指令对象
const command = {
command_type: commandTypeSnake,
command_name: commandName || commandType,
command_params: JSON.stringify(finalParams)
};
// 创建任务记录
const task = await task_status.create({
sn_code: account.sn_code,
taskType: commandTypeSnake,
taskName: commandName || commandType,
taskParams: JSON.stringify(finalParams)
});
// 直接执行指令
const result = await scheduleManager.command.executeCommand(task.id, command, scheduleManager.mqttClient);
return result;
}
/**
* 获取指令详情
* @param {Object} params - 查询参数
* @returns {Promise<Object>} 指令详情
*/
async getCommandDetail(params) {
const task_commands = db.getModel('task_commands');
const { accountId, commandId } = params;
if (!accountId || !commandId) {
throw new Error('账号ID和指令ID不能为空');
}
// 查询指令详情
const command = await task_commands.findByPk(commandId);
if (!command) {
throw new Error('指令不存在');
}
// 解析JSON字段
const commandDetail = command.toJSON();
try {
if (commandDetail.command_params) {
commandDetail.command_params = JSON.parse(commandDetail.command_params);
}
if (commandDetail.result) {
commandDetail.result = JSON.parse(commandDetail.result);
}
} catch (error) {
console.warn('解析指令详情失败:', error);
}
return commandDetail;
}
/**
* 执行账号任务(旧接口兼容)
* @param {Object} params - 任务参数
* @returns {Promise<Object>} 执行结果
*/
async runTask(params) {
const pla_account = db.getModel('pla_account');
const { id, taskType, taskName } = params;
if (!id || !taskType) {
throw new Error('账号ID和指令类型不能为空');
}
// 获取账号信息
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 检查账号是否启用
if (!account.is_enabled) {
throw new Error('账号未启用,无法执行指令');
}
// 移除 device_status 依赖,在线状态检查暂时移除
// 如果需要在线状态检查,可以在 pla_account 表中添加相应字段
// 获取调度管理器并执行指令
if (!scheduleManager.mqttClient) {
throw new Error('MQTT客户端未初始化');
}
await scheduleManager.taskQueue.addTask(account.sn_code, {
taskType: taskType,
taskName: `手动任务 - ${taskName}`,
taskParams: {
keyword: account.keyword,
platform: account.platform_type
}
});
return {
message: '任务已添加到队列',
taskId: task.id
};
}
/**
* 停止账号的所有任务
* @param {Object} params - 参数对象
* @param {number} params.id - 账号ID
* @param {string} params.sn_code - 设备SN码
* @returns {Promise<Object>} 停止结果
*/
async stopTasks(params) {
const { id, sn_code } = params;
if (!id && !sn_code) {
throw new Error('账号ID或设备SN码不能为空');
}
const pla_account = db.getModel('pla_account');
const task_status = db.getModel('task_status');
const scheduleManager = require('../middleware/schedule/index.js');
// 获取账号信息
let account;
if (id) {
account = await pla_account.findByPk(id);
} else if (sn_code) {
account = await pla_account.findOne({ where: { sn_code } });
}
if (!account) {
throw new Error('账号不存在');
}
const deviceSnCode = account.sn_code;
// 1. 从任务队列中取消该设备的所有待执行任务
const taskQueue = scheduleManager.taskQueue;
let cancelledCount = 0;
if (taskQueue && typeof taskQueue.cancelDeviceTasks === 'function') {
cancelledCount = await taskQueue.cancelDeviceTasks(deviceSnCode);
} else {
// 如果没有 cancelDeviceTasks 方法,手动取消
const tasks = await task_status.findAll({
where: {
sn_code: deviceSnCode,
status: ['pending', 'running']
}
});
for (const task of tasks) {
try {
if (taskQueue && typeof taskQueue.cancelTask === 'function') {
await taskQueue.cancelTask(task.id);
cancelledCount++;
} else {
// 直接更新数据库
await task_status.update(
{
status: 'cancelled',
endTime: new Date()
},
{ where: { id: task.id } }
);
cancelledCount++;
}
} catch (error) {
console.error(`[停止任务] 取消任务 ${task.id} 失败:`, error);
}
}
}
return {
success: true,
message: `已停止 ${cancelledCount} 个任务`,
cancelledCount: cancelledCount,
sn_code: deviceSnCode
};
}
/**
* 解析地址并更新经纬度
* @param {Object} params - 参数对象
* @param {number} params.id - 账号ID
* @param {string} params.address - 地址(可选,如果不提供则使用账号中的地址)
* @returns {Promise<Object>} 解析结果
*/
async parseLocation(params) {
const { id, address } = params;
if (!id) {
throw new Error('账号ID不能为空');
}
const pla_account = db.getModel('pla_account');
const account = await pla_account.findByPk(id);
if (!account) {
throw new Error('账号不存在');
}
// 如果提供了地址参数,使用参数中的地址;否则使用账号中的地址
const addressToParse = address || account.user_address;
if (!addressToParse || addressToParse.trim() === '') {
throw new Error('地址不能为空,请先设置用户地址');
}
try {
// 调用位置服务解析地址
const location = await locationService.getLocationByAddress(addressToParse);
if (!location || !location.lat || !location.lng) {
throw new Error('地址解析失败,未获取到经纬度信息');
}
// 更新账号的地址和经纬度
await account.update({
user_address: addressToParse,
user_longitude: String(location.lng),
user_latitude: String(location.lat)
});
return {
success: true,
message: '地址解析成功',
data: {
address: addressToParse,
longitude: location.lng,
latitude: location.lat
}
};
} catch (error) {
console.error('[账号管理] 地址解析失败:', error);
throw new Error('地址解析失败:' + (error.message || '请检查地址是否正确'));
}
}
/**
* 批量解析地址并更新经纬度
* @param {Array<number>} ids - 账号ID数组
* @returns {Promise<Object>} 批量解析结果
*/
async batchParseLocation(ids) {
if (!ids || !Array.isArray(ids) || ids.length === 0) {
throw new Error('账号ID列表不能为空');
}
const pla_account = db.getModel('pla_account');
const op = db.getModel('op');
const accounts = await pla_account.findAll({
where: {
id: { [op.in]: ids }
}
});
if (accounts.length === 0) {
throw new Error('未找到指定的账号');
}
const results = {
success: 0,
failed: 0,
details: []
};
// 逐个解析地址
for (const account of accounts) {
try {
if (!account.user_address || account.user_address.trim() === '') {
results.details.push({
id: account.id,
name: account.name,
success: false,
message: '地址为空,跳过解析'
});
results.failed++;
continue;
}
const result = await this.parseLocation({
id: account.id,
address: account.user_address
});
results.details.push({
id: account.id,
name: account.name,
success: true,
message: '解析成功',
data: result.data
});
results.success++;
} catch (error) {
results.details.push({
id: account.id,
name: account.name,
success: false,
message: error.message || '解析失败'
});
results.failed++;
}
}
return results;
}
}
// 导出单例
module.exports = new PlaAccountService();