Browse Source

🐞 fix: 修复使用旧channel发布任务的bug

Pchen. 3 days ago
parent
commit
1f0a29a381
2 changed files with 169 additions and 66 deletions
  1. 149 65
      lib/Lepao/Worker.js
  2. 20 1
      plugin/mq/index.js

+ 149 - 65
lib/Lepao/Worker.js

@@ -44,6 +44,8 @@ class Worker {
         this.noticeQueue = mqName('runforge_message_queue')
         this.noticeQueue = mqName('runforge_message_queue')
 
 
         this.channelName = 'lepao_worker'
         this.channelName = 'lepao_worker'
+        this.stopping = false
+        this.restartTimer = null
 
 
         this.maxRetry = 5
         this.maxRetry = 5
         /** 单次学校接口 HTTP 超时(用于 this.request 内部) */
         /** 单次学校接口 HTTP 超时(用于 this.request 内部) */
@@ -313,7 +315,7 @@ class Worker {
         return Number((Date.now() / 1000).toFixed(3))
         return Number((Date.now() / 1000).toFixed(3))
     }
     }
 
 
-    async enqueueTask(channel, type, data, options = {}) {
+    async enqueueTask(_channel, type, data, options = {}) {
         const payload = {
         const payload = {
             id: options.id || this.traceId(),
             id: options.id || this.traceId(),
             type,
             type,
@@ -321,15 +323,98 @@ class Worker {
             retry: options.retry ?? 0
             retry: options.retry ?? 0
         }
         }
 
 
-        await channel.sendToQueue(
-            this.taskQueue,
-            Buffer.from(JSON.stringify(payload)),
-            { persistent: true, contentType: 'application/json' }
-        )
+        await this.publishJson(this.taskQueue, payload, {
+            channelName: 'lepao_worker_publish_task',
+            contentType: 'application/json'
+        })
 
 
         return payload.id
         return payload.id
     }
     }
 
 
+    async sleep(ms) {
+        return new Promise(resolve => setTimeout(resolve, ms))
+    }
+
+    async preparePublishQueue(channel, queue) {
+        if (queue === this.taskQueue) {
+            await assertRunforgeTaskIngress(channel, this.logger)
+            return
+        }
+        if (queue === this.resultQueue || queue === this.deadQueue) {
+            await channel.assertQueue(queue, {
+                durable: true,
+                arguments: {
+                    'x-max-length': this.maxQueueLength
+                }
+            })
+            return
+        }
+        await channel.assertQueue(queue, { durable: true })
+    }
+
+    async publishJson(queue, data, options = {}) {
+        const channelName = options.channelName || `lepao_publish_${queue}`
+        const contentType = options.contentType || 'application/json'
+        const body = Buffer.from(JSON.stringify(data))
+        let lastErr
+
+        for (let attempt = 1; attempt <= 2; attempt++) {
+            try {
+                const ch = await mq.getChannel(channelName)
+                await this.preparePublishQueue(ch, queue)
+                const ok = ch.sendToQueue(queue, body, {
+                    persistent: true,
+                    contentType
+                })
+                if (!ok) {
+                    const err = new Error(`MQ 背压,未能写入队列 ${queue}`)
+                    err.code = 'MQ_BACKPRESSURE'
+                    throw err
+                }
+                return
+            } catch (e) {
+                lastErr = e
+                mq.invalidateChannel(channelName)
+                if (attempt < 2) {
+                    this.logger.warn(`MQ 发布失败,准备重试 queue=${queue}: ${e.message || e}`)
+                    await this.sleep(500)
+                }
+            }
+        }
+
+        throw lastErr
+    }
+
+    safeAck(channel, msg, traceId) {
+        try {
+            if (!channel?.__runforgeClosed) channel.ack(msg)
+        } catch (e) {
+            this.logger.warn(`[${traceId || 'unknown'}] MQ ack 失败,可能连接已断开: ${e.message || e}`)
+        }
+    }
+
+    safeNack(channel, msg, traceId, requeue = true) {
+        try {
+            if (!channel?.__runforgeClosed) channel.nack(msg, false, requeue)
+        } catch (e) {
+            this.logger.warn(`[${traceId || 'unknown'}] MQ nack 失败,可能连接已断开: ${e.message || e}`)
+        }
+    }
+
+    scheduleRestart(reason) {
+        if (this.stopping || this.restartTimer) return
+        this.running = false
+        this.logger.warn(`RunForge Worker 消费通道已不可用,准备重启消费器: ${reason || 'unknown'}`)
+        mq.invalidateChannel(this.channelName)
+        this.restartTimer = setTimeout(() => {
+            this.restartTimer = null
+            this.start().catch(err => {
+                this.logger.error(`RunForge Worker 重启失败: ${err.stack || err}`)
+                this.scheduleRestart('restart_failed')
+            })
+        }, 5000)
+    }
+
     async withTimeout(promise, name, timeoutMs, options = {}) {
     async withTimeout(promise, name, timeoutMs, options = {}) {
         const ms = Number.isFinite(Number(timeoutMs)) ? Number(timeoutMs) : this.taskTimeoutMs
         const ms = Number.isFinite(Number(timeoutMs)) ? Number(timeoutMs) : this.taskTimeoutMs
         const { code = 'TIMEOUT', retryable = false } = options || {}
         const { code = 'TIMEOUT', retryable = false } = options || {}
@@ -520,16 +605,9 @@ class Worker {
 
 
         if (noticeType === 'bot' && user.bot_umo) {
         if (noticeType === 'bot' && user.bot_umo) {
             try {
             try {
-                const ch = await mq.getChannel(this.noticeQueue)
-                await ch.assertQueue(this.noticeQueue, { durable: true })
-                ch.sendToQueue(
-                    this.noticeQueue,
-                    Buffer.from(JSON.stringify(overPayload)),
-                    {
-                        persistent: true,
-                        contentType: 'application/json'
-                    }
-                )
+                await this.publishJson(this.noticeQueue, overPayload, {
+                    channelName: 'lepao_worker_publish_notice'
+                })
             } catch (e) {
             } catch (e) {
                 this.logger.error(`lepao_over Bot 通知失败: ${e.message || e}`)
                 this.logger.error(`lepao_over Bot 通知失败: ${e.message || e}`)
             }
             }
@@ -864,16 +942,9 @@ class Worker {
             }
             }
 
 
             if (noticeType === 'bot' && user.bot_umo) {
             if (noticeType === 'bot' && user.bot_umo) {
-                const ch = await mq.getChannel(this.noticeQueue)
-                await ch.assertQueue(this.noticeQueue, { durable: true })
-                ch.sendToQueue(
-                    this.noticeQueue,
-                    Buffer.from(JSON.stringify(payload)),
-                    {
-                        persistent: true,
-                        contentType: 'application/json'
-                    }
-                )
+                await this.publishJson(this.noticeQueue, payload, {
+                    channelName: 'lepao_worker_publish_notice'
+                })
                 await afterSuccessNotify()
                 await afterSuccessNotify()
                 return { delivered: true, via: 'bot' }
                 return { delivered: true, via: 'bot' }
             }
             }
@@ -1439,6 +1510,7 @@ class Worker {
 
 
     async start() {
     async start() {
         if (this.running) return
         if (this.running) return
+        this.stopping = false
         this.running = true
         this.running = true
 
 
         this.logger.info('Worker 启动中...')
         this.logger.info('Worker 启动中...')
@@ -1448,6 +1520,13 @@ class Worker {
 
 
             const channel = await mq.getChannel(this.channelName)
             const channel = await mq.getChannel(this.channelName)
 
 
+            channel.once('close', () => {
+                this.scheduleRestart('consumer_channel_close')
+            })
+            channel.once('error', (err) => {
+                this.scheduleRestart(`consumer_channel_error:${err.message || err}`)
+            })
+
             await channel.prefetch(5)
             await channel.prefetch(5)
 
 
             await assertRunforgeTaskIngress(channel, this.logger)
             await assertRunforgeTaskIngress(channel, this.logger)
@@ -1472,7 +1551,7 @@ class Worker {
                 try {
                 try {
                     content = JSON.parse(msg.content.toString())
                     content = JSON.parse(msg.content.toString())
                 } catch {
                 } catch {
-                    return channel.ack(msg)
+                    return this.safeAck(channel, msg, 'bad_json')
                 }
                 }
 
 
                 const { id, type, data, retry = 0 } = content
                 const { id, type, data, retry = 0 } = content
@@ -1483,7 +1562,7 @@ class Worker {
 
 
                 if (!handler) {
                 if (!handler) {
                     this.log(traceId, 'ERROR', '未知任务', { type })
                     this.log(traceId, 'ERROR', '未知任务', { type })
-                    return channel.ack(msg)
+                    return this.safeAck(channel, msg, traceId)
                 }
                 }
 
 
                 try {
                 try {
@@ -1503,48 +1582,49 @@ class Worker {
                         { code: 'TASK_TIMEOUT', retryable: true }
                         { code: 'TASK_TIMEOUT', retryable: true }
                     )
                     )
 
 
-                    await this.sendResult(channel, {
-                        id,
-                        success: true,
-                        result
-                    })
+                    try {
+                        await this.sendResult(null, {
+                            id,
+                            success: true,
+                            result
+                        })
+                    } catch (resultErr) {
+                        this.logger.error(`[${traceId}] 任务已完成但结果投递失败: ${resultErr.stack || resultErr}`)
+                    }
 
 
                     this.log(traceId, 'DONE', `任务完成 ${type}`)
                     this.log(traceId, 'DONE', `任务完成 ${type}`)
-                    channel.ack(msg)
+                    this.safeAck(channel, msg, traceId)
 
 
                 } catch (err) {
                 } catch (err) {
                     this.logErr(traceId, `任务失败 ${type}`, err)
                     this.logErr(traceId, `任务失败 ${type}`, err)
 
 
-                    if (retry < this.maxRetry && this.isRetryableTaskError(err)) {
-                        // 重试
-                        await channel.sendToQueue(
-                            this.taskQueue,
-                            Buffer.from(JSON.stringify({
+                    try {
+                        if (retry < this.maxRetry && this.isRetryableTaskError(err)) {
+                            await this.publishJson(this.taskQueue, {
                                 ...content,
                                 ...content,
                                 retry: retry + 1
                                 retry: retry + 1
-                            })),
-                            { persistent: true }
-                        )
+                            }, { channelName: 'lepao_worker_publish_task' })
 
 
-                        this.log(traceId, 'RETRY', `重试第${retry + 1}次`)
-                    } else {
-                        // 死信
-                        await channel.sendToQueue(
-                            this.deadQueue,
-                            Buffer.from(JSON.stringify(content)),
-                            { persistent: true }
-                        )
-
-                        this.log(traceId, 'DEAD', '进入死信队列')
-                    }
+                            this.log(traceId, 'RETRY', `重试第${retry + 1}次`)
+                        } else {
+                            await this.publishJson(this.deadQueue, content, {
+                                channelName: 'lepao_worker_publish_dead'
+                            })
 
 
-                    await this.sendResult(channel, {
-                        id,
-                        success: false,
-                        error: err.code === 'TASK_TIMEOUT' ? '系统繁忙,请稍后再试' : this.maskClientReason(err.message)
-                    })
+                            this.log(traceId, 'DEAD', '进入死信队列')
+                        }
+
+                        await this.sendResult(null, {
+                            id,
+                            success: false,
+                            error: err.code === 'TASK_TIMEOUT' ? '系统繁忙,请稍后再试' : this.maskClientReason(err.message)
+                        })
 
 
-                    channel.ack(msg)
+                        this.safeAck(channel, msg, traceId)
+                    } catch (publishErr) {
+                        this.logger.error(`[${traceId}] MQ 失败处理投递失败,原消息保留等待重投: ${publishErr.stack || publishErr}`)
+                        this.safeNack(channel, msg, traceId, true)
+                    }
                 }
                 }
             })
             })
 
 
@@ -1556,15 +1636,19 @@ class Worker {
         }
         }
     }
     }
 
 
-    async sendResult(channel, data) {
-        channel.sendToQueue(
-            this.resultQueue,
-            Buffer.from(JSON.stringify(data)),
-            { persistent: true }
-        )
+    async sendResult(_channel, data) {
+        await this.publishJson(this.resultQueue, data, {
+            channelName: 'lepao_worker_publish_result',
+            contentType: 'application/json'
+        })
     }
     }
 
 
     async stop() {
     async stop() {
+        this.stopping = true
+        if (this.restartTimer) {
+            clearTimeout(this.restartTimer)
+            this.restartTimer = null
+        }
         this.running = false
         this.running = false
         await mq.close()
         await mq.close()
         this.logger.info('RunForge Worker 已停止')
         this.logger.info('RunForge Worker 已停止')

+ 20 - 1
plugin/mq/index.js

@@ -60,20 +60,39 @@ class MQManager {
 
 
         const key = mq(name)
         const key = mq(name)
         if (this.channels.has(key)) {
         if (this.channels.has(key)) {
-            return this.channels.get(key)
+            const cached = this.channels.get(key)
+            if (!cached.__runforgeClosed) return cached
+            this.channels.delete(key)
         }
         }
 
 
         const channel = await this.connection.createChannel()
         const channel = await this.connection.createChannel()
+        channel.__runforgeClosed = false
         this.channels.set(key, channel)
         this.channels.set(key, channel)
 
 
         channel.on('close', () => {
         channel.on('close', () => {
+            channel.__runforgeClosed = true
             this.logger.warn(`Channel [${key}] 已关闭`)
             this.logger.warn(`Channel [${key}] 已关闭`)
             this.channels.delete(key)
             this.channels.delete(key)
         })
         })
 
 
+        channel.on('error', (err) => {
+            channel.__runforgeClosed = true
+            this.logger.error(`Channel [${key}] 错误: ${err.message || err}`)
+            this.channels.delete(key)
+        })
+
         return channel
         return channel
     }
     }
 
 
+    invalidateChannel(name = 'default') {
+        const key = mq(name)
+        const channel = this.channels.get(key)
+        if (channel) {
+            channel.__runforgeClosed = true
+            this.channels.delete(key)
+        }
+    }
+
     async close() {
     async close() {
         for (const ch of this.channels.values()) {
         for (const ch of this.channels.values()) {
             await ch.close()
             await ch.close()