index.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. const amqp = require('amqplib')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const Logger = require('../../lib/Logger')
  5. const { mq } = require('./mqPrefix')
  6. class MQManager {
  7. constructor() {
  8. this.url = config.rabbitmq.url
  9. this.connection = null
  10. this.channels = new Map()
  11. this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
  12. this.reconnecting = false
  13. }
  14. async init() {
  15. if (this.connection) return
  16. try {
  17. this.logger.info('RabbitMQ 初始化连接...')
  18. this.connection = await amqp.connect(this.url)
  19. this.connection.on('close', () => {
  20. this.logger.warn('RabbitMQ 连接断开,准备重连')
  21. this.connection = null
  22. this.channels.clear()
  23. this.reconnect()
  24. })
  25. this.connection.on('error', (err) => {
  26. this.logger.error('RabbitMQ 连接错误:', err.message)
  27. })
  28. this.logger.info('RabbitMQ 连接成功')
  29. } catch (e) {
  30. this.logger.error('RabbitMQ 初始化失败:', e.message)
  31. this.reconnect()
  32. throw e
  33. }
  34. }
  35. async reconnect() {
  36. if (this.reconnecting) return
  37. this.reconnecting = true
  38. setTimeout(async () => {
  39. try {
  40. await this.init()
  41. this.reconnecting = false
  42. } catch {
  43. this.reconnecting = false
  44. }
  45. }, 5000)
  46. }
  47. async getChannel(name = 'default') {
  48. if (!this.connection) {
  49. await this.init()
  50. }
  51. const key = mq(name)
  52. if (this.channels.has(key)) {
  53. const cached = this.channels.get(key)
  54. if (!cached.__runforgeClosed) return cached
  55. this.channels.delete(key)
  56. }
  57. const channel = await this.connection.createChannel()
  58. channel.__runforgeClosed = false
  59. this.channels.set(key, channel)
  60. channel.on('close', () => {
  61. channel.__runforgeClosed = true
  62. this.logger.warn(`Channel [${key}] 已关闭`)
  63. this.channels.delete(key)
  64. })
  65. channel.on('error', (err) => {
  66. channel.__runforgeClosed = true
  67. this.logger.error(`Channel [${key}] 错误: ${err.message || err}`)
  68. this.channels.delete(key)
  69. })
  70. return channel
  71. }
  72. invalidateChannel(name = 'default') {
  73. const key = mq(name)
  74. const channel = this.channels.get(key)
  75. if (channel) {
  76. channel.__runforgeClosed = true
  77. this.channels.delete(key)
  78. }
  79. }
  80. async close() {
  81. for (const ch of this.channels.values()) {
  82. await ch.close()
  83. }
  84. this.channels.clear()
  85. if (this.connection) {
  86. await this.connection.close()
  87. this.connection = null
  88. }
  89. }
  90. }
  91. module.exports = new MQManager()