const axios = require('axios') const path = require('path') const config = require('../../config.json') const db = require('../../plugin/DataBase/db') const Redis = require('../../plugin/DataBase/Redis') const Logger = require('../Logger') const QG_POOL_URL = 'https://share.proxy.qg.net/pool' const QG_RESOURCES_URL = 'https://share.proxy.qg.net/resources' const REDIS_CURRENT = 'lepao:qg:current' const REDIS_LOCK = 'lepao:qg:fetch_lock' /** 隧道池入口地址缓存时长(服务商后台自动换出口,不依赖本地 deadline 刷新) */ const REDIS_SERVER_TTL_SEC = 6 * 60 * 60 /** 仅覆盖单次 /get(含 axios 超时),避免长持锁阻塞其它任务 */ const LOCK_TTL_SEC = 45 const LOCK_WAIT_ROUNDS = 40 const LOCK_WAIT_MS = 150 let warnedTlsRejectUnauthorized = false const logger = new Logger(path.join(__dirname, '../logs/QgProxyManager.log'), 'INFO') function sleep(ms) { return new Promise(r => setTimeout(r, ms)) } function getQgConfig() { const q = config.qgChannelProxy if (!q || typeof q !== 'object') return {} return { extractKey: String(q.extractKey || '').trim(), authUser: String(q.authUser || '').trim(), authPassword: String(q.authPassword || '').trim(), tunnelServer: String(q.tunnelServer || q.server || '').trim() } } function hasExtractCredentials() { const { extractKey } = getQgConfig() return extractKey.length > 0 } function hasProxyAuth() { const { authUser, authPassword } = getQgConfig() return authUser.length > 0 && authPassword.length > 0 } function hasTunnelServer() { const { tunnelServer } = getQgConfig() return tunnelServer.length > 0 } async function ensureSettingsRow() { const now = Date.now() await db.query( `INSERT IGNORE INTO lepao_proxy_settings (id, proxy_enabled, area, area_ex, isp, distinct_extract, updated_at) VALUES (1, 0, '', '', NULL, 1, ?)`, [now] ) } async function loadSettings() { await ensureSettingsRow() const rows = await db.query( `SELECT proxy_enabled, area, area_ex, isp, distinct_extract, updated_at FROM lepao_proxy_settings WHERE id = 1` ) return rows?.[0] || null } function parseDeadlineMs(deadlineStr) { if (!deadlineStr || typeof deadlineStr !== 'string') return null const isoish = deadlineStr.trim().replace(' ', 'T') const ms = Date.parse(isoish) return Number.isFinite(ms) ? ms : null } function axiosProxyOptsFromServer(server, useAuth) { if (!server || typeof server !== 'string') return { proxy: false } const parts = server.trim().split(':') const host = parts[0] const portNum = Number(parts[1]) if (!host || !Number.isFinite(portNum)) return { proxy: false } const opt = { proxy: { protocol: 'http', host, port: portNum } } if (useAuth) { const { authUser, authPassword } = getQgConfig() opt.proxy.auth = { username: authUser, password: authPassword } } return opt } function isAreaCodeLike(s) { return /^\d{4,9}$/.test(String(s || '').trim()) } function buildTunnelProxyOpts(settings) { const { tunnelServer, authUser, authPassword } = getQgConfig() if (!tunnelServer) return null const base = axiosProxyOptsFromServer(tunnelServer, false) if (!base.proxy) return null if (authUser && authPassword) { let password = authPassword const area = String(settings?.area || '').trim() const areaEx = String(settings?.area_ex || '').trim() /** * 参考隧道代理接入:普通模式指定地区通过 user:password:A@server。 * 仅当 area 为单个纯数字编码且未配置 area_ex 时尝试附加,避免把旧的多地区逗号表达式拼坏。 */ if (area && !areaEx && isAreaCodeLike(area)) { password = `${password}:A${area}` } base.proxy.auth = { username: authUser, password } } return base } async function recordLog({ event, server = null, deadline = null, detail = null }) { try { const detailStr = typeof detail === 'string' ? detail.slice(0, 8000) : JSON.stringify(detail || {}).slice(0, 8000) await db.query( `INSERT INTO lepao_proxy_log (created_at, event, server, deadline, detail) VALUES (?, ?, ?, ?, ?)`, [Date.now(), event, server, deadline, detailStr] ) } catch (e) { logger.error(`lepao_proxy_log 写入失败: ${e.stack || e}`) } } async function getCachedParsed() { const raw = await Redis.get(REDIS_CURRENT) if (!raw) return null try { return JSON.parse(raw) } catch { return null } } function cacheStillValid(parsed) { if (!parsed || !parsed.server) return false const ms = parsed.deadlineMs || parseDeadlineMs(parsed.deadline) if (ms) return Date.now() < ms const fetchedAt = Number(parsed.fetchedAt || 0) if (!Number.isFinite(fetchedAt) || fetchedAt <= 0) return true return Date.now() - fetchedAt < REDIS_SERVER_TTL_SEC * 1000 } async function acquireFetchLock() { for (let i = 0; i < LOCK_WAIT_ROUNDS; i++) { const ok = await Redis.set(REDIS_LOCK, '1', { NX: true, EX: LOCK_TTL_SEC }) if (ok) return true await sleep(LOCK_WAIT_MS) } return false } async function releaseFetchLock() { try { await Redis.del(REDIS_LOCK) } catch (e) { logger.warn(`释放青果 fetch 锁失败: ${e.message || e}`) } } /** 瞬时故障 / 通道释放延迟等可 backoff 再试(见青果通道提取说明)。REQUEST_LIMIT_EXCEEDED 再刷 /get 会恶化限流,不在此列。 */ const RETRYABLE_EXTRACT_CODES = new Set([ 'NO_AVAILABLE_CHANNEL', 'INTERNAL_ERROR', 'FAILED_OPERATION', 'NO_RESOURCE_FOUND' ]) function buildGetParams(settings) { const { extractKey } = getQgConfig() const params = { key: extractKey, num: 1 } const area = String(settings.area || '').trim() const areaEx = String(settings.area_ex || '').trim() if (area) params.area = area if (areaEx) params.area_ex = areaEx const isp = settings.isp if (isp === 1 || isp === 2 || isp === 3) params.isp = isp params.distinct = Number(settings.distinct_extract) === 1 return params } async function extractOnce(settings) { const params = buildGetParams(settings) const res = await axios.get(QG_POOL_URL, { params, timeout: 20000, proxy: false, validateStatus: () => true }) const body = res.data const code = body?.code if (code === 'SUCCESS' && Array.isArray(body?.data) && body.data.length >= 1) { return { body, item: body.data[0], code } } const err = new Error(`青果提取失败: ${code || JSON.stringify(body || {}).slice(0, 200)}`) err.qgCode = code err.retryable = RETRYABLE_EXTRACT_CODES.has(code) throw err } function warnIfTlsVerifyDisabled() { if (warnedTlsRejectUnauthorized) return if (process.env.NODE_TLS_REJECT_UNAUTHORIZED === '0') { warnedTlsRejectUnauthorized = true logger.warn( '[QgProxy] 检测到 NODE_TLS_REJECT_UNAUTHORIZED=0,将关闭 TLS 证书校验,存在中间人风险;生产环境建议移除此环境变量。' ) } } /** * 通道提取:[查询资源地区](https://www.qg.net/doc/1850.html) GET /resources */ async function fetchResourceAreas() { const { extractKey } = getQgConfig() if (!extractKey) { throw new Error('config 未配置 qgChannelProxy.extractKey') } const res = await axios.get(QG_RESOURCES_URL, { params: { key: extractKey }, timeout: 20000, proxy: false, validateStatus: () => true }) const body = res.data if (body?.code !== 'SUCCESS' || !Array.isArray(body?.data)) { throw new Error(body?.code || '青果 resources 查询失败') } return body.data } /** * 是否应在业务层尝试青果(DB 开关 + config 里存在 extractKey) */ async function isOutboundProxyEnabled() { if (!hasExtractCredentials() && !(hasTunnelServer() && hasProxyAuth())) return false const row = await loadSettings() return row && Number(row.proxy_enabled) === 1 } /** * 对外:得到可合并进 axios 的代理段;未启用则 { proxy: false }。 * 隧道池模式:服务商后台自动切换出口,本地仅缓存入口 server,不做频繁刷新。 * @param {{ forceRefresh?: boolean }} opt */ async function getOutboundAxiosFragment(opt = {}) { const forceRefresh = opt.forceRefresh === true warnIfTlsVerifyDisabled() if (!hasExtractCredentials() && !(hasTunnelServer() && hasProxyAuth())) return { proxy: false } const settings = await loadSettings() if (!settings || Number(settings.proxy_enabled) !== 1) return { proxy: false } // 隧道代理模式:直接使用隧道入口地址,不需要 /pool 提取。 const tunnelFrag = buildTunnelProxyOpts(settings) if (tunnelFrag?.proxy) { const payload = { server: `${tunnelFrag.proxy.host}:${tunnelFrag.proxy.port}`, deadline: '', deadlineMs: null, proxyIp: null, requestId: null, fetchedAt: Date.now(), mode: 'tunnel' } await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.max(60, REDIS_SERVER_TTL_SEC) }) return tunnelFrag } const cachedFast = await getCachedParsed() if (cacheStillValid(cachedFast)) { if (forceRefresh) { logger.info(`[QgProxy] 隧道池忽略 forceRefresh,复用入口 server=${cachedFast.server}`) } return axiosProxyOptsFromServer(cachedFast.server, hasProxyAuth()) } if (cachedFast?.server) { logger.info(`[QgProxy] 缓存入口失效,将重新提取。原server=${cachedFast.server}`) } const maxAttempts = 5 let lastErr for (let attempt = 1; attempt <= maxAttempts; attempt++) { const locked = await acquireFetchLock() if (!locked) { logger.warn('青果提取锁等待超时,本次尝试使用缓存或直连由调用方处理') const cached = await getCachedParsed() if (cached?.server) { logger.warn( `[QgProxy] 锁超时降级使用仍为缓存记录的节点 server=${cached.server} proxy_ip=${cached.proxyIp ?? '—'}(可能已过期)` ) return axiosProxyOptsFromServer(cached.server, hasProxyAuth()) } return { proxy: false } } let shouldBackoff = false try { { const cached = await getCachedParsed() if (cacheStillValid(cached)) { return axiosProxyOptsFromServer(cached.server, hasProxyAuth()) } } logger.info(`[QgProxy] 调用青果 /pool 提取代理资源... forceRefresh=${forceRefresh}`) const { body, item, code } = await extractOnce(settings) const server = item.server const deadline = item.deadline const proxyIp = item.proxy_ip const requestId = body.request_id if (!server) throw new Error('青果返回无 server 字段') const deadlineMs = parseDeadlineMs(deadline) const ttlSec = REDIS_SERVER_TTL_SEC const payload = { server, deadline: deadline || '', deadlineMs: deadlineMs || null, proxyIp: proxyIp || null, requestId: requestId || null, fetchedAt: Date.now() } await Redis.set(REDIS_CURRENT, JSON.stringify(payload), { EX: Math.max(60, ttlSec) }) if (attempt > 1) { logger.info(`[QgProxy] 第 ${attempt} 次 /get 成功`) } logger.info( `[QgProxy] 已获取隧道入口 server=${server} proxy_ip=${proxyIp ?? '—'} deadline=${deadline || '—'} request_id=${requestId ?? '—'}` ) await recordLog({ event: 'fetch', server, deadline: deadline || null, detail: { request_id: requestId, proxy_ip: proxyIp, code } }) return axiosProxyOptsFromServer(server, hasProxyAuth()) } catch (e) { lastErr = e shouldBackoff = e.retryable === true && attempt < maxAttempts if (shouldBackoff) { const backoff = Math.min(2000, 280 * attempt * attempt) logger.warn(`[QgProxy] /pool 将重试 (${attempt}/${maxAttempts}) ${e.message},等待 ${backoff}ms`) } else { logger.error(`青果拉取异常: ${e.stack || e}`) throw e } } finally { await releaseFetchLock() } if (shouldBackoff) { const backoff = Math.min(2000, 280 * attempt * attempt) await sleep(backoff) } } throw lastErr || new Error('青果提取失败') } async function invalidateCurrent(reason, detail) { if (reason === 'request_fail' || reason === 'retry_round_post_fail' || reason === 'extra_round_fail') { const detailObj = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') } logger.info( `[QgProxy] 隧道池模式忽略入口作废 reason=${reason} detail=${JSON.stringify(detailObj)}` ) await recordLog({ event: 'invalidate', detail: { reason, ignored_in_tunnel_pool: true, ...detailObj } }) return } let prev = null try { prev = await getCachedParsed() await Redis.del(REDIS_CURRENT) } catch (e) { logger.warn(`清空青果缓存失败: ${e.message || e}`) } const detailObj = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') } logger.info( `[QgProxy] 已作废当前代理缓存 reason=${reason || 'unknown'} 原server=${prev?.server ?? '(无)'} 原proxy_ip=${prev?.proxyIp ?? '—'} 原deadline=${prev?.deadline ?? '—'} detail=${JSON.stringify(detailObj)}` ) await recordLog({ event: 'invalidate', server: prev?.server ?? null, deadline: prev?.deadline ?? null, detail: { reason: reason || 'unknown', ...detailObj, proxy_ip: prev?.proxyIp ?? detailObj?.proxy_ip ?? null } }) } async function recordFallbackDirect(detail) { const d = typeof detail === 'object' && detail !== null ? detail : { message: String(detail || '') } logger.warn(`[QgProxy] 乐跑出站回退直连 reason=${JSON.stringify(d)}`) await recordLog({ event: 'fallback_direct', detail: d }) } async function getStatusSnapshot() { await ensureSettingsRow() const row = await loadSettings() const cached = await getCachedParsed() let lastFetch = null try { const lr = await db.query( `SELECT server, deadline, created_at, detail FROM lepao_proxy_log WHERE event = 'fetch' ORDER BY id DESC LIMIT 1` ) lastFetch = lr?.[0] || null } catch { lastFetch = null } return { proxy_enabled: row ? Number(row.proxy_enabled) === 1 : false, area: row?.area ?? '', area_ex: row?.area_ex ?? '', isp: row?.isp == null ? null : Number(row.isp), distinct_extract: row ? Number(row.distinct_extract) === 1 : true, updated_at: row?.updated_at ?? 0, extract_key_configured: hasExtractCredentials(), tunnel_server_configured: hasTunnelServer(), proxy_auth_configured: hasProxyAuth(), redis_current: cached && cacheStillValid(cached) ? cached : cached, last_fetch_log: lastFetch } } module.exports = { getQgConfig, hasExtractCredentials, hasProxyAuth, ensureSettingsRow, loadSettings, isOutboundProxyEnabled, getOutboundAxiosFragment, invalidateCurrent, recordFallbackDirect, recordLog, getStatusSnapshot, parseDeadlineMs, cacheStillValid, getCachedParsed, fetchResourceAreas, REDIS_CURRENT }