Browse Source

🐞 fix: 紧急修复:oss走代理

Pchen0 1 month ago
parent
commit
cba1db0a7f
3 changed files with 233 additions and 30 deletions
  1. 11 29
      lib/Lepao/Worker.js
  2. 214 0
      lib/Lepao/qgOssPut.js
  3. 8 1
      lib/Lepao/qgOutboundAxios.js

+ 11 - 29
lib/Lepao/Worker.js

@@ -1,5 +1,4 @@
 const path = require('path')
 const path = require('path')
-const OSS = require('ali-oss')
 const mq = require('../../plugin/mq')
 const mq = require('../../plugin/mq')
 const { mq: mqName } = require('../../plugin/mq/mqPrefix')
 const { mq: mqName } = require('../../plugin/mq/mqPrefix')
 const { assertRunforgeTaskIngress, TASK_QUEUE } = require('../../plugin/mq/runforgeTaskMq')
 const { assertRunforgeTaskIngress, TASK_QUEUE } = require('../../plugin/mq/runforgeTaskMq')
@@ -21,6 +20,7 @@ const {
 const generateGyrFromPath = require('../../plugin/Lepao/generateGyrFromPath')
 const generateGyrFromPath = require('../../plugin/Lepao/generateGyrFromPath')
 const { syncAccountInfo } = require('./syncAccountInfo')
 const { syncAccountInfo } = require('./syncAccountInfo')
 const { postLepaoSchool } = require('./lepaoSchoolHttp')
 const { postLepaoSchool } = require('./lepaoSchoolHttp')
+const { putOssWithQgOutbound } = require('./qgOssPut')
 const QgProxyManager = require('./QgProxyManager')
 const QgProxyManager = require('./QgProxyManager')
 const { insertLedgerRecord } = require('./CountLedger')
 const { insertLedgerRecord } = require('./CountLedger')
 
 
@@ -155,32 +155,14 @@ class Worker {
         return new Promise(r => setTimeout(r, ms))
         return new Promise(r => setTimeout(r, ms))
     }
     }
 
 
-    buildOssBaseClientConfig(sts) {
-        return {
-            bucket: sts.bucket,
-            region: sts.region || 'oss-cn-hangzhou',
-            accessKeyId: sts.AccessKeyId,
-            accessKeySecret: sts.AccessKeySecret,
-            stsToken: sts.SecurityToken,
-            secure: true
-        }
-    }
-
-    buildProxyUrlFromFragment(fragment) {
-        if (!fragment || fragment.proxy === false || !fragment.proxy) return null
-        const { host, port, auth } = fragment.proxy
-        let userPart = ''
-        if (auth && String(auth.username || '').length > 0) {
-            const u = encodeURIComponent(auth.username)
-            const p = encodeURIComponent(auth.password != null ? String(auth.password) : '')
-            userPart = `${u}:${p}@`
-        }
-        return `http://${userPart}${host}:${port}`
-    }
-
-    async putOssWithFallback(sts, ossPath, content) {
-        const directClient = new OSS(this.buildOssBaseClientConfig(sts))
-        await directClient.put(ossPath, content)
+    async putOssWithFallback(sts, ossPath, content, ctx = {}) {
+        await putOssWithQgOutbound(sts, ossPath, content, {
+            logger: this.logger,
+            traceId: ctx.traceId,
+            taskId: ctx.taskId,
+            outboundMode: ctx.outboundMode || 'auto',
+            timeout: this.httpTimeoutMs * 2
+        })
     }
     }
 
 
     isRunSuccess(bindResponse) {
     isRunSuccess(bindResponse) {
@@ -1345,7 +1327,7 @@ class Worker {
             const boundary = String(Date.now())
             const boundary = String(Date.now())
             const timestamp = String(Date.now())
             const timestamp = String(Date.now())
             const ossPath = `Public/Upload/file/run_record/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt`
             const ossPath = `Public/Upload/file/run_record/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt`
-            await this.putOssWithFallback(sts, ossPath, Buffer.from(pathResult, 'utf-8'))
+            await this.putOssWithFallback(sts, ossPath, Buffer.from(pathResult, 'utf-8'), ctx)
 
 
             return { oss_path: ossPath, point_data: point_data }
             return { oss_path: ossPath, point_data: point_data }
         })
         })
@@ -1368,7 +1350,7 @@ class Worker {
             const boundary = String(Date.now())
             const boundary = String(Date.now())
             const timestamp = String(Date.now())
             const timestamp = String(Date.now())
             const ossPath = `Public/Upload/file/run_gyroscope/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt`
             const ossPath = `Public/Upload/file/run_gyroscope/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt`
-            await this.putOssWithFallback(sts, ossPath, Buffer.from(JSON.stringify(gyrData), 'utf-8'))
+            await this.putOssWithFallback(sts, ossPath, Buffer.from(JSON.stringify(gyrData), 'utf-8'), ctx)
 
 
             return this.request(
             return this.request(
                 ctx.traceId,
                 ctx.traceId,

+ 214 - 0
lib/Lepao/qgOssPut.js

@@ -0,0 +1,214 @@
+/**
+ * 乐跑轨迹/陀螺仪 OSS 上传:与 lepaoSchoolHttp 一致,优先青果 HTTP 代理,失败换节点重试后回退直连。
+ */
+const OSS = require('ali-oss')
+const QgProxyManager = require('./QgProxyManager')
+const {
+    buildAxiosOutboundConfig,
+    getOutboundWithBackoff,
+    isQgProxyEligibleFailure,
+    isProxyTlsHandshakeReset,
+    summarizeAxiosError,
+    debugProxyEnabled,
+    debugProxyAxiosFragment,
+    sleep,
+    PROXY_FIRST_TIMEOUT_MS
+} = require('./qgOutboundAxios')
+
+const OSS_PROXY_FIRST_TIMEOUT_MS = Math.max(PROXY_FIRST_TIMEOUT_MS, 45000)
+
+function ossLogLabel(traceId, taskId) {
+    let s = ''
+    if (traceId) s += `[${traceId}] `
+    s += '[qgOssPut]'
+    if (taskId) s += ` [${taskId}]`
+    return s
+}
+
+function buildOssClient(sts, httpsAgent, timeoutMs) {
+    const opts = {
+        bucket: sts.bucket,
+        region: sts.region || 'oss-cn-hangzhou',
+        accessKeyId: sts.AccessKeyId,
+        accessKeySecret: sts.AccessKeySecret,
+        stsToken: sts.SecurityToken,
+        secure: true,
+        timeout: timeoutMs
+    }
+    if (httpsAgent) {
+        opts.httpsAgent = httpsAgent
+    }
+    return new OSS(opts)
+}
+
+async function logOssOutbound(logger, phase, ossPath, fragment, opts = {}) {
+    if (!logger?.info) return
+    const label = ossLogLabel(opts.traceId, opts.taskId)
+    if (!fragment || fragment.proxy === false || !fragment.proxy) {
+        logger.info(`${label} ${phase} PUT 出站=直连 path=${ossPath}`)
+        return
+    }
+    const conn = `${fragment.proxy.host}:${fragment.proxy.port}`
+    if (opts.skipQgSnapshot) {
+        logger.info(`${label} ${phase} PUT 出站=调试HTTP代理 连接=${conn} path=${ossPath}`)
+        return
+    }
+    const snap = await QgProxyManager.getCachedParsed()
+    const serverRecord = snap?.server ?? conn
+    const egress = snap?.proxyIp ?? '(暂无 proxy_ip)'
+    const dl = snap?.deadline ?? '—'
+    logger.info(
+        `${label} ${phase} PUT 出站=HTTP代理 节点server=${serverRecord} 连接${conn} 出口IP(proxy_ip)=${egress} deadline=${dl} path=${ossPath}`
+    )
+}
+
+async function doOssPut(sts, ossPath, content, fragment, requestTimeoutMs) {
+    const outbound = buildAxiosOutboundConfig(fragment)
+    const client = buildOssClient(sts, outbound.httpsAgent, requestTimeoutMs)
+    return client.put(ossPath, content)
+}
+
+/**
+ * @param {object} sts OSS STS 凭证
+ * @param {string} ossPath 对象 key
+ * @param {Buffer|string} content
+ * @param {{ logger?: object, traceId?: string, taskId?: string, outboundMode?: 'auto'|'direct'|'proxy', timeout?: number }} options
+ */
+async function putOssWithQgOutbound(sts, ossPath, content, options = {}) {
+    const {
+        logger = null,
+        traceId = null,
+        taskId = null,
+        outboundMode = 'auto',
+        timeout = 60000
+    } = options
+    const logLabel = () => ossLogLabel(traceId, taskId)
+    const logCtx = { traceId, taskId }
+
+    if (outboundMode === 'direct') {
+        await logOssOutbound(logger, '(强制直连)', ossPath, { proxy: false }, logCtx)
+        return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
+    }
+
+    if (debugProxyEnabled()) {
+        const dbg = debugProxyAxiosFragment()
+        await logOssOutbound(logger, 'Charles调试代理', ossPath, dbg, { ...logCtx, skipQgSnapshot: true })
+        logger?.info?.(`${logLabel()} 使用本地调试代理 LEPAO_DEBUG_PROXY`)
+        return doOssPut(sts, ossPath, content, dbg, timeout)
+    }
+
+    const qgOn = await QgProxyManager.isOutboundProxyEnabled()
+    if (!qgOn) {
+        await logOssOutbound(logger, '(青果出站未启用)', ossPath, { proxy: false }, logCtx)
+        return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
+    }
+
+    let frag
+    try {
+        frag = await getOutboundWithBackoff({ forceRefresh: false }, 2)
+    } catch (e0) {
+        if (outboundMode === 'proxy') {
+            const err = new Error(`代理模式提取失败: ${e0.message || e0}`)
+            err.code = 'PROXY_REQUIRED_EXTRACT_FAILED'
+            err.retryable = true
+            throw err
+        }
+        logger?.error?.(`${logLabel()} 青果提取多次重试仍失败,OSS 改直连: ${e0.message || e0}`)
+        await logOssOutbound(logger, '(青果提取异常→直连)', ossPath, { proxy: false }, logCtx)
+        await QgProxyManager.recordFallbackDirect({
+            reason: 'qg_extract_error_oss',
+            path: ossPath,
+            mq_task_id: taskId,
+            trace_id: traceId,
+            ...summarizeAxiosError(e0)
+        })
+        return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
+    }
+
+    if (frag.proxy === false) {
+        try {
+            await sleep(400)
+            frag = await getOutboundWithBackoff({ forceRefresh: true }, 2)
+        } catch {
+            /* keep */
+        }
+    }
+
+    if (frag.proxy === false) {
+        if (outboundMode === 'proxy') {
+            const err = new Error('代理模式无可用节点')
+            err.code = 'PROXY_REQUIRED_NO_NODE'
+            err.retryable = true
+            throw err
+        }
+        logger?.warn?.(`${logLabel()} 无可用青果节点,OSS 将直连`)
+        await logOssOutbound(logger, '(无缓存节点→直连)', ossPath, { proxy: false }, logCtx)
+        await QgProxyManager.recordFallbackDirect({
+            reason: 'no_proxy_available_oss',
+            path: ossPath,
+            mq_task_id: taskId,
+            trace_id: traceId
+        })
+        return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
+    }
+
+    await logOssOutbound(logger, '首次请求', ossPath, frag, logCtx)
+    try {
+        return await doOssPut(sts, ossPath, content, frag, OSS_PROXY_FIRST_TIMEOUT_MS)
+    } catch (e1) {
+        if (outboundMode === 'proxy') {
+            const err = new Error(`代理模式 OSS 上传失败: ${e1.message || e1}`)
+            err.code = 'PROXY_REQUIRED_OSS_PUT_FAILED'
+            err.retryable = true
+            throw err
+        }
+        if (!isQgProxyEligibleFailure(e1)) throw e1
+
+        const tls1 = isProxyTlsHandshakeReset(e1)
+        if (!tls1) {
+            logger?.warn?.(
+                `${logLabel()} 经代理 OSS 首次失败,作废缓存并换节点重试。err=${e1.message || e1} ${JSON.stringify(
+                    summarizeAxiosError(e1)
+                )}`
+            )
+            try {
+                await QgProxyManager.invalidateCurrent('oss_put_proxy_fail', {
+                    path: ossPath,
+                    mq_task_id: taskId,
+                    trace_id: traceId,
+                    ...summarizeAxiosError(e1)
+                })
+                const frag2 = await getOutboundWithBackoff({ forceRefresh: true }, 2)
+                if (frag2?.proxy !== false) {
+                    await logOssOutbound(logger, '(换节点重试)', ossPath, frag2, logCtx)
+                    return await doOssPut(sts, ossPath, content, frag2, OSS_PROXY_FIRST_TIMEOUT_MS)
+                }
+            } catch (eRetry) {
+                if (!isQgProxyEligibleFailure(eRetry)) throw eRetry
+                logger?.warn?.(
+                    `${logLabel()} 换节点重试仍失败: ${eRetry.message || eRetry} ${JSON.stringify(
+                        summarizeAxiosError(eRetry)
+                    )}`
+                )
+            }
+        } else {
+            logger?.warn?.(
+                `${logLabel()} TLS 握手前经代理断开,OSS 直接回退直连(隧道池由服务商切换出口)`
+            )
+        }
+
+        await logOssOutbound(logger, tls1 ? '(TLS隧道异常→直连)' : '(代理失败→直连)', ossPath, { proxy: false }, logCtx)
+        await QgProxyManager.recordFallbackDirect({
+            reason: tls1 ? 'tls_prefinish_reset_direct_oss' : 'proxy_oss_put_failed_then_direct',
+            path: ossPath,
+            mq_task_id: taskId,
+            trace_id: traceId,
+            ...summarizeAxiosError(e1)
+        })
+        return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
+    }
+}
+
+module.exports = {
+    putOssWithQgOutbound
+}

+ 8 - 1
lib/Lepao/qgOutboundAxios.js

@@ -272,5 +272,12 @@ async function axiosWithQgOutbound(opts) {
 module.exports = {
 module.exports = {
     axiosWithQgOutbound,
     axiosWithQgOutbound,
     buildAxiosOutboundConfig,
     buildAxiosOutboundConfig,
-    getOutboundWithBackoff
+    getOutboundWithBackoff,
+    isQgProxyEligibleFailure,
+    isProxyTlsHandshakeReset,
+    summarizeAxiosError,
+    debugProxyEnabled,
+    debugProxyAxiosFragment,
+    sleep,
+    PROXY_FIRST_TIMEOUT_MS
 }
 }