Worker.js 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018
  1. const path = require('path')
  2. const mq = require('../../plugin/mq')
  3. const { assertRunforgeTaskIngress, TASK_QUEUE } = require('../../plugin/mq/runforgeTaskMq')
  4. const db = require('../../plugin/DataBase/db')
  5. const Redis = require('../../plugin/DataBase/Redis')
  6. const EmailTemplate = require('../../plugin/Email/emailTemplate')
  7. const jkesRedisKeys = require('../../plugin/jkes/redisKeys')
  8. const Logger = require('../Logger')
  9. class Worker {
  10. constructor() {
  11. this.logger = new Logger(
  12. path.join(__dirname, '../logs/LepaoWorker.log'),
  13. 'INFO'
  14. )
  15. this.handlers = {}
  16. this.running = false
  17. this.taskQueue = TASK_QUEUE
  18. this.resultQueue = 'runforge_task_result_queue'
  19. this.deadQueue = 'runforge_task_dead_queue'
  20. this.noticeQueue = 'runforge_message_queue'
  21. this.channelName = 'lepao_worker'
  22. this.maxRetry = 3
  23. this.timeout = 15000
  24. this._lepaoRecordPathColumn = null
  25. }
  26. roundKm(n) {
  27. const v = Number(n)
  28. if (!Number.isFinite(v)) return 0
  29. return Math.round(v * 100) / 100
  30. }
  31. traceId() {
  32. return Date.now() + '_' + Math.random().toString(36).slice(2, 8)
  33. }
  34. sleep(ms) {
  35. return new Promise((r) => setTimeout(r, ms))
  36. }
  37. async markLoginExpired(account) {
  38. if (!account) return
  39. try {
  40. const sql = 'UPDATE lepao_account SET state = 0 WHERE student_num = ?'
  41. await db.query(sql, [account])
  42. try {
  43. await Redis.del(jkesRedisKeys.runnerFlag(account))
  44. } catch (e) {
  45. this.logger.warn(`${account} 清理 jkes_runner 标记失败:${e.message || e}`)
  46. }
  47. this.logger.warn(`${account} 登录状态已失效,已自动更新账号状态`)
  48. } catch (error) {
  49. this.logger.error(`更新账号登录状态失败:${error.stack || error}`)
  50. }
  51. }
  52. async writeSuccessRedis(account) {
  53. if (!account) return
  54. try {
  55. const now = new Date()
  56. const tomorrow = new Date().setHours(24, 0, 0, 0)
  57. const exp = Math.floor((tomorrow - now) / 1000)
  58. await Redis.set(jkesRedisKeys.lepaoSuccess(account), account, { EX: exp })
  59. } catch (error) {
  60. this.logger.error(`写入乐跑成功缓存失败: ${error.stack || error}`)
  61. }
  62. }
  63. async getLepaoRecordPathColumn() {
  64. if (this._lepaoRecordPathColumn) return this._lepaoRecordPathColumn
  65. try {
  66. await db.query('SELECT path_data FROM lepao_record LIMIT 1')
  67. this._lepaoRecordPathColumn = 'path_data'
  68. } catch (e) {
  69. if ((e?.message || '').includes("Unknown column 'path_data'")) {
  70. this._lepaoRecordPathColumn = 'point_data'
  71. } else {
  72. throw e
  73. }
  74. }
  75. return this._lepaoRecordPathColumn
  76. }
  77. async createLepaoRecord({ uuid, account, result = {}, pathId = null, pathData = [], state = 0 }) {
  78. if (!uuid || !account) return null
  79. const pathCol = await this.getLepaoRecordPathColumn()
  80. const sql = `INSERT INTO lepao_record (uuid, time, lepao_account, result, path_id, ${pathCol}, state) VALUES (?, ?, ?, ?, ?, ?, ?)`
  81. const r = await db.query(sql, [
  82. uuid,
  83. Date.now(),
  84. account,
  85. JSON.stringify(result || {}),
  86. pathId,
  87. JSON.stringify(pathData || []),
  88. state
  89. ])
  90. return r?.insertId || null
  91. }
  92. async updateLepaoRecord(id, { result, pathData, state } = {}) {
  93. if (!id) return
  94. const pathCol = await this.getLepaoRecordPathColumn()
  95. const sets = []
  96. const params = []
  97. if (result !== undefined) {
  98. sets.push('result = ?')
  99. params.push(JSON.stringify(result || {}))
  100. }
  101. if (pathData !== undefined) {
  102. sets.push(`${pathCol} = ?`)
  103. params.push(JSON.stringify(pathData || []))
  104. }
  105. if (state !== undefined) {
  106. sets.push('state = ?')
  107. params.push(state)
  108. }
  109. if (!sets.length) return
  110. params.push(id)
  111. await db.query(`UPDATE lepao_record SET ${sets.join(', ')} WHERE id = ?`, params)
  112. }
  113. async syncJkesRunCount(req) {
  114. const sid = req?.student_id || req?.account
  115. const token = req?.token
  116. if (!sid || !token) return
  117. const { fetchJkesMonthKm, fetchJkesTotalKm } = require('../../plugin/jkes/stats')
  118. const { readState, writeState } = require('../../plugin/jkes/monthPolicy')
  119. const now = new Date()
  120. const y = now.getFullYear()
  121. const m = now.getMonth() + 1
  122. const monthKm = await fetchJkesMonthKm(token, y, m)
  123. const totalKm = await fetchJkesTotalKm(token)
  124. const sql = 'UPDATE lepao_account SET term_num = ?, total_num = ? WHERE student_num = ?'
  125. const rows = await db.query(sql, [monthKm, totalKm, sid])
  126. if (!rows || rows.affectedRows !== 1) {
  127. this.logger.warn(`${sid} JKES 更新里程字段失败`)
  128. } else {
  129. this.logger.info(`${sid} JKES 同步里程 本月=${monthKm} 累计=${totalKm}`)
  130. }
  131. const prevLocal = await readState(sid, now)
  132. await writeState(sid, { km: monthKm, doubles: prevLocal.doubles }, now)
  133. }
  134. async syncRunCount(req) {
  135. try {
  136. await this.syncJkesRunCount(req)
  137. } catch (error) {
  138. this.logger.warn(`${req?.account || 'unknown'}同步乐跑里程失败: ${error.message || error}`)
  139. }
  140. }
  141. async enqueueTask(channel, type, data, options = {}) {
  142. const payload = {
  143. id: options.id || this.traceId(),
  144. type,
  145. data,
  146. retry: options.retry ?? 0
  147. }
  148. await channel.sendToQueue(
  149. this.taskQueue,
  150. Buffer.from(JSON.stringify(payload)),
  151. { persistent: true, contentType: 'application/json' }
  152. )
  153. return payload.id
  154. }
  155. async withTimeout(promise, name, ms) {
  156. const limit = typeof ms === 'number' && ms > 0 ? ms : this.timeout
  157. return Promise.race([
  158. promise,
  159. new Promise((_, reject) =>
  160. setTimeout(() => reject(new Error(`${name} 超时`)), limit)
  161. )
  162. ])
  163. }
  164. async retry(fn, name) {
  165. let lastErr
  166. for (let i = 0; i < this.maxRetry; i++) {
  167. try {
  168. return await fn()
  169. } catch (err) {
  170. lastErr = err
  171. if (!this.isRetryableTaskError(err)) {
  172. throw err
  173. }
  174. this.logger.warn(`[RETRY] ${name} 第${i + 1}次失败`)
  175. await this.sleep(1000 * (i + 1))
  176. }
  177. }
  178. throw lastErr
  179. }
  180. isNetworkError(err) {
  181. if (!err) return false
  182. if (
  183. err.code &&
  184. ['ECONNRESET', 'ECONNABORTED', 'ETIMEDOUT', 'ENOTFOUND', 'EAI_AGAIN'].includes(err.code)
  185. ) {
  186. return true
  187. }
  188. if (err.isAxiosError && !err.response) return true
  189. const msg = (err.message || '').toLowerCase()
  190. return msg.includes('timeout') || msg.includes('network')
  191. }
  192. isRetryableTaskError(err) {
  193. if (!err) return false
  194. if (err.retryable === true) return true
  195. if (this.isNetworkError(err)) return true
  196. return ['PATH_SELECT_FAILED', 'CHECKPOINT_FETCH_FAILED', 'CHECKPOINT_INSUFFICIENT'].includes(
  197. err.code
  198. )
  199. }
  200. safeStringify(obj) {
  201. const seen = new WeakSet()
  202. return JSON.stringify(obj, (key, value) => {
  203. if (typeof value === 'object' && value !== null) {
  204. if (seen.has(value)) return '[Circular]'
  205. seen.add(value)
  206. }
  207. return value
  208. })
  209. }
  210. log(traceId, type, msg, data) {
  211. this.logger.info(`[${traceId}] [${type}] ${msg} ${data ? this.safeStringify(data) : ''}`)
  212. }
  213. logErr(traceId, msg, err) {
  214. this.logger.error(`[${traceId}] ${msg} ${err.stack || err}`)
  215. }
  216. register(type, handler) {
  217. this.handlers[type] = handler
  218. this.logger.info(`注册任务: ${type}`)
  219. }
  220. /**
  221. * 仅选取轨迹几何(闭合环路),实际跑步距离与配速由 runJkesRecord 的 distanceM / paceSecPerKm 决定,不再按 path_data.distance 筛选
  222. */
  223. async selectJkesPathRow(account) {
  224. const accountSql = 'SELECT area FROM lepao_account WHERE student_num = ?'
  225. const rows = await db.query(accountSql, [account])
  226. const area = rows?.[0]?.area
  227. let pathSql = 'SELECT id, data FROM path_data WHERE state = 1 '
  228. const pathParams = []
  229. if (area) {
  230. pathSql += ' AND run_zone_name = ?'
  231. pathParams.push(area)
  232. }
  233. pathSql += ' ORDER BY count ASC LIMIT 1'
  234. const paths = await db.query(pathSql, pathParams)
  235. if (!paths || paths.length === 0) {
  236. const err = new Error('未找到符合条件的路线,请改变路径选择条件')
  237. err.code = 'PATH_SELECT_FAILED'
  238. err.retryable = true
  239. throw err
  240. }
  241. const picked = paths[0]
  242. await db.query('UPDATE path_data SET count = count + 1 WHERE id = ?', [picked.id])
  243. return picked
  244. }
  245. initHandlers() {
  246. this.register('lepao.startRun', async (req, ctx) => {
  247. const traceId = ctx.traceId
  248. const maxPathRetry = 5
  249. let pathRetry = 0
  250. let userData = null
  251. let recordDbId = null
  252. let deductedKm = 0
  253. try {
  254. // const isSuccess = await Redis.get(jkesRedisKeys.lepaoSuccess(req.account))
  255. // if (isSuccess) throw new Error('该账号当天已乐跑成功!请勿重复乐跑')
  256. userData = await this.handlers['lepao.getUserData'](req, ctx)
  257. req = {
  258. ...req,
  259. ...userData,
  260. student_id: req.account
  261. }
  262. const progressKey = jkesRedisKeys.lepaoProgress(req.account)
  263. const inProgress = await Redis.get(progressKey)
  264. if (inProgress) {
  265. throw new Error('该账号已进入乐跑任务队列,请等待乐跑完成后再进行乐跑操作')
  266. }
  267. await Redis.set(progressKey, req.account, { EX: 1800 })
  268. const hour = new Date().getHours()
  269. // if (hour < 7) {
  270. // throw new Error('当前不在有效乐跑时间范围内。哪吒乐跑支持乐跑时间段为7:00~24:00')
  271. // }
  272. const { runJkesRecord } = require('../../plugin/jkes/runRecord')
  273. const { getJkesSettings } = require('../../plugin/jkes/jkesSettings')
  274. const { randomPaceSecPerKm } = require('../../plugin/jkes/paceUtils')
  275. const { isJkesRecordValidInCampus } = require('../../plugin/jkes/stats')
  276. const manual = req.manual === true || req.manual === 'true'
  277. const jkesSettings = getJkesSettings()
  278. let targetKm = Number(req.targetKm)
  279. if (!Number.isFinite(targetKm) || targetKm < 1) targetKm = 1
  280. const maxAutoKm = Math.max(2, Number(jkesSettings.autoSingleRunMaxKm) || 5)
  281. if (manual) {
  282. if (targetKm > 5) targetKm = 5
  283. } else if (targetKm > maxAutoKm) {
  284. targetKm = maxAutoKm
  285. }
  286. targetKm = this.roundKm(targetKm)
  287. const distanceM = targetKm * 1000
  288. deductedKm = targetKm
  289. let pace
  290. if (manual) {
  291. const p = Number(req.paceSecPerKm)
  292. if (!Number.isFinite(p) || p < 180 || p > 600) {
  293. throw new Error('手动乐跑任务缺少有效配速 paceSecPerKm(3:00–10:00/km)')
  294. }
  295. pace = p
  296. } else {
  297. pace = randomPaceSecPerKm(
  298. jkesSettings.paceRandomMinSecPerKm,
  299. jkesSettings.paceRandomMaxSecPerKm
  300. )
  301. }
  302. await this.handlers['lepao.consumeCount'](
  303. {
  304. account: req.account,
  305. uuid: userData?.create_user,
  306. amountKm: deductedKm
  307. },
  308. ctx
  309. )
  310. let jkesPathId = null
  311. let jkesEnd = null
  312. while (pathRetry < maxPathRetry) {
  313. try {
  314. const pathRow = await this.selectJkesPathRow(req.account)
  315. jkesPathId = pathRow.id
  316. let rawData = pathRow.data
  317. if (typeof rawData === 'string') {
  318. rawData = JSON.parse(rawData)
  319. }
  320. if (!Array.isArray(rawData) || rawData.length === 0) {
  321. pathRetry++
  322. this.logger.warn(`[${traceId}] JKES 轨迹数据无效,换路径 第${pathRetry}次`)
  323. continue
  324. }
  325. if (!recordDbId) {
  326. recordDbId = await this.createLepaoRecord({
  327. uuid: userData?.create_user,
  328. account: req.account,
  329. pathId: jkesPathId,
  330. pathData: [],
  331. result: {
  332. phase: 'running',
  333. planned_km: targetKm,
  334. pace_sec_per_km: pace
  335. },
  336. state: 0
  337. })
  338. }
  339. jkesEnd = await runJkesRecord({
  340. token: req.token,
  341. recordDbId,
  342. pathPoints: rawData,
  343. distanceM,
  344. paceSecPerKm: pace,
  345. log: (msg) => this.logger.info(`[${traceId}] ${msg}`)
  346. })
  347. break
  348. } catch (err) {
  349. if (!this.isRetryableTaskError(err)) {
  350. throw err
  351. }
  352. this.logger.warn(`[${traceId}] JKES 可重试错误 第${pathRetry + 1}次:${err.message}`)
  353. pathRetry++
  354. await this.sleep(1000 * pathRetry)
  355. }
  356. }
  357. if (!jkesEnd) {
  358. throw new Error('JKES 乐跑失败:未获得有效跑步结果')
  359. }
  360. const info = jkesEnd.endJson?.data?.info
  361. const infoWithMeta = info
  362. ? {
  363. ...info,
  364. planned_km: targetKm,
  365. deducted_km: deductedKm,
  366. pace_sec_per_km: pace
  367. }
  368. : null
  369. if (!recordDbId) {
  370. recordDbId = await this.createLepaoRecord({
  371. uuid: userData?.create_user,
  372. account: req.account,
  373. pathId: jkesPathId,
  374. pathData: jkesEnd.uploadedPayloadPoints || [],
  375. result: infoWithMeta || jkesEnd.endJson?.data || {},
  376. state: 1
  377. })
  378. } else {
  379. await this.updateLepaoRecord(recordDbId, {
  380. result: infoWithMeta || jkesEnd.endJson?.data || {},
  381. pathData: jkesEnd.uploadedPayloadPoints || [],
  382. state: 1
  383. })
  384. }
  385. const ok = isJkesRecordValidInCampus(info || {})
  386. if (!ok) {
  387. const reason =
  388. info?.dataStatus?.label || info?.status?.label || '跑步记录未记作校内有效'
  389. throw new Error(reason)
  390. }
  391. await this.writeSuccessRedis(req.account)
  392. const autoDoubleSlot =
  393. !manual && (req.autoDoubleSlot === true || req.autoDoubleSlot === 'true')
  394. const finalizeTask = {
  395. account: req.account,
  396. token: req.token,
  397. uuid: userData?.create_user,
  398. recordDbId,
  399. jkesRecordId: jkesEnd.recordId || info?.id,
  400. deductedKm,
  401. targetKm,
  402. autoDoubleSlot,
  403. traceId
  404. }
  405. if (ctx.channel) {
  406. try {
  407. await this.enqueueTask(ctx.channel, 'lepao.finalizeRunSync', finalizeTask, {
  408. id: `${traceId}:finalize:${req.account}`
  409. })
  410. } catch (e) {
  411. this.logger.warn(`[${traceId}] finalize 任务投递失败,改为同步执行:${e.message || e}`)
  412. await this.handlers['lepao.finalizeRunSync'](finalizeTask, ctx)
  413. }
  414. } else {
  415. await this.handlers['lepao.finalizeRunSync'](finalizeTask, ctx)
  416. }
  417. return { traceId, jkes: true, endJson: jkesEnd.endJson }
  418. } catch (err) {
  419. this.logger.error(`[${traceId}] 乐跑流程失败:`, err)
  420. try {
  421. await this.handlers['lepao.refundCount'](
  422. {
  423. account: req.account,
  424. uuid: userData?.create_user,
  425. amountKm: deductedKm
  426. },
  427. ctx
  428. )
  429. } catch (e) {
  430. this.logger.error(`[${traceId}] 返还乐跑公里失败:${e.stack || e}`)
  431. }
  432. if (recordDbId) {
  433. try {
  434. await this.updateLepaoRecord(recordDbId, {
  435. result: { phase: 'error', reason: err.message || '未知错误' },
  436. state: 3
  437. })
  438. } catch (e) {
  439. this.logger.error(`[${traceId}] 更新乐跑记录失败:${e.stack || e}`)
  440. }
  441. }
  442. if (ctx.channel) {
  443. await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
  444. account: req.account,
  445. success: false,
  446. reason: err.message || '未知错误',
  447. traceId
  448. }, { id: `${traceId}:notice:fail` })
  449. }
  450. throw err
  451. } finally {
  452. if (req?.account) {
  453. await Redis.del(jkesRedisKeys.lepaoProgress(req.account))
  454. }
  455. }
  456. })
  457. this.register('lepao.finalizeRunSync', async (req, ctx) => {
  458. const traceId = req?.traceId || ctx?.traceId || this.traceId()
  459. const {
  460. account,
  461. token,
  462. uuid,
  463. recordDbId,
  464. jkesRecordId,
  465. deductedKm,
  466. targetKm,
  467. autoDoubleSlot
  468. } = req || {}
  469. if (!account || !token || !recordDbId || !jkesRecordId) {
  470. throw new Error('finalizeRunSync 参数缺失')
  471. }
  472. const {
  473. fetchJkesRecordById,
  474. isJkesRecordFullySynced,
  475. isJkesRecordValidInCampus,
  476. recordDistanceKm
  477. } = require('../../plugin/jkes/stats')
  478. const { recordSuccess } = require('../../plugin/jkes/monthPolicy')
  479. const pollCount = 40
  480. const pollIntervalMs = 90 * 1000
  481. let latest = null
  482. let synced = false
  483. for (let i = 1; i <= pollCount; i++) {
  484. latest = await fetchJkesRecordById(token, jkesRecordId, 10)
  485. if (latest && isJkesRecordFullySynced(latest)) {
  486. synced = true
  487. break
  488. }
  489. if (i < pollCount) {
  490. await this.sleep(pollIntervalMs)
  491. }
  492. }
  493. if (!latest || !isJkesRecordValidInCampus(latest)) {
  494. await this.updateLepaoRecord(recordDbId, {
  495. result: latest || { phase: 'finalize_failed', reason: '记录未校内有效' },
  496. state: 3
  497. })
  498. await this.handlers['lepao.refundCount'](
  499. {
  500. account,
  501. uuid,
  502. amountKm: this.roundKm(deductedKm || targetKm || 0)
  503. },
  504. { ...ctx, taskId: `${ctx?.taskId || traceId}:finalize_refund_full` }
  505. )
  506. if (ctx.channel) {
  507. await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
  508. account,
  509. success: false,
  510. reason: '乐跑记录未同步为校内有效,已退还预扣公里',
  511. traceId
  512. }, { id: `${traceId}:notice:fail:finalize` })
  513. }
  514. return { traceId, finalized: false, refunded: true }
  515. }
  516. if (!synced) {
  517. await this.updateLepaoRecord(recordDbId, {
  518. result: latest,
  519. state: 1
  520. })
  521. const err = new Error('官方记录尚未完全同步(distance/speed)')
  522. err.retryable = true
  523. throw err
  524. }
  525. const officialKmRaw = this.roundKm(recordDistanceKm(latest))
  526. const officialKm = officialKmRaw > 0 ? officialKmRaw : this.roundKm(targetKm || 0)
  527. const preDeduct = this.roundKm(deductedKm || targetKm || 0)
  528. const refundKm = this.roundKm(Math.max(0, preDeduct - officialKm))
  529. if (refundKm > 0) {
  530. await this.handlers['lepao.refundCount'](
  531. {
  532. account,
  533. uuid,
  534. amountKm: refundKm
  535. },
  536. { ...ctx, taskId: `${ctx?.taskId || traceId}:finalize_refund_over` }
  537. )
  538. }
  539. await this.updateLepaoRecord(recordDbId, {
  540. result: {
  541. ...(latest || {}),
  542. planned_km: this.roundKm(targetKm || 0),
  543. deducted_km: preDeduct,
  544. official_km: officialKm,
  545. refunded_km: refundKm
  546. },
  547. state: 2
  548. })
  549. await recordSuccess(account, officialKm, { autoDoubleSlot: officialKm >= 2 && !!autoDoubleSlot })
  550. await this.syncRunCount({ account, student_id: account, token })
  551. // 达成本月预设目标后:关闭自动乐跑并发送目标完成邮件(target_count=0 表示不限制)
  552. try {
  553. const accRows = await db.query(
  554. 'SELECT name, email, notice_type, auto_run, target_count, term_num, total_num FROM lepao_account WHERE student_num = ?',
  555. [account]
  556. )
  557. if (accRows && accRows.length) {
  558. const acc = accRows[0]
  559. const targetKm = Number(acc.target_count) || 0
  560. const monthKm = Number(acc.term_num) || 0
  561. if (acc.auto_run === 1 && targetKm !== 0 && monthKm >= targetKm) {
  562. await db.query('UPDATE lepao_account SET auto_run = 0 WHERE student_num = ?', [account])
  563. if (acc.notice_type === 'email' && acc.email) {
  564. await EmailTemplate.lepaoOver(acc.email, {
  565. name: acc.name || account,
  566. month_km: monthKm
  567. })
  568. }
  569. }
  570. }
  571. } catch (e) {
  572. this.logger.warn(`[${traceId}] 目标达成处理失败:${e.message || e}`)
  573. }
  574. if (ctx.channel) {
  575. await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
  576. account,
  577. success: true,
  578. data: {
  579. distance: officialKm,
  580. time: Number(latest.useTime) || 0,
  581. record_failed_reason: '',
  582. refundedKm: refundKm
  583. },
  584. traceId
  585. }, { id: `${traceId}:notice:success:finalize` })
  586. }
  587. return { traceId, finalized: true, officialKm, refundKm }
  588. })
  589. this.register('lepao.sendNotice', async (req, ctx) => {
  590. const { account, success, data, reason, traceId } = req || {}
  591. if (!account) {
  592. throw new Error('发送通知失败:缺少 account')
  593. }
  594. const emailSql = `
  595. SELECT
  596. a.name,
  597. a.email,
  598. a.target_count,
  599. a.term_num,
  600. a.total_num,
  601. a.notice_type,
  602. e.bot_umo
  603. FROM
  604. lepao_account a
  605. LEFT JOIN
  606. lepao_extra e
  607. ON
  608. a.student_num = e.student_num
  609. WHERE
  610. a.student_num = ?
  611. `
  612. const rows = await db.query(emailSql, [account])
  613. if (!rows || rows.length === 0) {
  614. throw new Error('发送通知失败:未找到用户通知配置')
  615. }
  616. const user = rows[0]
  617. const noticeType = user.notice_type || 'none'
  618. let runZoneName = data?.run_zone_name
  619. if (!runZoneName && success) {
  620. try {
  621. const z = await db.query(
  622. `
  623. SELECT p.run_zone_name
  624. FROM lepao_record r
  625. LEFT JOIN path_data p ON r.path_id = p.id
  626. WHERE r.lepao_account = ?
  627. ORDER BY r.id DESC
  628. LIMIT 1
  629. `,
  630. [account]
  631. )
  632. runZoneName = z?.length ? z[0].run_zone_name : null
  633. } catch (e) {
  634. this.logger.warn(`[${traceId}] 查询跑区失败:${e.message || e}`)
  635. }
  636. }
  637. const payload = success
  638. ? {
  639. ...(data && typeof data === 'object' ? data : {}),
  640. type: 'lepao_success',
  641. umo: user.bot_umo,
  642. run_zone_name: runZoneName,
  643. month_km: Number(user.term_num) || 0,
  644. total_km: Number(user.total_num) || 0,
  645. target_km: Number(user.target_count) || 0,
  646. name: user.name,
  647. account,
  648. traceId
  649. }
  650. : {
  651. type: 'lepao_fail',
  652. umo: user.bot_umo,
  653. name: user.name,
  654. account,
  655. reason,
  656. traceId
  657. }
  658. if (noticeType === 'bot' && user.bot_umo) {
  659. const ch = await mq.getChannel(this.noticeQueue)
  660. await ch.assertQueue(this.noticeQueue, { durable: true })
  661. ch.sendToQueue(
  662. this.noticeQueue,
  663. Buffer.from(JSON.stringify(payload)),
  664. {
  665. persistent: true,
  666. contentType: 'application/json'
  667. }
  668. )
  669. return { delivered: true, via: 'bot' }
  670. }
  671. if (noticeType === 'email' && user.email) {
  672. if (success) {
  673. await EmailTemplate.lepaoSuccess(user.email, payload)
  674. return { delivered: true, via: 'email' }
  675. }
  676. await EmailTemplate.lepaoFail(user.email, {
  677. name: user.name,
  678. account,
  679. reason: reason || '系统繁忙,请联系客服或稍后再试',
  680. traceId
  681. })
  682. return { delivered: true, via: 'email' }
  683. }
  684. return { delivered: false, via: 'none' }
  685. })
  686. this.register('lepao.consumeCount', async (req, ctx) => {
  687. const account = req?.account
  688. const uuid = req?.uuid
  689. const amountKm = this.roundKm(req?.amountKm ?? 1)
  690. if (!uuid) {
  691. throw new Error('扣减乐跑公里失败:缺少 uuid')
  692. }
  693. if (!(amountKm > 0)) {
  694. throw new Error('扣减乐跑公里失败:amountKm 无效')
  695. }
  696. const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}`
  697. const consumeKey = jkesRedisKeys.consume(baseKey)
  698. const existed = await Redis.get(consumeKey)
  699. if (existed) {
  700. return true
  701. }
  702. this.logger.info(`${account || uuid}开始扣减乐跑公里 ${amountKm}km`)
  703. const useLepaoCountSql =
  704. 'UPDATE users SET lepao_count = ROUND(lepao_count - ?, 2) WHERE uuid = ? AND lepao_count >= ?'
  705. const r = await db.query(useLepaoCountSql, [amountKm, uuid, amountKm])
  706. if (!r || r.affectedRows !== 1) {
  707. throw new Error(`扣减乐跑公里失败:余额不足(需 ${amountKm}km)`)
  708. }
  709. this.logger.info(`${account || uuid}扣减乐跑公里完成`)
  710. await Redis.set(consumeKey, '1', { EX: 3600 })
  711. return true
  712. })
  713. this.register('lepao.refundCount', async (req, ctx) => {
  714. const account = req?.account
  715. const uuid = req?.uuid
  716. const amountKm = this.roundKm(req?.amountKm ?? 0)
  717. if (!uuid) {
  718. return true
  719. }
  720. if (!(amountKm > 0)) return true
  721. const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}`
  722. const consumeKey = jkesRedisKeys.consume(baseKey)
  723. const refundKey = jkesRedisKeys.refund(baseKey)
  724. const consumed = await Redis.get(consumeKey)
  725. if (!consumed) {
  726. return true
  727. }
  728. const refunded = await Redis.get(refundKey)
  729. if (refunded) {
  730. return true
  731. }
  732. this.logger.info(`${account || uuid}开始返还乐跑公里 ${amountKm}km`)
  733. const sql = 'UPDATE users SET lepao_count = ROUND(lepao_count + ?, 2) WHERE uuid = ?'
  734. const r = await db.query(sql, [amountKm, uuid])
  735. if (!r || r.affectedRows !== 1) {
  736. throw new Error('返还乐跑公里失败:数据库更新失败')
  737. }
  738. this.logger.info(`${account || uuid}返还乐跑公里完成`)
  739. await Redis.set(refundKey, '1', { EX: 3600 })
  740. return true
  741. })
  742. this.register('lepao.getUserData', async (req, ctx) => {
  743. const account = req.account
  744. this.logger.info(`${account}开始获取用户数据`)
  745. const accountSql = `
  746. SELECT
  747. u.uuid,
  748. u.lepao_count,
  749. l.create_user,
  750. l.name,
  751. l.student_num,
  752. l.area,
  753. l.sex,
  754. l.state,
  755. l.token,
  756. l.userAgent,
  757. l.deviceModel,
  758. l.notice_type,
  759. l.email,
  760. e.bot_account
  761. FROM
  762. lepao_account l
  763. LEFT JOIN
  764. users u
  765. ON
  766. l.create_user = u.uuid
  767. LEFT JOIN
  768. lepao_extra e
  769. ON
  770. l.student_num = e.student_num
  771. WHERE
  772. l.student_num = ?
  773. `
  774. const rows = await db.query(accountSql, [account])
  775. if (!rows || rows.length === 0) {
  776. this.logger.error(`${account}无法获取账号数据`)
  777. throw new Error('无法获取账号数据,请联系客服或稍后再试')
  778. }
  779. let userData = rows[0]
  780. if (!userData.create_user || !userData.uuid) {
  781. this.logger.warn(`${account}账号状态异常`)
  782. throw new Error('当前账号状态异常,请联系客服')
  783. }
  784. if (userData.state !== 1) {
  785. this.logger.warn(`${account}登录状态异常 state=${userData.state}`)
  786. throw new Error('乐跑账号登录已过期,请尝试使用登录器重新登录')
  787. }
  788. if (Number(userData.lepao_count) <= 0) {
  789. this.logger.warn(`${account}乐跑公里余额不足`)
  790. throw new Error('用户乐跑公里余额不足,请购买后重试!')
  791. }
  792. if (!userData.userAgent) {
  793. userData.userAgent =
  794. 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.69 NetType/WIFI Language/zh_CN'
  795. }
  796. if (!userData.deviceModel) userData.deviceModel = 'unknown'
  797. return userData
  798. })
  799. }
  800. async start() {
  801. if (this.running) return
  802. this.running = true
  803. this.logger.info('Worker 启动中(JKES)...')
  804. try {
  805. this.initHandlers()
  806. const channel = await mq.getChannel(this.channelName)
  807. await channel.prefetch(5)
  808. await assertRunforgeTaskIngress(channel, this.logger)
  809. await channel.assertQueue(this.resultQueue, { durable: true })
  810. await channel.assertQueue(this.deadQueue, { durable: true })
  811. const handleTaskMessage = async (msg) => {
  812. if (!msg) return
  813. let content
  814. try {
  815. content = JSON.parse(msg.content.toString())
  816. } catch {
  817. return channel.ack(msg)
  818. }
  819. const { id, type, data, retry = 0 } = content
  820. const traceId = this.traceId()
  821. const handler = this.handlers[type]
  822. if (!handler) {
  823. this.log(traceId, 'ERROR', '未知任务', { type })
  824. return channel.ack(msg)
  825. }
  826. try {
  827. const runMs =
  828. type === 'lepao.startRun'
  829. ? 3600000
  830. : type === 'lepao.finalizeRunSync'
  831. ? 4 * 3600000
  832. : undefined
  833. const result = await this.withTimeout(
  834. handler(data, { traceId, channel, taskId: id }),
  835. type,
  836. runMs
  837. )
  838. await this.sendResult(channel, {
  839. id,
  840. success: true,
  841. result
  842. })
  843. this.log(traceId, 'DONE', `任务完成 ${type}`)
  844. channel.ack(msg)
  845. } catch (err) {
  846. this.logErr(traceId, `任务失败 ${type}`, err)
  847. if (err?.loginExpired) {
  848. const account = data?.account || data?.student_num || data?.studentNum
  849. await this.markLoginExpired(account)
  850. }
  851. if (retry < this.maxRetry && this.isRetryableTaskError(err)) {
  852. await channel.sendToQueue(
  853. this.taskQueue,
  854. Buffer.from(
  855. JSON.stringify({
  856. ...content,
  857. retry: retry + 1
  858. })
  859. ),
  860. { persistent: true }
  861. )
  862. this.log(traceId, 'RETRY', `重试第${retry + 1}次`)
  863. } else {
  864. await channel.sendToQueue(
  865. this.deadQueue,
  866. Buffer.from(JSON.stringify(content)),
  867. { persistent: true }
  868. )
  869. this.log(traceId, 'DEAD', '进入死信队列')
  870. }
  871. await this.sendResult(channel, {
  872. id,
  873. success: false,
  874. error: err.message
  875. })
  876. channel.ack(msg)
  877. }
  878. }
  879. await channel.consume(this.taskQueue, handleTaskMessage, { noAck: false })
  880. this.logger.info('哪吒乐跑 Worker 启动成功(JKES)')
  881. } catch (err) {
  882. this.logger.error('哪吒乐跑 Worker 启动失败: ' + err.stack)
  883. }
  884. }
  885. async sendResult(channel, data) {
  886. channel.sendToQueue(
  887. this.resultQueue,
  888. Buffer.from(JSON.stringify(data)),
  889. { persistent: true }
  890. )
  891. }
  892. async stop() {
  893. this.running = false
  894. await mq.close()
  895. this.logger.info('哪吒乐跑 Worker 已停止')
  896. }
  897. }
  898. module.exports = Worker