| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- const axios = require('axios')
- const db = require('../../plugin/DataBase/db')
- const { invalidateGlobalCache } = require('./lepaoOutboundProxy')
- function parseProxyLines(text) {
- const out = []
- const seen = new Set()
- const body = String(text || '')
- // 兼容纯文本、多行、HTML(含 <br> / 脚本广告)场景:
- // 从整段内容中直接抽取 IPv4:port(可带 http(s):// 前缀)。
- const re = /\b(?:(https?):\/\/)?((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\b/gi
- let m
- while ((m = re.exec(body)) !== null) {
- const scheme = (m[1] || 'auto').toLowerCase()
- const host = m[2]
- const port = Number(m[3])
- if (!host || !Number.isFinite(port) || port <= 0 || port > 65535) continue
- const key = `${host}:${port}`
- if (seen.has(key)) continue
- seen.add(key)
- out.push({ host, port, scheme })
- }
- return out
- }
- async function getGlobalOrThrow() {
- const rows = await db.query('SELECT * FROM lepao_proxy_global WHERE id = 1 LIMIT 1')
- const g = rows?.[0]
- if (!g) throw new Error('lepao_proxy_global 未初始化')
- return g
- }
- /**
- * @param {Array<{host:string,port:number,scheme:string}>} list
- * @param {string} source manual|url|cron
- */
- async function upsertProxyRows(list, source) {
- const t = Date.now()
- let inserted = 0
- let duplicated = 0
- for (const item of list) {
- const exists = await db.query(
- 'SELECT id FROM lepao_proxy_pool WHERE host = ? AND port = ? LIMIT 1',
- [item.host, item.port]
- )
- if (exists && exists.length > 0) {
- duplicated += 1
- continue
- }
- await db.query(
- `
- INSERT INTO lepao_proxy_pool (host, port, scheme, is_active, source, created_at, updated_at)
- VALUES (?, ?, ?, 0, ?, ?, ?)
- `,
- [item.host, item.port, item.scheme, source, t, t]
- )
- inserted += 1
- }
- return { total: list.length, inserted, duplicated }
- }
- async function importFromText(rawText, source) {
- const list = parseProxyLines(rawText || '')
- if (!list.length) {
- return { ok: true, imported: 0, message: '无有效代理行(期望 ip:port 或 http(s)://ip:port)' }
- }
- const res = await upsertProxyRows(list, source || 'manual')
- return {
- ok: true,
- imported: res.inserted,
- duplicated: res.duplicated,
- totalParsed: res.total,
- message: `解析 ${res.total} 条,新增 ${res.inserted} 条,重复跳过 ${res.duplicated} 条`,
- ...res
- }
- }
- async function importFromRemoteUrl(url, source) {
- const u = String(url || '').trim()
- if (!u) throw new Error('import_url 为空')
- const res = await axios.get(u, {
- timeout: 30000,
- proxy: false,
- responseType: 'text',
- transformResponse: [(b) => b],
- validateStatus: () => true
- })
- if (typeof res.data !== 'string') {
- throw new Error('拉取内容不是文本')
- }
- if (res.status >= 400) {
- throw new Error(`拉取 HTTP ${res.status}`)
- }
- return importFromText(res.data, source || 'url')
- }
- /**
- * HEAD/GET 探针,经代理访问 probe_target_url
- * @returns {Promise<{ ok: boolean, latency_ms?: number, err?: string, scheme?: string }>}
- */
- async function probeOne(row, probeUrl, timeoutMs) {
- const tryOnce = async (proto) => {
- const start = Date.now()
- await axios.get(probeUrl, {
- proxy: {
- protocol: proto,
- host: row.host,
- port: Number(row.port)
- },
- timeout: timeoutMs,
- validateStatus: () => true,
- maxRedirects: 5
- })
- return { ok: true, latency_ms: Date.now() - start, scheme: proto }
- }
- try {
- const rawScheme = String(row.scheme || 'auto').toLowerCase()
- if (rawScheme === 'http' || rawScheme === 'https') {
- return await tryOnce(rawScheme)
- }
- // 未指定协议时自动判断:先 http 再 https
- try {
- return await tryOnce('http')
- } catch (e1) {
- try {
- return await tryOnce('https')
- } catch (e2) {
- return { ok: false, err: (e2 && e2.message) || (e1 && e1.message) || 'probe failed' }
- }
- }
- } catch (e) {
- return { ok: false, err: e.message || String(e) }
- }
- }
- async function updateProxyCheckResult(poolId, { ok, latency_ms, err, scheme }) {
- const t = Date.now()
- const normalizedScheme = scheme && (scheme === 'http' || scheme === 'https') ? scheme : null
- await db.query(
- `
- UPDATE lepao_proxy_pool
- SET
- is_active = ?,
- latency_ms = ?,
- last_check_at = ?,
- last_error = ?,
- scheme = COALESCE(?, scheme),
- updated_at = ?
- WHERE id = ?
- `,
- [ok ? 1 : 0, ok ? latency_ms : null, t, ok ? null : String(err || '').slice(0, 250), normalizedScheme, t, poolId]
- )
- }
- async function poolRowsByIds(ids) {
- if (!ids?.length) {
- const rows = await db.query('SELECT * FROM lepao_proxy_pool')
- return rows || []
- }
- const placeholders = ids.map(() => '?').join(',')
- const rows = await db.query(`SELECT * FROM lepao_proxy_pool WHERE id IN (${placeholders})`, ids)
- return rows || []
- }
- module.exports.parseProxyLines = parseProxyLines
- module.exports.importFromText = importFromText
- module.exports.importFromRemoteUrl = importFromRemoteUrl
- module.exports.probeOne = probeOne
- module.exports.updateProxyCheckResult = updateProxyCheckResult
- module.exports.poolRowsByIds = poolRowsByIds
- /**
- * @param {number[]} [ids] 为空则检测全表
- * @param {{ logger?: object }} [opts]
- */
- module.exports.batchCheckProxies = async function batchCheckProxies(ids, opts = {}) {
- const g = await getGlobalOrThrow()
- const probeUrl = String(g.probe_target_url || 'https://www.baidu.com')
- const timeoutMs = Number(g.check_timeout_ms) || 8000
- const concurrency = Math.max(1, Math.min(50, Number(g.check_concurrency) || 10))
- const rows = await poolRowsByIds(ids)
- const logger = opts.logger
- let ok = 0
- let fail = 0
- let idx = 0
- async function worker() {
- while (idx < rows.length) {
- const my = idx++
- const row = rows[my]
- if (!row) continue
- const r = await probeOne(row, probeUrl, timeoutMs)
- await updateProxyCheckResult(row.id, r)
- if (r.ok) ok += 1
- else fail += 1
- if (logger && !r.ok) {
- logger.warn?.(`[proxyCheck] id=${row.id} ${row.host}:${row.port} ${r.err}`)
- }
- }
- }
- const n = Math.min(concurrency, rows.length || 1)
- await Promise.all(Array.from({ length: n }, () => worker()))
- return {
- total: rows.length,
- ok,
- fail,
- probeUrl,
- timeoutMs,
- concurrency
- }
- }
- module.exports.getGlobalOrThrow = getGlobalOrThrow
- /**
- * @param {object} patch
- */
- module.exports.patchGlobal = async function patchGlobal(patch) {
- const allowed = [
- 'random_proxy_enabled',
- 'import_url',
- 'probe_target_url',
- 'check_timeout_ms',
- 'check_concurrency'
- ]
- const normalize = {}
- if (Object.prototype.hasOwnProperty.call(patch, 'random_proxy_enabled')) {
- normalize.random_proxy_enabled = Number(patch.random_proxy_enabled) === 1 ? 1 : 0
- }
- if (Object.prototype.hasOwnProperty.call(patch, 'import_url')) {
- const u = String(patch.import_url || '').trim().slice(0, 1024)
- if (!u) throw new Error('import_url 不能为空')
- normalize.import_url = u
- }
- if (Object.prototype.hasOwnProperty.call(patch, 'probe_target_url')) {
- const u = String(patch.probe_target_url || '').trim().slice(0, 1024)
- if (!u) throw new Error('probe_target_url 不能为空')
- normalize.probe_target_url = u
- }
- if (Object.prototype.hasOwnProperty.call(patch, 'check_timeout_ms')) {
- const n = Number(patch.check_timeout_ms)
- normalize.check_timeout_ms = Number.isFinite(n) ? Math.max(500, Math.min(120000, Math.floor(n))) : 8000
- }
- if (Object.prototype.hasOwnProperty.call(patch, 'check_concurrency')) {
- const n = Number(patch.check_concurrency)
- normalize.check_concurrency = Number.isFinite(n) ? Math.max(1, Math.min(50, Math.floor(n))) : 10
- }
- const sets = []
- const params = []
- const t = Date.now()
- for (const k of allowed) {
- if (Object.prototype.hasOwnProperty.call(normalize, k)) {
- sets.push(`${k} = ?`)
- params.push(normalize[k])
- }
- }
- if (!sets.length) {
- return { updated: 0 }
- }
- sets.push('updated_at = ?')
- params.push(t)
- params.push(1)
- await db.query(`UPDATE lepao_proxy_global SET ${sets.join(', ')} WHERE id = ?`, params)
- invalidateGlobalCache()
- return { updated: 1 }
- }
|