QgProxyManager.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. const axios = require('axios')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const db = require('../../plugin/DataBase/db')
  5. const Redis = require('../../plugin/DataBase/Redis')
  6. const Logger = require('../Logger')
  7. const QG_GET_URL = 'https://share.proxy.qg.net/get'
  8. const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources'
  9. const REDIS_CURRENT = 'lepao:qg:current'
  10. const REDIS_LOCK = 'lepao:qg:fetch_lock'
  11. /** 早于 deadline 提前刷新,尽量减少「边检边过期」(参考 [文档](https://www.qg.net/doc/1850.html)) */
  12. const DEADLINE_MARGIN_MS = 18_000
  13. /** 仅覆盖单次 /get(含 axios 超时),避免长持锁阻塞其它任务 */
  14. const LOCK_TTL_SEC = 45
  15. const LOCK_WAIT_ROUNDS = 40
  16. const LOCK_WAIT_MS = 150
  17. let warnedTlsRejectUnauthorized = false
  18. const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO')
  19. function sleep(ms) {
  20. return new Promise(r => setTimeout(r, ms))
  21. }
  22. function getQgConfig() {
  23. const q = config.qgChannelProxy
  24. if (!q || typeof q !== 'object') return {}
  25. return {
  26. extractKey: String(q.extractKey || '').trim(),
  27. authUser: String(q.authUser || '').trim(),
  28. authPassword: String(q.authPassword || '').trim()
  29. }
  30. }
  31. function hasExtractCredentials() {
  32. const { extractKey } = getQgConfig()
  33. return extractKey.length > 0
  34. }
  35. function hasProxyAuth() {
  36. const { authUser, authPassword } = getQgConfig()
  37. return authUser.length > 0 && authPassword.length > 0
  38. }
  39. async function ensureSettingsRow() {
  40. const now = Date.now()
  41. await db.query(
  42. `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at)
  43. VALUES (1, 0, '', '', NULL, 1, ?)`,
  44. [now]
  45. )
  46. }
  47. async function loadSettings() {
  48. await ensureSettingsRow()
  49. const rows = await db.query(
  50. `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1`
  51. )
  52. return rows?.[0] || null
  53. }
  54. function parseDeadlineMs(deadlineStr) {
  55. if (!deadlineStr || typeof deadlineStr !== 'string') return null
  56. const isoish = deadlineStr.trim().replace(' ', 'T')
  57. const ms = Date.parse(isoish)
  58. return Number.isFinite(ms) ? ms : null
  59. }
  60. function axiosProxyOptsFromServer(server, useAuth) {
  61. if (!server || typeof server !== 'string') return { proxy: false }
  62. const parts = server.trim().split(':')
  63. const host = parts[0]
  64. const portNum = Number(parts[1])
  65. if (!host || !Number.isFinite(portNum)) return { proxy: false }
  66. const opt = {
  67. proxy: {
  68. protocol: 'http',
  69. host,
  70. port: portNum
  71. }
  72. }
  73. if (useAuth) {
  74. const { authUser, authPassword } = getQgConfig()
  75. opt.proxy.auth = { username: authUser, password: authPassword }
  76. }
  77. return opt
  78. }
  79. async function recordLog({ event, server = null, deadline = null, detail = null }) {
  80. try {
  81. const detailStr =
  82. typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000)
  83. await db.query(
  84. `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`,
  85. [Date.now(), event, server, deadline, detailStr]
  86. )
  87. } catch (e) {
  88. logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`)
  89. }
  90. }
  91. async function getCachedParsed() {
  92. const raw = await Redis.get(REDIS_CURRENT)
  93. if (!raw) return null
  94. try {
  95. return JSON.parse(raw)
  96. } catch {
  97. return null
  98. }
  99. }
  100. function cacheStillValid(parsed) {
  101. if (!parsed || !parsed.server) return false
  102. const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline)
  103. if (!ms) return false
  104. return Date.now() + DEADLINE_MARGIN_MS < ms
  105. }
  106. async function acquireFetchLock() {
  107. for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) {
  108. const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC })
  109. if (ok) return true
  110. await sleep(LOCK_WAIT_MS)
  111. }
  112. return false
  113. }
  114. async function releaseFetchLock() {
  115. try {
  116. await Redis.del(REDIS_LOCK)
  117. } catch (e) {
  118. logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`)
  119. }
  120. }
  121. /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明)。REQUEST_LIMIT_EXCEEDED 再刷 /get 会恶化限流,不在此列。 */
  122. const RETRYABLE_EXTRACT_CODES = new Set([
  123. 'NO_AVAILABLE_CHANNEL',
  124. 'INTERNAL_ERROR',
  125. 'FAILED_OPERATION',
  126. 'NO_RESOURCE_FOUND'
  127. ])
  128. function buildGetParams(settings) {
  129. const { extractKey } = getQgConfig()
  130. const params = {
  131. key: extractKey,
  132. num: 1
  133. }
  134. const area = String(settings.area || '').trim()
  135. const areaEx = String(settings.area_ex || '').trim()
  136. if (area) params.area = area
  137. if (areaEx) params.area_ex = areaEx
  138. const isp = settings.isp
  139. if (isp === 1 || isp === 2 || isp === 3) params.isp = isp
  140. params.distinct = Number(settings.distinct_extract) === 1
  141. return params
  142. }
  143. async function extractOnce(settings) {
  144. const params = buildGetParams(settings)
  145. const res = await axios.get(QG_GET_URL, {
  146. params,
  147. timeout: 20000,
  148. proxy: false,
  149. validateStatus: () => true
  150. })
  151. const body = res.data
  152. const code = body?.code
  153. if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) {
  154. return { body, item: body.data[0], code }
  155. }
  156. const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`)
  157. err.qgCode = code
  158. err.retryable = RETRYABLE_EXTRACT_CODES.has(code)
  159. throw err
  160. }
  161. function warnIfTlsVerifyDisabled() {
  162. if (warnedTlsRejectUnauthorized) return
  163. if (process.env.NODE_TLS_REJECT_UNAUTHORIZED === '0') {
  164. warnedTlsRejectUnauthorized = true
  165. logger.warn(
  166. '[QgProxy] 检测到 NODE_TLS_REJECT_UNAUTHORIZED=0,将关闭 TLS 证书校验,存在中间人风险;生产环境建议移除此环境变量。'
  167. )
  168. }
  169. }
  170. /**
  171. * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources
  172. */
  173. async function fetchResourceAreas() {
  174. const { extractKey } = getQgConfig()
  175. if (!extractKey) {
  176. throw new Error('config 未配置 qgChannelProxy.extractKey')
  177. }
  178. const res = await axios.get(QG_RESOURCES_URL, {
  179. params: { key: extractKey },
  180. timeout: 20000,
  181. proxy: false,
  182. validateStatus: () => true
  183. })
  184. const body = res.data
  185. if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) {
  186. throw new Error(body?.code || '青果 resources 查询失败')
  187. }
  188. return body.data
  189. }
  190. /**
  191. * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey)
  192. */
  193. async function isOutboundProxyEnabled() {
  194. if (!hasExtractCredentials()) return false
  195. const row = await loadSettings()
  196. return row && Number(row.proxy_enabled) === 1
  197. }
  198. /**
  199. * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false }
  200. * @param {{ forceRefresh?: boolean }} opt
  201. */
  202. async function getOutboundAxiosFragment(opt = {}) {
  203. const forceRefresh = opt.forceRefresh === true
  204. warnIfTlsVerifyDisabled()
  205. if (!hasExtractCredentials()) return { proxy: false }
  206. const settings = await loadSettings()
  207. if (!settings || Number(settings.proxy_enabled) !== 1) return { proxy: false }
  208. if (!forceRefresh) {
  209. const cached = await getCachedParsed()
  210. if (cacheStillValid(cached)) {
  211. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  212. }
  213. if (cached?.server) {
  214. const ms = cached.deadlineMs || parseDeadlineMs(cached.deadline)
  215. logger.info(
  216. `[QgProxy] 缓存代理已失效或临近截止,将重新提取。原节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'} deadline_ms=${ms ?? '—'}`
  217. )
  218. }
  219. }
  220. const maxAttempts = 5
  221. let lastErr
  222. for (let attempt = 1; attempt <= maxAttempts; attempt++) {
  223. const locked = await acquireFetchLock()
  224. if (!locked) {
  225. logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理')
  226. const cached = await getCachedParsed()
  227. if (cached?.server) {
  228. logger.warn(
  229. `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)`
  230. )
  231. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  232. }
  233. return { proxy: false }
  234. }
  235. let shouldBackoff = false
  236. try {
  237. if (!forceRefresh) {
  238. const cached = await getCachedParsed()
  239. if (cacheStillValid(cached)) {
  240. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  241. }
  242. }
  243. logger.info(`[QgProxy] 调用青果 /get 拉取新代理... forceRefresh=${forceRefresh}`)
  244. const { body, item, code } = await extractOnce(settings)
  245. const server = item.server
  246. const deadline = item.deadline
  247. const proxyIp = item.proxy_ip
  248. const requestId = body.request_id
  249. if (!server) throw new Error('青果返回无 server 字段')
  250. const deadlineMs = parseDeadlineMs(deadline)
  251. const ttlSec = deadlineMs
  252. ? Math.max(15, Math.floor((deadlineMs - Date.now()) / 1000) - 2)
  253. : 55
  254. const payload = {
  255. server,
  256. deadline: deadline || '',
  257. deadlineMs: deadlineMs || null,
  258. proxyIp: proxyIp || null,
  259. requestId: requestId || null,
  260. fetchedAt: Date.now()
  261. }
  262. await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.min(120, Math.max(15, ttlSec)) })
  263. if (attempt > 1) {
  264. logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`)
  265. }
  266. logger.info(
  267. `[QgProxy] 已获取代理节点 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}`
  268. )
  269. await recordLog({
  270. event: 'fetch',
  271. server,
  272. deadline: deadline || null,
  273. detail: { request_id: requestId, proxy_ip: proxyIp, code }
  274. })
  275. return axiosProxyOptsFromServer(server, hasProxyAuth())
  276. } catch (e) {
  277. lastErr = e
  278. shouldBackoff = e.retryable === true && attempt < maxAttempts
  279. if (shouldBackoff) {
  280. const backoff = Math.min(2000, 280 * attempt * attempt)
  281. logger.warn(`[QgProxy] /get 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`)
  282. } else {
  283. logger.error(`青果拉取异常: ${e.stack || e}`)
  284. throw e
  285. }
  286. } finally {
  287. await releaseFetchLock()
  288. }
  289. if (shouldBackoff) {
  290. const backoff = Math.min(2000, 280 * attempt * attempt)
  291. await sleep(backoff)
  292. }
  293. }
  294. throw lastErr || new Error('青果提取失败')
  295. }
  296. async function invalidateCurrent(reason, detail) {
  297. let prev = null
  298. try {
  299. prev = await getCachedParsed()
  300. await Redis.del(REDIS_CURRENT)
  301. } catch (e) {
  302. logger.warn(`清空青果缓存失败: ${e.message || e}`)
  303. }
  304. const detailObj =
  305. typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  306. logger.info(
  307. `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}`
  308. )
  309. await recordLog({
  310. event: 'invalidate',
  311. server: prev?.server ?? null,
  312. deadline: prev?.deadline ?? null,
  313. detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null }
  314. })
  315. }
  316. async function recordFallbackDirect(detail) {
  317. const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  318. logger.warn(`[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`)
  319. await recordLog({
  320. event: 'fallback_direct',
  321. detail: d
  322. })
  323. }
  324. async function getStatusSnapshot() {
  325. await ensureSettingsRow()
  326. const row = await loadSettings()
  327. const cached = await getCachedParsed()
  328. let lastFetch = null
  329. try {
  330. const lr = await db.query(
  331. `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1`
  332. )
  333. lastFetch = lr?.[0] || null
  334. } catch {
  335. lastFetch = null
  336. }
  337. return {
  338. proxy_enabled: row ? Number(row.proxy_enabled) === 1 : false,
  339. area: row?.area ?? '',
  340. area_ex: row?.area_ex ?? '',
  341. isp: row?.isp == null ? null : Number(row.isp),
  342. distinct_extract: row ? Number(row.distinct_extract) === 1 : true,
  343. updated_at: row?.updated_at ?? 0,
  344. extract_key_configured: hasExtractCredentials(),
  345. proxy_auth_configured: hasProxyAuth(),
  346. redis_current: cached && cacheStillValid(cached) ? cached : cached,
  347. last_fetch_log: lastFetch
  348. }
  349. }
  350. module.exports = {
  351. getQgConfig,
  352. hasExtractCredentials,
  353. hasProxyAuth,
  354. ensureSettingsRow,
  355. loadSettings,
  356. isOutboundProxyEnabled,
  357. getOutboundAxiosFragment,
  358. invalidateCurrent,
  359. recordFallbackDirect,
  360. recordLog,
  361. getStatusSnapshot,
  362. parseDeadlineMs,
  363. cacheStillValid,
  364. getCachedParsed,
  365. fetchResourceAreas,
  366. REDIS_CURRENT
  367. }