lepaoProxyPoolService.js 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. const axios = require('axios')
  2. const db = require('../../plugin/DataBase/db')
  3. const { invalidateGlobalCache } = require('./lepaoOutboundProxy')
  4. function parseProxyLines(text) {
  5. const out = []
  6. const seen = new Set()
  7. const body = String(text || '')
  8. // 兼容纯文本、多行、HTML(含 <br> / 脚本广告)场景:
  9. // 从整段内容中直接抽取 IPv4:port(可带 http(s):// 前缀)。
  10. const re = /\b(?:(https?):\/\/)?((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\b/gi
  11. let m
  12. while ((m = re.exec(body)) !== null) {
  13. const scheme = (m[1] || 'auto').toLowerCase()
  14. const host = m[2]
  15. const port = Number(m[3])
  16. if (!host || !Number.isFinite(port) || port <= 0 || port > 65535) continue
  17. const key = `${host}:${port}`
  18. if (seen.has(key)) continue
  19. seen.add(key)
  20. out.push({ host, port, scheme })
  21. }
  22. return out
  23. }
  24. async function getGlobalOrThrow() {
  25. const rows = await db.query('SELECT * FROM lepao_proxy_global WHERE id = 1 LIMIT 1')
  26. const g = rows?.[0]
  27. if (!g) throw new Error('lepao_proxy_global 未初始化')
  28. return g
  29. }
  30. /**
  31. * @param {Array<{host:string,port:number,scheme:string}>} list
  32. * @param {string} source manual|url|cron
  33. */
  34. async function upsertProxyRows(list, source) {
  35. const t = Date.now()
  36. let inserted = 0
  37. let duplicated = 0
  38. for (const item of list) {
  39. const exists = await db.query(
  40. 'SELECT id FROM lepao_proxy_pool WHERE host = ? AND port = ? LIMIT 1',
  41. [item.host, item.port]
  42. )
  43. if (exists && exists.length > 0) {
  44. duplicated += 1
  45. continue
  46. }
  47. await db.query(
  48. `
  49. INSERT INTO lepao_proxy_pool (host, port, scheme, is_active, source, created_at, updated_at)
  50. VALUES (?, ?, ?, 0, ?, ?, ?)
  51. `,
  52. [item.host, item.port, item.scheme, source, t, t]
  53. )
  54. inserted += 1
  55. }
  56. return { total: list.length, inserted, duplicated }
  57. }
  58. async function importFromText(rawText, source) {
  59. const list = parseProxyLines(rawText || '')
  60. if (!list.length) {
  61. return { ok: true, imported: 0, message: '无有效代理行(期望 ip:port 或 http(s)://ip:port)' }
  62. }
  63. const res = await upsertProxyRows(list, source || 'manual')
  64. return {
  65. ok: true,
  66. imported: res.inserted,
  67. duplicated: res.duplicated,
  68. totalParsed: res.total,
  69. message: `解析 ${res.total} 条,新增 ${res.inserted} 条,重复跳过 ${res.duplicated} 条`,
  70. ...res
  71. }
  72. }
  73. async function importFromRemoteUrl(url, source) {
  74. const u = String(url || '').trim()
  75. if (!u) throw new Error('import_url 为空')
  76. const res = await axios.get(u, {
  77. timeout: 30000,
  78. proxy: false,
  79. responseType: 'text',
  80. transformResponse: [(b) => b],
  81. validateStatus: () => true
  82. })
  83. if (typeof res.data !== 'string') {
  84. throw new Error('拉取内容不是文本')
  85. }
  86. if (res.status >= 400) {
  87. throw new Error(`拉取 HTTP ${res.status}`)
  88. }
  89. return importFromText(res.data, source || 'url')
  90. }
  91. /**
  92. * HEAD/GET 探针,经代理访问 probe_target_url
  93. * @returns {Promise<{ ok: boolean, latency_ms?: number, err?: string, scheme?: string }>}
  94. */
  95. async function probeOne(row, probeUrl, timeoutMs) {
  96. const tryOnce = async (proto) => {
  97. const start = Date.now()
  98. await axios.get(probeUrl, {
  99. proxy: {
  100. protocol: proto,
  101. host: row.host,
  102. port: Number(row.port)
  103. },
  104. timeout: timeoutMs,
  105. validateStatus: () => true,
  106. maxRedirects: 5
  107. })
  108. return { ok: true, latency_ms: Date.now() - start, scheme: proto }
  109. }
  110. try {
  111. const rawScheme = String(row.scheme || 'auto').toLowerCase()
  112. if (rawScheme === 'http' || rawScheme === 'https') {
  113. return await tryOnce(rawScheme)
  114. }
  115. // 未指定协议时自动判断:先 http 再 https
  116. try {
  117. return await tryOnce('http')
  118. } catch (e1) {
  119. try {
  120. return await tryOnce('https')
  121. } catch (e2) {
  122. return { ok: false, err: (e2 && e2.message) || (e1 && e1.message) || 'probe failed' }
  123. }
  124. }
  125. } catch (e) {
  126. return { ok: false, err: e.message || String(e) }
  127. }
  128. }
  129. async function updateProxyCheckResult(poolId, { ok, latency_ms, err, scheme }) {
  130. const t = Date.now()
  131. const normalizedScheme = scheme && (scheme === 'http' || scheme === 'https') ? scheme : null
  132. await db.query(
  133. `
  134. UPDATE lepao_proxy_pool
  135. SET
  136. is_active = ?,
  137. latency_ms = ?,
  138. last_check_at = ?,
  139. last_error = ?,
  140. scheme = COALESCE(?, scheme),
  141. updated_at = ?
  142. WHERE id = ?
  143. `,
  144. [ok ? 1 : 0, ok ? latency_ms : null, t, ok ? null : String(err || '').slice(0, 250), normalizedScheme, t, poolId]
  145. )
  146. }
  147. async function poolRowsByIds(ids) {
  148. if (!ids?.length) {
  149. const rows = await db.query('SELECT * FROM lepao_proxy_pool')
  150. return rows || []
  151. }
  152. const placeholders = ids.map(() => '?').join(',')
  153. const rows = await db.query(`SELECT * FROM lepao_proxy_pool WHERE id IN (${placeholders})`, ids)
  154. return rows || []
  155. }
  156. module.exports.parseProxyLines = parseProxyLines
  157. module.exports.importFromText = importFromText
  158. module.exports.importFromRemoteUrl = importFromRemoteUrl
  159. module.exports.probeOne = probeOne
  160. module.exports.updateProxyCheckResult = updateProxyCheckResult
  161. module.exports.poolRowsByIds = poolRowsByIds
  162. /**
  163. * @param {number[]} [ids] 为空则检测全表
  164. * @param {{ logger?: object }} [opts]
  165. */
  166. module.exports.batchCheckProxies = async function batchCheckProxies(ids, opts = {}) {
  167. const g = await getGlobalOrThrow()
  168. const probeUrl = String(g.probe_target_url || 'https://www.baidu.com')
  169. const timeoutMs = Number(g.check_timeout_ms) || 8000
  170. const concurrency = Math.max(1, Math.min(50, Number(g.check_concurrency) || 10))
  171. const rows = await poolRowsByIds(ids)
  172. const logger = opts.logger
  173. let ok = 0
  174. let fail = 0
  175. let idx = 0
  176. async function worker() {
  177. while (idx < rows.length) {
  178. const my = idx++
  179. const row = rows[my]
  180. if (!row) continue
  181. const r = await probeOne(row, probeUrl, timeoutMs)
  182. await updateProxyCheckResult(row.id, r)
  183. if (r.ok) ok += 1
  184. else fail += 1
  185. if (logger && !r.ok) {
  186. logger.warn?.(`[proxyCheck] id=${row.id} ${row.host}:${row.port} ${r.err}`)
  187. }
  188. }
  189. }
  190. const n = Math.min(concurrency, rows.length || 1)
  191. await Promise.all(Array.from({ length: n }, () => worker()))
  192. return {
  193. total: rows.length,
  194. ok,
  195. fail,
  196. probeUrl,
  197. timeoutMs,
  198. concurrency
  199. }
  200. }
  201. module.exports.getGlobalOrThrow = getGlobalOrThrow
  202. /**
  203. * @param {object} patch
  204. */
  205. module.exports.patchGlobal = async function patchGlobal(patch) {
  206. const allowed = [
  207. 'random_proxy_enabled',
  208. 'import_url',
  209. 'probe_target_url',
  210. 'check_timeout_ms',
  211. 'check_concurrency'
  212. ]
  213. const normalize = {}
  214. if (Object.prototype.hasOwnProperty.call(patch, 'random_proxy_enabled')) {
  215. normalize.random_proxy_enabled = Number(patch.random_proxy_enabled) === 1 ? 1 : 0
  216. }
  217. if (Object.prototype.hasOwnProperty.call(patch, 'import_url')) {
  218. const u = String(patch.import_url || '').trim().slice(0, 1024)
  219. if (!u) throw new Error('import_url 不能为空')
  220. normalize.import_url = u
  221. }
  222. if (Object.prototype.hasOwnProperty.call(patch, 'probe_target_url')) {
  223. const u = String(patch.probe_target_url || '').trim().slice(0, 1024)
  224. if (!u) throw new Error('probe_target_url 不能为空')
  225. normalize.probe_target_url = u
  226. }
  227. if (Object.prototype.hasOwnProperty.call(patch, 'check_timeout_ms')) {
  228. const n = Number(patch.check_timeout_ms)
  229. normalize.check_timeout_ms = Number.isFinite(n) ? Math.max(500, Math.min(120000, Math.floor(n))) : 8000
  230. }
  231. if (Object.prototype.hasOwnProperty.call(patch, 'check_concurrency')) {
  232. const n = Number(patch.check_concurrency)
  233. normalize.check_concurrency = Number.isFinite(n) ? Math.max(1, Math.min(50, Math.floor(n))) : 10
  234. }
  235. const sets = []
  236. const params = []
  237. const t = Date.now()
  238. for (const k of allowed) {
  239. if (Object.prototype.hasOwnProperty.call(normalize, k)) {
  240. sets.push(`${k} = ?`)
  241. params.push(normalize[k])
  242. }
  243. }
  244. if (!sets.length) {
  245. return { updated: 0 }
  246. }
  247. sets.push('updated_at = ?')
  248. params.push(t)
  249. params.push(1)
  250. await db.query(`UPDATE lepao_proxy_global SET ${sets.join(', ')} WHERE id = ?`, params)
  251. invalidateGlobalCache()
  252. return { updated: 1 }
  253. }