QgProxyManager.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. const axios = require('axios')
  2. const path = require('path')
  3. const config = require('../../config.json')
  4. const db = require('../../plugin/DataBase/db')
  5. const Redis = require('../../plugin/DataBase/Redis')
  6. const Logger = require('../Logger')
  7. const QG_GET_URL = 'https://share.proxy.qg.net/get'
  8. const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources'
  9. const REDIS_CURRENT = 'lepao:qg:current'
  10. const REDIS_LOCK = 'lepao:qg:fetch_lock'
  11. /** 早于 deadline 提前刷新,尽量减少「边检边过期」(参考 [文档](https://www.qg.net/doc/1850.html)) */
  12. const DEADLINE_MARGIN_MS = 18_000
  13. const LOCK_TTL_SEC = 75
  14. const LOCK_WAIT_ROUNDS = 40
  15. const LOCK_WAIT_MS = 150
  16. const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO')
  17. function sleep(ms) {
  18. return new Promise(r => setTimeout(r, ms))
  19. }
  20. function getQgConfig() {
  21. const q = config.qgChannelProxy
  22. if (!q || typeof q !== 'object') return {}
  23. return {
  24. extractKey: String(q.extractKey || '').trim(),
  25. authUser: String(q.authUser || '').trim(),
  26. authPassword: String(q.authPassword || '').trim()
  27. }
  28. }
  29. function hasExtractCredentials() {
  30. const { extractKey } = getQgConfig()
  31. return extractKey.length > 0
  32. }
  33. function hasProxyAuth() {
  34. const { authUser, authPassword } = getQgConfig()
  35. return authUser.length > 0 && authPassword.length > 0
  36. }
  37. async function ensureSettingsRow() {
  38. const now = Date.now()
  39. await db.query(
  40. `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at)
  41. VALUES (1, 0, '', '', NULL, 1, ?)`,
  42. [now]
  43. )
  44. }
  45. async function loadSettings() {
  46. await ensureSettingsRow()
  47. const rows = await db.query(
  48. `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1`
  49. )
  50. return rows?.[0] || null
  51. }
  52. function parseDeadlineMs(deadlineStr) {
  53. if (!deadlineStr || typeof deadlineStr !== 'string') return null
  54. const isoish = deadlineStr.trim().replace(' ', 'T')
  55. const ms = Date.parse(isoish)
  56. return Number.isFinite(ms) ? ms : null
  57. }
  58. function axiosProxyOptsFromServer(server, useAuth) {
  59. if (!server || typeof server !== 'string') return { proxy: false }
  60. const parts = server.trim().split(':')
  61. const host = parts[0]
  62. const portNum = Number(parts[1])
  63. if (!host || !Number.isFinite(portNum)) return { proxy: false }
  64. const opt = {
  65. proxy: {
  66. protocol: 'http',
  67. host,
  68. port: portNum
  69. }
  70. }
  71. if (useAuth) {
  72. const { authUser, authPassword } = getQgConfig()
  73. opt.proxy.auth = { username: authUser, password: authPassword }
  74. }
  75. return opt
  76. }
  77. async function recordLog({ event, server = null, deadline = null, detail = null }) {
  78. try {
  79. const detailStr =
  80. typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000)
  81. await db.query(
  82. `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`,
  83. [Date.now(), event, server, deadline, detailStr]
  84. )
  85. } catch (e) {
  86. logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`)
  87. }
  88. }
  89. async function getCachedParsed() {
  90. const raw = await Redis.get(REDIS_CURRENT)
  91. if (!raw) return null
  92. try {
  93. return JSON.parse(raw)
  94. } catch {
  95. return null
  96. }
  97. }
  98. function cacheStillValid(parsed) {
  99. if (!parsed || !parsed.server) return false
  100. const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline)
  101. if (!ms) return false
  102. return Date.now() + DEADLINE_MARGIN_MS < ms
  103. }
  104. async function acquireFetchLock() {
  105. for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) {
  106. const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC })
  107. if (ok) return true
  108. await sleep(LOCK_WAIT_MS)
  109. }
  110. return false
  111. }
  112. async function releaseFetchLock() {
  113. try {
  114. await Redis.del(REDIS_LOCK)
  115. } catch (e) {
  116. logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`)
  117. }
  118. }
  119. /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明) */
  120. const RETRYABLE_EXTRACT_CODES = new Set([
  121. 'NO_AVAILABLE_CHANNEL',
  122. 'REQUEST_LIMIT_EXCEEDED',
  123. 'INTERNAL_ERROR',
  124. 'FAILED_OPERATION',
  125. 'NO_RESOURCE_FOUND'
  126. ])
  127. function buildGetParams(settings) {
  128. const { extractKey } = getQgConfig()
  129. const params = {
  130. key: extractKey,
  131. num: 1
  132. }
  133. const area = String(settings.area || '').trim()
  134. const areaEx = String(settings.area_ex || '').trim()
  135. if (area) params.area = area
  136. if (areaEx) params.area_ex = areaEx
  137. const isp = settings.isp
  138. if (isp === 1 || isp === 2 || isp === 3) params.isp = isp
  139. params.distinct = Number(settings.distinct_extract) === 1
  140. return params
  141. }
  142. async function extractOnce(settings) {
  143. const params = buildGetParams(settings)
  144. const res = await axios.get(QG_GET_URL, {
  145. params,
  146. timeout: 20000,
  147. proxy: false,
  148. validateStatus: () => true
  149. })
  150. const body = res.data
  151. const code = body?.code
  152. if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) {
  153. return { body, item: body.data[0], code }
  154. }
  155. const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`)
  156. err.qgCode = code
  157. err.retryable = RETRYABLE_EXTRACT_CODES.has(code)
  158. throw err
  159. }
  160. async function fetchFromQgUnderLock(settings) {
  161. let lastErr
  162. const maxAttempts = 8
  163. for (let attempt = 1; attempt <= maxAttempts; attempt++) {
  164. try {
  165. const { body, item, code } = await extractOnce(settings)
  166. const server = item.server
  167. const deadline = item.deadline
  168. const proxyIp = item.proxy_ip
  169. const requestId = body.request_id
  170. if (!server) throw new Error('青果返回无 server 字段')
  171. const deadlineMs = parseDeadlineMs(deadline)
  172. const ttlSec = deadlineMs
  173. ? Math.max(15, Math.floor((deadlineMs - Date.now()) / 1000) - 2)
  174. : 55
  175. const payload = {
  176. server,
  177. deadline: deadline || '',
  178. deadlineMs: deadlineMs || null,
  179. proxyIp: proxyIp || null,
  180. requestId: requestId || null,
  181. fetchedAt: Date.now()
  182. }
  183. await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.min(120, Math.max(15, ttlSec)) })
  184. if (attempt > 1) {
  185. logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`)
  186. }
  187. logger.info(
  188. `[QgProxy] 已获取代理节点 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}`
  189. )
  190. await recordLog({
  191. event: 'fetch',
  192. server,
  193. deadline: deadline || null,
  194. detail: { request_id: requestId, proxy_ip: proxyIp, code }
  195. })
  196. return payload
  197. } catch (e) {
  198. lastErr = e
  199. const retryable = e.retryable === true
  200. if (!retryable || attempt >= maxAttempts) {
  201. throw e
  202. }
  203. const backoff = Math.min(5200, 380 * attempt * attempt)
  204. logger.warn(`[QgProxy] /get 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`)
  205. await sleep(backoff)
  206. }
  207. }
  208. throw lastErr || new Error('青果提取失败')
  209. }
  210. /**
  211. * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources
  212. */
  213. async function fetchResourceAreas() {
  214. const { extractKey } = getQgConfig()
  215. if (!extractKey) {
  216. throw new Error('config 未配置 qgChannelProxy.extractKey')
  217. }
  218. const res = await axios.get(QG_RESOURCES_URL, {
  219. params: { key: extractKey },
  220. timeout: 20000,
  221. proxy: false,
  222. validateStatus: () => true
  223. })
  224. const body = res.data
  225. if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) {
  226. throw new Error(body?.code || '青果 resources 查询失败')
  227. }
  228. return body.data
  229. }
  230. /**
  231. * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey)
  232. */
  233. async function isOutboundProxyEnabled() {
  234. if (!hasExtractCredentials()) return false
  235. const row = await loadSettings()
  236. return row && Number(row.proxy_enabled) === 1
  237. }
  238. /**
  239. * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false }
  240. * @param {{ forceRefresh?: boolean }} opt
  241. */
  242. async function getOutboundAxiosFragment(opt = {}) {
  243. const forceRefresh = opt.forceRefresh === true
  244. if (!hasExtractCredentials()) return { proxy: false }
  245. const settings = await loadSettings()
  246. if (!settings || Number(settings.proxy_enabled) !== 1) return { proxy: false }
  247. if (!forceRefresh) {
  248. const cached = await getCachedParsed()
  249. if (cacheStillValid(cached)) {
  250. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  251. }
  252. if (cached?.server) {
  253. const ms = cached.deadlineMs || parseDeadlineMs(cached.deadline)
  254. logger.info(
  255. `[QgProxy] 缓存代理已失效或临近截止,将重新提取。原节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'} deadline_ms=${ms ?? '—'}`
  256. )
  257. }
  258. }
  259. const locked = await acquireFetchLock()
  260. if (!locked) {
  261. logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理')
  262. const cached = await getCachedParsed()
  263. if (cached?.server) {
  264. logger.warn(
  265. `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)`
  266. )
  267. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  268. }
  269. return { proxy: false }
  270. }
  271. try {
  272. if (!forceRefresh) {
  273. const cached = await getCachedParsed()
  274. if (cacheStillValid(cached)) {
  275. return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
  276. }
  277. }
  278. logger.info(`[QgProxy] 调用青果 /get 拉取新代理... forceRefresh=${forceRefresh}`)
  279. const fresh = await fetchFromQgUnderLock(settings)
  280. return axiosProxyOptsFromServer(fresh.server, hasProxyAuth())
  281. } catch (e) {
  282. logger.error(`青果拉取异常: ${e.stack || e}`)
  283. throw e
  284. } finally {
  285. await releaseFetchLock()
  286. }
  287. }
  288. async function invalidateCurrent(reason, detail) {
  289. let prev = null
  290. try {
  291. prev = await getCachedParsed()
  292. await Redis.del(REDIS_CURRENT)
  293. } catch (e) {
  294. logger.warn(`清空青果缓存失败: ${e.message || e}`)
  295. }
  296. const detailObj =
  297. typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  298. logger.info(
  299. `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}`
  300. )
  301. await recordLog({
  302. event: 'invalidate',
  303. server: prev?.server ?? null,
  304. deadline: prev?.deadline ?? null,
  305. detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null }
  306. })
  307. }
  308. async function recordFallbackDirect(detail) {
  309. const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
  310. logger.warn(`[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`)
  311. await recordLog({
  312. event: 'fallback_direct',
  313. detail: d
  314. })
  315. }
  316. async function getStatusSnapshot() {
  317. await ensureSettingsRow()
  318. const row = await loadSettings()
  319. const cached = await getCachedParsed()
  320. let lastFetch = null
  321. try {
  322. const lr = await db.query(
  323. `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1`
  324. )
  325. lastFetch = lr?.[0] || null
  326. } catch {
  327. lastFetch = null
  328. }
  329. return {
  330. proxy_enabled: row ? Number(row.proxy_enabled) === 1 : false,
  331. area: row?.area ?? '',
  332. area_ex: row?.area_ex ?? '',
  333. isp: row?.isp == null ? null : Number(row.isp),
  334. distinct_extract: row ? Number(row.distinct_extract) === 1 : true,
  335. updated_at: row?.updated_at ?? 0,
  336. extract_key_configured: hasExtractCredentials(),
  337. proxy_auth_configured: hasProxyAuth(),
  338. redis_current: cached && cacheStillValid(cached) ? cached : cached,
  339. last_fetch_log: lastFetch
  340. }
  341. }
  342. module.exports = {
  343. getQgConfig,
  344. hasExtractCredentials,
  345. hasProxyAuth,
  346. ensureSettingsRow,
  347. loadSettings,
  348. isOutboundProxyEnabled,
  349. getOutboundAxiosFragment,
  350. invalidateCurrent,
  351. recordFallbackDirect,
  352. recordLog,
  353. getStatusSnapshot,
  354. parseDeadlineMs,
  355. cacheStillValid,
  356. getCachedParsed,
  357. fetchResourceAreas,
  358. REDIS_CURRENT
  359. }