Worker.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. const db = require('../DataBase/db')
  2. const path = require('path')
  3. const Logger = require('../../lib/Logger')
  4. const mq = require('.')
  5. class Worker {
  6. constructor() {
  7. this.logger = new Logger(
  8. path.join(__dirname, '../logs/Worker.log'),
  9. 'INFO'
  10. )
  11. this.handlers = {}
  12. this.running = false
  13. // 队列名
  14. this.taskQueue = 'task_queue'
  15. this.resultQueue = 'task_result_queue'
  16. // channel 名称(避免和别的模块冲突)
  17. this.channelName = 'worker_channel'
  18. }
  19. /**
  20. * 注册任务处理器
  21. */
  22. register(type, handler) {
  23. this.handlers[type] = handler
  24. this.logger.info(`注册处理器: ${type}`)
  25. }
  26. /**
  27. * 启动 Worker
  28. */
  29. async start() {
  30. if (this.running) return
  31. this.running = true
  32. this.logger.info('Worker 启动中...')
  33. try {
  34. const channel = await mq.getChannel(this.channelName)
  35. // 控制并发(重要)
  36. await channel.prefetch(5)
  37. // 确保队列存在
  38. await channel.assertQueue(this.taskQueue, { durable: true })
  39. await channel.assertQueue(this.resultQueue, { durable: true })
  40. // 开始消费
  41. await channel.consume(
  42. this.taskQueue,
  43. async (msg) => {
  44. if (!msg) return
  45. let content
  46. try {
  47. content = JSON.parse(msg.content.toString())
  48. } catch (err) {
  49. this.logger.error('消息解析失败: ' + err.message)
  50. channel.ack(msg)
  51. return
  52. }
  53. const { id, type, data } = content
  54. this.logger.info(`收到任务: ${id} 类型: ${type}`)
  55. const handler = this.handlers[type]
  56. if (!handler) {
  57. this.logger.error(`未找到处理器: ${type}`)
  58. channel.ack(msg)
  59. return
  60. }
  61. try {
  62. const result = await handler(data, {
  63. db,
  64. logger: this.logger
  65. })
  66. this.logger.info(`任务完成: ${id}`)
  67. await this.sendResult(channel, {
  68. id,
  69. success: true,
  70. result
  71. })
  72. channel.ack(msg)
  73. } catch (err) {
  74. this.logger.error(`任务失败: ${id} - ${err.stack}`)
  75. await this.sendResult(channel, {
  76. id,
  77. success: false,
  78. error: err.message
  79. })
  80. // 简单策略:失败直接 ack(避免死循环)
  81. channel.ack(msg)
  82. }
  83. },
  84. {
  85. noAck: false
  86. }
  87. )
  88. this.logger.info('Worker 启动成功')
  89. } catch (err) {
  90. this.logger.error('Worker 启动失败: ' + err.stack)
  91. }
  92. }
  93. /**
  94. * 发送结果
  95. */
  96. async sendResult(channel, data) {
  97. try {
  98. channel.sendToQueue(
  99. this.resultQueue,
  100. Buffer.from(JSON.stringify(data)),
  101. { persistent: true }
  102. )
  103. } catch (err) {
  104. this.logger.error('结果发送失败: ' + err.message)
  105. }
  106. }
  107. /**
  108. * 停止 Worker
  109. */
  110. async stop() {
  111. this.running = false
  112. await mq.close()
  113. this.logger.info('Worker 已停止')
  114. }
  115. }
  116. module.exports = Worker