/** * 乐跑轨迹/陀螺仪 OSS 上传:与 lepaoSchoolHttp 一致,优先青果 HTTP 代理,失败换节点重试后回退直连。 */ const OSS = require('ali-oss') const QgProxyManager = require('./QgProxyManager') const { buildAxiosOutboundConfig, getOutboundWithBackoff, isQgProxyEligibleFailure, isProxyTlsHandshakeReset, summarizeAxiosError, debugProxyEnabled, debugProxyAxiosFragment, sleep, PROXY_FIRST_TIMEOUT_MS } = require('./qgOutboundAxios') const OSS_PROXY_FIRST_TIMEOUT_MS = Math.max(PROXY_FIRST_TIMEOUT_MS, 45000) function ossLogLabel(traceId, taskId) { let s = '' if (traceId) s += `[${traceId}] ` s += '[qgOssPut]' if (taskId) s += ` [${taskId}]` return s } function buildOssClient(sts, httpsAgent, timeoutMs) { const opts = { bucket: sts.bucket, region: sts.region || 'oss-cn-hangzhou', accessKeyId: sts.AccessKeyId, accessKeySecret: sts.AccessKeySecret, stsToken: sts.SecurityToken, secure: true, timeout: timeoutMs } if (httpsAgent) { opts.httpsAgent = httpsAgent } return new OSS(opts) } async function logOssOutbound(logger, phase, ossPath, fragment, opts = {}) { if (!logger?.info) return const label = ossLogLabel(opts.traceId, opts.taskId) if (!fragment || fragment.proxy === false || !fragment.proxy) { logger.info(`${label} ${phase} PUT 出站=直连 path=${ossPath}`) return } const conn = `${fragment.proxy.host}:${fragment.proxy.port}` if (opts.skipQgSnapshot) { logger.info(`${label} ${phase} PUT 出站=调试HTTP代理 连接=${conn} path=${ossPath}`) return } const snap = await QgProxyManager.getCachedParsed() const serverRecord = snap?.server ?? conn const egress = snap?.proxyIp ?? '(暂无 proxy_ip)' const dl = snap?.deadline ?? '—' logger.info( `${label} ${phase} PUT 出站=HTTP代理 节点server=${serverRecord} 连接${conn} 出口IP(proxy_ip)=${egress} deadline=${dl} path=${ossPath}` ) } async function doOssPut(sts, ossPath, content, fragment, requestTimeoutMs) { const outbound = buildAxiosOutboundConfig(fragment) const client = buildOssClient(sts, outbound.httpsAgent, requestTimeoutMs) return client.put(ossPath, content) } /** * @param {object} sts OSS STS 凭证 * @param {string} ossPath 对象 key * @param {Buffer|string} content * @param {{ logger?: object, traceId?: string, taskId?: string, outboundMode?: 'auto'|'direct'|'proxy', timeout?: number }} options */ async function putOssWithQgOutbound(sts, ossPath, content, options = {}) { const { logger = null, traceId = null, taskId = null, outboundMode = 'auto', timeout = 60000 } = options const logLabel = () => ossLogLabel(traceId, taskId) const logCtx = { traceId, taskId } if (outboundMode === 'direct') { await logOssOutbound(logger, '(强制直连)', ossPath, { proxy: false }, logCtx) return doOssPut(sts, ossPath, content, { proxy: false }, timeout) } if (debugProxyEnabled()) { const dbg = debugProxyAxiosFragment() await logOssOutbound(logger, 'Charles调试代理', ossPath, dbg, { ...logCtx, skipQgSnapshot: true }) logger?.info?.(`${logLabel()} 使用本地调试代理 LEPAO_DEBUG_PROXY`) return doOssPut(sts, ossPath, content, dbg, timeout) } const qgOn = await QgProxyManager.isOutboundProxyEnabled() if (!qgOn) { await logOssOutbound(logger, '(青果出站未启用)', ossPath, { proxy: false }, logCtx) return doOssPut(sts, ossPath, content, { proxy: false }, timeout) } let frag try { frag = await getOutboundWithBackoff({ forceRefresh: false }, 2) } catch (e0) { if (outboundMode === 'proxy') { const err = new Error(`代理模式提取失败: ${e0.message || e0}`) err.code = 'PROXY_REQUIRED_EXTRACT_FAILED' err.retryable = true throw err } logger?.error?.(`${logLabel()} 青果提取多次重试仍失败,OSS 改直连: ${e0.message || e0}`) await logOssOutbound(logger, '(青果提取异常→直连)', ossPath, { proxy: false }, logCtx) await QgProxyManager.recordFallbackDirect({ reason: 'qg_extract_error_oss', path: ossPath, mq_task_id: taskId, trace_id: traceId, ...summarizeAxiosError(e0) }) return doOssPut(sts, ossPath, content, { proxy: false }, timeout) } if (frag.proxy === false) { try { await sleep(400) frag = await getOutboundWithBackoff({ forceRefresh: true }, 2) } catch { /* keep */ } } if (frag.proxy === false) { if (outboundMode === 'proxy') { const err = new Error('代理模式无可用节点') err.code = 'PROXY_REQUIRED_NO_NODE' err.retryable = true throw err } logger?.warn?.(`${logLabel()} 无可用青果节点,OSS 将直连`) await logOssOutbound(logger, '(无缓存节点→直连)', ossPath, { proxy: false }, logCtx) await QgProxyManager.recordFallbackDirect({ reason: 'no_proxy_available_oss', path: ossPath, mq_task_id: taskId, trace_id: traceId }) return doOssPut(sts, ossPath, content, { proxy: false }, timeout) } await logOssOutbound(logger, '首次请求', ossPath, frag, logCtx) try { return await doOssPut(sts, ossPath, content, frag, OSS_PROXY_FIRST_TIMEOUT_MS) } catch (e1) { if (outboundMode === 'proxy') { const err = new Error(`代理模式 OSS 上传失败: ${e1.message || e1}`) err.code = 'PROXY_REQUIRED_OSS_PUT_FAILED' err.retryable = true throw err } if (!isQgProxyEligibleFailure(e1)) throw e1 const tls1 = isProxyTlsHandshakeReset(e1) if (!tls1) { logger?.warn?.( `${logLabel()} 经代理 OSS 首次失败,作废缓存并换节点重试。err=${e1.message || e1} ${JSON.stringify( summarizeAxiosError(e1) )}` ) try { await QgProxyManager.invalidateCurrent('oss_put_proxy_fail', { path: ossPath, mq_task_id: taskId, trace_id: traceId, ...summarizeAxiosError(e1) }) const frag2 = await getOutboundWithBackoff({ forceRefresh: true }, 2) if (frag2?.proxy !== false) { await logOssOutbound(logger, '(换节点重试)', ossPath, frag2, logCtx) return await doOssPut(sts, ossPath, content, frag2, OSS_PROXY_FIRST_TIMEOUT_MS) } } catch (eRetry) { if (!isQgProxyEligibleFailure(eRetry)) throw eRetry logger?.warn?.( `${logLabel()} 换节点重试仍失败: ${eRetry.message || eRetry} ${JSON.stringify( summarizeAxiosError(eRetry) )}` ) } } else { logger?.warn?.( `${logLabel()} TLS 握手前经代理断开,OSS 直接回退直连(隧道池由服务商切换出口)` ) } await logOssOutbound(logger, tls1 ? '(TLS隧道异常→直连)' : '(代理失败→直连)', ossPath, { proxy: false }, logCtx) await QgProxyManager.recordFallbackDirect({ reason: tls1 ? 'tls_prefinish_reset_direct_oss' : 'proxy_oss_put_failed_then_direct', path: ossPath, mq_task_id: taskId, trace_id: traceId, ...summarizeAxiosError(e1) }) return doOssPut(sts, ossPath, content, { proxy: false }, timeout) } } module.exports = { putOssWithQgOutbound }