const express = require('express') const cors = require('cors') const path = require('path') const fs = require('fs') const config = require('../config.json') const Logger = require('./Logger') const MySQL = require('../plugin/DataBase/MySQL') const Worker = require('./Lepao/Worker') const mq = require('../plugin/mq') const { mq: mqName } = require('../plugin/mq/mqPrefix') const { startLepaoSchedulePublisher } = require('../plugin/mq/lepaoSchedulePublisher') const OneBotV11 = require('../plugin/OneBot/OneBotV11') const AccessControl = require('./AccessControl') const { resolveServerRole, shouldServeApi, shouldRunLepaoWorker, shouldRunOrderPaymentWorker } = require('./serverRole') const { TASK_QUEUE } = require('../plugin/mq/runforgeTaskMq') const { PREFIX } = require('../plugin/mq/mqPrefix') class SERVER { constructor() { this.serverRole = resolveServerRole() this.port = config.port || 3000 this.apiDirectory = path.join(__dirname, '../apis') const logName = this.serverRole === 'worker' ? 'WorkerServer.log' : 'Server.log' this.logger = new Logger(path.join(__dirname, '../logs', logName), 'INFO') this.db = new MySQL() if (shouldServeApi(this.serverRole)) { this.app = express() this.app.use(express.json()) this.app.use(cors()) this.app.use('/uploads', express.static('./uploads')) this.app.use('/models', express.static('./models')) this.loadAPIs(this.apiDirectory) } } async initDB() { try { this.logger.info('正在测试数据库连接') await this.db.connect() await this.db.close() } catch (error) { this.logger.error(`数据库连接失败: ${error.stack}`) process.exit(1) } } async startLepaoWorker() { const worker = new Worker() await worker.start() this.logger.info('RunForge Worker 已启动,正在监听 MQ 任务...') startLepaoSchedulePublisher({ logger: this.logger, intervalMs: config.rabbitmq?.lepaoScheduleTickMs ?? 2000, batch: config.rabbitmq?.lepaoScheduleBatch ?? 100 }) } async initMQ() { try { await mq.init() const ch = await mq.getChannel('health') await ch.assertQueue(mqName('mq_health_check'), { durable: false }) this.logger.info('✅ RabbitMQ 初始化 & 测试成功') if (shouldRunLepaoWorker(this.serverRole)) { try { await this.startLepaoWorker() } catch (err) { console.error('RunForge Worker 启动失败:', err) process.exit(1) } } else { this.logger.info( `serverRole=api,跳过乐跑 Worker;乐跑任务将投递到 MQ 队列「${TASK_QUEUE}」(mqPrefix=${JSON.stringify(PREFIX)})` ) } if (shouldRunOrderPaymentWorker(this.serverRole)) { const { startOrderPaymentWorker } = require('../plugin/mq/orderPaymentWorker') await startOrderPaymentWorker(this.logger) } else if (shouldServeApi(this.serverRole)) { this.logger.info('serverRole=api,跳过订单支付 MQ 消费者(由 all 进程消费)') } } catch (e) { this.logger.error('❌ RabbitMQ 初始化失败') process.exit(1) } } async initOneBot() { try { const ok = await OneBotV11.initOneBotWs() if (ok) { this.logger.info('OneBot v11 ws 初始化成功,已开始监听消息') } else { this.logger.info('OneBot v11 ws 未初始化(可能未启用)') } } catch (err) { this.logger.error(`OneBot v11 ws 初始化失败: ${err.message}`) } } async initAccessControlSchema() { await AccessControl.ensurePermissionSchema() } loadAPIs(directory) { const items = fs.readdirSync(directory) items.forEach(item => { const itemPath = path.join(directory, item) const stats = fs.statSync(itemPath) if (stats.isDirectory()) { this.loadAPIs(itemPath) } else if (stats.isFile() && itemPath.endsWith('.js')) { this.loadAPIFile(itemPath) } }) } loadAPIFile(filePath) { try { const APIClass = require(filePath) for (const key in APIClass) { if (APIClass.hasOwnProperty(key)) { const apiInstance = new APIClass[key]() apiInstance.setupRoute() this.app.use('/', apiInstance.getRouter()) this.logger.info(`已加载API:${apiInstance.path} 类型:${apiInstance.method}`) } } } catch (error) { this.logger.error(`加载API文件失败: ${filePath},错误: ${error.stack}`) } } listenHttp() { this.app.listen(this.port, () => { this.logger.info(`==========服务器正在 ${this.port} 端口上运行 (role=${this.serverRole})==========`) }) } async startApiServices() { try { await this.initAccessControlSchema() } catch (err) { this.logger.error(`权限模型初始化异常: ${err.message}`) } try { await this.initOneBot() } catch (err) { this.logger.error(`OneBot 初始化异常: ${err.message}`) } this.listenHttp() } start() { this.logger.info( `============正在启动 (role=${this.serverRole}, lepaoQueue=${TASK_QUEUE}, mqPrefix=${JSON.stringify(PREFIX)})============` ) this.initDB() .then(() => this.initMQ()) .then(async () => { if (!shouldServeApi(this.serverRole)) { this.logger.info('serverRole=worker,未启动 HTTP API,进程将保持运行以消费乐跑任务') return } await this.startApiServices() }) .catch((err) => { this.logger.error(`启动失败: ${err.message || err}`) if (err.stack) this.logger.error(err.stack) process.exit(1) }) } } module.exports = SERVER