QgProxyManager.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. const axios = require('axios')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const db = require('../../plugin/DataBase/db')
  5. const Redis = require('../../plugin/DataBase/Redis')
  6. const Logger = require('../Logger')
  7. const QG_POOL_URL = 'https://share.proxy.qg.net/pool'
  8. const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources'
  9. const REDIS_CURRENT = 'lepao:qg:current'
  10. const REDIS_LOCK = 'lepao:qg:fetch_lock'
  11. /** 隧道池入口地址缓存时长(服务商后台自动换出口,不依赖本地 deadline 刷新) */
  12. const REDIS_SERVER_TTL_SEC = 6 * 60 * 60
  13. /** 仅覆盖单次 /get(含 axios 超时),避免长持锁阻塞其它任务 */
  14. const LOCK_TTL_SEC = 45
  15. const LOCK_WAIT_ROUNDS = 40
  16. const LOCK_WAIT_MS = 150
  17. let warnedTlsRejectUnauthorized = false
  18. const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO')
  19. function sleep(ms) {
  20. return new Promise(r => setTimeout(r, ms))
  21. }
  22. function getQgConfig() {
  23. const q = config.qgChannelProxy
  24. if (!q || typeof q !== 'object') return {}
  25. return {
  26. extractKey: String(q.extractKey || '').trim(),
  27. authUser: String(q.authUser || '').trim(),
  28. authPassword: String(q.authPassword || '').trim(),
  29. tunnelServer: String(q.tunnelServer || q.server || '').trim()
  30. }
  31. }
  32. function hasExtractCredentials() {
  33. const { extractKey } = getQgConfig()
  34. return extractKey.length > 0
  35. }
  36. function hasProxyAuth() {
  37. const { authUser, authPassword } = getQgConfig()
  38. return authUser.length > 0 && authPassword.length > 0
  39. }
  40. function hasTunnelServer() {
  41. const { tunnelServer } = getQgConfig()
  42. return tunnelServer.length > 0
  43. }
  44. async function ensureSettingsRow() {
  45. const now = Date.now()
  46. await db.query(
  47. `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at)
  48. VALUES (1, 0, '', '', NULL, 1, ?)`,
  49. [now]
  50. )
  51. }
  52. async function loadSettings() {
  53. await ensureSettingsRow()
  54. const rows = await db.query(
  55. `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1`
  56. )
  57. return rows?.[0] || null
  58. }
  59. function parseDeadlineMs(deadlineStr) {
  60. if (!deadlineStr || typeof deadlineStr !== 'string') return null
  61. const isoish = deadlineStr.trim().replace(' ', 'T')
  62. const ms = Date.parse(isoish)
  63. return Number.isFinite(ms) ? ms : null
  64. }
  65. function axiosProxyOptsFromServer(server, useAuth) {
  66. if (!server || typeof server !== 'string') return { proxy: false }
  67. const parts = server.trim().split(':')
  68. const host = parts[0]
  69. const portNum = Number(parts[1])
  70. if (!host || !Number.isFinite(portNum)) return { proxy: false }
  71. const opt = {
  72. proxy: {
  73. protocol: 'http',
  74. host,
  75. port: portNum
  76. }
  77. }
  78. if (useAuth) {
  79. const { authUser, authPassword } = getQgConfig()
  80. opt.proxy.auth = { username: authUser, password: authPassword }
  81. }
  82. return opt
  83. }
  84. function isAreaCodeLike(s) {
  85. return /^\d{4,9}$/.test(String(s || '').trim())
  86. }
  87. function buildTunnelProxyOpts(settings) {
  88. const { tunnelServer, authUser, authPassword } = getQgConfig()
  89. if (!tunnelServer) return null
  90. const base = axiosProxyOptsFromServer(tunnelServer, false)
  91. if (!base.proxy) return null
  92. if (authUser && authPassword) {
  93. let password = authPassword
  94. const area = String(settings?.area || '').trim()
  95. const areaEx = String(settings?.area_ex || '').trim()
  96. /**
  97. * 参考隧道代理接入:普通模式指定地区通过 user:password:A<area>@server。
  98. * 仅当 area 为单个纯数字编码且未配置 area_ex 时尝试附加,避免把旧的多地区逗号表达式拼坏。
  99. */
  100. if (area && !areaEx && isAreaCodeLike(area)) {
  101. password = `${password}:A${area}`
  102. }
  103. base.proxy.auth = { username: authUser, password }
  104. }
  105. return base
  106. }
  107. async function recordLog({ event, server = null, deadline = null, detail = null }) {
  108. try {
  109. const detailStr =
  110. typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000)
  111. await db.query(
  112. `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`,
  113. [Date.now(), event, server, deadline, detailStr]
  114. )
  115. } catch (e) {
  116. logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`)
  117. }
  118. }
  119. async function getCachedParsed() {
  120. const raw = await Redis.get(REDIS_CURRENT)
  121. if (!raw) return null
  122. try {
  123. return JSON.parse(raw)
  124. } catch {
  125. return null
  126. }
  127. }
  128. function cacheStillValid(parsed) {
  129. if (!parsed || !parsed.server) return false
  130. const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline)
  131. if (ms) return Date.now() < ms
  132. const fetchedAt = Number(parsed.fetchedAt || 0)
  133. if (!Number.isFinite(fetchedAt) || fetchedAt <= 0) return true
  134. return Date.now() - fetchedAt < REDIS_SERVER_TTL_SEC * 1000
  135. }
  136. async function acquireFetchLock() {
  137. for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) {
  138. const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC })
  139. if (ok) return true
  140. await sleep(LOCK_WAIT_MS)
  141. }
  142. return false
  143. }
  144. async function releaseFetchLock() {
  145. try {
  146. await Redis.del(REDIS_LOCK)
  147. } catch (e) {
  148. logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`)
  149. }
  150. }
  151. /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明)。REQUEST_LIMIT_EXCEEDED 再刷 /get 会恶化限流,不在此列。 */
  152. const RETRYABLE_EXTRACT_CODES = new Set([
  153. 'NO_AVAILABLE_CHANNEL',
  154. 'INTERNAL_ERROR',
  155. 'FAILED_OPERATION',
  156. 'NO_RESOURCE_FOUND'
  157. ])
  158. function buildGetParams(settings) {
  159. const { extractKey } = getQgConfig()
  160. const params = {
  161. key: extractKey,
  162. num: 1
  163. }
  164. const area = String(settings.area || '').trim()
  165. const areaEx = String(settings.area_ex || '').trim()
  166. if (area) params.area = area
  167. if (areaEx) params.area_ex = areaEx
  168. const isp = settings.isp
  169. if (isp === 1 || isp === 2 || isp === 3) params.isp = isp
  170. params.distinct = Number(settings.distinct_extract) === 1
  171. return params
  172. }
  173. async function extractOnce(settings) {
  174. const params = buildGetParams(settings)
  175. const res = await axios.get(QG_POOL_URL, {
  176. params,
  177. timeout: 20000,
  178. proxy: false,
  179. validateStatus: () => true
  180. })
  181. const body = res.data
  182. const code = body?.code
  183. if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) {
  184. return { body, item: body.data[0], code }
  185. }
  186. const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`)
  187. err.qgCode = code
  188. err.retryable = RETRYABLE_EXTRACT_CODES.has(code)
  189. throw err
  190. }
  191. function warnIfTlsVerifyDisabled() {
  192. if (warnedTlsRejectUnauthorized) return
  193. if (process.env.NODE_TLS_REJECT_UNAUTHORIZED === '0') {
  194. warnedTlsRejectUnauthorized = true
  195. logger.warn(
  196. '[QgProxy] 检测到 NODE_TLS_REJECT_UNAUTHORIZED=0,将关闭 TLS 证书校验,存在中间人风险;生产环境建议移除此环境变量。'
  197. )
  198. }
  199. }
  200. /**
  201. * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources
  202. */
  203. async function fetchResourceAreas() {
  204. const { extractKey } = getQgConfig()
  205. if (!extractKey) {
  206. throw new Error('config 未配置 qgChannelProxy.extractKey')
  207. }
  208. const res = await axios.get(QG_RESOURCES_URL, {
  209. params: { key: extractKey },
  210. timeout: 20000,
  211. proxy: false,
  212. validateStatus: () => true
  213. })
  214. const body = res.data
  215. if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) {
  216. throw new Error(body?.code || '青果 resources 查询失败')
  217. }
  218. return body.data
  219. }
  220. /**
  221. * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey)
  222. */
  223. async function isOutboundProxyEnabled() {
  224. if (!hasExtractCredentials() && !(hasTunnelServer() && hasProxyAuth())) return false
  225. const row = await loadSettings()
  226. return row && Number(row.proxy_enabled) === 1
  227. }
  228. /**
  229. * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false }。
  230. * 隧道池模式:服务商后台自动切换出口,本地仅缓存入口 server,不做频繁刷新。
  231. * @param {{ forceRefresh?: boolean }} opt
  232. */
  233. async function getOutboundAxiosFragment(opt = {}) {
  234. const forceRefresh = opt.forceRefresh === true
  235. warnIfTlsVerifyDisabled()
  236. if (!hasExtractCredentials() && !(hasTunnelServer() && hasProxyAuth())) return { proxy: false }
  237. const settings = await loadSettings()
  238. if (!settings || Number(settings.proxy_enabled) !== 1) return { proxy: false }
  239. // 隧道代理模式:直接使用隧道入口地址,不需要 /pool 提取。
  240. const tunnelFrag = buildTunnelProxyOpts(settings)
  241. if (tunnelFrag?.proxy) {
  242. const payload = {
  243. server: `${tunnelFrag.proxy.host}:${tunnelFrag.proxy.port}`,
  244. deadline: '',
  245. deadlineMs: null,
  246. proxyIp: null,
  247. requestId: null,
  248. fetchedAt: Date.now(),
  249. mode: 'tunnel'
  250. }
  251. await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.max(60, REDIS_SERVER_TTL_SEC) })
  252. return tunnelFrag
  253. }
  254. const cachedFast = await getCachedParsed()
  255. if (cacheStillValid(cachedFast)) {
  256. if (forceRefresh) {
  257. logger.info(`[QgProxy] 隧道池忽略 forceRefresh,复用入口 server=${cachedFast.server}`)
  258. }
  259. return axiosProxyOptsFromServer(cachedFast.server, hasProxyAuth())
  260. }
  261. if (cachedFast?.server) {
  262. logger.info(`[QgProxy] 缓存入口失效,将重新提取。原server=${cachedFast.server}`)
  263. }
  264. const maxAttempts = 5
  265. let lastErr
  266. for (let attempt = 1; attempt <= maxAttempts; attempt++) {
  267. const locked = await acquireFetchLock()
  268. if (!locked) {
  269. logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理')
  270. const cached = await getCachedParsed()
  271. if (cached?.server) {
  272. logger.warn(
  273. `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)`
  274. )
  275. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  276. }
  277. return { proxy: false }
  278. }
  279. let shouldBackoff = false
  280. try {
  281. {
  282. const cached = await getCachedParsed()
  283. if (cacheStillValid(cached)) {
  284. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  285. }
  286. }
  287. logger.info(`[QgProxy] 调用青果 /pool 提取代理资源... forceRefresh=${forceRefresh}`)
  288. const { body, item, code } = await extractOnce(settings)
  289. const server = item.server
  290. const deadline = item.deadline
  291. const proxyIp = item.proxy_ip
  292. const requestId = body.request_id
  293. if (!server) throw new Error('青果返回无 server 字段')
  294. const deadlineMs = parseDeadlineMs(deadline)
  295. const ttlSec = REDIS_SERVER_TTL_SEC
  296. const payload = {
  297. server,
  298. deadline: deadline || '',
  299. deadlineMs: deadlineMs || null,
  300. proxyIp: proxyIp || null,
  301. requestId: requestId || null,
  302. fetchedAt: Date.now()
  303. }
  304. await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.max(60, ttlSec) })
  305. if (attempt > 1) {
  306. logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`)
  307. }
  308. logger.info(
  309. `[QgProxy] 已获取隧道入口 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}`
  310. )
  311. await recordLog({
  312. event: 'fetch',
  313. server,
  314. deadline: deadline || null,
  315. detail: { request_id: requestId, proxy_ip: proxyIp, code }
  316. })
  317. return axiosProxyOptsFromServer(server, hasProxyAuth())
  318. } catch (e) {
  319. lastErr = e
  320. shouldBackoff = e.retryable === true && attempt < maxAttempts
  321. if (shouldBackoff) {
  322. const backoff = Math.min(2000, 280 * attempt * attempt)
  323. logger.warn(`[QgProxy] /pool 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`)
  324. } else {
  325. logger.error(`青果拉取异常: ${e.stack || e}`)
  326. throw e
  327. }
  328. } finally {
  329. await releaseFetchLock()
  330. }
  331. if (shouldBackoff) {
  332. const backoff = Math.min(2000, 280 * attempt * attempt)
  333. await sleep(backoff)
  334. }
  335. }
  336. throw lastErr || new Error('青果提取失败')
  337. }
  338. async function invalidateCurrent(reason, detail) {
  339. if (reason === 'request_fail' || reason === 'retry_round_post_fail' || reason === 'extra_round_fail') {
  340. const detailObj =
  341. typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  342. logger.info(
  343. `[QgProxy] 隧道池模式忽略入口作废 reason=${reason} detail=${JSON.stringify(detailObj)}`
  344. )
  345. await recordLog({
  346. event: 'invalidate',
  347. detail: { reason, ignored_in_tunnel_pool: true, ...detailObj }
  348. })
  349. return
  350. }
  351. let prev = null
  352. try {
  353. prev = await getCachedParsed()
  354. await Redis.del(REDIS_CURRENT)
  355. } catch (e) {
  356. logger.warn(`清空青果缓存失败: ${e.message || e}`)
  357. }
  358. const detailObj =
  359. typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  360. logger.info(
  361. `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}`
  362. )
  363. await recordLog({
  364. event: 'invalidate',
  365. server: prev?.server ?? null,
  366. deadline: prev?.deadline ?? null,
  367. detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null }
  368. })
  369. }
  370. async function recordFallbackDirect(detail) {
  371. const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  372. logger.warn(`[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`)
  373. await recordLog({
  374. event: 'fallback_direct',
  375. detail: d
  376. })
  377. }
  378. async function getStatusSnapshot() {
  379. await ensureSettingsRow()
  380. const row = await loadSettings()
  381. const cached = await getCachedParsed()
  382. let lastFetch = null
  383. try {
  384. const lr = await db.query(
  385. `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1`
  386. )
  387. lastFetch = lr?.[0] || null
  388. } catch {
  389. lastFetch = null
  390. }
  391. return {
  392. proxy_enabled: row ? Number(row.proxy_enabled) === 1 : false,
  393. area: row?.area ?? '',
  394. area_ex: row?.area_ex ?? '',
  395. isp: row?.isp == null ? null : Number(row.isp),
  396. distinct_extract: row ? Number(row.distinct_extract) === 1 : true,
  397. updated_at: row?.updated_at ?? 0,
  398. extract_key_configured: hasExtractCredentials(),
  399. tunnel_server_configured: hasTunnelServer(),
  400. proxy_auth_configured: hasProxyAuth(),
  401. redis_current: cached && cacheStillValid(cached) ? cached : cached,
  402. last_fetch_log: lastFetch
  403. }
  404. }
  405. module.exports = {
  406. getQgConfig,
  407. hasExtractCredentials,
  408. hasProxyAuth,
  409. ensureSettingsRow,
  410. loadSettings,
  411. isOutboundProxyEnabled,
  412. getOutboundAxiosFragment,
  413. invalidateCurrent,
  414. recordFallbackDirect,
  415. recordLog,
  416. getStatusSnapshot,
  417. parseDeadlineMs,
  418. cacheStillValid,
  419. getCachedParsed,
  420. fetchResourceAreas,
  421. REDIS_CURRENT
  422. }