| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- /**
- * 乐跑轨迹/陀螺仪 OSS 上传:与 lepaoSchoolHttp 一致,优先青果 HTTP 代理,失败换节点重试后回退直连。
- */
- const OSS = require('ali-oss')
- const QgProxyManager = require('./QgProxyManager')
- const {
- buildAxiosOutboundConfig,
- getOutboundWithBackoff,
- isQgProxyEligibleFailure,
- isProxyTlsHandshakeReset,
- summarizeAxiosError,
- debugProxyEnabled,
- debugProxyAxiosFragment,
- sleep,
- PROXY_FIRST_TIMEOUT_MS
- } = require('./qgOutboundAxios')
- const OSS_PROXY_FIRST_TIMEOUT_MS = Math.max(PROXY_FIRST_TIMEOUT_MS, 45000)
- function ossLogLabel(traceId, taskId) {
- let s = ''
- if (traceId) s += `[${traceId}] `
- s += '[qgOssPut]'
- if (taskId) s += ` [${taskId}]`
- return s
- }
- function buildOssClient(sts, httpsAgent, timeoutMs) {
- const opts = {
- bucket: sts.bucket,
- region: sts.region || 'oss-cn-hangzhou',
- accessKeyId: sts.AccessKeyId,
- accessKeySecret: sts.AccessKeySecret,
- stsToken: sts.SecurityToken,
- secure: true,
- timeout: timeoutMs
- }
- if (httpsAgent) {
- opts.httpsAgent = httpsAgent
- }
- return new OSS(opts)
- }
- async function logOssOutbound(logger, phase, ossPath, fragment, opts = {}) {
- if (!logger?.info) return
- const label = ossLogLabel(opts.traceId, opts.taskId)
- if (!fragment || fragment.proxy === false || !fragment.proxy) {
- logger.info(`${label} ${phase} PUT 出站=直连 path=${ossPath}`)
- return
- }
- const conn = `${fragment.proxy.host}:${fragment.proxy.port}`
- if (opts.skipQgSnapshot) {
- logger.info(`${label} ${phase} PUT 出站=调试HTTP代理 连接=${conn} path=${ossPath}`)
- return
- }
- const snap = await QgProxyManager.getCachedParsed()
- const serverRecord = snap?.server ?? conn
- const egress = snap?.proxyIp ?? '(暂无 proxy_ip)'
- const dl = snap?.deadline ?? '—'
- logger.info(
- `${label} ${phase} PUT 出站=HTTP代理 节点server=${serverRecord} 连接${conn} 出口IP(proxy_ip)=${egress} deadline=${dl} path=${ossPath}`
- )
- }
- async function doOssPut(sts, ossPath, content, fragment, requestTimeoutMs) {
- const outbound = buildAxiosOutboundConfig(fragment)
- const client = buildOssClient(sts, outbound.httpsAgent, requestTimeoutMs)
- return client.put(ossPath, content)
- }
- /**
- * @param {object} sts OSS STS 凭证
- * @param {string} ossPath 对象 key
- * @param {Buffer|string} content
- * @param {{ logger?: object, traceId?: string, taskId?: string, outboundMode?: 'auto'|'direct'|'proxy', timeout?: number }} options
- */
- async function putOssWithQgOutbound(sts, ossPath, content, options = {}) {
- const {
- logger = null,
- traceId = null,
- taskId = null,
- outboundMode = 'auto',
- timeout = 60000
- } = options
- const logLabel = () => ossLogLabel(traceId, taskId)
- const logCtx = { traceId, taskId }
- if (outboundMode === 'direct') {
- await logOssOutbound(logger, '(强制直连)', ossPath, { proxy: false }, logCtx)
- return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
- }
- if (debugProxyEnabled()) {
- const dbg = debugProxyAxiosFragment()
- await logOssOutbound(logger, 'Charles调试代理', ossPath, dbg, { ...logCtx, skipQgSnapshot: true })
- logger?.info?.(`${logLabel()} 使用本地调试代理 LEPAO_DEBUG_PROXY`)
- return doOssPut(sts, ossPath, content, dbg, timeout)
- }
- const qgOn = await QgProxyManager.isOutboundProxyEnabled()
- if (!qgOn) {
- await logOssOutbound(logger, '(青果出站未启用)', ossPath, { proxy: false }, logCtx)
- return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
- }
- let frag
- try {
- frag = await getOutboundWithBackoff({ forceRefresh: false }, 2)
- } catch (e0) {
- if (outboundMode === 'proxy') {
- const err = new Error(`代理模式提取失败: ${e0.message || e0}`)
- err.code = 'PROXY_REQUIRED_EXTRACT_FAILED'
- err.retryable = true
- throw err
- }
- logger?.error?.(`${logLabel()} 青果提取多次重试仍失败,OSS 改直连: ${e0.message || e0}`)
- await logOssOutbound(logger, '(青果提取异常→直连)', ossPath, { proxy: false }, logCtx)
- await QgProxyManager.recordFallbackDirect({
- reason: 'qg_extract_error_oss',
- path: ossPath,
- mq_task_id: taskId,
- trace_id: traceId,
- ...summarizeAxiosError(e0)
- })
- return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
- }
- if (frag.proxy === false) {
- try {
- await sleep(400)
- frag = await getOutboundWithBackoff({ forceRefresh: true }, 2)
- } catch {
- /* keep */
- }
- }
- if (frag.proxy === false) {
- if (outboundMode === 'proxy') {
- const err = new Error('代理模式无可用节点')
- err.code = 'PROXY_REQUIRED_NO_NODE'
- err.retryable = true
- throw err
- }
- logger?.warn?.(`${logLabel()} 无可用青果节点,OSS 将直连`)
- await logOssOutbound(logger, '(无缓存节点→直连)', ossPath, { proxy: false }, logCtx)
- await QgProxyManager.recordFallbackDirect({
- reason: 'no_proxy_available_oss',
- path: ossPath,
- mq_task_id: taskId,
- trace_id: traceId
- })
- return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
- }
- await logOssOutbound(logger, '首次请求', ossPath, frag, logCtx)
- try {
- return await doOssPut(sts, ossPath, content, frag, OSS_PROXY_FIRST_TIMEOUT_MS)
- } catch (e1) {
- if (outboundMode === 'proxy') {
- const err = new Error(`代理模式 OSS 上传失败: ${e1.message || e1}`)
- err.code = 'PROXY_REQUIRED_OSS_PUT_FAILED'
- err.retryable = true
- throw err
- }
- if (!isQgProxyEligibleFailure(e1)) throw e1
- const tls1 = isProxyTlsHandshakeReset(e1)
- if (!tls1) {
- logger?.warn?.(
- `${logLabel()} 经代理 OSS 首次失败,作废缓存并换节点重试。err=${e1.message || e1} ${JSON.stringify(
- summarizeAxiosError(e1)
- )}`
- )
- try {
- await QgProxyManager.invalidateCurrent('oss_put_proxy_fail', {
- path: ossPath,
- mq_task_id: taskId,
- trace_id: traceId,
- ...summarizeAxiosError(e1)
- })
- const frag2 = await getOutboundWithBackoff({ forceRefresh: true }, 2)
- if (frag2?.proxy !== false) {
- await logOssOutbound(logger, '(换节点重试)', ossPath, frag2, logCtx)
- return await doOssPut(sts, ossPath, content, frag2, OSS_PROXY_FIRST_TIMEOUT_MS)
- }
- } catch (eRetry) {
- if (!isQgProxyEligibleFailure(eRetry)) throw eRetry
- logger?.warn?.(
- `${logLabel()} 换节点重试仍失败: ${eRetry.message || eRetry} ${JSON.stringify(
- summarizeAxiosError(eRetry)
- )}`
- )
- }
- } else {
- logger?.warn?.(
- `${logLabel()} TLS 握手前经代理断开,OSS 直接回退直连(隧道池由服务商切换出口)`
- )
- }
- await logOssOutbound(logger, tls1 ? '(TLS隧道异常→直连)' : '(代理失败→直连)', ossPath, { proxy: false }, logCtx)
- await QgProxyManager.recordFallbackDirect({
- reason: tls1 ? 'tls_prefinish_reset_direct_oss' : 'proxy_oss_put_failed_then_direct',
- path: ossPath,
- mq_task_id: taskId,
- trace_id: traceId,
- ...summarizeAxiosError(e1)
- })
- return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
- }
- }
- module.exports = {
- putOssWithQgOutbound
- }
|