QgProxyManager.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. const axios = require('axios')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const db = require('../../plugin/DataBase/db')
  5. const Redis = require('../../plugin/DataBase/Redis')
  6. const Logger = require('../Logger')
  7. const QG_POOL_URL = 'https://share.proxy.qg.net/pool'
  8. const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources'
  9. const REDIS_CURRENT = 'lepao:qg:current'
  10. const REDIS_LOCK = 'lepao:qg:fetch_lock'
  11. /** 隧道池入口地址缓存时长(服务商后台自动换出口,不依赖本地 deadline 刷新) */
  12. const REDIS_SERVER_TTL_SEC = 6 * 60 * 60
  13. /** 仅覆盖单次 /get(含 axios 超时),避免长持锁阻塞其它任务 */
  14. const LOCK_TTL_SEC = 45
  15. const LOCK_WAIT_ROUNDS = 40
  16. const LOCK_WAIT_MS = 150
  17. let warnedTlsRejectUnauthorized = false
  18. let projectSettingsTableEnsured = false
  19. const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO')
  20. function sleep(ms) {
  21. return new Promise(r => setTimeout(r, ms))
  22. }
  23. function getQgConfig() {
  24. const q = config.qgChannelProxy
  25. if (!q || typeof q !== 'object') return {}
  26. return {
  27. extractKey: String(q.extractKey || '').trim(),
  28. authUser: String(q.authUser || '').trim(),
  29. authPassword: String(q.authPassword || '').trim(),
  30. tunnelServer: String(q.tunnelServer || q.server || '').trim()
  31. }
  32. }
  33. function hasExtractCredentials() {
  34. const { extractKey } = getQgConfig()
  35. return extractKey.length > 0
  36. }
  37. function hasProxyAuth() {
  38. const { authUser, authPassword } = getQgConfig()
  39. return authUser.length > 0 && authPassword.length > 0
  40. }
  41. function hasTunnelServer() {
  42. const { tunnelServer } = getQgConfig()
  43. return tunnelServer.length > 0
  44. }
  45. function normalizeProjectKey(raw) {
  46. return String(raw || '')
  47. .trim()
  48. .replace(/[^\w\u4e00-\u9fa5:.-]/g, '_')
  49. .slice(0, 128)
  50. }
  51. function getProjectKey() {
  52. return (
  53. normalizeProjectKey(process.env.RUNFORGE_PROXY_SCOPE) ||
  54. normalizeProjectKey(config.qgProxyScope) ||
  55. normalizeProjectKey(config.qgChannelProxy?.scopeKey) ||
  56. normalizeProjectKey(config.server) ||
  57. normalizeProjectKey(config.port) ||
  58. 'default'
  59. )
  60. }
  61. async function ensureProjectSettingsTable() {
  62. if (projectSettingsTableEnsured) return
  63. await db.query(
  64. `CREATE TABLE IF NOT EXISTS lepao_proxy_project_settings (
  65. scope_key VARCHAR(128) NOT NULL PRIMARY KEY,
  66. proxy_enabled TINYINT NOT NULL,
  67. updated_at BIGINT NOT NULL
  68. )`
  69. )
  70. projectSettingsTableEnsured = true
  71. }
  72. async function ensureSettingsRow() {
  73. const now = Date.now()
  74. await db.query(
  75. `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at)
  76. VALUES (1, 0, '', '', NULL, 1, ?)`,
  77. [now]
  78. )
  79. await ensureProjectSettingsTable()
  80. await db.query(
  81. `INSERT IGNORE INTO lepao_proxy_project_settings (scope_key, proxy_enabled, updated_at)
  82. SELECT ?, proxy_enabled, ? FROM lepao_proxy_settings WHERE id = 1`,
  83. [getProjectKey(), now]
  84. )
  85. }
  86. async function loadSettings() {
  87. await ensureSettingsRow()
  88. const rows = await db.query(
  89. `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1`
  90. )
  91. const row = rows?.[0] || null
  92. if (!row) return null
  93. const projectRows = await db.query(
  94. `SELECT proxy_enabled, updated_at FROM lepao_proxy_project_settings WHERE scope_key = ? LIMIT 1`,
  95. [getProjectKey()]
  96. )
  97. const project = projectRows?.[0] || null
  98. return {
  99. ...row,
  100. project_scope_key: getProjectKey(),
  101. project_proxy_enabled: project?.proxy_enabled ?? row.proxy_enabled,
  102. project_updated_at: project?.updated_at ?? row.updated_at
  103. }
  104. }
  105. function getProjectProxyEnabledFromSettings(settings) {
  106. if (!settings) return false
  107. return Number(settings.project_proxy_enabled) === 1
  108. }
  109. function parseDeadlineMs(deadlineStr) {
  110. if (!deadlineStr || typeof deadlineStr !== 'string') return null
  111. const isoish = deadlineStr.trim().replace(' ', 'T')
  112. const ms = Date.parse(isoish)
  113. return Number.isFinite(ms) ? ms : null
  114. }
  115. function axiosProxyOptsFromServer(server, useAuth) {
  116. if (!server || typeof server !== 'string') return { proxy: false }
  117. const parts = server.trim().split(':')
  118. const host = parts[0]
  119. const portNum = Number(parts[1])
  120. if (!host || !Number.isFinite(portNum)) return { proxy: false }
  121. const opt = {
  122. proxy: {
  123. protocol: 'http',
  124. host,
  125. port: portNum
  126. }
  127. }
  128. if (useAuth) {
  129. const { authUser, authPassword } = getQgConfig()
  130. opt.proxy.auth = { username: authUser, password: authPassword }
  131. }
  132. return opt
  133. }
  134. function isAreaCodeLike(s) {
  135. return /^\d{4,9}$/.test(String(s || '').trim())
  136. }
  137. function buildTunnelProxyOpts(settings) {
  138. const { tunnelServer, authUser, authPassword } = getQgConfig()
  139. if (!tunnelServer) return null
  140. const base = axiosProxyOptsFromServer(tunnelServer, false)
  141. if (!base.proxy) return null
  142. if (authUser && authPassword) {
  143. let password = authPassword
  144. const area = String(settings?.area || '').trim()
  145. const areaEx = String(settings?.area_ex || '').trim()
  146. /**
  147. * 参考隧道代理接入:普通模式指定地区通过 user:password:A<area>@server。
  148. * 仅当 area 为单个纯数字编码且未配置 area_ex 时尝试附加,避免把旧的多地区逗号表达式拼坏。
  149. */
  150. if (area && !areaEx && isAreaCodeLike(area)) {
  151. password = `${password}:A${area}`
  152. }
  153. base.proxy.auth = { username: authUser, password }
  154. }
  155. return base
  156. }
  157. async function recordLog({ event, server = null, deadline = null, detail = null }) {
  158. try {
  159. const detailStr =
  160. typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000)
  161. await db.query(
  162. `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`,
  163. [Date.now(), event, server, deadline, detailStr]
  164. )
  165. } catch (e) {
  166. logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`)
  167. }
  168. }
  169. async function getCachedParsed() {
  170. const raw = await Redis.get(REDIS_CURRENT)
  171. if (!raw) return null
  172. try {
  173. return JSON.parse(raw)
  174. } catch {
  175. return null
  176. }
  177. }
  178. function cacheStillValid(parsed) {
  179. if (!parsed || !parsed.server) return false
  180. const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline)
  181. if (ms) return Date.now() < ms
  182. const fetchedAt = Number(parsed.fetchedAt || 0)
  183. if (!Number.isFinite(fetchedAt) || fetchedAt <= 0) return true
  184. return Date.now() - fetchedAt < REDIS_SERVER_TTL_SEC * 1000
  185. }
  186. async function acquireFetchLock() {
  187. for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) {
  188. const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC })
  189. if (ok) return true
  190. await sleep(LOCK_WAIT_MS)
  191. }
  192. return false
  193. }
  194. async function releaseFetchLock() {
  195. try {
  196. await Redis.del(REDIS_LOCK)
  197. } catch (e) {
  198. logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`)
  199. }
  200. }
  201. /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明)。REQUEST_LIMIT_EXCEEDED 再刷 /get 会恶化限流,不在此列。 */
  202. const RETRYABLE_EXTRACT_CODES = new Set([
  203. 'NO_AVAILABLE_CHANNEL',
  204. 'INTERNAL_ERROR',
  205. 'FAILED_OPERATION',
  206. 'NO_RESOURCE_FOUND'
  207. ])
  208. function buildGetParams(settings) {
  209. const { extractKey } = getQgConfig()
  210. const params = {
  211. key: extractKey,
  212. num: 1
  213. }
  214. const area = String(settings.area || '').trim()
  215. const areaEx = String(settings.area_ex || '').trim()
  216. if (area) params.area = area
  217. if (areaEx) params.area_ex = areaEx
  218. const isp = settings.isp
  219. if (isp === 1 || isp === 2 || isp === 3) params.isp = isp
  220. params.distinct = Number(settings.distinct_extract) === 1
  221. return params
  222. }
  223. async function extractOnce(settings) {
  224. const params = buildGetParams(settings)
  225. const res = await axios.get(QG_POOL_URL, {
  226. params,
  227. timeout: 20000,
  228. proxy: false,
  229. validateStatus: () => true
  230. })
  231. const body = res.data
  232. const code = body?.code
  233. if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) {
  234. return { body, item: body.data[0], code }
  235. }
  236. const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`)
  237. err.qgCode = code
  238. err.retryable = RETRYABLE_EXTRACT_CODES.has(code)
  239. throw err
  240. }
  241. function warnIfTlsVerifyDisabled() {
  242. if (warnedTlsRejectUnauthorized) return
  243. if (process.env.NODE_TLS_REJECT_UNAUTHORIZED === '0') {
  244. warnedTlsRejectUnauthorized = true
  245. logger.warn(
  246. '[QgProxy] 检测到 NODE_TLS_REJECT_UNAUTHORIZED=0,将关闭 TLS 证书校验,存在中间人风险;生产环境建议移除此环境变量。'
  247. )
  248. }
  249. }
  250. /**
  251. * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources
  252. */
  253. async function fetchResourceAreas() {
  254. const { extractKey } = getQgConfig()
  255. if (!extractKey) {
  256. throw new Error('config 未配置 qgChannelProxy.extractKey')
  257. }
  258. const res = await axios.get(QG_RESOURCES_URL, {
  259. params: { key: extractKey },
  260. timeout: 20000,
  261. proxy: false,
  262. validateStatus: () => true
  263. })
  264. const body = res.data
  265. if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) {
  266. throw new Error(body?.code || '青果 resources 查询失败')
  267. }
  268. return body.data
  269. }
  270. /**
  271. * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey)
  272. */
  273. async function isOutboundProxyEnabled() {
  274. if (!hasExtractCredentials() && !(hasTunnelServer() && hasProxyAuth())) return false
  275. const row = await loadSettings()
  276. return getProjectProxyEnabledFromSettings(row)
  277. }
  278. /**
  279. * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false }。
  280. * 隧道池模式:服务商后台自动切换出口,本地仅缓存入口 server,不做频繁刷新。
  281. * @param {{ forceRefresh?: boolean }} opt
  282. */
  283. async function getOutboundAxiosFragment(opt = {}) {
  284. const forceRefresh = opt.forceRefresh === true
  285. warnIfTlsVerifyDisabled()
  286. if (!hasExtractCredentials() && !(hasTunnelServer() && hasProxyAuth())) return { proxy: false }
  287. const settings = await loadSettings()
  288. if (!getProjectProxyEnabledFromSettings(settings)) return { proxy: false }
  289. // 隧道代理模式:直接使用隧道入口地址,不需要 /pool 提取。
  290. const tunnelFrag = buildTunnelProxyOpts(settings)
  291. if (tunnelFrag?.proxy) {
  292. const payload = {
  293. server: `${tunnelFrag.proxy.host}:${tunnelFrag.proxy.port}`,
  294. deadline: '',
  295. deadlineMs: null,
  296. proxyIp: null,
  297. requestId: null,
  298. fetchedAt: Date.now(),
  299. mode: 'tunnel'
  300. }
  301. await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.max(60, REDIS_SERVER_TTL_SEC) })
  302. return tunnelFrag
  303. }
  304. const cachedFast = await getCachedParsed()
  305. if (cacheStillValid(cachedFast)) {
  306. if (forceRefresh) {
  307. logger.info(`[QgProxy] 隧道池忽略 forceRefresh,复用入口 server=${cachedFast.server}`)
  308. }
  309. return axiosProxyOptsFromServer(cachedFast.server, hasProxyAuth())
  310. }
  311. if (cachedFast?.server) {
  312. logger.info(`[QgProxy] 缓存入口失效,将重新提取。原server=${cachedFast.server}`)
  313. }
  314. const maxAttempts = 5
  315. let lastErr
  316. for (let attempt = 1; attempt <= maxAttempts; attempt++) {
  317. const locked = await acquireFetchLock()
  318. if (!locked) {
  319. logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理')
  320. const cached = await getCachedParsed()
  321. if (cached?.server) {
  322. logger.warn(
  323. `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)`
  324. )
  325. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  326. }
  327. return { proxy: false }
  328. }
  329. let shouldBackoff = false
  330. try {
  331. {
  332. const cached = await getCachedParsed()
  333. if (cacheStillValid(cached)) {
  334. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  335. }
  336. }
  337. logger.info(`[QgProxy] 调用青果 /pool 提取代理资源... forceRefresh=${forceRefresh}`)
  338. const { body, item, code } = await extractOnce(settings)
  339. const server = item.server
  340. const deadline = item.deadline
  341. const proxyIp = item.proxy_ip
  342. const requestId = body.request_id
  343. if (!server) throw new Error('青果返回无 server 字段')
  344. const deadlineMs = parseDeadlineMs(deadline)
  345. const ttlSec = REDIS_SERVER_TTL_SEC
  346. const payload = {
  347. server,
  348. deadline: deadline || '',
  349. deadlineMs: deadlineMs || null,
  350. proxyIp: proxyIp || null,
  351. requestId: requestId || null,
  352. fetchedAt: Date.now()
  353. }
  354. await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.max(60, ttlSec) })
  355. if (attempt > 1) {
  356. logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`)
  357. }
  358. logger.info(
  359. `[QgProxy] 已获取隧道入口 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}`
  360. )
  361. await recordLog({
  362. event: 'fetch',
  363. server,
  364. deadline: deadline || null,
  365. detail: { request_id: requestId, proxy_ip: proxyIp, code }
  366. })
  367. return axiosProxyOptsFromServer(server, hasProxyAuth())
  368. } catch (e) {
  369. lastErr = e
  370. shouldBackoff = e.retryable === true && attempt < maxAttempts
  371. if (shouldBackoff) {
  372. const backoff = Math.min(2000, 280 * attempt * attempt)
  373. logger.warn(`[QgProxy] /pool 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`)
  374. } else {
  375. logger.error(`青果拉取异常: ${e.stack || e}`)
  376. throw e
  377. }
  378. } finally {
  379. await releaseFetchLock()
  380. }
  381. if (shouldBackoff) {
  382. const backoff = Math.min(2000, 280 * attempt * attempt)
  383. await sleep(backoff)
  384. }
  385. }
  386. throw lastErr || new Error('青果提取失败')
  387. }
  388. async function invalidateCurrent(reason, detail) {
  389. if (reason === 'request_fail' || reason === 'retry_round_post_fail' || reason === 'extra_round_fail') {
  390. const detailObj =
  391. typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  392. logger.info(
  393. `[QgProxy] 隧道池模式忽略入口作废 reason=${reason} detail=${JSON.stringify(detailObj)}`
  394. )
  395. await recordLog({
  396. event: 'invalidate',
  397. detail: { reason, ignored_in_tunnel_pool: true, ...detailObj }
  398. })
  399. return
  400. }
  401. let prev = null
  402. try {
  403. prev = await getCachedParsed()
  404. await Redis.del(REDIS_CURRENT)
  405. } catch (e) {
  406. logger.warn(`清空青果缓存失败: ${e.message || e}`)
  407. }
  408. const detailObj =
  409. typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  410. logger.info(
  411. `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}`
  412. )
  413. await recordLog({
  414. event: 'invalidate',
  415. server: prev?.server ?? null,
  416. deadline: prev?.deadline ?? null,
  417. detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null }
  418. })
  419. }
  420. async function recordFallbackDirect(detail) {
  421. const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  422. const tid = d.trace_id || d.mq_task_id
  423. const head = tid ? `[${tid}] ` : ''
  424. logger.warn(`${head}[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`)
  425. await recordLog({
  426. event: 'fallback_direct',
  427. detail: d
  428. })
  429. }
  430. async function getStatusSnapshot() {
  431. await ensureSettingsRow()
  432. const row = await loadSettings()
  433. const cached = await getCachedParsed()
  434. let lastFetch = null
  435. try {
  436. const lr = await db.query(
  437. `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1`
  438. )
  439. lastFetch = lr?.[0] || null
  440. } catch {
  441. lastFetch = null
  442. }
  443. return {
  444. project_scope_key: getProjectKey(),
  445. proxy_enabled: getProjectProxyEnabledFromSettings(row),
  446. proxy_enabled_default: row ? Number(row.proxy_enabled) === 1 : false,
  447. project_proxy_updated_at: row?.project_updated_at ?? 0,
  448. area: row?.area ?? '',
  449. area_ex: row?.area_ex ?? '',
  450. isp: row?.isp == null ? null : Number(row.isp),
  451. distinct_extract: row ? Number(row.distinct_extract) === 1 : true,
  452. updated_at: row?.updated_at ?? 0,
  453. extract_key_configured: hasExtractCredentials(),
  454. tunnel_server_configured: hasTunnelServer(),
  455. proxy_auth_configured: hasProxyAuth(),
  456. redis_current: cached && cacheStillValid(cached) ? cached : cached,
  457. last_fetch_log: lastFetch
  458. }
  459. }
  460. module.exports = {
  461. getQgConfig,
  462. hasExtractCredentials,
  463. hasProxyAuth,
  464. getProjectKey,
  465. getProjectProxyEnabledFromSettings,
  466. ensureSettingsRow,
  467. loadSettings,
  468. isOutboundProxyEnabled,
  469. getOutboundAxiosFragment,
  470. invalidateCurrent,
  471. recordFallbackDirect,
  472. recordLog,
  473. getStatusSnapshot,
  474. parseDeadlineMs,
  475. cacheStillValid,
  476. getCachedParsed,
  477. fetchResourceAreas,
  478. REDIS_CURRENT
  479. }