Browse Source

🎈 perf: 加强代码健壮性

Pchen. 1 month ago
parent
commit
55f36ec3af
5 changed files with 195 additions and 75 deletions
  1. 6 3
      apis/Order/CreateOrder.js
  2. 85 40
      lib/Lepao/Worker.js
  3. 7 9
      plugin/jkes/updateAccountCore.js
  4. 14 2
      plugin/mq/Worker.js
  5. 83 21
      plugin/mq/index.js

+ 6 - 3
apis/Order/CreateOrder.js

@@ -258,9 +258,12 @@ class CreateOrder extends API {
                         orderId,
                         enqueueTime: Date.now()
                     }
-                    ch.sendToQueue('order_payment_check', Buffer.from(JSON.stringify(msg)), {
-                        persistent: true
-                    })
+                    await mq.sendToQueueSafe(
+                        'order_payment',
+                        'order_payment_check',
+                        Buffer.from(JSON.stringify(msg)),
+                        { persistent: true, contentType: 'application/json' }
+                    )
                 } catch (error) {
                     this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
                 }

+ 85 - 40
lib/Lepao/Worker.js

@@ -17,6 +17,8 @@ class Worker {
 
         this.handlers = {}
         this.running = false
+        this._consuming = false
+        this._consumeTag = null
 
         this.taskQueue = TASK_QUEUE
         this.resultQueue = 'runforge_task_result_queue'
@@ -166,7 +168,9 @@ class Worker {
             retry: options.retry ?? 0
         }
 
-        await channel.sendToQueue(
+        // 这里不要直接用传入的 channel:断线后它可能已 close
+        await mq.sendToQueueSafe(
+            this.channelName,
             this.taskQueue,
             Buffer.from(JSON.stringify(payload)),
             { persistent: true, contentType: 'application/json' }
@@ -732,13 +736,11 @@ class Worker {
             if (noticeType === 'bot' && user.bot_umo) {
                 const ch = await mq.getChannel(this.noticeQueue)
                 await ch.assertQueue(this.noticeQueue, { durable: true })
-                ch.sendToQueue(
+                await mq.sendToQueueSafe(
+                    this.noticeQueue,
                     this.noticeQueue,
                     Buffer.from(JSON.stringify(payload)),
-                    {
-                        persistent: true,
-                        contentType: 'application/json'
-                    }
+                    { persistent: true, contentType: 'application/json' }
                 )
                 return { delivered: true, via: 'bot' }
             }
@@ -900,15 +902,40 @@ class Worker {
         try {
             this.initHandlers()
 
-            const channel = await mq.getChannel(this.channelName)
+            await this.startConsumeLoop()
 
-            await channel.prefetch(5)
+            this.logger.info('哪吒乐跑 Worker 启动成功(JKES)')
+        } catch (err) {
+            this.logger.error('哪吒乐跑 Worker 启动失败: ' + (err.stack || err))
+        }
+    }
 
-            await assertRunforgeTaskIngress(channel, this.logger)
-            await channel.assertQueue(this.resultQueue, { durable: true })
-            await channel.assertQueue(this.deadQueue, { durable: true })
+    async startConsumeLoop() {
+        if (!this.running) return
+        if (this._consuming) return
+        this._consuming = true
+
+        const channel = await mq.getChannel(this.channelName)
+
+        channel.on('close', () => {
+            // close 事件可能重复触发;这里仅触发一次重启
+            if (!this.running) return
+            this._consuming = false
+            this.logger.warn('Worker channel 已关闭,准备重启消费')
+            setTimeout(() => {
+                this.startConsumeLoop().catch((e) => {
+                    this.logger.error('重启 Worker 消费失败: ' + (e?.stack || e))
+                })
+            }, 1000)
+        })
+
+        await channel.prefetch(5)
 
-            const handleTaskMessage = async (msg) => {
+        await assertRunforgeTaskIngress(channel, this.logger)
+        await channel.assertQueue(this.resultQueue, { durable: true })
+        await channel.assertQueue(this.deadQueue, { durable: true })
+
+        const handleTaskMessage = async (msg) => {
                 if (!msg) return
 
                 let content
@@ -960,51 +987,69 @@ class Worker {
                     }
 
                     if (retry < this.maxRetry && this.isRetryableTaskError(err)) {
-                        await channel.sendToQueue(
-                            this.taskQueue,
-                            Buffer.from(
-                                JSON.stringify({
-                                    ...content,
-                                    retry: retry + 1
-                                })
-                            ),
-                            { persistent: true }
-                        )
+                        try {
+                            await mq.sendToQueueSafe(
+                                this.channelName,
+                                this.taskQueue,
+                                Buffer.from(
+                                    JSON.stringify({
+                                        ...content,
+                                        retry: retry + 1
+                                    })
+                                ),
+                                { persistent: true, contentType: 'application/json' }
+                            )
+                        } catch (e) {
+                            this.logger.error(
+                                `[${traceId}] 重试消息投递失败(将直接 ack,避免进程崩溃):${e?.message || e}`
+                            )
+                        }
 
                         this.log(traceId, 'RETRY', `重试第${retry + 1}次`)
                     } else {
-                        await channel.sendToQueue(
-                            this.deadQueue,
-                            Buffer.from(JSON.stringify(content)),
-                            { persistent: true }
-                        )
+                        try {
+                            await mq.sendToQueueSafe(
+                                this.channelName,
+                                this.deadQueue,
+                                Buffer.from(JSON.stringify(content)),
+                                { persistent: true, contentType: 'application/json' }
+                            )
+                        } catch (e) {
+                            this.logger.error(
+                                `[${traceId}] 死信投递失败(将直接 ack,避免进程崩溃):${e?.message || e}`
+                            )
+                        }
 
                         this.log(traceId, 'DEAD', '进入死信队列')
                     }
 
-                    await this.sendResult(channel, {
-                        id,
-                        success: false,
-                        error: err.message
-                    })
+                    try {
+                        await this.sendResult(channel, {
+                            id,
+                            success: false,
+                            error: err.message
+                        })
+                    } catch (e) {
+                        this.logger.error(
+                            `[${traceId}] 结果投递失败(忽略):${e?.message || e}`
+                        )
+                    }
 
                     channel.ack(msg)
                 }
             }
 
-            await channel.consume(this.taskQueue, handleTaskMessage, { noAck: false })
-
-            this.logger.info('哪吒乐跑 Worker 启动成功(JKES)')
-        } catch (err) {
-            this.logger.error('哪吒乐跑 Worker 启动失败: ' + err.stack)
-        }
+        const ok = await channel.consume(this.taskQueue, handleTaskMessage, { noAck: false })
+        this._consumeTag = ok?.consumerTag || null
     }
 
     async sendResult(channel, data) {
-        channel.sendToQueue(
+        // 结果队列同样可能因断线导致 channel 关闭,这里用安全投递兜底
+        await mq.sendToQueueSafe(
+            this.channelName,
             this.resultQueue,
             Buffer.from(JSON.stringify(data)),
-            { persistent: true }
+            { persistent: true, contentType: 'application/json' }
         )
     }
 

+ 7 - 9
plugin/jkes/updateAccountCore.js

@@ -69,15 +69,13 @@ async function executeLepaoTokenUpdate(ctx, req, res) {
         if (findRows[0].notice_type === 'bot' && findRows[0].bot_umo) {
             logger.info(`${student_num}发送乐跑更新Bot通知,UMO=${findRows[0].bot_umo}`)
             const ch = await mq.getChannel(messageQueue)
-
-            await ch.assertQueue(messageQueue, {
-                durable: true
-            })
-
-            ch.sendToQueue(messageQueue, Buffer.from(JSON.stringify(emailData)), {
-                persistent: true,
-                contentType: 'application/json'
-            })
+            await ch.assertQueue(messageQueue, { durable: true })
+            await mq.sendToQueueSafe(
+                messageQueue,
+                messageQueue,
+                Buffer.from(JSON.stringify(emailData)),
+                { persistent: true, contentType: 'application/json' }
+            )
 
             logger.info(`${student_num}乐跑更新Bot通知发送完成`)
         } else if (findRows[0].notice_type === 'email' && findRows[0].email) {

+ 14 - 2
plugin/mq/Worker.js

@@ -41,6 +41,17 @@ class Worker {
         try {
             const channel = await mq.getChannel(this.channelName)
 
+            channel.on('close', () => {
+                if (!this.running) return
+                this.logger.warn('Worker channel 已关闭,准备重启消费')
+                this.running = false
+                setTimeout(() => {
+                    this.start().catch((e) => {
+                        this.logger.error('重启 Worker 失败: ' + (e?.stack || e))
+                    })
+                }, 1000)
+            })
+
             // 控制并发(重要)
             await channel.prefetch(5)
 
@@ -120,10 +131,11 @@ class Worker {
      */
     async sendResult(channel, data) {
         try {
-            channel.sendToQueue(
+            await mq.sendToQueueSafe(
+                this.channelName,
                 this.resultQueue,
                 Buffer.from(JSON.stringify(data)),
-                { persistent: true }
+                { persistent: true, contentType: 'application/json' }
             )
         } catch (err) {
             this.logger.error('结果发送失败: ' + err.message)

+ 83 - 21
plugin/mq/index.js

@@ -10,46 +10,74 @@ class MQManager {
         this.channels = new Map()
         this.logger = new Logger(path.join(__dirname, '../../logs/RabbitMQ.log'), 'INFO')
         this.reconnecting = false
+        this._initPromise = null
+        this._reconnectTimer = null
+        this._reconnectAttempt = 0
     }
 
     async init() {
         if (this.connection) return
+        if (this._initPromise) return this._initPromise
 
-        try {
-            this.logger.info('RabbitMQ 初始化连接...')
-            this.connection = await amqp.connect(this.url)
-
-            this.connection.on('close', () => {
-                this.logger.warn('RabbitMQ 连接断开,准备重连')
-                this.connection = null
-                this.channels.clear()
+        this._initPromise = (async () => {
+            try {
+                this.logger.info('RabbitMQ 初始化连接...')
+                const conn = await amqp.connect(this.url)
+                this.connection = conn
+                this._reconnectAttempt = 0
+
+                conn.on('close', () => {
+                    this.logger.warn('RabbitMQ 连接断开,准备重连')
+                    this._dropConnection()
+                    this.reconnect()
+                })
+
+                conn.on('error', (err) => {
+                    // error 事件有时会在 close 前触发;这里不要 throw,交给 close 触发的重连来恢复
+                    this.logger.error('RabbitMQ 连接错误:', err?.message || err)
+                })
+
+                this.logger.info('RabbitMQ 连接成功')
+            } catch (e) {
+                this.logger.error('RabbitMQ 初始化失败:', e?.message || e)
+                this._dropConnection()
                 this.reconnect()
-            })
+                throw e
+            } finally {
+                this._initPromise = null
+            }
+        })()
 
-            this.connection.on('error', (err) => {
-                this.logger.error('RabbitMQ 连接错误:', err.message)
-            })
+        return this._initPromise
+    }
 
-            this.logger.info('RabbitMQ 连接成功')
-        } catch (e) {
-            this.logger.error('RabbitMQ 初始化失败:', e.message)
-            this.reconnect()
-            throw e
-        }
+    _dropConnection() {
+        this.connection = null
+        // 旧的 channel 失效,直接清空;调用方需重新 getChannel
+        this.channels.clear()
     }
 
     async reconnect() {
         if (this.reconnecting) return
         this.reconnecting = true
 
-        setTimeout(async () => {
+        const attempt = ++this._reconnectAttempt
+        const delayMs = Math.min(30000, 1000 * Math.pow(2, Math.min(attempt, 5))) // 2s..32s capped
+
+        if (this._reconnectTimer) {
+            clearTimeout(this._reconnectTimer)
+            this._reconnectTimer = null
+        }
+
+        this._reconnectTimer = setTimeout(async () => {
             try {
                 await this.init()
-                this.reconnecting = false
             } catch {
+                // init() 内已记录日志并触发下一次 reconnect
+            } finally {
                 this.reconnecting = false
             }
-        }, 5000)
+        }, delayMs)
     }
 
     async getChannel(name = 'default') {
@@ -69,9 +97,43 @@ class MQManager {
             this.channels.delete(name)
         })
 
+        channel.on('error', (err) => {
+            // channel error 也不应抛出到未捕获异常链路
+            this.logger.warn(`Channel [${name}] 错误: ${err?.message || err}`)
+        })
+
         return channel
     }
 
+    isChannelClosedError(err) {
+        if (!err) return false
+        const msg = String(err.message || err).toLowerCase()
+        return msg.includes('channel closed') || msg.includes('illegaloperationerror')
+    }
+
+    /**
+     * 安全投递:遇到断线/Channel closed 自动等待重连并重试。
+     * 注意:只适用于“投递端”;消费端需要重新 consume(业务层处理)。
+     */
+    async sendToQueueSafe(channelName, queue, content, options = {}) {
+        let lastErr
+        for (let i = 0; i < 3; i++) {
+            try {
+                const ch = await this.getChannel(channelName)
+                return ch.sendToQueue(queue, content, options)
+            } catch (e) {
+                lastErr = e
+                if (!this.isChannelClosedError(e)) throw e
+                // 让下次循环重新取 channel(已在 close 时 delete,但保险起见这里也删)
+                this.channels.delete(channelName)
+                // 如果连接也掉了,触发重连并等待一小会
+                this.reconnect()
+                await new Promise((r) => setTimeout(r, 500 * (i + 1)))
+            }
+        }
+        throw lastErr
+    }
+
     async close() {
         for (const ch of this.channels.values()) {
             await ch.close()