const amqp = require('amqplib') const path = require('path') const config = require('../../config.json') const Logger = require('../../lib/Logger') const { mq } = require('./mqPrefix') class MQManager { constructor() { this.url = config.rabbitmq.url this.connection = null this.channels = new Map() this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO') this.reconnecting = false } async init() { if (this.connection) return try { this.logger.info('RabbitMQ 初始化连接...') this.connection = await amqp.connect(this.url) this.connection.on('close', () => { this.logger.warn('RabbitMQ 连接断开,准备重连') this.connection = null this.channels.clear() this.reconnect() }) this.connection.on('error', (err) => { this.logger.error('RabbitMQ 连接错误:', err.message) }) this.logger.info('RabbitMQ 连接成功') } catch (e) { this.logger.error('RabbitMQ 初始化失败:', e.message) this.reconnect() throw e } } async reconnect() { if (this.reconnecting) return this.reconnecting = true setTimeout(async () => { try { await this.init() this.reconnecting = false } catch { this.reconnecting = false } }, 5000) } async getChannel(name = 'default') { if (!this.connection) { await this.init() } const key = mq(name) if (this.channels.has(key)) { return this.channels.get(key) } const channel = await this.connection.createChannel() this.channels.set(key, channel) channel.on('close', () => { this.logger.warn(`Channel [${key}] 已关闭`) this.channels.delete(key) }) return channel } async close() { for (const ch of this.channels.values()) { await ch.close() } this.channels.clear() if (this.connection) { await this.connection.close() this.connection = null } } } module.exports = new MQManager()