Worker.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. const db = require('../DataBase/db')
  2. const path = require('path')
  3. const Logger = require('../../lib/Logger')
  4. const mq = require('.')
  5. const { mq: mqName } = require('./mqPrefix')
  6. class Worker {
  7. constructor() {
  8. this.logger = new Logger(
  9. path.join(__dirname, '../logs/Worker.log'),
  10. 'INFO'
  11. )
  12. this.handlers = {}
  13. this.running = false
  14. // 队列名
  15. this.taskQueue = mqName('task_queue')
  16. this.resultQueue = mqName('task_result_queue')
  17. // channel 名称(避免和别的模块冲突)
  18. this.channelName = 'worker_channel'
  19. }
  20. /**
  21. * 注册任务处理器
  22. */
  23. register(type, handler) {
  24. this.handlers[type] = handler
  25. this.logger.info(`注册处理器: ${type}`)
  26. }
  27. /**
  28. * 启动 Worker
  29. */
  30. async start() {
  31. if (this.running) return
  32. this.running = true
  33. this.logger.info('Worker 启动中...')
  34. try {
  35. const channel = await mq.getChannel(this.channelName)
  36. // 控制并发(重要)
  37. await channel.prefetch(5)
  38. // 确保队列存在
  39. await channel.assertQueue(this.taskQueue, { durable: true })
  40. await channel.assertQueue(this.resultQueue, { durable: true })
  41. // 开始消费
  42. await channel.consume(
  43. this.taskQueue,
  44. async (msg) => {
  45. if (!msg) return
  46. let content
  47. try {
  48. content = JSON.parse(msg.content.toString())
  49. } catch (err) {
  50. this.logger.error('消息解析失败: ' + err.message)
  51. channel.ack(msg)
  52. return
  53. }
  54. const { id, type, data } = content
  55. this.logger.info(`收到任务: ${id} 类型: ${type}`)
  56. const handler = this.handlers[type]
  57. if (!handler) {
  58. this.logger.error(`未找到处理器: ${type}`)
  59. channel.ack(msg)
  60. return
  61. }
  62. try {
  63. const result = await handler(data, {
  64. db,
  65. logger: this.logger
  66. })
  67. this.logger.info(`任务完成: ${id}`)
  68. await this.sendResult(channel, {
  69. id,
  70. success: true,
  71. result
  72. })
  73. channel.ack(msg)
  74. } catch (err) {
  75. this.logger.error(`任务失败: ${id} - ${err.stack}`)
  76. await this.sendResult(channel, {
  77. id,
  78. success: false,
  79. error: err.message
  80. })
  81. // 简单策略:失败直接 ack(避免死循环)
  82. channel.ack(msg)
  83. }
  84. },
  85. {
  86. noAck: false
  87. }
  88. )
  89. this.logger.info('Worker 启动成功')
  90. } catch (err) {
  91. this.logger.error('Worker 启动失败: ' + err.stack)
  92. }
  93. }
  94. /**
  95. * 发送结果
  96. */
  97. async sendResult(channel, data) {
  98. try {
  99. channel.sendToQueue(
  100. this.resultQueue,
  101. Buffer.from(JSON.stringify(data)),
  102. { persistent: true }
  103. )
  104. } catch (err) {
  105. this.logger.error('结果发送失败: ' + err.message)
  106. }
  107. }
  108. /**
  109. * 停止 Worker
  110. */
  111. async stop() {
  112. this.running = false
  113. await mq.close()
  114. this.logger.info('Worker 已停止')
  115. }
  116. }
  117. module.exports = Worker