Worker.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. const db = require('../DataBase/db')
  2. const path = require('path')
  3. const Logger = require('../../lib/Logger')
  4. const mq = require('.')
  5. class Worker {
  6. constructor() {
  7. this.logger = new Logger(
  8. path.join(__dirname, '../logs/Worker.log'),
  9. 'INFO'
  10. )
  11. this.handlers = {}
  12. this.running = false
  13. // 队列名
  14. this.taskQueue = 'task_queue'
  15. this.resultQueue = 'task_result_queue'
  16. // channel 名称(避免和别的模块冲突)
  17. this.channelName = 'worker_channel'
  18. }
  19. /**
  20. * 注册任务处理器
  21. */
  22. register(type, handler) {
  23. this.handlers[type] = handler
  24. this.logger.info(`注册处理器: ${type}`)
  25. }
  26. /**
  27. * 启动 Worker
  28. */
  29. async start() {
  30. if (this.running) return
  31. this.running = true
  32. this.logger.info('Worker 启动中...')
  33. try {
  34. const channel = await mq.getChannel(this.channelName)
  35. channel.on('close', () => {
  36. if (!this.running) return
  37. this.logger.warn('Worker channel 已关闭,准备重启消费')
  38. this.running = false
  39. setTimeout(() => {
  40. this.start().catch((e) => {
  41. this.logger.error('重启 Worker 失败: ' + (e?.stack || e))
  42. })
  43. }, 1000)
  44. })
  45. // 控制并发(重要)
  46. await channel.prefetch(5)
  47. // 确保队列存在
  48. await channel.assertQueue(this.taskQueue, { durable: true })
  49. await channel.assertQueue(this.resultQueue, { durable: true })
  50. // 开始消费
  51. await channel.consume(
  52. this.taskQueue,
  53. async (msg) => {
  54. if (!msg) return
  55. let content
  56. try {
  57. content = JSON.parse(msg.content.toString())
  58. } catch (err) {
  59. this.logger.error('消息解析失败: ' + err.message)
  60. channel.ack(msg)
  61. return
  62. }
  63. const { id, type, data } = content
  64. this.logger.info(`收到任务: ${id} 类型: ${type}`)
  65. const handler = this.handlers[type]
  66. if (!handler) {
  67. this.logger.error(`未找到处理器: ${type}`)
  68. channel.ack(msg)
  69. return
  70. }
  71. try {
  72. const result = await handler(data, {
  73. db,
  74. logger: this.logger
  75. })
  76. this.logger.info(`任务完成: ${id}`)
  77. await this.sendResult(channel, {
  78. id,
  79. success: true,
  80. result
  81. })
  82. channel.ack(msg)
  83. } catch (err) {
  84. this.logger.error(`任务失败: ${id} - ${err.stack}`)
  85. await this.sendResult(channel, {
  86. id,
  87. success: false,
  88. error: err.message
  89. })
  90. // 简单策略:失败直接 ack(避免死循环)
  91. channel.ack(msg)
  92. }
  93. },
  94. {
  95. noAck: false
  96. }
  97. )
  98. this.logger.info('Worker 启动成功')
  99. } catch (err) {
  100. this.logger.error('Worker 启动失败: ' + err.stack)
  101. }
  102. }
  103. /**
  104. * 发送结果
  105. */
  106. async sendResult(channel, data) {
  107. try {
  108. await mq.sendToQueueSafe(
  109. this.channelName,
  110. this.resultQueue,
  111. Buffer.from(JSON.stringify(data)),
  112. { persistent: true, contentType: 'application/json' }
  113. )
  114. } catch (err) {
  115. this.logger.error('结果发送失败: ' + err.message)
  116. }
  117. }
  118. /**
  119. * 停止 Worker
  120. */
  121. async stop() {
  122. this.running = false
  123. await mq.close()
  124. this.logger.info('Worker 已停止')
  125. }
  126. }
  127. module.exports = Worker