const axios = require('axios') const path = require('path') const config = require('../../config.json') const db = require('../../plugin/DataBase/db') const Redis = require('../../plugin/DataBase/Redis') const Logger = require('../Logger') const QG_GET_URL = 'https://share.proxy.qg.net/get' const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources' const REDIS_CURRENT = 'lepao:qg:current' const REDIS_LOCK = 'lepao:qg:fetch_lock' /** 早于 deadline 提前刷新,尽量减少「边检边过期」(参考 [文档](https://www.qg.net/doc/1850.html)) */ const DEADLINE_MARGIN_MS = 18_000 /** 仅覆盖单次 /get(含 axios 超时),避免长持锁阻塞其它任务 */ const LOCK_TTL_SEC = 45 const LOCK_WAIT_ROUNDS = 40 const LOCK_WAIT_MS = 150 let warnedTlsRejectUnauthorized = false const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO') function sleep(ms) { return new Promise(r => setTimeout(r, ms)) } function getQgConfig() { const q = config.qgChannelProxy if (!q || typeof q !== 'object') return {} return { extractKey: String(q.extractKey || '').trim(), authUser: String(q.authUser || '').trim(), authPassword: String(q.authPassword || '').trim() } } function hasExtractCredentials() { const { extractKey } = getQgConfig() return extractKey.length > 0 } function hasProxyAuth() { const { authUser, authPassword } = getQgConfig() return authUser.length > 0 && authPassword.length > 0 } async function ensureSettingsRow() { const now = Date.now() await db.query( `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at) VALUES (1, 0, '', '', NULL, 1, ?)`, [now] ) } async function loadSettings() { await ensureSettingsRow() const rows = await db.query( `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1` ) return rows?.[0] || null } function parseDeadlineMs(deadlineStr) { if (!deadlineStr || typeof deadlineStr !== 'string') return null const isoish = deadlineStr.trim().replace(' ', 'T') const ms = Date.parse(isoish) return Number.isFinite(ms) ? ms : null } function axiosProxyOptsFromServer(server, useAuth) { if (!server || typeof server !== 'string') return { proxy: false } const parts = server.trim().split(':') const host = parts[0] const portNum = Number(parts[1]) if (!host || !Number.isFinite(portNum)) return { proxy: false } const opt = { proxy: { protocol: 'http', host, port: portNum } } if (useAuth) { const { authUser, authPassword } = getQgConfig() opt.proxy.auth = { username: authUser, password: authPassword } } return opt } async function recordLog({ event, server = null, deadline = null, detail = null }) { try { const detailStr = typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000) await db.query( `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`, [Date.now(), event, server, deadline, detailStr] ) } catch (e) { logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`) } } async function getCachedParsed() { const raw = await Redis.get(REDIS_CURRENT) if (!raw) return null try { return JSON.parse(raw) } catch { return null } } function cacheStillValid(parsed) { if (!parsed || !parsed.server) return false const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline) if (!ms) return false return Date.now() + DEADLINE_MARGIN_MS < ms } async function acquireFetchLock() { for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) { const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC }) if (ok) return true await sleep(LOCK_WAIT_MS) } return false } async function releaseFetchLock() { try { await Redis.del(REDIS_LOCK) } catch (e) { logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`) } } /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明)。REQUEST_LIMIT_EXCEEDED 再刷 /get 会恶化限流,不在此列。 */ const RETRYABLE_EXTRACT_CODES = new Set([ 'NO_AVAILABLE_CHANNEL', 'INTERNAL_ERROR', 'FAILED_OPERATION', 'NO_RESOURCE_FOUND' ]) function buildGetParams(settings) { const { extractKey } = getQgConfig() const params = { key: extractKey, num: 1 } const area = String(settings.area || '').trim() const areaEx = String(settings.area_ex || '').trim() if (area) params.area = area if (areaEx) params.area_ex = areaEx const isp = settings.isp if (isp === 1 || isp === 2 || isp === 3) params.isp = isp params.distinct = Number(settings.distinct_extract) === 1 return params } async function extractOnce(settings) { const params = buildGetParams(settings) const res = await axios.get(QG_GET_URL, { params, timeout: 20000, proxy: false, validateStatus: () => true }) const body = res.data const code = body?.code if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) { return { body, item: body.data[0], code } } const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`) err.qgCode = code err.retryable = RETRYABLE_EXTRACT_CODES.has(code) throw err } function warnIfTlsVerifyDisabled() { if (warnedTlsRejectUnauthorized) return if (process.env.NODE_TLS_REJECT_UNAUTHORIZED === '0') { warnedTlsRejectUnauthorized = true logger.warn( '[QgProxy] 检测到 NODE_TLS_REJECT_UNAUTHORIZED=0,将关闭 TLS 证书校验,存在中间人风险;生产环境建议移除此环境变量。' ) } } /** * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources */ async function fetchResourceAreas() { const { extractKey } = getQgConfig() if (!extractKey) { throw new Error('config 未配置 qgChannelProxy.extractKey') } const res = await axios.get(QG_RESOURCES_URL, { params: { key: extractKey }, timeout: 20000, proxy: false, validateStatus: () => true }) const body = res.data if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) { throw new Error(body?.code || '青果 resources 查询失败') } return body.data } /** * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey) */ async function isOutboundProxyEnabled() { if (!hasExtractCredentials()) return false const row = await loadSettings() return row && Number(row.proxy_enabled) === 1 } /** * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false } * @param {{ forceRefresh?: boolean }} opt */ async function getOutboundAxiosFragment(opt = {}) { const forceRefresh = opt.forceRefresh === true warnIfTlsVerifyDisabled() if (!hasExtractCredentials()) return { proxy: false } const settings = await loadSettings() if (!settings || Number(settings.proxy_enabled) !== 1) return { proxy: false } if (!forceRefresh) { const cached = await getCachedParsed() if (cacheStillValid(cached)) { return axiosProxyOptsFromServer(cached.server, hasProxyAuth()) } if (cached?.server) { const ms = cached.deadlineMs || parseDeadlineMs(cached.deadline) logger.info( `[QgProxy] 缓存代理已失效或临近截止,将重新提取。原节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'} deadline_ms=${ms ?? '—'}` ) } } const maxAttempts = 5 let lastErr for (let attempt = 1; attempt <= maxAttempts; attempt++) { const locked = await acquireFetchLock() if (!locked) { logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理') const cached = await getCachedParsed() if (cached?.server) { logger.warn( `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)` ) return axiosProxyOptsFromServer(cached.server, hasProxyAuth()) } return { proxy: false } } let shouldBackoff = false try { if (!forceRefresh) { const cached = await getCachedParsed() if (cacheStillValid(cached)) { return axiosProxyOptsFromServer(cached.server, hasProxyAuth()) } } logger.info(`[QgProxy] 调用青果 /get 拉取新代理... forceRefresh=${forceRefresh}`) const { body, item, code } = await extractOnce(settings) const server = item.server const deadline = item.deadline const proxyIp = item.proxy_ip const requestId = body.request_id if (!server) throw new Error('青果返回无 server 字段') const deadlineMs = parseDeadlineMs(deadline) const ttlSec = deadlineMs ? Math.max(15, Math.floor((deadlineMs - Date.now()) / 1000) - 2) : 55 const payload = { server, deadline: deadline || '', deadlineMs: deadlineMs || null, proxyIp: proxyIp || null, requestId: requestId || null, fetchedAt: Date.now() } await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.min(120, Math.max(15, ttlSec)) }) if (attempt > 1) { logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`) } logger.info( `[QgProxy] 已获取代理节点 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}` ) await recordLog({ event: 'fetch', server, deadline: deadline || null, detail: { request_id: requestId, proxy_ip: proxyIp, code } }) return axiosProxyOptsFromServer(server, hasProxyAuth()) } catch (e) { lastErr = e shouldBackoff = e.retryable === true && attempt < maxAttempts if (shouldBackoff) { const backoff = Math.min(2000, 280 * attempt * attempt) logger.warn(`[QgProxy] /get 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`) } else { logger.error(`青果拉取异常: ${e.stack || e}`) throw e } } finally { await releaseFetchLock() } if (shouldBackoff) { const backoff = Math.min(2000, 280 * attempt * attempt) await sleep(backoff) } } throw lastErr || new Error('青果提取失败') } async function invalidateCurrent(reason, detail) { let prev = null try { prev = await getCachedParsed() await Redis.del(REDIS_CURRENT) } catch (e) { logger.warn(`清空青果缓存失败: ${e.message || e}`) } const detailObj = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') } logger.info( `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}` ) await recordLog({ event: 'invalidate', server: prev?.server ?? null, deadline: prev?.deadline ?? null, detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null } }) } async function recordFallbackDirect(detail) { const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') } logger.warn(`[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`) await recordLog({ event: 'fallback_direct', detail: d }) } async function getStatusSnapshot() { await ensureSettingsRow() const row = await loadSettings() const cached = await getCachedParsed() let lastFetch = null try { const lr = await db.query( `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1` ) lastFetch = lr?.[0] || null } catch { lastFetch = null } return { proxy_enabled: row ? Number(row.proxy_enabled) === 1 : false, area: row?.area ?? '', area_ex: row?.area_ex ?? '', isp: row?.isp == null ? null : Number(row.isp), distinct_extract: row ? Number(row.distinct_extract) === 1 : true, updated_at: row?.updated_at ?? 0, extract_key_configured: hasExtractCredentials(), proxy_auth_configured: hasProxyAuth(), redis_current: cached && cacheStillValid(cached) ? cached : cached, last_fetch_log: lastFetch } } module.exports = { getQgConfig, hasExtractCredentials, hasProxyAuth, ensureSettingsRow, loadSettings, isOutboundProxyEnabled, getOutboundAxiosFragment, invalidateCurrent, recordFallbackDirect, recordLog, getStatusSnapshot, parseDeadlineMs, cacheStillValid, getCachedParsed, fetchResourceAreas, REDIS_CURRENT }