Browse Source

✨ feat: 引入rabbitmq

Pchen. 3 months ago
parent
commit
bcf641fb06
3 changed files with 150 additions and 42 deletions
  1. 61 42
      lib/Server.js
  2. 1 0
      package.json
  3. 88 0
      plugin/mq/index.js

+ 61 - 42
lib/Server.js

@@ -1,96 +1,115 @@
-const express = require('express');
-const cors = require('cors');
-const path = require('path');
-const fs = require('fs');
-const config = require('../config.json');
-const Logger = require('./Logger');
-const MySQL = require('../plugin/DataBase/MySQL');
+const express = require('express')
+const cors = require('cors')
+const path = require('path')
+const fs = require('fs')
+const config = require('../config.json')
+const Logger = require('./Logger')
+const MySQL = require('../plugin/DataBase/MySQL')
+const mq = require('../plugin/mq')
 
 
 class SERVER {
 class SERVER {
     constructor() {
     constructor() {
-        this.app = express();
-        this.port = config.port || 3000;
-        this.apiDirectory = path.join(__dirname, '../apis'); // API 文件存放目录
+        this.app = express()
+        this.port = config.port || 3000
+        this.apiDirectory = path.join(__dirname, '../apis') // API 文件存放目录
 
 
-        this.logger = new Logger(path.join(__dirname, '../logs/Server.log'), 'INFO');
+        this.logger = new Logger(path.join(__dirname, '../logs/Server.log'), 'INFO')
 
 
         // 解析 JSON 请求体
         // 解析 JSON 请求体
-        this.app.use(express.json());
+        this.app.use(express.json())
 
 
         //解决cors跨域
         //解决cors跨域
-        this.app.use(cors());
+        this.app.use(cors())
 
 
         //使用静态资源
         //使用静态资源
-        this.app.use('/uploads',express.static('./uploads'))
-        this.app.use('/models',express.static('./models'))
+        this.app.use('/uploads', express.static('./uploads'))
+        this.app.use('/models', express.static('./models'))
 
 
         // 初始化数据库连接
         // 初始化数据库连接
-        this.db = new MySQL();
+        this.db = new MySQL()
 
 
         // 加载 API 路由
         // 加载 API 路由
-        this.loadAPIs(this.apiDirectory);
+        this.loadAPIs(this.apiDirectory)
     }
     }
 
 
     // 测试数据库连接
     // 测试数据库连接
     async initDB() {
     async initDB() {
         try {
         try {
-            this.logger.info('正在测试数据库连接');
-            await this.db.connect();
-            await this.db.close();
+            this.logger.info('正在测试数据库连接')
+            await this.db.connect()
+            await this.db.close()
         } catch (error) {
         } catch (error) {
-            this.logger.error(`数据库连接失败: ${error.stack}`);
-            process.exit(1);
+            this.logger.error(`数据库连接失败: ${error.stack}`)
+            process.exit(1)
+        }
+    }
+
+    // 测试MQ连接
+    async initMQ() {
+        try {
+            await mq.init()
+            const ch = await mq.getChannel('health')
+
+            await ch.assertQueue('mq_health_check', { durable: false })
+            this.logger.info('✅ RabbitMQ 初始化 & 测试成功')
+        } catch (e) {
+            this.logger.error('❌ RabbitMQ 初始化失败')
+            process.exit(1)
         }
         }
     }
     }
 
 
     loadAPIs(directory) {
     loadAPIs(directory) {
-        const items = fs.readdirSync(directory);
+        const items = fs.readdirSync(directory)
 
 
         items.forEach(item => {
         items.forEach(item => {
-            const itemPath = path.join(directory, item);
-            const stats = fs.statSync(itemPath);
+            const itemPath = path.join(directory, item)
+            const stats = fs.statSync(itemPath)
 
 
             if (stats.isDirectory()) {
             if (stats.isDirectory()) {
                 // 如果是目录,递归调用
                 // 如果是目录,递归调用
-                this.loadAPIs(itemPath);
+                this.loadAPIs(itemPath)
             } else if (stats.isFile() && itemPath.endsWith('.js')) {
             } else if (stats.isFile() && itemPath.endsWith('.js')) {
                 // 如果是文件且是 JavaScript 文件
                 // 如果是文件且是 JavaScript 文件
-                this.loadAPIFile(itemPath);
+                this.loadAPIFile(itemPath)
             }
             }
-        });
+        })
     }
     }
 
 
     // 加载单个 API 文件
     // 加载单个 API 文件
     loadAPIFile(filePath) {
     loadAPIFile(filePath) {
         try {
         try {
-            const APIClass = require(filePath);
+            const APIClass = require(filePath)
 
 
             for (const key in APIClass) {
             for (const key in APIClass) {
                 if (APIClass.hasOwnProperty(key)) {
                 if (APIClass.hasOwnProperty(key)) {
-                    const apiInstance = new APIClass[key]();
-                    apiInstance.setupRoute();
-                    this.app.use('/', apiInstance.getRouter());
-                    this.logger.info(`已加载API:${apiInstance.path} 类型:${apiInstance.method}`);
+                    const apiInstance = new APIClass[key]()
+                    apiInstance.setupRoute()
+                    this.app.use('/', apiInstance.getRouter())
+                    this.logger.info(`已加载API:${apiInstance.path} 类型:${apiInstance.method}`)
                 }
                 }
             }
             }
         } catch (error) {
         } catch (error) {
-            this.logger.error(`加载API文件失败: ${filePath},错误: ${error.stack}`);
+            this.logger.error(`加载API文件失败: ${filePath},错误: ${error.stack}`)
         }
         }
     }
     }
 
 
     start() {
     start() {
-        this.logger.info('============正在启动服务器============');
+        this.logger.info('============正在启动服务器============')
 
 
         // 初始化数据库连接
         // 初始化数据库连接
         this.initDB().then(() => {
         this.initDB().then(() => {
-            this.app.listen(this.port, () => {
-                this.logger.info(`==========服务器正在 ${this.port} 端口上运行==========`);
-            });
+            this.initMQ().then(() => {
+                this.app.listen(this.port, () => {
+                    this.logger.info(`==========服务器正在 ${this.port} 端口上运行==========`)
+                })
+            }).catch(err => {
+                this.logger.error(`启动服务器失败: ${err.message}`)
+            })
         }).catch(err => {
         }).catch(err => {
-            this.logger.error(`启动服务器失败: ${err.message}`);
-            process.exit(1); // 启动失败时退出进程
-        });
+            this.logger.error(`启动服务器失败: ${err.message}`)
+            process.exit(1) // 启动失败时退出进程
+        })
     }
     }
 }
 }
 
 
-module.exports = SERVER;
+module.exports = SERVER

+ 1 - 0
package.json

@@ -10,6 +10,7 @@
   "license": "ISC",
   "license": "ISC",
   "dependencies": {
   "dependencies": {
     "ali-oss": "^6.x",
     "ali-oss": "^6.x",
+    "amqplib": "^0.10.9",
     "axios": "^1.7.4",
     "axios": "^1.7.4",
     "bcryptjs": "^2.4.3",
     "bcryptjs": "^2.4.3",
     "body-parser": "^1.20.2",
     "body-parser": "^1.20.2",

+ 88 - 0
plugin/mq/index.js

@@ -0,0 +1,88 @@
+const amqp = require('amqplib')
+const path = require('path')
+const config = require('../../config.json')
+const Logger = require('../../lib/Logger')
+
+class MQManager {
+    constructor() {
+        this.url = config.rabbitmq.url
+        this.connection = null
+        this.channels = new Map()
+        this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
+        this.reconnecting = false
+    }
+
+    async init() {
+        if (this.connection) return
+
+        try {
+            this.logger.info('RabbitMQ 初始化连接...')
+            this.connection = await amqp.connect(this.url)
+
+            this.connection.on('close', () => {
+                this.logger.warn('RabbitMQ 连接断开,准备重连')
+                this.connection = null
+                this.channels.clear()
+                this.reconnect()
+            })
+
+            this.connection.on('error', (err) => {
+                this.logger.error('RabbitMQ 连接错误:', err.message)
+            })
+
+            this.logger.info('RabbitMQ 连接成功')
+        } catch (e) {
+            this.logger.error('RabbitMQ 初始化失败:', e.message)
+            this.reconnect()
+            throw e
+        }
+    }
+
+    async reconnect() {
+        if (this.reconnecting) return
+        this.reconnecting = true
+
+        setTimeout(async () => {
+            try {
+                await this.init()
+                this.reconnecting = false
+            } catch {
+                this.reconnecting = false
+            }
+        }, 5000)
+    }
+
+    async getChannel(name = 'default') {
+        if (!this.connection) {
+            await this.init()
+        }
+
+        if (this.channels.has(name)) {
+            return this.channels.get(name)
+        }
+
+        const channel = await this.connection.createChannel()
+        this.channels.set(name, channel)
+
+        channel.on('close', () => {
+            this.logger.warn(`Channel [${name}] 已关闭`)
+            this.channels.delete(name)
+        })
+
+        return channel
+    }
+
+    async close() {
+        for (const ch of this.channels.values()) {
+            await ch.close()
+        }
+        this.channels.clear()
+
+        if (this.connection) {
+            await this.connection.close()
+            this.connection = null
+        }
+    }
+}
+
+module.exports = new MQManager()