Browse Source

🐞 fix: 增加超时处理

Pchen0 6 hours ago
parent
commit
45beb6fa42
2 changed files with 36 additions and 20 deletions
  1. 26 11
      lib/Lepao/Worker.js
  2. 10 9
      lib/Lepao/lepaoSchoolHttp.js

+ 26 - 11
lib/Lepao/Worker.js

@@ -46,8 +46,10 @@ class Worker {
         this.channelName = 'lepao_worker'
 
         this.maxRetry = 3
-        /** 青果提取 + 代理 POST 重试需略长于单次 axios,避免误报「任务超时」而死信 */
-        this.timeout = 22000
+        /** 单次学校接口 HTTP 超时(用于 this.request 内部) */
+        this.httpTimeoutMs = 30000
+        /** 单个 MQ 任务总超时(覆盖 startRun 多阶段 + 重试) */
+        this.taskTimeoutMs = 180000
         this.maxQueueLength = 2000
 
         this.defaultUserAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64'
@@ -357,11 +359,18 @@ class Worker {
         return payload.id
     }
 
-    async withTimeout(promise, name) {
+    async withTimeout(promise, name, timeoutMs, options = {}) {
+        const ms = Number.isFinite(Number(timeoutMs)) ? Number(timeoutMs) : this.taskTimeoutMs
+        const { code = 'TIMEOUT', retryable = false } = options || {}
         return Promise.race([
             promise,
             new Promise((_, reject) =>
-                setTimeout(() => reject(new Error(`${name} 超时`)), this.timeout)
+                setTimeout(() => {
+                    const err = new Error(`${name} 超时`)
+                    err.code = code
+                    err.retryable = !!retryable
+                    reject(err)
+                }, ms)
             )
         ])
     }
@@ -422,8 +431,7 @@ class Worker {
         this.logger.error(`[${traceId}] ${msg} ${err.stack || err}`)
     }
 
-    async request(traceId, name, url, raw, headers = {}) {
-        const ctx = arguments.length >= 6 ? arguments[5] : null
+    async request(traceId, name, url, raw, headers = {}, ctx = null) {
         return this.retry(async () => {
             this.log(traceId, 'REQ', name, raw)
 
@@ -446,12 +454,14 @@ class Worker {
             const res = await this.withTimeout(
                 postLepaoSchool(url, form, {
                     headers: mergedHeaders,
-                    timeout: this.timeout,
+                    timeout: this.httpTimeoutMs,
                     logger: this.logger,
                     outboundMode: ctx?.outboundMode || 'auto',
                     mqTaskId: ctx?.taskId
                 }),
-                name
+                name,
+                this.httpTimeoutMs,
+                { code: 'HTTP_TIMEOUT', retryable: true }
             )
 
             let result = res.data
@@ -773,7 +783,10 @@ class Worker {
                     await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
                         account: req.account,
                         success: false,
-                        reason: this.maskClientReason(err.message || '未知错误'),
+                        reason:
+                            err?.code === 'TASK_TIMEOUT'
+                                ? '系统繁忙,请稍后再试'
+                                : this.maskClientReason(err.message || '未知错误'),
                         traceId
                     }, { id: `${traceId}:notice:fail` })
                 }
@@ -1461,7 +1474,9 @@ class Worker {
                     const outboundMode = proxyEnabled ? 'proxy' : 'direct'
                     const result = await this.withTimeout(
                         handler(data, { traceId, channel, taskId: id, outboundMode }),
-                        type
+                        type,
+                        this.taskTimeoutMs,
+                        { code: 'TASK_TIMEOUT', retryable: true }
                     )
 
                     await this.sendResult(channel, {
@@ -1502,7 +1517,7 @@ class Worker {
                     await this.sendResult(channel, {
                         id,
                         success: false,
-                        error: err.message
+                        error: err.code === 'TASK_TIMEOUT' ? '系统繁忙,请稍后再试' : this.maskClientReason(err.message)
                     })
 
                     channel.ack(msg)

+ 10 - 9
lib/Lepao/lepaoSchoolHttp.js

@@ -115,14 +115,15 @@ function isProxyTlsHandshakeReset(err) {
 async function logSchoolOutbound(logger, phase, url, axiosMerge, opts = {}) {
     if (!logger?.info) return
     const path = briefUrlPath(url)
+    const mqPart = opts.mqTaskId ? ` mq_task_id=${opts.mqTaskId}` : ''
     if (!axiosMerge || axiosMerge.proxy === false || !axiosMerge.proxy) {
-        logger.info(`[lepaoSchoolHttp] ${phase} POST 出站=直连 path=${path}`)
+        logger.info(`[lepaoSchoolHttp] ${phase} POST 出站=直连${mqPart} path=${path}`)
         return
     }
     const conn = `${axiosMerge.proxy.host}:${axiosMerge.proxy.port}`
     if (opts.skipQgSnapshot) {
         logger.info(
-            `[lepaoSchoolHttp] ${phase} POST 出站=调试HTTP代理(非青果) 连接=${conn} path=${path}`
+            `[lepaoSchoolHttp] ${phase} POST 出站=调试HTTP代理(非青果)${mqPart} 连接=${conn} path=${path}`
         )
         return
     }
@@ -131,7 +132,7 @@ async function logSchoolOutbound(logger, phase, url, axiosMerge, opts = {}) {
     const egress = snap?.proxyIp ?? '(暂无 proxy_ip)'
     const dl = snap?.deadline ?? '—'
     logger.info(
-        `[lepaoSchoolHttp] ${phase} POST 出站=HTTP代理 节点server=${serverRecord} 连接${conn} 出口IP(proxy_ip)=${egress} deadline=${dl} path=${path}`
+        `[lepaoSchoolHttp] ${phase} POST 出站=HTTP代理${mqPart} 节点server=${serverRecord} 连接${conn} 出口IP(proxy_ip)=${egress} deadline=${dl} path=${path}`
     )
 }
 
@@ -152,7 +153,7 @@ async function postLepaoSchool(url, data, options = {}) {
 
     // 强制直连:策略 A 用(任务内固定出站,禁止中途切换)
     if (outboundMode === 'direct') {
-        await logSchoolOutbound(logger, '(强制直连)', url, { proxy: false })
+        await logSchoolOutbound(logger, '(强制直连)', url, { proxy: false }, { mqTaskId })
         return doPost({ proxy: false })
     }
 
@@ -165,7 +166,7 @@ async function postLepaoSchool(url, data, options = {}) {
 
     const qgOn = await QgProxyManager.isOutboundProxyEnabled()
     if (!qgOn) {
-        await logSchoolOutbound(logger, '(青果出站未启用)', url, { proxy: false })
+        await logSchoolOutbound(logger, '(青果出站未启用)', url, { proxy: false }, { mqTaskId })
         return doPost({ proxy: false })
     }
 
@@ -206,12 +207,12 @@ async function postLepaoSchool(url, data, options = {}) {
             err.retryable = true
             throw err
         }
-        await logSchoolOutbound(logger, '(无缓存节点→直连)', url, { proxy: false })
+        await logSchoolOutbound(logger, '(无缓存节点→直连)', url, { proxy: false }, { mqTaskId })
         await QgProxyManager.recordFallbackDirect({ reason: 'no_proxy_available', mq_task_id: mqTaskId })
         return doPost({ proxy: false })
     }
 
-    await logSchoolOutbound(logger, '首次请求', url, frag)
+    await logSchoolOutbound(logger, '首次请求', url, frag, { mqTaskId })
     try {
         const proxyFirstTimeoutMs = 20000
         return await doPost(frag, proxyFirstTimeoutMs)
@@ -234,7 +235,7 @@ async function postLepaoSchool(url, data, options = {}) {
             logger?.warn?.(
                 '[lepaoSchoolHttp] TLS 握手前经代理断开,隧道池模式直接直连(由服务商后台自动切换出口)'
             )
-            await logSchoolOutbound(logger, '(TLS隧道异常→直连)', url, { proxy: false })
+            await logSchoolOutbound(logger, '(TLS隧道异常→直连)', url, { proxy: false }, { mqTaskId })
             await QgProxyManager.recordFallbackDirect({
                 reason: 'tls_prefinish_reset_direct',
                 path: briefUrlPath(url),
@@ -244,7 +245,7 @@ async function postLepaoSchool(url, data, options = {}) {
             return doPost({ proxy: false })
         }
 
-        await logSchoolOutbound(logger, '(代理失败→直连)', url, { proxy: false })
+        await logSchoolOutbound(logger, '(代理失败→直连)', url, { proxy: false }, { mqTaskId })
         await QgProxyManager.recordFallbackDirect({
             reason: 'proxy_post_failed_then_direct',
             path: briefUrlPath(url),