qgOssPut.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. /**
  2. * 乐跑轨迹/陀螺仪 OSS 上传:与 lepaoSchoolHttp 一致,优先青果 HTTP 代理,失败换节点重试后回退直连。
  3. */
  4. const OSS = require('ali-oss')
  5. const QgProxyManager = require('./QgProxyManager')
  6. const {
  7. buildAxiosOutboundConfig,
  8. getOutboundWithBackoff,
  9. isQgProxyEligibleFailure,
  10. isProxyTlsHandshakeReset,
  11. summarizeAxiosError,
  12. debugProxyEnabled,
  13. debugProxyAxiosFragment,
  14. sleep,
  15. PROXY_FIRST_TIMEOUT_MS
  16. } = require('./qgOutboundAxios')
  17. const OSS_PROXY_FIRST_TIMEOUT_MS = Math.max(PROXY_FIRST_TIMEOUT_MS, 45000)
  18. function ossLogLabel(traceId, taskId) {
  19. let s = ''
  20. if (traceId) s += `[${traceId}] `
  21. s += '[qgOssPut]'
  22. if (taskId) s += ` [${taskId}]`
  23. return s
  24. }
  25. function buildOssClient(sts, httpsAgent, timeoutMs) {
  26. const opts = {
  27. bucket: sts.bucket,
  28. region: sts.region || 'oss-cn-hangzhou',
  29. accessKeyId: sts.AccessKeyId,
  30. accessKeySecret: sts.AccessKeySecret,
  31. stsToken: sts.SecurityToken,
  32. secure: true,
  33. timeout: timeoutMs
  34. }
  35. if (httpsAgent) {
  36. opts.httpsAgent = httpsAgent
  37. }
  38. return new OSS(opts)
  39. }
  40. async function logOssOutbound(logger, phase, ossPath, fragment, opts = {}) {
  41. if (!logger?.info) return
  42. const label = ossLogLabel(opts.traceId, opts.taskId)
  43. if (!fragment || fragment.proxy === false || !fragment.proxy) {
  44. logger.info(`${label} ${phase} PUT 出站=直连 path=${ossPath}`)
  45. return
  46. }
  47. const conn = `${fragment.proxy.host}:${fragment.proxy.port}`
  48. if (opts.skipQgSnapshot) {
  49. logger.info(`${label} ${phase} PUT 出站=调试HTTP代理 连接=${conn} path=${ossPath}`)
  50. return
  51. }
  52. const snap = await QgProxyManager.getCachedParsed()
  53. const serverRecord = snap?.server ?? conn
  54. const egress = snap?.proxyIp ?? '(暂无 proxy_ip)'
  55. const dl = snap?.deadline ?? '—'
  56. logger.info(
  57. `${label} ${phase} PUT 出站=HTTP代理 节点server=${serverRecord} 连接${conn} 出口IP(proxy_ip)=${egress} deadline=${dl} path=${ossPath}`
  58. )
  59. }
  60. async function doOssPut(sts, ossPath, content, fragment, requestTimeoutMs) {
  61. const outbound = buildAxiosOutboundConfig(fragment)
  62. const client = buildOssClient(sts, outbound.httpsAgent, requestTimeoutMs)
  63. return client.put(ossPath, content)
  64. }
  65. /**
  66. * @param {object} sts OSS STS 凭证
  67. * @param {string} ossPath 对象 key
  68. * @param {Buffer|string} content
  69. * @param {{ logger?: object, traceId?: string, taskId?: string, outboundMode?: 'auto'|'direct'|'proxy', timeout?: number }} options
  70. */
  71. async function putOssWithQgOutbound(sts, ossPath, content, options = {}) {
  72. const {
  73. logger = null,
  74. traceId = null,
  75. taskId = null,
  76. outboundMode = 'auto',
  77. timeout = 60000
  78. } = options
  79. const logLabel = () => ossLogLabel(traceId, taskId)
  80. const logCtx = { traceId, taskId }
  81. if (outboundMode === 'direct') {
  82. await logOssOutbound(logger, '(强制直连)', ossPath, { proxy: false }, logCtx)
  83. return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
  84. }
  85. if (debugProxyEnabled()) {
  86. const dbg = debugProxyAxiosFragment()
  87. await logOssOutbound(logger, 'Charles调试代理', ossPath, dbg, { ...logCtx, skipQgSnapshot: true })
  88. logger?.info?.(`${logLabel()} 使用本地调试代理 LEPAO_DEBUG_PROXY`)
  89. return doOssPut(sts, ossPath, content, dbg, timeout)
  90. }
  91. const qgOn = await QgProxyManager.isOutboundProxyEnabled()
  92. if (!qgOn) {
  93. await logOssOutbound(logger, '(青果出站未启用)', ossPath, { proxy: false }, logCtx)
  94. return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
  95. }
  96. let frag
  97. try {
  98. frag = await getOutboundWithBackoff({ forceRefresh: false }, 2)
  99. } catch (e0) {
  100. if (outboundMode === 'proxy') {
  101. const err = new Error(`代理模式提取失败: ${e0.message || e0}`)
  102. err.code = 'PROXY_REQUIRED_EXTRACT_FAILED'
  103. err.retryable = true
  104. throw err
  105. }
  106. logger?.error?.(`${logLabel()} 青果提取多次重试仍失败,OSS 改直连: ${e0.message || e0}`)
  107. await logOssOutbound(logger, '(青果提取异常→直连)', ossPath, { proxy: false }, logCtx)
  108. await QgProxyManager.recordFallbackDirect({
  109. reason: 'qg_extract_error_oss',
  110. path: ossPath,
  111. mq_task_id: taskId,
  112. trace_id: traceId,
  113. ...summarizeAxiosError(e0)
  114. })
  115. return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
  116. }
  117. if (frag.proxy === false) {
  118. try {
  119. await sleep(400)
  120. frag = await getOutboundWithBackoff({ forceRefresh: true }, 2)
  121. } catch {
  122. /* keep */
  123. }
  124. }
  125. if (frag.proxy === false) {
  126. if (outboundMode === 'proxy') {
  127. const err = new Error('代理模式无可用节点')
  128. err.code = 'PROXY_REQUIRED_NO_NODE'
  129. err.retryable = true
  130. throw err
  131. }
  132. logger?.warn?.(`${logLabel()} 无可用青果节点,OSS 将直连`)
  133. await logOssOutbound(logger, '(无缓存节点→直连)', ossPath, { proxy: false }, logCtx)
  134. await QgProxyManager.recordFallbackDirect({
  135. reason: 'no_proxy_available_oss',
  136. path: ossPath,
  137. mq_task_id: taskId,
  138. trace_id: traceId
  139. })
  140. return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
  141. }
  142. await logOssOutbound(logger, '首次请求', ossPath, frag, logCtx)
  143. try {
  144. return await doOssPut(sts, ossPath, content, frag, OSS_PROXY_FIRST_TIMEOUT_MS)
  145. } catch (e1) {
  146. if (outboundMode === 'proxy') {
  147. const err = new Error(`代理模式 OSS 上传失败: ${e1.message || e1}`)
  148. err.code = 'PROXY_REQUIRED_OSS_PUT_FAILED'
  149. err.retryable = true
  150. throw err
  151. }
  152. if (!isQgProxyEligibleFailure(e1)) throw e1
  153. const tls1 = isProxyTlsHandshakeReset(e1)
  154. if (!tls1) {
  155. logger?.warn?.(
  156. `${logLabel()} 经代理 OSS 首次失败,作废缓存并换节点重试。err=${e1.message || e1} ${JSON.stringify(
  157. summarizeAxiosError(e1)
  158. )}`
  159. )
  160. try {
  161. await QgProxyManager.invalidateCurrent('oss_put_proxy_fail', {
  162. path: ossPath,
  163. mq_task_id: taskId,
  164. trace_id: traceId,
  165. ...summarizeAxiosError(e1)
  166. })
  167. const frag2 = await getOutboundWithBackoff({ forceRefresh: true }, 2)
  168. if (frag2?.proxy !== false) {
  169. await logOssOutbound(logger, '(换节点重试)', ossPath, frag2, logCtx)
  170. return await doOssPut(sts, ossPath, content, frag2, OSS_PROXY_FIRST_TIMEOUT_MS)
  171. }
  172. } catch (eRetry) {
  173. if (!isQgProxyEligibleFailure(eRetry)) throw eRetry
  174. logger?.warn?.(
  175. `${logLabel()} 换节点重试仍失败: ${eRetry.message || eRetry} ${JSON.stringify(
  176. summarizeAxiosError(eRetry)
  177. )}`
  178. )
  179. }
  180. } else {
  181. logger?.warn?.(
  182. `${logLabel()} TLS 握手前经代理断开,OSS 直接回退直连(隧道池由服务商切换出口)`
  183. )
  184. }
  185. await logOssOutbound(logger, tls1 ? '(TLS隧道异常→直连)' : '(代理失败→直连)', ossPath, { proxy: false }, logCtx)
  186. await QgProxyManager.recordFallbackDirect({
  187. reason: tls1 ? 'tls_prefinish_reset_direct_oss' : 'proxy_oss_put_failed_then_direct',
  188. path: ossPath,
  189. mq_task_id: taskId,
  190. trace_id: traceId,
  191. ...summarizeAxiosError(e1)
  192. })
  193. return doOssPut(sts, ossPath, content, { proxy: false }, timeout)
  194. }
  195. }
  196. module.exports = {
  197. putOssWithQgOutbound
  198. }