const db = require('../DataBase/db') const path = require('path') const Logger = require('../../lib/Logger') const mq = require('.') class Worker { constructor() { this.logger = new Logger( path.join(__dirname, '../logs/Worker.log'), 'INFO' ) this.handlers = {} this.running = false // 队列名 this.taskQueue = 'task_queue' this.resultQueue = 'task_result_queue' // channel 名称(避免和别的模块冲突) this.channelName = 'worker_channel' } /** * 注册任务处理器 */ register(type, handler) { this.handlers[type] = handler this.logger.info(`注册处理器: ${type}`) } /** * 启动 Worker */ async start() { if (this.running) return this.running = true this.logger.info('Worker 启动中...') try { const channel = await mq.getChannel(this.channelName) // 控制并发(重要) await channel.prefetch(5) // 确保队列存在 await channel.assertQueue(this.taskQueue, { durable: true }) await channel.assertQueue(this.resultQueue, { durable: true }) // 开始消费 await channel.consume( this.taskQueue, async (msg) => { if (!msg) return let content try { content = JSON.parse(msg.content.toString()) } catch (err) { this.logger.error('消息解析失败: ' + err.message) channel.ack(msg) return } const { id, type, data } = content this.logger.info(`收到任务: ${id} 类型: ${type}`) const handler = this.handlers[type] if (!handler) { this.logger.error(`未找到处理器: ${type}`) channel.ack(msg) return } try { const result = await handler(data, { db, logger: this.logger }) this.logger.info(`任务完成: ${id}`) await this.sendResult(channel, { id, success: true, result }) channel.ack(msg) } catch (err) { this.logger.error(`任务失败: ${id} - ${err.stack}`) await this.sendResult(channel, { id, success: false, error: err.message }) // 简单策略:失败直接 ack(避免死循环) channel.ack(msg) } }, { noAck: false } ) this.logger.info('Worker 启动成功') } catch (err) { this.logger.error('Worker 启动失败: ' + err.stack) } } /** * 发送结果 */ async sendResult(channel, data) { try { channel.sendToQueue( this.resultQueue, Buffer.from(JSON.stringify(data)), { persistent: true } ) } catch (err) { this.logger.error('结果发送失败: ' + err.message) } } /** * 停止 Worker */ async stop() { this.running = false await mq.close() this.logger.info('Worker 已停止') } } module.exports = Worker