| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- const axios = require('axios')
- const path = require('path')
- const config = require('../../config.json')
- const db = require('../../plugin/DataBase/db')
- const Redis = require('../../plugin/DataBase/Redis')
- const Logger = require('../Logger')
- const QG_GET_URL = 'https://share.proxy.qg.net/get'
- const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources'
- const REDIS_CURRENT = 'lepao:qg:current'
- const REDIS_LOCK = 'lepao:qg:fetch_lock'
- /** 早于 deadline 提前刷新,尽量减少「边检边过期」(参考 [文档](https://www.qg.net/doc/1850.html)) */
- const DEADLINE_MARGIN_MS = 18_000
- const LOCK_TTL_SEC = 75
- const LOCK_WAIT_ROUNDS = 40
- const LOCK_WAIT_MS = 150
- const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO')
- function sleep(ms) {
- return new Promise(r => setTimeout(r, ms))
- }
- function getQgConfig() {
- const q = config.qgChannelProxy
- if (!q || typeof q !== 'object') return {}
- return {
- extractKey: String(q.extractKey || '').trim(),
- authUser: String(q.authUser || '').trim(),
- authPassword: String(q.authPassword || '').trim()
- }
- }
- function hasExtractCredentials() {
- const { extractKey } = getQgConfig()
- return extractKey.length > 0
- }
- function hasProxyAuth() {
- const { authUser, authPassword } = getQgConfig()
- return authUser.length > 0 && authPassword.length > 0
- }
- async function ensureSettingsRow() {
- const now = Date.now()
- await db.query(
- `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at)
- VALUES (1, 0, '', '', NULL, 1, ?)`,
- [now]
- )
- }
- async function loadSettings() {
- await ensureSettingsRow()
- const rows = await db.query(
- `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1`
- )
- return rows?.[0] || null
- }
- function parseDeadlineMs(deadlineStr) {
- if (!deadlineStr || typeof deadlineStr !== 'string') return null
- const isoish = deadlineStr.trim().replace(' ', 'T')
- const ms = Date.parse(isoish)
- return Number.isFinite(ms) ? ms : null
- }
- function axiosProxyOptsFromServer(server, useAuth) {
- if (!server || typeof server !== 'string') return { proxy: false }
- const parts = server.trim().split(':')
- const host = parts[0]
- const portNum = Number(parts[1])
- if (!host || !Number.isFinite(portNum)) return { proxy: false }
- const opt = {
- proxy: {
- protocol: 'http',
- host,
- port: portNum
- }
- }
- if (useAuth) {
- const { authUser, authPassword } = getQgConfig()
- opt.proxy.auth = { username: authUser, password: authPassword }
- }
- return opt
- }
- async function recordLog({ event, server = null, deadline = null, detail = null }) {
- try {
- const detailStr =
- typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000)
- await db.query(
- `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`,
- [Date.now(), event, server, deadline, detailStr]
- )
- } catch (e) {
- logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`)
- }
- }
- async function getCachedParsed() {
- const raw = await Redis.get(REDIS_CURRENT)
- if (!raw) return null
- try {
- return JSON.parse(raw)
- } catch {
- return null
- }
- }
- function cacheStillValid(parsed) {
- if (!parsed || !parsed.server) return false
- const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline)
- if (!ms) return false
- return Date.now() + DEADLINE_MARGIN_MS < ms
- }
- async function acquireFetchLock() {
- for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) {
- const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC })
- if (ok) return true
- await sleep(LOCK_WAIT_MS)
- }
- return false
- }
- async function releaseFetchLock() {
- try {
- await Redis.del(REDIS_LOCK)
- } catch (e) {
- logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`)
- }
- }
- /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明) */
- const RETRYABLE_EXTRACT_CODES = new Set([
- 'NO_AVAILABLE_CHANNEL',
- 'REQUEST_LIMIT_EXCEEDED',
- 'INTERNAL_ERROR',
- 'FAILED_OPERATION',
- 'NO_RESOURCE_FOUND'
- ])
- function buildGetParams(settings) {
- const { extractKey } = getQgConfig()
- const params = {
- key: extractKey,
- num: 1
- }
- const area = String(settings.area || '').trim()
- const areaEx = String(settings.area_ex || '').trim()
- if (area) params.area = area
- if (areaEx) params.area_ex = areaEx
- const isp = settings.isp
- if (isp === 1 || isp === 2 || isp === 3) params.isp = isp
- params.distinct = Number(settings.distinct_extract) === 1
- return params
- }
- async function extractOnce(settings) {
- const params = buildGetParams(settings)
- const res = await axios.get(QG_GET_URL, {
- params,
- timeout: 20000,
- proxy: false,
- validateStatus: () => true
- })
- const body = res.data
- const code = body?.code
- if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) {
- return { body, item: body.data[0], code }
- }
- const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`)
- err.qgCode = code
- err.retryable = RETRYABLE_EXTRACT_CODES.has(code)
- throw err
- }
- async function fetchFromQgUnderLock(settings) {
- let lastErr
- const maxAttempts = 8
- for (let attempt = 1; attempt <= maxAttempts; attempt++) {
- try {
- const { body, item, code } = await extractOnce(settings)
- const server = item.server
- const deadline = item.deadline
- const proxyIp = item.proxy_ip
- const requestId = body.request_id
- if (!server) throw new Error('青果返回无 server 字段')
- const deadlineMs = parseDeadlineMs(deadline)
- const ttlSec = deadlineMs
- ? Math.max(15, Math.floor((deadlineMs - Date.now()) / 1000) - 2)
- : 55
- const payload = {
- server,
- deadline: deadline || '',
- deadlineMs: deadlineMs || null,
- proxyIp: proxyIp || null,
- requestId: requestId || null,
- fetchedAt: Date.now()
- }
- await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.min(120, Math.max(15, ttlSec)) })
- if (attempt > 1) {
- logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`)
- }
- logger.info(
- `[QgProxy] 已获取代理节点 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}`
- )
- await recordLog({
- event: 'fetch',
- server,
- deadline: deadline || null,
- detail: { request_id: requestId, proxy_ip: proxyIp, code }
- })
- return payload
- } catch (e) {
- lastErr = e
- const retryable = e.retryable === true
- if (!retryable || attempt >= maxAttempts) {
- throw e
- }
- const backoff = Math.min(5200, 380 * attempt * attempt)
- logger.warn(`[QgProxy] /get 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`)
- await sleep(backoff)
- }
- }
- throw lastErr || new Error('青果提取失败')
- }
- /**
- * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources
- */
- async function fetchResourceAreas() {
- const { extractKey } = getQgConfig()
- if (!extractKey) {
- throw new Error('config 未配置 qgChannelProxy.extractKey')
- }
- const res = await axios.get(QG_RESOURCES_URL, {
- params: { key: extractKey },
- timeout: 20000,
- proxy: false,
- validateStatus: () => true
- })
- const body = res.data
- if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) {
- throw new Error(body?.code || '青果 resources 查询失败')
- }
- return body.data
- }
- /**
- * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey)
- */
- async function isOutboundProxyEnabled() {
- if (!hasExtractCredentials()) return false
- const row = await loadSettings()
- return row && Number(row.proxy_enabled) === 1
- }
- /**
- * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false }
- * @param {{ forceRefresh?: boolean }} opt
- */
- async function getOutboundAxiosFragment(opt = {}) {
- const forceRefresh = opt.forceRefresh === true
- if (!hasExtractCredentials()) return { proxy: false }
- const settings = await loadSettings()
- if (!settings || Number(settings.proxy_enabled) !== 1) return { proxy: false }
- if (!forceRefresh) {
- const cached = await getCachedParsed()
- if (cacheStillValid(cached)) {
- return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
- }
- if (cached?.server) {
- const ms = cached.deadlineMs || parseDeadlineMs(cached.deadline)
- logger.info(
- `[QgProxy] 缓存代理已失效或临近截止,将重新提取。原节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'} deadline_ms=${ms ?? '—'}`
- )
- }
- }
- const locked = await acquireFetchLock()
- if (!locked) {
- logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理')
- const cached = await getCachedParsed()
- if (cached?.server) {
- logger.warn(
- `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)`
- )
- return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
- }
- return { proxy: false }
- }
- try {
- if (!forceRefresh) {
- const cached = await getCachedParsed()
- if (cacheStillValid(cached)) {
- return axiosProxyOptsFromServer(cached.server, hasProxyAuth())
- }
- }
- logger.info(`[QgProxy] 调用青果 /get 拉取新代理... forceRefresh=${forceRefresh}`)
- const fresh = await fetchFromQgUnderLock(settings)
- return axiosProxyOptsFromServer(fresh.server, hasProxyAuth())
- } catch (e) {
- logger.error(`青果拉取异常: ${e.stack || e}`)
- throw e
- } finally {
- await releaseFetchLock()
- }
- }
- async function invalidateCurrent(reason, detail) {
- let prev = null
- try {
- prev = await getCachedParsed()
- await Redis.del(REDIS_CURRENT)
- } catch (e) {
- logger.warn(`清空青果缓存失败: ${e.message || e}`)
- }
- const detailObj =
- typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
- logger.info(
- `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}`
- )
- await recordLog({
- event: 'invalidate',
- server: prev?.server ?? null,
- deadline: prev?.deadline ?? null,
- detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null }
- })
- }
- async function recordFallbackDirect(detail) {
- const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') }
- logger.warn(`[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`)
- await recordLog({
- event: 'fallback_direct',
- detail: d
- })
- }
- async function getStatusSnapshot() {
- await ensureSettingsRow()
- const row = await loadSettings()
- const cached = await getCachedParsed()
- let lastFetch = null
- try {
- const lr = await db.query(
- `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1`
- )
- lastFetch = lr?.[0] || null
- } catch {
- lastFetch = null
- }
- return {
- proxy_enabled: row ? Number(row.proxy_enabled) === 1 : false,
- area: row?.area ?? '',
- area_ex: row?.area_ex ?? '',
- isp: row?.isp == null ? null : Number(row.isp),
- distinct_extract: row ? Number(row.distinct_extract) === 1 : true,
- updated_at: row?.updated_at ?? 0,
- extract_key_configured: hasExtractCredentials(),
- proxy_auth_configured: hasProxyAuth(),
- redis_current: cached && cacheStillValid(cached) ? cached : cached,
- last_fetch_log: lastFetch
- }
- }
- module.exports = {
- getQgConfig,
- hasExtractCredentials,
- hasProxyAuth,
- ensureSettingsRow,
- loadSettings,
- isOutboundProxyEnabled,
- getOutboundAxiosFragment,
- invalidateCurrent,
- recordFallbackDirect,
- recordLog,
- getStatusSnapshot,
- parseDeadlineMs,
- cacheStillValid,
- getCachedParsed,
- fetchResourceAreas,
- REDIS_CURRENT
- }
|