| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- const amqp = require('amqplib')
- const path = require('path')
- const config = require('../../config.json')
- const Logger = require('../../lib/Logger')
- class MQManager {
- constructor() {
- this.url = config.rabbitmq.url
- this.connection = null
- this.channels = new Map()
- this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
- this.reconnecting = false
- this._initPromise = null
- this._reconnectTimer = null
- this._reconnectAttempt = 0
- }
- async init() {
- if (this.connection) return
- if (this._initPromise) return this._initPromise
- this._initPromise = (async () => {
- try {
- this.logger.info('RabbitMQ 初始化连接...')
- const conn = await amqp.connect(this.url)
- this.connection = conn
- this._reconnectAttempt = 0
- conn.on('close', () => {
- this.logger.warn('RabbitMQ 连接断开,准备重连')
- this._dropConnection()
- this.reconnect()
- })
- conn.on('error', (err) => {
- // error 事件有时会在 close 前触发;这里不要 throw,交给 close 触发的重连来恢复
- this.logger.error('RabbitMQ 连接错误:', err?.message || err)
- })
- this.logger.info('RabbitMQ 连接成功')
- } catch (e) {
- this.logger.error('RabbitMQ 初始化失败:', e?.message || e)
- this._dropConnection()
- this.reconnect()
- throw e
- } finally {
- this._initPromise = null
- }
- })()
- return this._initPromise
- }
- _dropConnection() {
- this.connection = null
- // 旧的 channel 失效,直接清空;调用方需重新 getChannel
- this.channels.clear()
- }
- async reconnect() {
- if (this.reconnecting) return
- this.reconnecting = true
- const attempt = ++this._reconnectAttempt
- const delayMs = Math.min(30000, 1000 * Math.pow(2, Math.min(attempt, 5))) // 2s..32s capped
- if (this._reconnectTimer) {
- clearTimeout(this._reconnectTimer)
- this._reconnectTimer = null
- }
- this._reconnectTimer = setTimeout(async () => {
- try {
- await this.init()
- } catch {
- // init() 内已记录日志并触发下一次 reconnect
- } finally {
- this.reconnecting = false
- }
- }, delayMs)
- }
- async getChannel(name = 'default') {
- if (!this.connection) {
- await this.init()
- }
- if (this.channels.has(name)) {
- return this.channels.get(name)
- }
- const channel = await this.connection.createChannel()
- this.channels.set(name, channel)
- channel.on('close', () => {
- this.logger.warn(`Channel [${name}] 已关闭`)
- this.channels.delete(name)
- })
- channel.on('error', (err) => {
- // channel error 也不应抛出到未捕获异常链路
- this.logger.warn(`Channel [${name}] 错误: ${err?.message || err}`)
- })
- return channel
- }
- isChannelClosedError(err) {
- if (!err) return false
- const msg = String(err.message || err).toLowerCase()
- return msg.includes('channel closed') || msg.includes('illegaloperationerror')
- }
- /**
- * 安全投递:遇到断线/Channel closed 自动等待重连并重试。
- * 注意:只适用于“投递端”;消费端需要重新 consume(业务层处理)。
- */
- async sendToQueueSafe(channelName, queue, content, options = {}) {
- let lastErr
- for (let i = 0; i < 3; i++) {
- try {
- const ch = await this.getChannel(channelName)
- return ch.sendToQueue(queue, content, options)
- } catch (e) {
- lastErr = e
- if (!this.isChannelClosedError(e)) throw e
- // 让下次循环重新取 channel(已在 close 时 delete,但保险起见这里也删)
- this.channels.delete(channelName)
- // 如果连接也掉了,触发重连并等待一小会
- this.reconnect()
- await new Promise((r) => setTimeout(r, 500 * (i + 1)))
- }
- }
- throw lastErr
- }
- async close() {
- for (const ch of this.channels.values()) {
- await ch.close()
- }
- this.channels.clear()
- if (this.connection) {
- await this.connection.close()
- this.connection = null
- }
- }
- }
- module.exports = new MQManager()
|