Worker.js 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921
  1. const path = require('path')
  2. const axios = require('axios')
  3. const OSS = require('ali-oss')
  4. const mq = require('../../plugin/mq')
  5. const db = require('../../plugin/DataBase/db')
  6. const Redis = require('../../plugin/DataBase/Redis')
  7. const EmailTemplate = require('../../plugin/Email/emailTemplate')
  8. const { URLSearchParams } = require('url')
  9. const {
  10. getPathData,
  11. selectCheckpoints,
  12. generateCadence
  13. } = require('../../plugin/Lepao/Path')
  14. const {
  15. dataEncrypt,
  16. dataDecrypt,
  17. dataSign
  18. } = require('../../plugin/Lepao/Crypto')
  19. const Logger = require('../Logger')
  20. class Worker {
  21. constructor() {
  22. this.logger = new Logger(
  23. path.join(__dirname, '../logs/LepaoWorker.log'),
  24. 'INFO'
  25. )
  26. this.handlers = {}
  27. this.running = false
  28. this.baseUrl = 'https://lepao.ctbu.edu.cn/v3/api.php'
  29. this.taskQueue = 'task_queue'
  30. this.resultQueue = 'task_result_queue'
  31. this.deadQueue = 'task_dead_queue'
  32. this.noticeQueue = 'runforge_message_queue'
  33. this.channelName = 'lepao_worker'
  34. this.maxRetry = 3
  35. this.timeout = 15000
  36. this.defaultUserAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64'
  37. // 调试模式:将 axios 请求走本地代理(例如 charles/fiddler)
  38. // 开启方式:设置环境变量 LEPAO_DEBUG_PROXY=1
  39. this.debugProxyEnabled = String(process.env.LEPAO_DEBUG_PROXY || '').trim() === '1'
  40. this.debugProxyHost = process.env.LEPAO_DEBUG_PROXY_HOST || '127.0.0.1'
  41. this.debugProxyPort = Number(process.env.LEPAO_DEBUG_PROXY_PORT || 9000)
  42. }
  43. /* ================= 工具 ================= */
  44. api(path) {
  45. return this.baseUrl + path
  46. }
  47. traceId() {
  48. return Date.now() + '_' + Math.random().toString(36).slice(2, 8)
  49. }
  50. sleep(ms) {
  51. return new Promise(r => setTimeout(r, ms))
  52. }
  53. lepaoTimestamp() {
  54. return (Date.now() / 1000).toFixed(3)
  55. }
  56. axiosProxyConfig() {
  57. if (!this.debugProxyEnabled) {
  58. return { proxy: false }
  59. }
  60. this.logger.info(`使用本地代理: ${this.debugProxyHost}:${this.debugProxyPort}`)
  61. return {
  62. proxy: {
  63. host: this.debugProxyHost,
  64. port: this.debugProxyPort,
  65. protocol: 'http'
  66. }
  67. }
  68. }
  69. async enqueueTask(channel, type, data, options = {}) {
  70. const payload = {
  71. id: options.id || this.traceId(),
  72. type,
  73. data,
  74. retry: options.retry ?? 0
  75. }
  76. await channel.sendToQueue(
  77. this.taskQueue,
  78. Buffer.from(JSON.stringify(payload)),
  79. { persistent: true, contentType: 'application/json' }
  80. )
  81. return payload.id
  82. }
  83. async withTimeout(promise, name) {
  84. return Promise.race([
  85. promise,
  86. new Promise((_, reject) =>
  87. setTimeout(() => reject(new Error(`${name} 超时`)), this.timeout)
  88. )
  89. ])
  90. }
  91. async retry(fn, name) {
  92. let lastErr
  93. for (let i = 0; i < this.maxRetry; i++) {
  94. try {
  95. return await fn()
  96. } catch (err) {
  97. lastErr = err
  98. this.logger.warn(`[RETRY] ${name} 第${i + 1}次失败`)
  99. await this.sleep(1000 * (i + 1)) // 指数退避
  100. }
  101. }
  102. throw lastErr
  103. }
  104. isNetworkError(err) {
  105. if (!err) return false
  106. if (err.code && ['ECONNRESET', 'ECONNABORTED', 'ETIMEDOUT', 'ENOTFOUND', 'EAI_AGAIN'].includes(err.code)) {
  107. return true
  108. }
  109. if (err.isAxiosError && !err.response) return true
  110. const msg = (err.message || '').toLowerCase()
  111. return msg.includes('timeout') || msg.includes('network')
  112. }
  113. isRetryableTaskError(err) {
  114. if (!err) return false
  115. if (err.retryable === true) return true
  116. if (this.isNetworkError(err)) return true
  117. return ['PATH_SELECT_FAILED', 'CHECKPOINT_FETCH_FAILED', 'CHECKPOINT_INSUFFICIENT'].includes(err.code)
  118. }
  119. log(traceId, type, msg, data) {
  120. this.logger.info(`[${traceId}] [${type}] ${msg} ${data ? data : ''}`)
  121. }
  122. logErr(traceId, msg, err) {
  123. this.logger.error(`[${traceId}] ${msg} ${err.stack || err}`)
  124. }
  125. async request(traceId, name, url, raw, headers = {}) {
  126. return this.retry(async () => {
  127. this.log(traceId, 'REQ', name, raw)
  128. const mergedHeaders = {
  129. 'Content-Type': 'application/x-www-form-urlencoded',
  130. 'Accept': '*/*',
  131. 'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
  132. 'Accept-Encoding': 'gzip, deflate, br',
  133. 'Referer': 'https://servicewechat.com/wxf94c4ddb63d87ede/32/page-frame.html',
  134. ...headers
  135. }
  136. if (!mergedHeaders['User-Agent']) {
  137. mergedHeaders['User-Agent'] = this.defaultUserAgent
  138. }
  139. const form = new URLSearchParams()
  140. form.append('ostype', '5')
  141. form.append('data', dataEncrypt(JSON.stringify(raw)))
  142. const res = await this.withTimeout(
  143. axios.post(
  144. url,
  145. form,
  146. {
  147. headers: mergedHeaders,
  148. ...this.axiosProxyConfig()
  149. }
  150. ),
  151. name
  152. )
  153. this.log(traceId, 'RES', name, res)
  154. let result = res.data
  155. if (result?.data && result?.is_encrypt === 1) {
  156. result.data = JSON.parse(dataDecrypt(result.data))
  157. }
  158. this.log(traceId, 'RES', name, result)
  159. return result
  160. }, name)
  161. }
  162. register(type, handler) {
  163. this.handlers[type] = handler
  164. this.logger.info(`注册任务: ${type}`)
  165. }
  166. /* ================= 业务 ================= */
  167. initHandlers() {
  168. /* ---------------- 开始乐跑 ---------------- */
  169. this.register('lepao.startRun', async (req, ctx) => {
  170. const traceId = ctx.traceId
  171. const maxPathRetry = 999 // 自动获取路径失败最大重试次数
  172. let pathRetry = 0
  173. let pointData = null
  174. let ossPath = null
  175. let userData = null
  176. let pathId = null
  177. let runZoneId = 0
  178. try {
  179. userData = await this.handlers['lepao.getUserData'](req, ctx)
  180. req = {
  181. ...req,
  182. ...userData,
  183. student_id: req.account
  184. }
  185. // 1.5️⃣ 乐跑开始前扣减次数(失败会返还,且有幂等保护)
  186. await this.handlers['lepao.consumeCount']({
  187. account: req.account,
  188. uuid: userData?.create_user
  189. }, ctx)
  190. while (pathRetry < maxPathRetry) {
  191. try {
  192. // 2️⃣ 获取路径(仅路径选择失败时重试)
  193. const pathRes = await this.handlers['lepao.getPath'](req, ctx)
  194. pathId = pathRes.path_id
  195. // 3️⃣ 切换跑区
  196. const zoneRes = await this.handlers['lepao.setZone']({ ...req, random_id: pathId }, ctx)
  197. runZoneId = zoneRes?.run_zone_id || 0
  198. // 4️⃣ 上传 OSS 文件、生成打卡点
  199. const uploadRes = await this.handlers['lepao.uploadOssFile']({ ...req, random_id: pathId }, ctx)
  200. ossPath = uploadRes.oss_path
  201. pointData = uploadRes.point_data
  202. if (!pointData) {
  203. pathRetry++
  204. this.logger.warn(`[${traceId}] 打卡点不满足要求,重新获取路径 第${pathRetry}次`)
  205. continue
  206. }
  207. // 打卡点符合要求,跳出循环
  208. break
  209. } catch (err) {
  210. if (!this.isRetryableTaskError(err)) {
  211. throw err
  212. }
  213. this.logger.warn(`[${traceId}] 可重试错误,重新获取路径 第${pathRetry + 1}次,原因:${err.message}`)
  214. pathRetry++
  215. await this.sleep(1000 * pathRetry)
  216. }
  217. }
  218. if (!pointData) {
  219. throw new Error('打卡点生成失败,乐跑任务终止')
  220. }
  221. // 5️⃣ 提交跑步数据
  222. const bindRes = await this.handlers['lepao.bindData']({
  223. ...req,
  224. run_zone_id: runZoneId,
  225. record_file: ossPath,
  226. point_data: pointData
  227. }, ctx)
  228. // 6️⃣ 发送通知
  229. if (ctx.channel) {
  230. await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
  231. account: req.account,
  232. success: true,
  233. data: bindRes?.data ?? bindRes,
  234. traceId
  235. }, { id: `${traceId}:notice:success` })
  236. }
  237. return { traceId, ossPath, pointData, bindRes }
  238. } catch (err) {
  239. this.logger.error(`[${traceId}] 乐跑流程失败:`, err)
  240. // 若已扣减次数,则失败时返还(幂等)
  241. try {
  242. await this.handlers['lepao.refundCount']({
  243. account: req.account,
  244. uuid: userData?.create_user
  245. }, ctx)
  246. } catch (e) {
  247. this.logger.error(`[${traceId}] 返还乐跑次数失败:${e.stack || e}`)
  248. }
  249. if (ctx.channel) {
  250. await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
  251. account: req.account,
  252. success: false,
  253. reason: err.message || '未知错误',
  254. traceId
  255. }, { id: `${traceId}:notice:fail` })
  256. }
  257. // 将失败消息发送到结果队列或死信队列
  258. if (ctx.channel) {
  259. await this.sendResult(ctx.channel, {
  260. id: req.taskId,
  261. success: false,
  262. error: err.message
  263. })
  264. await ctx.channel.sendToQueue(
  265. this.deadQueue,
  266. Buffer.from(JSON.stringify({ ...req, error: err.message })),
  267. { persistent: true }
  268. )
  269. }
  270. throw err
  271. }
  272. })
  273. /* ---------------- 发送通知(独立 MQ 任务) ---------------- */
  274. this.register('lepao.sendNotice', async (req, ctx) => {
  275. const { account, success, data, reason, traceId } = req || {}
  276. if (!account) {
  277. throw new Error('发送通知失败:缺少 account')
  278. }
  279. const emailSql = `
  280. SELECT
  281. a.name,
  282. a.email,
  283. a.target_count,
  284. a.notice_type,
  285. e.bot_umo
  286. FROM
  287. lepao_account a
  288. LEFT JOIN
  289. lepao_extra e
  290. ON
  291. a.student_num = e.student_num
  292. WHERE
  293. a.student_num = ?
  294. `
  295. const rows = await db.query(emailSql, [account])
  296. if (!rows || rows.length === 0) {
  297. throw new Error('发送通知失败:未找到用户通知配置')
  298. }
  299. const user = rows[0]
  300. const noticeType = user.notice_type || 'none'
  301. const payload = success ? {
  302. ...(data && typeof data === 'object' ? data : {}),
  303. type: 'lepao_success',
  304. umo: user.bot_umo,
  305. // 沿用原 Lepao.js 字段:term_num 实际传的是 target_count
  306. term_num: user.target_count ?? 0,
  307. name: user.name,
  308. account,
  309. traceId
  310. } : {
  311. type: 'lepao_fail',
  312. umo: user.bot_umo,
  313. name: user.name,
  314. account,
  315. reason,
  316. traceId
  317. }
  318. if (noticeType === 'bot' && user.bot_umo) {
  319. const ch = await mq.getChannel(this.noticeQueue)
  320. await ch.assertQueue(this.noticeQueue, { durable: true })
  321. ch.sendToQueue(
  322. this.noticeQueue,
  323. Buffer.from(JSON.stringify(payload)),
  324. {
  325. persistent: true,
  326. contentType: 'application/json'
  327. }
  328. )
  329. return { delivered: true, via: 'bot' }
  330. }
  331. if (noticeType === 'email' && user.email) {
  332. if (success) {
  333. await EmailTemplate.lepaoSuccess(user.email, payload)
  334. return { delivered: true, via: 'email' }
  335. }
  336. await EmailTemplate.lepaoFail(user.email, {
  337. name: user.name,
  338. account,
  339. reason: reason || '系统繁忙,请联系客服或稍后再试',
  340. traceId
  341. })
  342. return { delivered: true, via: 'email' }
  343. }
  344. return { delivered: false, via: 'none' }
  345. })
  346. /* ---------------- 扣减次数(仅成功时执行) ---------------- */
  347. this.register('lepao.consumeCount', async (req, ctx) => {
  348. const account = req?.account
  349. const uuid = req?.uuid
  350. if (!uuid) {
  351. throw new Error('扣减乐跑次数失败:缺少 uuid')
  352. }
  353. // 幂等:同一 taskId 只扣一次
  354. const consumeKey = `lepao:consume:${ctx?.taskId || ctx?.traceId || account || uuid}`
  355. const existed = await Redis.get(consumeKey)
  356. if (existed) {
  357. return true
  358. }
  359. this.logger.info(`${account || uuid}开始扣减乐跑次数`)
  360. const useLepaoCountSql = 'UPDATE users SET lepao_count = lepao_count - 1 WHERE uuid = ?'
  361. const r = await db.query(useLepaoCountSql, [uuid])
  362. if (!r || r.affectedRows !== 1) {
  363. throw new Error('扣减乐跑次数失败:数据库更新失败')
  364. }
  365. this.logger.info(`${account || uuid}扣减乐跑次数完成`)
  366. await Redis.set(consumeKey, '1', { EX: 3600 })
  367. return true
  368. })
  369. /* ---------------- 返还次数(失败时执行) ---------------- */
  370. this.register('lepao.refundCount', async (req, ctx) => {
  371. const account = req?.account
  372. const uuid = req?.uuid
  373. if (!uuid) {
  374. return true
  375. }
  376. const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}`
  377. const consumeKey = `lepao:consume:${baseKey}`
  378. const refundKey = `lepao:refund:${baseKey}`
  379. const consumed = await Redis.get(consumeKey)
  380. if (!consumed) {
  381. return true
  382. }
  383. const refunded = await Redis.get(refundKey)
  384. if (refunded) {
  385. return true
  386. }
  387. this.logger.info(`${account || uuid}开始返还乐跑次数`)
  388. const sql = 'UPDATE users SET lepao_count = lepao_count + 1 WHERE uuid = ?'
  389. const r = await db.query(sql, [uuid])
  390. if (!r || r.affectedRows !== 1) {
  391. throw new Error('返还乐跑次数失败:数据库更新失败')
  392. }
  393. this.logger.info(`${account || uuid}返还乐跑次数完成`)
  394. await Redis.set(refundKey, '1', { EX: 3600 })
  395. return true
  396. })
  397. this.register('lepao.getUserData', async (req, ctx) => {
  398. const account = req.account
  399. this.logger.info(`${account}开始获取用户数据`)
  400. const accountSql = `
  401. SELECT
  402. u.uuid,
  403. u.lepao_count,
  404. l.create_user,
  405. l.name,
  406. l.student_num,
  407. l.area,
  408. l.sex,
  409. l.state,
  410. l.token,
  411. l.uid,
  412. l.school_id,
  413. l.userAgent,
  414. l.deviceModel,
  415. l.notice_type,
  416. l.email,
  417. e.bot_account
  418. FROM
  419. lepao_account l
  420. LEFT JOIN
  421. users u
  422. ON
  423. l.create_user = u.uuid
  424. LEFT JOIN
  425. lepao_extra e
  426. ON
  427. l.student_num = e.student_num
  428. WHERE
  429. l.student_num = ?
  430. `
  431. const rows = await db.query(accountSql, [account])
  432. if (!rows || rows.length === 0) {
  433. this.logger.error(`${account}无法获取账号数据`)
  434. throw new Error('无法获取账号数据,请联系客服或稍后再试')
  435. }
  436. let userData = rows[0]
  437. if (!userData.create_user || !userData.uuid) {
  438. this.logger.warn(`${account}账号状态异常`)
  439. throw new Error('当前账号状态异常,请联系客服')
  440. }
  441. if (userData.state !== 1) {
  442. this.logger.warn(`${account}登录状态异常 state=${userData.state}`)
  443. throw new Error('乐跑账号登录已过期,请尝试使用登录器重新登录')
  444. }
  445. if (userData.lepao_count < 1) {
  446. this.logger.warn(`${account}乐跑次数不足`)
  447. throw new Error('用户乐跑次数不足,请购买乐跑次数后重试!')
  448. }
  449. if(!userData.userAgent)
  450. userData.userAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64'
  451. if(!userData.deviceModel)
  452. userData.deviceModel = '2211133C'
  453. return userData
  454. })
  455. this.register('lepao.getPath', async (req, ctx) => {
  456. const account = req.account
  457. this.logger.info(`${account}开始获取路径`)
  458. const accountSql = 'SELECT area, sex FROM lepao_account WHERE student_num = ?'
  459. const rows = await db.query(accountSql, [account])
  460. if (!rows || rows.length === 0) {
  461. this.logger.error(`${account}无法获取账号数据`)
  462. throw new Error('无法获取账号数据')
  463. }
  464. const { area, sex } = rows[0]
  465. let max = 4.00
  466. let min = 2.00
  467. if (sex === 2) {
  468. max = 2.00
  469. min = 1.60
  470. }
  471. this.logger.info(`${account}路径参数: area=${area ?? '随机'}, max_distance=${max}, min_distance=${min}`)
  472. let pathSql = 'SELECT id FROM path_data WHERE state = 1 AND distance < ? AND distance > ? '
  473. const pathParams = [max, min]
  474. if (area) {
  475. pathSql += ' AND run_zone_name = ?'
  476. pathParams.push(area)
  477. }
  478. pathSql += ' ORDER BY count ASC LIMIT 1'
  479. const paths = await db.query(pathSql, pathParams)
  480. if (!paths || paths.length === 0) {
  481. this.logger.error(`${account}未找到符合条件的路线`)
  482. const err = new Error('未找到符合条件的路线,请改变路径选择条件')
  483. err.code = 'PATH_SELECT_FAILED'
  484. err.retryable = true
  485. throw err
  486. }
  487. const randomPath = paths[0]
  488. const updateSql = 'UPDATE path_data SET count = count + 1 WHERE id = ?'
  489. await db.query(updateSql, [randomPath.id])
  490. this.logger.info(`${account}路径选中id=${randomPath.id},计数加1成功`)
  491. return { path_id: randomPath.id }
  492. })
  493. /* ---------------- 获取跑步记录 ---------------- */
  494. this.register('lepao.getRecord', async (req, ctx) => {
  495. const now = this.lepaoTimestamp()
  496. const raw = {
  497. uid: req.uid,
  498. token: req.token,
  499. school_id: req.school_id,
  500. term_id: 0,
  501. course_id: 0,
  502. class_id: 0,
  503. student_num: req.student_id,
  504. card_id: req.student_id,
  505. timestamp: now,
  506. version: 1,
  507. nonce: String(Math.floor(Math.random() * 900000 + 100000)),
  508. ostype: 5
  509. }
  510. raw.sign = dataSign(raw)
  511. return this.request(
  512. ctx.traceId,
  513. 'getRecord',
  514. this.api('/Run2/beforeRunV260'),
  515. raw,
  516. {
  517. 'User-Agent': req.userAgent,
  518. 'charset': 'utf-8',
  519. 'Referer': 'https://servicewechat.com/wxf94c4ddb63d87ede/32/page-frame.html',
  520. }
  521. )
  522. })
  523. /* ---------------- 切换跑区 ---------------- */
  524. this.register('lepao.setZone', async (req, ctx) => {
  525. const runZoneMap = {
  526. '兰花湖校区跑区': 2,
  527. '主校区北跑区': 3,
  528. '主校区南跑区': 5,
  529. '重庆工商大学茶园校区': 6
  530. }
  531. const record = await db.query(
  532. 'SELECT run_zone_name FROM path_data WHERE id = ?',
  533. [req.random_id]
  534. )
  535. if (!record || record.length === 0) {
  536. throw new Error('跑区不存在')
  537. }
  538. const runZoneId = runZoneMap[record[0].run_zone_name]
  539. if (!runZoneId) throw new Error('跑区不存在')
  540. const raw = {
  541. uid: req.uid,
  542. token: req.token,
  543. school_id: req.school_id,
  544. term_id: 0,
  545. course_id: 0,
  546. class_id: 0,
  547. student_num: req.student_id,
  548. card_id: req.student_id,
  549. timestamp: this.lepaoTimestamp(),
  550. version: 1,
  551. nonce: String(Math.floor(Math.random() * 900000 + 100000)),
  552. ostype: 5,
  553. run_zone_id: String(runZoneId)
  554. }
  555. raw.sign = dataSign(raw)
  556. await this.request(
  557. ctx.traceId,
  558. 'setZone',
  559. this.api('/Run/setRunZone'),
  560. raw
  561. )
  562. return { run_zone_id: runZoneId }
  563. })
  564. /* ---------------- 获取 OSS STS ---------------- */
  565. this.register('lepao.getOssSts', async (req, ctx) => {
  566. const raw = {
  567. uid: req.uid,
  568. token: req.token,
  569. school_id: req.school_id,
  570. term_id: 0,
  571. course_id: 0,
  572. class_id: 0,
  573. student_num: req.student_id,
  574. card_id: req.student_id,
  575. timestamp: this.lepaoTimestamp(),
  576. version: 1,
  577. nonce: String(Math.floor(Math.random() * 900000 + 100000)),
  578. ostype: 5
  579. }
  580. raw.sign = dataSign(raw)
  581. const res = await this.request(
  582. ctx.traceId,
  583. 'getOssSts',
  584. this.api('/WpIndex/getOssSts'),
  585. raw
  586. )
  587. return res.data
  588. })
  589. /* ---------------- 上传 OSS 文件 ---------------- */
  590. this.register('lepao.uploadOssFile', async (req, ctx) => {
  591. const pathRow = await db.query(
  592. 'SELECT * FROM path_data WHERE id=?',
  593. [req.random_id]
  594. )
  595. if (!pathRow || pathRow.length === 0) {
  596. throw new Error('路径数据不存在')
  597. }
  598. const pathData = pathRow[0]
  599. // 处理跑步路径
  600. const newPathData = getPathData(pathData.data, req.run_end_time, pathData.time)
  601. const pathResult = dataEncrypt(JSON.stringify(newPathData))
  602. // 获取跑步规则参数
  603. const runRule = await this.handlers['lepao.getRecord'](req, ctx)
  604. const ruleData = runRule?.data
  605. if (!ruleData?.run_line_info?.point_list || !ruleData?.time_rule_arr?.length) {
  606. const err = new Error('获取打卡点规则失败')
  607. err.code = 'CHECKPOINT_FETCH_FAILED'
  608. err.retryable = true
  609. throw err
  610. }
  611. const check_points = ruleData.run_line_info.point_list
  612. let min_log_num = ruleData.time_rule_arr[0]?.min_log_num || 4
  613. const point_update_distance = parseFloat(ruleData.run_line_info.point_update_distance || 0) * 1000
  614. const log_max_distance = Number(ruleData.run_line_info.log_max_distance || 0)
  615. // 生成打卡点
  616. const point_data = selectCheckpoints(newPathData, check_points, min_log_num, point_update_distance, log_max_distance, req.run_end_time, pathData.time)
  617. if (!point_data) {
  618. this.logger.warn(`[RETRY] 打卡点数量不足,重新更换路径`)
  619. const err = new Error('打卡点数量不足')
  620. err.code = 'CHECKPOINT_INSUFFICIENT'
  621. err.retryable = true
  622. throw err
  623. }
  624. const sts = await this.handlers['lepao.getOssSts'](req, ctx)
  625. if (!sts?.bucket || !sts?.AccessKeyId || !sts?.AccessKeySecret || !sts?.SecurityToken) {
  626. throw new Error('获取 OSS STS 失败')
  627. }
  628. const now = new Date()
  629. const yyyy = now.getFullYear()
  630. const mm = String(now.getMonth() + 1).padStart(2, '0')
  631. const dd = String(now.getDate()).padStart(2, '0')
  632. const formattedToday = `${yyyy}-${mm}-${dd}`
  633. const boundary = String(Date.now())
  634. const timestamp = String(Date.now())
  635. const ossPath = `Public/Upload/file/run_record/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt`
  636. const client = new OSS({
  637. bucket: sts.bucket,
  638. region: sts.region || 'oss-cn-hangzhou',
  639. accessKeyId: sts.AccessKeyId,
  640. accessKeySecret: sts.AccessKeySecret,
  641. stsToken: sts.SecurityToken,
  642. secure: true
  643. })
  644. await client.put(ossPath, Buffer.from(pathResult, 'utf-8'))
  645. return { oss_path: ossPath, point_data: point_data }
  646. })
  647. /* ---------------- 提交跑步数据 ---------------- */
  648. this.register('lepao.bindData', async (req, ctx) => {
  649. const pathRow = await db.query(
  650. 'SELECT * FROM path_data WHERE id=?',
  651. [req.random_id]
  652. )
  653. const pathData = pathRow[0]
  654. const stepData = generateCadence(pathData.distance, pathData.time)
  655. const stepInfo = JSON.stringify({ interval: 60, list: stepData.cadence_list })
  656. const data = {
  657. uid: req.uid,
  658. token: req.token,
  659. school_id: req.school_id,
  660. term_id: 1,
  661. course_id: 0,
  662. class_id: 0,
  663. student_num: req.student_id,
  664. card_id: req.student_id,
  665. timestamp: this.lepaoTimestamp(),
  666. version: 1,
  667. nonce: String(Math.floor(Math.random() * 900000 + 100000)),
  668. ostype: 5,
  669. game_id: req.run_zone_id || 0,
  670. start_time: req.run_end_time - Number(pathData.time),
  671. end_time: req.run_end_time,
  672. distance: pathData.distance,
  673. record_img: "",
  674. log_data: req.point_data,
  675. file_img: "",
  676. is_running_area_valid: 1,
  677. mobileDeviceId: 1,
  678. mobileModel: req.deviceModel,
  679. step_info: stepInfo,
  680. step_num: stepData.total_steps,
  681. used_time: pathData.time,
  682. mobileOsVersion: 1,
  683. record_file: req.record_file
  684. }
  685. data.sign = dataSign(data)
  686. return this.request(
  687. ctx.traceId,
  688. 'bindData',
  689. this.api('/Run/stopRunV278'),
  690. data
  691. )
  692. })
  693. }
  694. /* ================= Worker核心 ================= */
  695. async start() {
  696. if (this.running) return
  697. this.running = true
  698. this.logger.info('Worker 启动中...')
  699. try {
  700. this.initHandlers()
  701. const channel = await mq.getChannel(this.channelName)
  702. await channel.prefetch(5)
  703. await channel.assertQueue(this.taskQueue, { durable: true })
  704. await channel.assertQueue(this.resultQueue, { durable: true })
  705. await channel.assertQueue(this.deadQueue, { durable: true })
  706. await channel.consume(this.taskQueue, async (msg) => {
  707. if (!msg) return
  708. let content
  709. try {
  710. content = JSON.parse(msg.content.toString())
  711. } catch {
  712. return channel.ack(msg)
  713. }
  714. const { id, type, data, retry = 0 } = content
  715. const traceId = this.traceId()
  716. const handler = this.handlers[type]
  717. if (!handler) {
  718. this.log(traceId, 'ERROR', '未知任务', { type })
  719. return channel.ack(msg)
  720. }
  721. try {
  722. const result = await this.withTimeout(
  723. handler(data, { traceId, channel, taskId: id }),
  724. type
  725. )
  726. await this.sendResult(channel, {
  727. id,
  728. success: true,
  729. result
  730. })
  731. this.log(traceId, 'DONE', `任务完成 ${type}`)
  732. channel.ack(msg)
  733. } catch (err) {
  734. this.logErr(traceId, `任务失败 ${type}`, err)
  735. if (retry < this.maxRetry && this.isRetryableTaskError(err)) {
  736. // 重试
  737. await channel.sendToQueue(
  738. this.taskQueue,
  739. Buffer.from(JSON.stringify({
  740. ...content,
  741. retry: retry + 1
  742. })),
  743. { persistent: true }
  744. )
  745. this.log(traceId, 'RETRY', `重试第${retry + 1}次`)
  746. } else {
  747. // 死信
  748. await channel.sendToQueue(
  749. this.deadQueue,
  750. Buffer.from(JSON.stringify(content)),
  751. { persistent: true }
  752. )
  753. this.log(traceId, 'DEAD', '进入死信队列')
  754. }
  755. await this.sendResult(channel, {
  756. id,
  757. success: false,
  758. error: err.message
  759. })
  760. channel.ack(msg)
  761. }
  762. })
  763. this.logger.info('RunForge Worker 启动成功')
  764. } catch (err) {
  765. this.logger.error('RunForge Worker 启动失败: ' + err.stack)
  766. }
  767. }
  768. async sendResult(channel, data) {
  769. channel.sendToQueue(
  770. this.resultQueue,
  771. Buffer.from(JSON.stringify(data)),
  772. { persistent: true }
  773. )
  774. }
  775. async stop() {
  776. this.running = false
  777. await mq.close()
  778. this.logger.info('RunForge Worker 已停止')
  779. }
  780. }
  781. module.exports = Worker