index.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. const amqp = require('amqplib')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const Logger = require('../../lib/Logger')
  5. class MQManager {
  6. constructor() {
  7. this.url = config.rabbitmq.url
  8. this.connection = null
  9. this.channels = new Map()
  10. this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
  11. this.reconnecting = false
  12. this._initPromise = null
  13. this._reconnectTimer = null
  14. this._reconnectAttempt = 0
  15. }
  16. async init() {
  17. if (this.connection) return
  18. if (this._initPromise) return this._initPromise
  19. this._initPromise = (async () => {
  20. try {
  21. this.logger.info('RabbitMQ 初始化连接...')
  22. const conn = await amqp.connect(this.url)
  23. this.connection = conn
  24. this._reconnectAttempt = 0
  25. conn.on('close', () => {
  26. this.logger.warn('RabbitMQ 连接断开,准备重连')
  27. this._dropConnection()
  28. this.reconnect()
  29. })
  30. conn.on('error', (err) => {
  31. // error 事件有时会在 close 前触发;这里不要 throw,交给 close 触发的重连来恢复
  32. this.logger.error('RabbitMQ 连接错误:', err?.message || err)
  33. })
  34. this.logger.info('RabbitMQ 连接成功')
  35. } catch (e) {
  36. this.logger.error('RabbitMQ 初始化失败:', e?.message || e)
  37. this._dropConnection()
  38. this.reconnect()
  39. throw e
  40. } finally {
  41. this._initPromise = null
  42. }
  43. })()
  44. return this._initPromise
  45. }
  46. _dropConnection() {
  47. this.connection = null
  48. // 旧的 channel 失效,直接清空;调用方需重新 getChannel
  49. this.channels.clear()
  50. }
  51. async reconnect() {
  52. if (this.reconnecting) return
  53. this.reconnecting = true
  54. const attempt = ++this._reconnectAttempt
  55. const delayMs = Math.min(30000, 1000 * Math.pow(2, Math.min(attempt, 5))) // 2s..32s capped
  56. if (this._reconnectTimer) {
  57. clearTimeout(this._reconnectTimer)
  58. this._reconnectTimer = null
  59. }
  60. this._reconnectTimer = setTimeout(async () => {
  61. try {
  62. await this.init()
  63. } catch {
  64. // init() 内已记录日志并触发下一次 reconnect
  65. } finally {
  66. this.reconnecting = false
  67. }
  68. }, delayMs)
  69. }
  70. async getChannel(name = 'default') {
  71. if (!this.connection) {
  72. await this.init()
  73. }
  74. if (this.channels.has(name)) {
  75. return this.channels.get(name)
  76. }
  77. const channel = await this.connection.createChannel()
  78. this.channels.set(name, channel)
  79. channel.on('close', () => {
  80. this.logger.warn(`Channel [${name}] 已关闭`)
  81. this.channels.delete(name)
  82. })
  83. channel.on('error', (err) => {
  84. // channel error 也不应抛出到未捕获异常链路
  85. this.logger.warn(`Channel [${name}] 错误: ${err?.message || err}`)
  86. })
  87. return channel
  88. }
  89. isChannelClosedError(err) {
  90. if (!err) return false
  91. const msg = String(err.message || err).toLowerCase()
  92. return msg.includes('channel closed') || msg.includes('illegaloperationerror')
  93. }
  94. /**
  95. * 安全投递:遇到断线/Channel closed 自动等待重连并重试。
  96. * 注意:只适用于“投递端”;消费端需要重新 consume(业务层处理)。
  97. */
  98. async sendToQueueSafe(channelName, queue, content, options = {}) {
  99. let lastErr
  100. for (let i = 0; i < 3; i++) {
  101. try {
  102. const ch = await this.getChannel(channelName)
  103. return ch.sendToQueue(queue, content, options)
  104. } catch (e) {
  105. lastErr = e
  106. if (!this.isChannelClosedError(e)) throw e
  107. // 让下次循环重新取 channel(已在 close 时 delete,但保险起见这里也删)
  108. this.channels.delete(channelName)
  109. // 如果连接也掉了,触发重连并等待一小会
  110. this.reconnect()
  111. await new Promise((r) => setTimeout(r, 500 * (i + 1)))
  112. }
  113. }
  114. throw lastErr
  115. }
  116. async close() {
  117. for (const ch of this.channels.values()) {
  118. await ch.close()
  119. }
  120. this.channels.clear()
  121. if (this.connection) {
  122. await this.connection.close()
  123. this.connection = null
  124. }
  125. }
  126. }
  127. module.exports = new MQManager()