| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- const db = require('../DataBase/db')
- const path = require('path')
- const Logger = require('../../lib/Logger')
- const mq = require('.')
- const { mq: mqName } = require('./mqPrefix')
- class Worker {
- constructor() {
- this.logger = new Logger(
- path.join(__dirname, '../logs/Worker.log'),
- 'INFO'
- )
- this.handlers = {}
- this.running = false
- // 队列名
- this.taskQueue = mqName('task_queue')
- this.resultQueue = mqName('task_result_queue')
- // channel 名称(避免和别的模块冲突)
- this.channelName = 'worker_channel'
- }
- /**
- * 注册任务处理器
- */
- register(type, handler) {
- this.handlers[type] = handler
- this.logger.info(`注册处理器: ${type}`)
- }
- /**
- * 启动 Worker
- */
- async start() {
- if (this.running) return
- this.running = true
- this.logger.info('Worker 启动中...')
- try {
- const channel = await mq.getChannel(this.channelName)
- // 控制并发(重要)
- await channel.prefetch(5)
- // 确保队列存在
- await channel.assertQueue(this.taskQueue, { durable: true })
- await channel.assertQueue(this.resultQueue, { durable: true })
- // 开始消费
- await channel.consume(
- this.taskQueue,
- async (msg) => {
- if (!msg) return
- let content
- try {
- content = JSON.parse(msg.content.toString())
- } catch (err) {
- this.logger.error('消息解析失败: ' + err.message)
- channel.ack(msg)
- return
- }
- const { id, type, data } = content
- this.logger.info(`收到任务: ${id} 类型: ${type}`)
- const handler = this.handlers[type]
- if (!handler) {
- this.logger.error(`未找到处理器: ${type}`)
- channel.ack(msg)
- return
- }
- try {
- const result = await handler(data, {
- db,
- logger: this.logger
- })
- this.logger.info(`任务完成: ${id}`)
- await this.sendResult(channel, {
- id,
- success: true,
- result
- })
- channel.ack(msg)
- } catch (err) {
- this.logger.error(`任务失败: ${id} - ${err.stack}`)
- await this.sendResult(channel, {
- id,
- success: false,
- error: err.message
- })
- // 简单策略:失败直接 ack(避免死循环)
- channel.ack(msg)
- }
- },
- {
- noAck: false
- }
- )
- this.logger.info('Worker 启动成功')
- } catch (err) {
- this.logger.error('Worker 启动失败: ' + err.stack)
- }
- }
- /**
- * 发送结果
- */
- async sendResult(channel, data) {
- try {
- channel.sendToQueue(
- this.resultQueue,
- Buffer.from(JSON.stringify(data)),
- { persistent: true }
- )
- } catch (err) {
- this.logger.error('结果发送失败: ' + err.message)
- }
- }
- /**
- * 停止 Worker
- */
- async stop() {
- this.running = false
- await mq.close()
- this.logger.info('Worker 已停止')
- }
- }
- module.exports = Worker
|