const amqp = require('amqplib') const path = require('path') const config = require('../../config.json') const Logger = require('../../lib/Logger') class MQManager { constructor() { this.url = config.rabbitmq.url this.connection = null this.channels = new Map() this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO') this.reconnecting = false this._initPromise = null this._reconnectTimer = null this._reconnectAttempt = 0 } async init() { if (this.connection) return if (this._initPromise) return this._initPromise this._initPromise = (async () => { try { this.logger.info('RabbitMQ 初始化连接...') const conn = await amqp.connect(this.url) this.connection = conn this._reconnectAttempt = 0 conn.on('close', () => { this.logger.warn('RabbitMQ 连接断开,准备重连') this._dropConnection() this.reconnect() }) conn.on('error', (err) => { // error 事件有时会在 close 前触发;这里不要 throw,交给 close 触发的重连来恢复 this.logger.error('RabbitMQ 连接错误:', err?.message || err) }) this.logger.info('RabbitMQ 连接成功') } catch (e) { this.logger.error('RabbitMQ 初始化失败:', e?.message || e) this._dropConnection() this.reconnect() throw e } finally { this._initPromise = null } })() return this._initPromise } _dropConnection() { this.connection = null // 旧的 channel 失效,直接清空;调用方需重新 getChannel this.channels.clear() } async reconnect() { if (this.reconnecting) return this.reconnecting = true const attempt = ++this._reconnectAttempt const delayMs = Math.min(30000, 1000 * Math.pow(2, Math.min(attempt, 5))) // 2s..32s capped if (this._reconnectTimer) { clearTimeout(this._reconnectTimer) this._reconnectTimer = null } this._reconnectTimer = setTimeout(async () => { try { await this.init() } catch { // init() 内已记录日志并触发下一次 reconnect } finally { this.reconnecting = false } }, delayMs) } async getChannel(name = 'default') { if (!this.connection) { await this.init() } if (this.channels.has(name)) { return this.channels.get(name) } const channel = await this.connection.createChannel() this.channels.set(name, channel) channel.on('close', () => { this.logger.warn(`Channel [${name}] 已关闭`) this.channels.delete(name) }) channel.on('error', (err) => { // channel error 也不应抛出到未捕获异常链路 this.logger.warn(`Channel [${name}] 错误: ${err?.message || err}`) }) return channel } isChannelClosedError(err) { if (!err) return false const msg = String(err.message || err).toLowerCase() return msg.includes('channel closed') || msg.includes('illegaloperationerror') } /** * 安全投递:遇到断线/Channel closed 自动等待重连并重试。 * 注意:只适用于“投递端”;消费端需要重新 consume(业务层处理)。 */ async sendToQueueSafe(channelName, queue, content, options = {}) { let lastErr for (let i = 0; i < 3; i++) { try { const ch = await this.getChannel(channelName) return ch.sendToQueue(queue, content, options) } catch (e) { lastErr = e if (!this.isChannelClosedError(e)) throw e // 让下次循环重新取 channel(已在 close 时 delete,但保险起见这里也删) this.channels.delete(channelName) // 如果连接也掉了,触发重连并等待一小会 this.reconnect() await new Promise((r) => setTimeout(r, 500 * (i + 1))) } } throw lastErr } async close() { for (const ch of this.channels.values()) { await ch.close() } this.channels.clear() if (this.connection) { await this.connection.close() this.connection = null } } } module.exports = new MQManager()