index.js 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. const amqp = require('amqplib')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const Logger = require('../../lib/Logger')
  5. class MQManager {
  6. constructor() {
  7. this.url = config.rabbitmq.url
  8. this.connection = null
  9. this.channels = new Map()
  10. this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
  11. this.reconnecting = false
  12. }
  13. async init() {
  14. if (this.connection) return
  15. try {
  16. this.logger.info('RabbitMQ 初始化连接...')
  17. this.connection = await amqp.connect(this.url)
  18. this.connection.on('close', () => {
  19. this.logger.warn('RabbitMQ 连接断开,准备重连')
  20. this.connection = null
  21. this.channels.clear()
  22. this.reconnect()
  23. })
  24. this.connection.on('error', (err) => {
  25. this.logger.error('RabbitMQ 连接错误:', err.message)
  26. })
  27. this.logger.info('RabbitMQ 连接成功')
  28. } catch (e) {
  29. this.logger.error('RabbitMQ 初始化失败:', e.message)
  30. this.reconnect()
  31. throw e
  32. }
  33. }
  34. async reconnect() {
  35. if (this.reconnecting) return
  36. this.reconnecting = true
  37. setTimeout(async () => {
  38. try {
  39. await this.init()
  40. this.reconnecting = false
  41. } catch {
  42. this.reconnecting = false
  43. }
  44. }, 5000)
  45. }
  46. async getChannel(name = 'default') {
  47. if (!this.connection) {
  48. await this.init()
  49. }
  50. if (this.channels.has(name)) {
  51. return this.channels.get(name)
  52. }
  53. const channel = await this.connection.createChannel()
  54. this.channels.set(name, channel)
  55. channel.on('close', () => {
  56. this.logger.warn(`Channel [${name}] 已关闭`)
  57. this.channels.delete(name)
  58. })
  59. return channel
  60. }
  61. async close() {
  62. for (const ch of this.channels.values()) {
  63. await ch.close()
  64. }
  65. this.channels.clear()
  66. if (this.connection) {
  67. await this.connection.close()
  68. this.connection = null
  69. }
  70. }
  71. }
  72. module.exports = new MQManager()