index.js 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. const amqp = require('amqplib')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const Logger = require('../../lib/Logger')
  5. const { mq } = require('./mqPrefix')
  6. class MQManager {
  7. constructor() {
  8. this.url = config.rabbitmq.url
  9. this.connection = null
  10. this.channels = new Map()
  11. this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
  12. this.reconnecting = false
  13. }
  14. async init() {
  15. if (this.connection) return
  16. try {
  17. this.logger.info('RabbitMQ 初始化连接...')
  18. this.connection = await amqp.connect(this.url)
  19. this.connection.on('close', () => {
  20. this.logger.warn('RabbitMQ 连接断开,准备重连')
  21. this.connection = null
  22. this.channels.clear()
  23. this.reconnect()
  24. })
  25. this.connection.on('error', (err) => {
  26. this.logger.error('RabbitMQ 连接错误:', err.message)
  27. })
  28. this.logger.info('RabbitMQ 连接成功')
  29. } catch (e) {
  30. this.logger.error('RabbitMQ 初始化失败:', e.message)
  31. this.reconnect()
  32. throw e
  33. }
  34. }
  35. async reconnect() {
  36. if (this.reconnecting) return
  37. this.reconnecting = true
  38. setTimeout(async () => {
  39. try {
  40. await this.init()
  41. this.reconnecting = false
  42. } catch {
  43. this.reconnecting = false
  44. }
  45. }, 5000)
  46. }
  47. async getChannel(name = 'default') {
  48. if (!this.connection) {
  49. await this.init()
  50. }
  51. const key = mq(name)
  52. if (this.channels.has(key)) {
  53. return this.channels.get(key)
  54. }
  55. const channel = await this.connection.createChannel()
  56. this.channels.set(key, channel)
  57. channel.on('close', () => {
  58. this.logger.warn(`Channel [${key}] 已关闭`)
  59. this.channels.delete(key)
  60. })
  61. return channel
  62. }
  63. async close() {
  64. for (const ch of this.channels.values()) {
  65. await ch.close()
  66. }
  67. this.channels.clear()
  68. if (this.connection) {
  69. await this.connection.close()
  70. this.connection = null
  71. }
  72. }
  73. }
  74. module.exports = new MQManager()