Browse Source

🐞 fix: 修复mq连接被关闭的问题

Pchen0 3 weeks ago
parent
commit
7f063ea2a1
1 changed files with 25 additions and 4 deletions
  1. 25 4
      lib/Lepao/Worker.js

+ 25 - 4
lib/Lepao/Worker.js

@@ -958,11 +958,23 @@ class Worker {
                 if (!msg) return
                 if (!msg) return
 
 
                 let content
                 let content
+                let acked = false
+                const safeAck = () => {
+                    if (!msg || acked) return true
+                    try {
+                        channel.ack(msg)
+                        acked = true
+                        return true
+                    } catch (e) {
+                        this.logger.warn(`消息 ack 失败(可能 channel 已关闭):${e?.message || e}`)
+                        return false
+                    }
+                }
 
 
                 try {
                 try {
                     content = JSON.parse(msg.content.toString())
                     content = JSON.parse(msg.content.toString())
                 } catch {
                 } catch {
-                    return channel.ack(msg)
+                    return safeAck()
                 }
                 }
 
 
                 const { id, type, data, retry = 0 } = content
                 const { id, type, data, retry = 0 } = content
@@ -973,7 +985,7 @@ class Worker {
 
 
                 if (!handler) {
                 if (!handler) {
                     this.log(traceId, 'ERROR', '未知任务', { type })
                     this.log(traceId, 'ERROR', '未知任务', { type })
-                    return channel.ack(msg)
+                    return safeAck()
                 }
                 }
 
 
                 try {
                 try {
@@ -983,6 +995,15 @@ class Worker {
                             : type === 'lepao.finalizeRunSync'
                             : type === 'lepao.finalizeRunSync'
                                 ? 4 * 3600000
                                 ? 4 * 3600000
                                 : undefined
                                 : undefined
+                    /**
+                     * RabbitMQ consumer_timeout 默认常见为 30min。
+                     * lepao.startRun / finalizeRunSync 可能长时间运行,若一直不 ack 会触发 PRECONDITION_FAILED 并关闭 channel。
+                     * 对长任务先 ack,再由本地重试/死信逻辑托底,避免进程因 channel closed 崩溃。
+                     */
+                    const needEarlyAck = Number.isFinite(runMs) && runMs >= 25 * 60 * 1000
+                    if (needEarlyAck) {
+                        safeAck()
+                    }
                     const result = await this.withTimeout(
                     const result = await this.withTimeout(
                         handler(data, { traceId, channel, taskId: id }),
                         handler(data, { traceId, channel, taskId: id }),
                         type,
                         type,
@@ -996,7 +1017,7 @@ class Worker {
                     })
                     })
 
 
                     this.log(traceId, 'DONE', `任务完成 ${type}`)
                     this.log(traceId, 'DONE', `任务完成 ${type}`)
-                    channel.ack(msg)
+                    safeAck()
                 } catch (err) {
                 } catch (err) {
                     this.logErr(traceId, `任务失败 ${type}`, err)
                     this.logErr(traceId, `任务失败 ${type}`, err)
 
 
@@ -1054,7 +1075,7 @@ class Worker {
                         )
                         )
                     }
                     }
 
 
-                    channel.ack(msg)
+                    safeAck()
                 }
                 }
             }
             }