const axios = require('axios') const db = require('../../plugin/DataBase/db') const { invalidateGlobalCache } = require('./lepaoOutboundProxy') function parseProxyLines(text) { const out = [] const seen = new Set() const body = String(text || '') // 兼容纯文本、多行、HTML(含
/ 脚本广告)场景: // 从整段内容中直接抽取 IPv4:port(可带 http(s):// 前缀)。 const re = /\b(?:(https?):\/\/)?((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\b/gi let m while ((m = re.exec(body)) !== null) { const scheme = (m[1] || 'auto').toLowerCase() const host = m[2] const port = Number(m[3]) if (!host || !Number.isFinite(port) || port <= 0 || port > 65535) continue const key = `${host}:${port}` if (seen.has(key)) continue seen.add(key) out.push({ host, port, scheme }) } return out } async function getGlobalOrThrow() { const rows = await db.query('SELECT * FROM lepao_proxy_global WHERE id = 1 LIMIT 1') const g = rows?.[0] if (!g) throw new Error('lepao_proxy_global 未初始化') return g } /** * @param {Array<{host:string,port:number,scheme:string}>} list * @param {string} source manual|url|cron */ async function upsertProxyRows(list, source) { const t = Date.now() let inserted = 0 let duplicated = 0 for (const item of list) { const exists = await db.query( 'SELECT id FROM lepao_proxy_pool WHERE host = ? AND port = ? LIMIT 1', [item.host, item.port] ) if (exists && exists.length > 0) { duplicated += 1 continue } await db.query( ` INSERT INTO lepao_proxy_pool (host, port, scheme, is_active, source, created_at, updated_at) VALUES (?, ?, ?, 0, ?, ?, ?) `, [item.host, item.port, item.scheme, source, t, t] ) inserted += 1 } return { total: list.length, inserted, duplicated } } async function importFromText(rawText, source) { const list = parseProxyLines(rawText || '') if (!list.length) { return { ok: true, imported: 0, message: '无有效代理行(期望 ip:port 或 http(s)://ip:port)' } } const res = await upsertProxyRows(list, source || 'manual') return { ok: true, imported: res.inserted, duplicated: res.duplicated, totalParsed: res.total, message: `解析 ${res.total} 条,新增 ${res.inserted} 条,重复跳过 ${res.duplicated} 条`, ...res } } async function importFromRemoteUrl(url, source) { const u = String(url || '').trim() if (!u) throw new Error('import_url 为空') const res = await axios.get(u, { timeout: 30000, proxy: false, responseType: 'text', transformResponse: [(b) => b], validateStatus: () => true }) if (typeof res.data !== 'string') { throw new Error('拉取内容不是文本') } if (res.status >= 400) { throw new Error(`拉取 HTTP ${res.status}`) } return importFromText(res.data, source || 'url') } /** * HEAD/GET 探针,经代理访问 probe_target_url * @returns {Promise<{ ok: boolean, latency_ms?: number, err?: string, scheme?: string }>} */ async function probeOne(row, probeUrl, timeoutMs) { const tryOnce = async (proto) => { const start = Date.now() await axios.get(probeUrl, { proxy: { protocol: proto, host: row.host, port: Number(row.port) }, timeout: timeoutMs, validateStatus: () => true, maxRedirects: 5 }) return { ok: true, latency_ms: Date.now() - start, scheme: proto } } try { const rawScheme = String(row.scheme || 'auto').toLowerCase() if (rawScheme === 'http' || rawScheme === 'https') { return await tryOnce(rawScheme) } // 未指定协议时自动判断:先 http 再 https try { return await tryOnce('http') } catch (e1) { try { return await tryOnce('https') } catch (e2) { return { ok: false, err: (e2 && e2.message) || (e1 && e1.message) || 'probe failed' } } } } catch (e) { return { ok: false, err: e.message || String(e) } } } async function updateProxyCheckResult(poolId, { ok, latency_ms, err, scheme }) { const t = Date.now() const normalizedScheme = scheme && (scheme === 'http' || scheme === 'https') ? scheme : null await db.query( ` UPDATE lepao_proxy_pool SET is_active = ?, latency_ms = ?, last_check_at = ?, last_error = ?, scheme = COALESCE(?, scheme), updated_at = ? WHERE id = ? `, [ok ? 1 : 0, ok ? latency_ms : null, t, ok ? null : String(err || '').slice(0, 250), normalizedScheme, t, poolId] ) } async function poolRowsByIds(ids) { if (!ids?.length) { const rows = await db.query('SELECT * FROM lepao_proxy_pool') return rows || [] } const placeholders = ids.map(() => '?').join(',') const rows = await db.query(`SELECT * FROM lepao_proxy_pool WHERE id IN (${placeholders})`, ids) return rows || [] } module.exports.parseProxyLines = parseProxyLines module.exports.importFromText = importFromText module.exports.importFromRemoteUrl = importFromRemoteUrl module.exports.probeOne = probeOne module.exports.updateProxyCheckResult = updateProxyCheckResult module.exports.poolRowsByIds = poolRowsByIds /** * @param {number[]} [ids] 为空则检测全表 * @param {{ logger?: object }} [opts] */ module.exports.batchCheckProxies = async function batchCheckProxies(ids, opts = {}) { const g = await getGlobalOrThrow() const probeUrl = String(g.probe_target_url || 'https://www.baidu.com') const timeoutMs = Number(g.check_timeout_ms) || 8000 const concurrency = Math.max(1, Math.min(50, Number(g.check_concurrency) || 10)) const rows = await poolRowsByIds(ids) const logger = opts.logger let ok = 0 let fail = 0 let idx = 0 async function worker() { while (idx < rows.length) { const my = idx++ const row = rows[my] if (!row) continue const r = await probeOne(row, probeUrl, timeoutMs) await updateProxyCheckResult(row.id, r) if (r.ok) ok += 1 else fail += 1 if (logger && !r.ok) { logger.warn?.(`[proxyCheck] id=${row.id} ${row.host}:${row.port} ${r.err}`) } } } const n = Math.min(concurrency, rows.length || 1) await Promise.all(Array.from({ length: n }, () => worker())) return { total: rows.length, ok, fail, probeUrl, timeoutMs, concurrency } } module.exports.getGlobalOrThrow = getGlobalOrThrow /** * @param {object} patch */ module.exports.patchGlobal = async function patchGlobal(patch) { const allowed = [ 'random_proxy_enabled', 'import_url', 'probe_target_url', 'check_timeout_ms', 'check_concurrency' ] const normalize = {} if (Object.prototype.hasOwnProperty.call(patch, 'random_proxy_enabled')) { normalize.random_proxy_enabled = Number(patch.random_proxy_enabled) === 1 ? 1 : 0 } if (Object.prototype.hasOwnProperty.call(patch, 'import_url')) { const u = String(patch.import_url || '').trim().slice(0, 1024) if (!u) throw new Error('import_url 不能为空') normalize.import_url = u } if (Object.prototype.hasOwnProperty.call(patch, 'probe_target_url')) { const u = String(patch.probe_target_url || '').trim().slice(0, 1024) if (!u) throw new Error('probe_target_url 不能为空') normalize.probe_target_url = u } if (Object.prototype.hasOwnProperty.call(patch, 'check_timeout_ms')) { const n = Number(patch.check_timeout_ms) normalize.check_timeout_ms = Number.isFinite(n) ? Math.max(500, Math.min(120000, Math.floor(n))) : 8000 } if (Object.prototype.hasOwnProperty.call(patch, 'check_concurrency')) { const n = Number(patch.check_concurrency) normalize.check_concurrency = Number.isFinite(n) ? Math.max(1, Math.min(50, Math.floor(n))) : 10 } const sets = [] const params = [] const t = Date.now() for (const k of allowed) { if (Object.prototype.hasOwnProperty.call(normalize, k)) { sets.push(`${k} = ?`) params.push(normalize[k]) } } if (!sets.length) { return { updated: 0 } } sets.push('updated_at = ?') params.push(t) params.push(1) await db.query(`UPDATE lepao_proxy_global SET ${sets.join(', ')} WHERE id = ?`, params) invalidateGlobalCache() return { updated: 1 } }