| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- const express = require('express')
- const cors = require('cors')
- const path = require('path')
- const fs = require('fs')
- const config = require('../config.json')
- const Logger = require('./Logger')
- const MySQL = require('../plugin/DataBase/MySQL')
- const Worker = require('./Lepao/Worker')
- const mq = require('../plugin/mq')
- const { mq: mqName } = require('../plugin/mq/mqPrefix')
- const { startLepaoSchedulePublisher } = require('../plugin/mq/lepaoSchedulePublisher')
- const OneBotV11 = require('../plugin/OneBot/OneBotV11')
- const AccessControl = require('./AccessControl')
- const {
- resolveServerRole,
- shouldServeApi,
- shouldRunLepaoWorker,
- shouldRunOrderPaymentWorker
- } = require('./serverRole')
- const { TASK_QUEUE } = require('../plugin/mq/runforgeTaskMq')
- const { PREFIX } = require('../plugin/mq/mqPrefix')
- class SERVER {
- constructor() {
- this.serverRole = resolveServerRole()
- this.port = config.port || 3000
- this.apiDirectory = path.join(__dirname, '../apis')
- const logName = this.serverRole === 'worker' ? 'WorkerServer.log' : 'Server.log'
- this.logger = new Logger(path.join(__dirname, '../logs', logName), 'INFO')
- this.db = new MySQL()
- if (shouldServeApi(this.serverRole)) {
- this.app = express()
- this.app.use(express.json())
- this.app.use(cors())
- this.app.use('/uploads', express.static('./uploads'))
- this.app.use('/models', express.static('./models'))
- this.loadAPIs(this.apiDirectory)
- }
- }
- async initDB() {
- try {
- this.logger.info('正在测试数据库连接')
- await this.db.connect()
- await this.db.close()
- } catch (error) {
- this.logger.error(`数据库连接失败: ${error.stack}`)
- process.exit(1)
- }
- }
- async startLepaoWorker() {
- const worker = new Worker()
- await worker.start()
- this.logger.info('RunForge Worker 已启动,正在监听 MQ 任务...')
- startLepaoSchedulePublisher({
- logger: this.logger,
- intervalMs: config.rabbitmq?.lepaoScheduleTickMs ?? 2000,
- batch: config.rabbitmq?.lepaoScheduleBatch ?? 100
- })
- }
- async initMQ() {
- try {
- await mq.init()
- const ch = await mq.getChannel('health')
- await ch.assertQueue(mqName('mq_health_check'), { durable: false })
- this.logger.info('✅ RabbitMQ 初始化 & 测试成功')
- if (shouldRunLepaoWorker(this.serverRole)) {
- try {
- await this.startLepaoWorker()
- } catch (err) {
- console.error('RunForge Worker 启动失败:', err)
- process.exit(1)
- }
- } else {
- this.logger.info(
- `serverRole=api,跳过乐跑 Worker;乐跑任务将投递到 MQ 队列「${TASK_QUEUE}」(mqPrefix=${JSON.stringify(PREFIX)})`
- )
- }
- if (shouldRunOrderPaymentWorker(this.serverRole)) {
- const { startOrderPaymentWorker } = require('../apis/Order/CreateOrder')
- await startOrderPaymentWorker(this.logger)
- } else if (shouldServeApi(this.serverRole)) {
- this.logger.info('serverRole=api,跳过订单支付 MQ 消费者(由 all 进程消费)')
- }
- } catch (e) {
- this.logger.error('❌ RabbitMQ 初始化失败')
- process.exit(1)
- }
- }
- async initOneBot() {
- try {
- const ok = await OneBotV11.initOneBotWs()
- if (ok) {
- this.logger.info('OneBot v11 ws 初始化成功,已开始监听消息')
- } else {
- this.logger.info('OneBot v11 ws 未初始化(可能未启用)')
- }
- } catch (err) {
- this.logger.error(`OneBot v11 ws 初始化失败: ${err.message}`)
- }
- }
- async initAccessControlSchema() {
- await AccessControl.ensurePermissionSchema()
- }
- loadAPIs(directory) {
- const items = fs.readdirSync(directory)
- items.forEach(item => {
- const itemPath = path.join(directory, item)
- const stats = fs.statSync(itemPath)
- if (stats.isDirectory()) {
- this.loadAPIs(itemPath)
- } else if (stats.isFile() && itemPath.endsWith('.js')) {
- this.loadAPIFile(itemPath)
- }
- })
- }
- loadAPIFile(filePath) {
- try {
- const APIClass = require(filePath)
- for (const key in APIClass) {
- if (APIClass.hasOwnProperty(key)) {
- const apiInstance = new APIClass[key]()
- apiInstance.setupRoute()
- this.app.use('/', apiInstance.getRouter())
- this.logger.info(`已加载API:${apiInstance.path} 类型:${apiInstance.method}`)
- }
- }
- } catch (error) {
- this.logger.error(`加载API文件失败: ${filePath},错误: ${error.stack}`)
- }
- }
- listenHttp() {
- this.app.listen(this.port, () => {
- this.logger.info(`==========服务器正在 ${this.port} 端口上运行 (role=${this.serverRole})==========`)
- })
- }
- async startApiServices() {
- try {
- await this.initAccessControlSchema()
- } catch (err) {
- this.logger.error(`权限模型初始化异常: ${err.message}`)
- }
- try {
- await this.initOneBot()
- } catch (err) {
- this.logger.error(`OneBot 初始化异常: ${err.message}`)
- }
- this.listenHttp()
- }
- start() {
- this.logger.info(
- `============正在启动 (role=${this.serverRole}, lepaoQueue=${TASK_QUEUE}, mqPrefix=${JSON.stringify(PREFIX)})============`
- )
- this.initDB()
- .then(() => this.initMQ())
- .then(async () => {
- if (!shouldServeApi(this.serverRole)) {
- this.logger.info('serverRole=worker,未启动 HTTP API,进程将保持运行以消费乐跑任务')
- return
- }
- await this.startApiServices()
- })
- .catch((err) => {
- this.logger.error(`启动失败: ${err.message || err}`)
- if (err.stack) this.logger.error(err.stack)
- process.exit(1)
- })
- }
- }
- module.exports = SERVER
|