| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- const amqp = require('amqplib')
- const path = require('path')
- const config = require('../../config.json')
- const Logger = require('../../lib/Logger')
- const { mq } = require('./mqPrefix')
- class MQManager {
- constructor() {
- this.url = config.rabbitmq.url
- this.connection = null
- this.channels = new Map()
- this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
- this.reconnecting = false
- }
- async init() {
- if (this.connection) return
- try {
- this.logger.info('RabbitMQ 初始化连接...')
- this.connection = await amqp.connect(this.url)
- this.connection.on('close', () => {
- this.logger.warn('RabbitMQ 连接断开,准备重连')
- this.connection = null
- this.channels.clear()
- this.reconnect()
- })
- this.connection.on('error', (err) => {
- this.logger.error('RabbitMQ 连接错误:', err.message)
- })
- this.logger.info('RabbitMQ 连接成功')
- } catch (e) {
- this.logger.error('RabbitMQ 初始化失败:', e.message)
- this.reconnect()
- throw e
- }
- }
- async reconnect() {
- if (this.reconnecting) return
- this.reconnecting = true
- setTimeout(async () => {
- try {
- await this.init()
- this.reconnecting = false
- } catch {
- this.reconnecting = false
- }
- }, 5000)
- }
- async getChannel(name = 'default') {
- if (!this.connection) {
- await this.init()
- }
- const key = mq(name)
- if (this.channels.has(key)) {
- return this.channels.get(key)
- }
- const channel = await this.connection.createChannel()
- this.channels.set(key, channel)
- channel.on('close', () => {
- this.logger.warn(`Channel [${key}] 已关闭`)
- this.channels.delete(key)
- })
- return channel
- }
- async close() {
- for (const ch of this.channels.values()) {
- await ch.close()
- }
- this.channels.clear()
- if (this.connection) {
- await this.connection.close()
- this.connection = null
- }
- }
- }
- module.exports = new MQManager()
|