const path = require('path') const axios = require('axios') const OSS = require('ali-oss') const mq = require('../../plugin/mq') const db = require('../../plugin/DataBase/db') const Redis = require('../../plugin/DataBase/Redis') const EmailTemplate = require('../../plugin/Email/emailTemplate') const { URLSearchParams } = require('url') const { getPathData, selectCheckpoints, generateCadence } = require('../../plugin/Lepao/Path') const { dataEncrypt, dataDecrypt, dataSign } = require('../../plugin/Lepao/Crypto') const Logger = require('../Logger') class Worker { constructor() { this.logger = new Logger( path.join(__dirname, '../logs/LepaoWorker.log'), 'INFO' ) this.handlers = {} this.running = false this.baseUrl = 'https://lepao.ctbu.edu.cn/v3/api.php' this.taskQueue = 'runforge_task_queue' this.resultQueue = 'runforge_task_result_queue' this.deadQueue = 'runforge_task_dead_queue' this.noticeQueue = 'runforge_message_queue' this.channelName = 'lepao_worker' this.maxRetry = 3 this.timeout = 15000 this.defaultUserAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64' // 调试模式:将 axios 请求走本地代理(例如 charles/fiddler) // 开启方式:设置环境变量 LEPAO_DEBUG_PROXY=1 this.debugProxyEnabled = String(process.env.LEPAO_DEBUG_PROXY || '').trim() === '1' this.debugProxyHost = process.env.LEPAO_DEBUG_PROXY_HOST || '127.0.0.1' this.debugProxyPort = Number(process.env.LEPAO_DEBUG_PROXY_PORT || 9000) } /* ================= 工具 ================= */ api(path) { return this.baseUrl + path } traceId() { return Date.now() + '_' + Math.random().toString(36).slice(2, 8) } sleep(ms) { return new Promise(r => setTimeout(r, ms)) } isRunSuccess(bindResponse) { const payload = bindResponse?.data if (!bindResponse || bindResponse.status !== 1 || !payload) { return { ok: false, reason: bindResponse?.info || '系统繁忙,请联系客服或稍后再试' } } const failedReason = payload.record_failed_reason || '' if (failedReason === '' || failedReason === '自动确认有效') { return { ok: true, payload } } return { ok: false, reason: failedReason, payload } } extractApiErrorMessage(name, result) { if (!result) { this.logger.error(`${name} 接口无响应数据: ${this.safeStringify(result)}`) return `系统繁忙,请联系客服或稍后再试` } const candidates = [ result.info, result.msg, result.message, result?.data?.info, result?.data?.msg, result?.data?.message, result?.data?.record_failed_reason ] const reason = candidates.find(v => typeof v === 'string' && v.trim() !== '') if (reason) { return reason } if (result.code !== undefined || result.status !== undefined) { this.logger.error(`${name} 接口返回异常: ${this.safeStringify(result)}`) return `系统繁忙,请联系客服或稍后再试` } return `系统繁忙,请联系客服或稍后再试` } async markLoginExpired(account) { if (!account) return try { const sql = 'UPDATE lepao_account SET state = 0 WHERE student_num = ?' await db.query(sql, [account]) this.logger.warn(`${account} 登录状态已失效,已自动更新为未登录`) } catch (error) { this.logger.error(`更新账号登录状态失败:${error.stack || error}`) } } async writeSuccessRedis(account) { if (!account) return try { const now = new Date() const tomorrow = new Date().setHours(24, 0, 0, 0) const exp = Math.floor((tomorrow - now) / 1000) await Redis.set(`lepaoSuccess:${account}`, account, { EX: exp }) } catch (error) { this.logger.error(`写入乐跑成功缓存失败: ${error.stack || error}`) } } async addLepaoRecord(uuid, account, result, pathId, pointData) { if (!uuid || !account || !result || !pathId) return try { const time = Date.now() const sql = 'INSERT INTO lepao_record (uuid, time, lepao_account, result, path_id, point_data) VALUES (?, ?, ?, ?, ?, ?)' await db.query(sql, [uuid, time, account, result, pathId, JSON.stringify(pointData || [])]) } catch (error) { this.logger.error(`写入乐跑记录失败: ${error.stack || error}`) } } lepaoTimestamp() { return Number((Date.now() / 1000).toFixed(3)) } axiosProxyConfig() { if (!this.debugProxyEnabled) { return { proxy: false } } this.logger.info(`使用本地代理: ${this.debugProxyHost}:${this.debugProxyPort}`) return { proxy: { host: this.debugProxyHost, port: this.debugProxyPort, protocol: 'http' } } } async enqueueTask(channel, type, data, options = {}) { const payload = { id: options.id || this.traceId(), type, data, retry: options.retry ?? 0 } await channel.sendToQueue( this.taskQueue, Buffer.from(JSON.stringify(payload)), { persistent: true, contentType: 'application/json' } ) return payload.id } async withTimeout(promise, name) { return Promise.race([ promise, new Promise((_, reject) => setTimeout(() => reject(new Error(`${name} 超时`)), this.timeout) ) ]) } async retry(fn, name) { let lastErr for (let i = 0; i < this.maxRetry; i++) { try { return await fn() } catch (err) { lastErr = err if (!this.isRetryableTaskError(err)) { throw err } this.logger.warn(`[RETRY] ${name} 第${i + 1}次失败`) await this.sleep(1000 * (i + 1)) // 指数退避 } } throw lastErr } isNetworkError(err) { if (!err) return false if (err.code && ['ECONNRESET', 'ECONNABORTED', 'ETIMEDOUT', 'ENOTFOUND', 'EAI_AGAIN'].includes(err.code)) { return true } if (err.isAxiosError && !err.response) return true const msg = (err.message || '').toLowerCase() return msg.includes('timeout') || msg.includes('network') } isRetryableTaskError(err) { if (!err) return false if (err.retryable === true) return true if (this.isNetworkError(err)) return true return ['PATH_SELECT_FAILED', 'CHECKPOINT_FETCH_FAILED', 'CHECKPOINT_INSUFFICIENT'].includes(err.code) } safeStringify(obj) { const seen = new WeakSet(); return JSON.stringify(obj, (key, value) => { if (typeof value === 'object' && value !== null) { if (seen.has(value)) return '[Circular]'; seen.add(value); } return value; }) } log(traceId, type, msg, data) { this.logger.info(`[${traceId}] [${type}] ${msg} ${data ? this.safeStringify(data) : ''}`) } logErr(traceId, msg, err) { this.logger.error(`[${traceId}] ${msg} ${err.stack || err}`) } async request(traceId, name, url, raw, headers = {}) { return this.retry(async () => { this.log(traceId, 'REQ', name, raw) const mergedHeaders = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Referer': 'https://servicewechat.com/wxf94c4ddb63d87ede/32/page-frame.html', ...headers } if (!mergedHeaders['User-Agent']) { mergedHeaders['User-Agent'] = this.defaultUserAgent } const form = new URLSearchParams() form.append('ostype', '5') form.append('data', dataEncrypt(JSON.stringify(raw))) const res = await this.withTimeout( axios.post( url, form, { headers: mergedHeaders, ...this.axiosProxyConfig() } ), name ) let result = res.data if (result?.data && result?.is_encrypt === 1) { result.data = JSON.parse(dataDecrypt(result.data)) } this.log(traceId, 'RES', name, result) // 除 bindData 外,其余调用若接口已明确返回失败,直接抛出该失败原因 // bindData 需要保留完整响应由 isRunSuccess 统一判定。 if (name !== 'bindData') { const hasCode = result && Object.prototype.hasOwnProperty.call(result, 'code') const hasStatus = result && Object.prototype.hasOwnProperty.call(result, 'status') const failedByCode = hasCode && Number(result.code) !== 1 && Number(result.code) !== 200 const failedByStatus = hasStatus && Number(result.status) !== 1 if (failedByCode || failedByStatus) { const message = this.extractApiErrorMessage(name, result) const err = new Error(message) // 学习 Lepao.js:若明确提示重新登录,自动标记账号失效 if (message.includes('重新登录')) { await this.markLoginExpired(raw?.student_num) } // 接口已返回业务错误,禁止重试 err.retryable = false throw err } } return result }, name) } register(type, handler) { this.handlers[type] = handler this.logger.info(`注册任务: ${type}`) } /* ================= 业务 ================= */ initHandlers() { /* ---------------- 开始乐跑 ---------------- */ this.register('lepao.startRun', async (req, ctx) => { const traceId = ctx.traceId const maxPathRetry = 999 // 自动获取路径失败最大重试次数 let pathRetry = 0 let pointData = null let ossPath = null let userData = null let pathId = null let runZoneId = 0 let bindRes = null try { userData = await this.handlers['lepao.getUserData'](req, ctx) // 进入乐跑进程后写入进行中缓存 const progressKey = `lepaoProgress:${req.account}` const inProgress = await Redis.get(progressKey) if (inProgress) { throw new Error('该账号已进入乐跑任务队列,请等待乐跑完成后再进行乐跑操作') } await Redis.set(progressKey, req.account, { EX: 1800 }) // 晚上10点后提前 let run_end_time = Math.floor(Date.now() / 1000) - 300 // 提前5分钟 let hour = new Date().getHours() if (hour < 7) throw new Error('当前不在有效乐跑时间范围内。RunForge支持乐跑时间段为7:00~24:00') if (hour >= 22) { this.logger.info(`${req.account}当前时间为${hour}点,调整run_end_time提前5小时`) run_end_time -= 18000 } req = { ...req, ...userData, run_end_time, student_id: req.account } // 1.5️⃣ 乐跑开始前扣减次数(失败会返还,且有幂等保护) await this.handlers['lepao.consumeCount']({ account: req.account, uuid: userData?.create_user }, ctx) while (pathRetry < maxPathRetry) { try { // 2️⃣ 获取路径(仅路径选择失败时重试) const pathRes = await this.handlers['lepao.getPath'](req, ctx) pathId = pathRes.path_id // 3️⃣ 切换跑区 const zoneRes = await this.handlers['lepao.setZone']({ ...req, random_id: pathId }, ctx) runZoneId = zoneRes?.run_zone_id || 0 // 4️⃣ 上传 OSS 文件、生成打卡点 const uploadRes = await this.handlers['lepao.uploadOssFile']({ ...req, random_id: pathId }, ctx) ossPath = uploadRes.oss_path pointData = uploadRes.point_data if (!pointData) { pathRetry++ this.logger.warn(`[${traceId}] 打卡点不满足要求,重新获取路径 第${pathRetry}次`) continue } // 打卡点符合要求,跳出循环 break } catch (err) { if (!this.isRetryableTaskError(err)) { throw err } this.logger.warn(`[${traceId}] 可重试错误,重新获取路径 第${pathRetry + 1}次,原因:${err.message}`) pathRetry++ await this.sleep(1000 * pathRetry) } } if (!pointData) { throw new Error('打卡点获取失败,乐跑任务终止') } // 5️⃣ 提交跑步数据 bindRes = await this.handlers['lepao.bindData']({ ...req, random_id: pathId, run_zone_id: runZoneId, record_file: ossPath, point_data: pointData }, ctx) // 绑定接口有返回即入库(无论成功或失败) if (bindRes && bindRes.data) { await this.addLepaoRecord(userData?.create_user, req.account, bindRes.data, pathId, pointData) } // 使用旧版 Lepao.js 的规则判断“是否真正乐跑成功” const runResult = this.isRunSuccess(bindRes) if (runResult.ok || runResult.reason === '当天关联成绩次数已达到上限') { await this.writeSuccessRedis(req.account) } if (!runResult.ok) { throw new Error(runResult.reason) } // 6️⃣ 发送通知 if (ctx.channel) { await this.enqueueTask(ctx.channel, 'lepao.sendNotice', { account: req.account, success: true, data: runResult.payload, traceId }, { id: `${traceId}:notice:success` }) } return { traceId, ossPath, pointData, bindRes } } catch (err) { this.logger.error(`[${traceId}] 乐跑流程失败:`, err) // 若已扣减次数,则失败时返还(幂等) try { await this.handlers['lepao.refundCount']({ account: req.account, uuid: userData?.create_user }, ctx) } catch (e) { this.logger.error(`[${traceId}] 返还乐跑次数失败:${e.stack || e}`) } if (ctx.channel) { await this.enqueueTask(ctx.channel, 'lepao.sendNotice', { account: req.account, success: false, reason: err.message || '未知错误', traceId }, { id: `${traceId}:notice:fail` }) } // 将失败消息发送到结果队列或死信队列 if (ctx.channel) { await this.sendResult(ctx.channel, { id: req.taskId, success: false, error: err.message }) await ctx.channel.sendToQueue( this.deadQueue, Buffer.from(JSON.stringify({ ...req, error: err.message })), { persistent: true } ) } throw err } finally { await Redis.del(`lepaoProgress:${req.account}`) } }) /* ---------------- 发送通知(独立 MQ 任务) ---------------- */ this.register('lepao.sendNotice', async (req, ctx) => { const { account, success, data, reason, traceId } = req || {} if (!account) { throw new Error('发送通知失败:缺少 account') } const emailSql = ` SELECT a.name, a.email, a.target_count, a.notice_type, e.bot_umo FROM lepao_account a LEFT JOIN lepao_extra e ON a.student_num = e.student_num WHERE a.student_num = ? ` const rows = await db.query(emailSql, [account]) if (!rows || rows.length === 0) { throw new Error('发送通知失败:未找到用户通知配置') } const user = rows[0] const noticeType = user.notice_type || 'none' const payload = success ? { ...(data && typeof data === 'object' ? data : {}), type: 'lepao_success', umo: user.bot_umo, // 沿用原 Lepao.js 字段:term_num 实际传的是 target_count term_num: user.target_count ?? 0, name: user.name, account, traceId } : { type: 'lepao_fail', umo: user.bot_umo, name: user.name, account, reason, traceId } if (noticeType === 'bot' && user.bot_umo) { const ch = await mq.getChannel(this.noticeQueue) await ch.assertQueue(this.noticeQueue, { durable: true }) ch.sendToQueue( this.noticeQueue, Buffer.from(JSON.stringify(payload)), { persistent: true, contentType: 'application/json' } ) return { delivered: true, via: 'bot' } } if (noticeType === 'email' && user.email) { if (success) { await EmailTemplate.lepaoSuccess(user.email, payload) return { delivered: true, via: 'email' } } await EmailTemplate.lepaoFail(user.email, { name: user.name, account, reason: reason || '系统繁忙,请联系客服或稍后再试', traceId }) return { delivered: true, via: 'email' } } return { delivered: false, via: 'none' } }) /* ---------------- 扣减次数 ---------------- */ this.register('lepao.consumeCount', async (req, ctx) => { const account = req?.account const uuid = req?.uuid if (!uuid) { throw new Error('扣减乐跑次数失败:缺少 uuid') } // 幂等:同一 taskId 只扣一次 const consumeKey = `lepao:consume:${ctx?.taskId || ctx?.traceId || account || uuid}` const existed = await Redis.get(consumeKey) if (existed) { return true } this.logger.info(`${account || uuid}开始扣减乐跑次数`) const useLepaoCountSql = 'UPDATE users SET lepao_count = lepao_count - 1 WHERE uuid = ?' const r = await db.query(useLepaoCountSql, [uuid]) if (!r || r.affectedRows !== 1) { throw new Error('扣减乐跑次数失败:数据库更新失败') } this.logger.info(`${account || uuid}扣减乐跑次数完成`) await Redis.set(consumeKey, '1', { EX: 3600 }) return true }) /* ---------------- 返还次数(失败时执行) ---------------- */ this.register('lepao.refundCount', async (req, ctx) => { const account = req?.account const uuid = req?.uuid if (!uuid) { return true } const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}` const consumeKey = `lepao:consume:${baseKey}` const refundKey = `lepao:refund:${baseKey}` const consumed = await Redis.get(consumeKey) if (!consumed) { return true } const refunded = await Redis.get(refundKey) if (refunded) { return true } this.logger.info(`${account || uuid}开始返还乐跑次数`) const sql = 'UPDATE users SET lepao_count = lepao_count + 1 WHERE uuid = ?' const r = await db.query(sql, [uuid]) if (!r || r.affectedRows !== 1) { throw new Error('返还乐跑次数失败:数据库更新失败') } this.logger.info(`${account || uuid}返还乐跑次数完成`) await Redis.set(refundKey, '1', { EX: 3600 }) return true }) this.register('lepao.getUserData', async (req, ctx) => { const account = req.account this.logger.info(`${account}开始获取用户数据`) const accountSql = ` SELECT u.uuid, u.lepao_count, l.create_user, l.name, l.student_num, l.area, l.sex, l.state, l.token, l.uid, l.school_id, l.userAgent, l.deviceModel, l.notice_type, l.email, e.bot_account FROM lepao_account l LEFT JOIN users u ON l.create_user = u.uuid LEFT JOIN lepao_extra e ON l.student_num = e.student_num WHERE l.student_num = ? ` const rows = await db.query(accountSql, [account]) if (!rows || rows.length === 0) { this.logger.error(`${account}无法获取账号数据`) throw new Error('无法获取账号数据,请联系客服或稍后再试') } let userData = rows[0] if (!userData.create_user || !userData.uuid) { this.logger.warn(`${account}账号状态异常`) throw new Error('当前账号状态异常,请联系客服') } if (userData.state !== 1) { this.logger.warn(`${account}登录状态异常 state=${userData.state}`) throw new Error('乐跑账号登录已过期,请尝试使用登录器重新登录') } if (userData.lepao_count < 1) { this.logger.warn(`${account}乐跑次数不足`) throw new Error('用户乐跑次数不足,请购买乐跑次数后重试!') } if (!userData.userAgent) userData.userAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64' if (!userData.deviceModel) userData.deviceModel = '2211133C' return userData }) this.register('lepao.getPath', async (req, ctx) => { const account = req.account this.logger.info(`${account}开始获取路径`) const accountSql = 'SELECT area, sex FROM lepao_account WHERE student_num = ?' const rows = await db.query(accountSql, [account]) if (!rows || rows.length === 0) { this.logger.error(`${account}无法获取账号数据`) throw new Error('无法获取账号数据') } const { area, sex } = rows[0] let max = 4.00 let min = 2.00 if (sex === 2) { max = 2.00 min = 1.60 } this.logger.info(`${account}路径参数: area=${area ?? '随机'}, max_distance=${max}, min_distance=${min}`) let pathSql = 'SELECT id FROM path_data WHERE state = 1 AND distance < ? AND distance > ? ' const pathParams = [max, min] if (area) { pathSql += ' AND run_zone_name = ?' pathParams.push(area) } pathSql += ' ORDER BY count ASC LIMIT 1' const paths = await db.query(pathSql, pathParams) if (!paths || paths.length === 0) { this.logger.error(`${account}未找到符合条件的路线`) const err = new Error('未找到符合条件的路线,请改变路径选择条件') err.code = 'PATH_SELECT_FAILED' err.retryable = true throw err } const randomPath = paths[0] const updateSql = 'UPDATE path_data SET count = count + 1 WHERE id = ?' await db.query(updateSql, [randomPath.id]) this.logger.info(`${account}路径选中id=${randomPath.id},计数加1成功`) return { path_id: randomPath.id } }) /* ---------------- 获取跑步记录 ---------------- */ this.register('lepao.getRecord', async (req, ctx) => { const now = this.lepaoTimestamp() const raw = { uid: req.uid, token: req.token, school_id: req.school_id, term_id: 0, course_id: 0, class_id: 0, student_num: req.student_id, card_id: req.student_id, timestamp: now, version: 1, nonce: String(Math.floor(Math.random() * 900000 + 100000)), ostype: 5 } raw.sign = dataSign(raw) return this.request( ctx.traceId, 'getRecord', this.api('/Run2/beforeRunV260'), raw, { 'User-Agent': req.userAgent, 'charset': 'utf-8', 'Referer': 'https://servicewechat.com/wxf94c4ddb63d87ede/32/page-frame.html', } ) }) /* ---------------- 切换跑区 ---------------- */ this.register('lepao.setZone', async (req, ctx) => { const runZoneMap = { '兰花湖校区跑区': 2, '主校区北跑区': 3, '主校区南跑区': 5, '重庆工商大学茶园校区': 6 } const record = await db.query( 'SELECT run_zone_name FROM path_data WHERE id = ?', [req.random_id] ) if (!record || record.length === 0) { throw new Error('跑区不存在') } const runZoneId = runZoneMap[record[0].run_zone_name] if (!runZoneId) throw new Error('跑区不存在') const raw = { uid: req.uid, token: req.token, school_id: req.school_id, term_id: 0, course_id: 0, class_id: 0, student_num: req.student_id, card_id: req.student_id, timestamp: this.lepaoTimestamp(), version: 1, nonce: String(Math.floor(Math.random() * 900000 + 100000)), ostype: 5, run_zone_id: String(runZoneId) } raw.sign = dataSign(raw) await this.request( ctx.traceId, 'setZone', this.api('/Run/setRunZone'), raw ) return { run_zone_id: runZoneId } }) /* ---------------- 获取 OSS STS ---------------- */ this.register('lepao.getOssSts', async (req, ctx) => { const raw = { uid: req.uid, token: req.token, school_id: req.school_id, term_id: 0, course_id: 0, class_id: 0, student_num: req.student_id, card_id: req.student_id, timestamp: this.lepaoTimestamp(), version: 1, nonce: String(Math.floor(Math.random() * 900000 + 100000)), ostype: 5 } raw.sign = dataSign(raw) const res = await this.request( ctx.traceId, 'getOssSts', this.api('/WpIndex/getOssSts'), raw ) return res.data }) /* ---------------- 上传 OSS 文件 ---------------- */ this.register('lepao.uploadOssFile', async (req, ctx) => { const pathRow = await db.query( 'SELECT * FROM path_data WHERE id=?', [req.random_id] ) if (!pathRow || pathRow.length === 0) { throw new Error('路径数据不存在') } const pathData = pathRow[0] // 处理跑步路径 const newPathData = getPathData(pathData.data, req.run_end_time, pathData.time) const pathResult = dataEncrypt(JSON.stringify(newPathData)) // 获取跑步规则参数 const runRule = await this.handlers['lepao.getRecord'](req, ctx) const ruleData = runRule?.data if (!ruleData?.run_line_info?.point_list || !ruleData?.time_rule_arr?.length) { const err = new Error('获取打卡点规则失败') err.code = 'CHECKPOINT_FETCH_FAILED' err.retryable = true throw err } const check_points = ruleData.run_line_info.point_list let min_log_num = ruleData.time_rule_arr[0]?.min_log_num || 4 const point_update_distance = parseFloat(ruleData.run_line_info.point_update_distance || 0) * 1000 const log_max_distance = Number(ruleData.run_line_info.log_max_distance || 0) // 生成打卡点 const point_data = selectCheckpoints(newPathData, check_points, min_log_num, point_update_distance, log_max_distance, req.run_end_time, pathData.time) if (!point_data) { this.logger.warn(`[RETRY] 打卡点数量不足,重新更换路径`) const err = new Error('打卡点数量不足') err.code = 'CHECKPOINT_INSUFFICIENT' err.retryable = true throw err } const sts = await this.handlers['lepao.getOssSts'](req, ctx) if (!sts?.bucket || !sts?.AccessKeyId || !sts?.AccessKeySecret || !sts?.SecurityToken) { throw new Error('获取 OSS STS 失败') } const now = new Date() const yyyy = now.getFullYear() const mm = String(now.getMonth() + 1).padStart(2, '0') const dd = String(now.getDate()).padStart(2, '0') const formattedToday = `${yyyy}-${mm}-${dd}` const boundary = String(Date.now()) const timestamp = String(Date.now()) const ossPath = `Public/Upload/file/run_record/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt` const client = new OSS({ bucket: sts.bucket, region: sts.region || 'oss-cn-hangzhou', accessKeyId: sts.AccessKeyId, accessKeySecret: sts.AccessKeySecret, stsToken: sts.SecurityToken, secure: true }) await client.put(ossPath, Buffer.from(pathResult, 'utf-8')) return { oss_path: ossPath, point_data: point_data } }) /* ---------------- 提交跑步数据 ---------------- */ this.register('lepao.bindData', async (req, ctx) => { if (req?.random_id === undefined || req?.random_id === null || req?.random_id === '') { throw new Error('提交跑步数据失败:缺少 random_id') } const pathRow = await db.query( 'SELECT * FROM path_data WHERE id=?', [req.random_id] ) if (!pathRow || pathRow.length === 0) { throw new Error(`提交跑步数据失败:未找到路径数据(random_id=${req.random_id})`) } const pathData = pathRow[0] const distance = Number(Number(pathData.distance || 0).toFixed(2)) const stepData = generateCadence(distance, pathData.time) const stepInfo = JSON.stringify({ interval: 60, list: stepData.cadence_list }) let points = req.point_data.map(({ address, jingwei, ...rest }) => rest) points = JSON.stringify(points) const data = { uid: req.uid, token: req.token, school_id: req.school_id, term_id: 1, course_id: 0, class_id: 0, student_num: req.student_id, card_id: req.student_id, timestamp: this.lepaoTimestamp(), version: 1, nonce: String(Math.floor(Math.random() * 900000 + 100000)), ostype: 5, game_id: String(req.run_zone_id || 0), start_time: req.run_end_time - Number(pathData.time), end_time: req.run_end_time, distance, record_img: "", log_data: points, file_img: "", is_running_area_valid: 1, mobileDeviceId: 1, mobileModel: req.deviceModel, step_info: stepInfo, step_num: stepData.total_steps, used_time: pathData.time, mobileOsVersion: 1, record_file: req.record_file } data.sign = dataSign(data) return this.request( ctx.traceId, 'bindData', this.api('/Run/stopRunV278'), data ) }) } /* ================= Worker核心 ================= */ async start() { if (this.running) return this.running = true this.logger.info('Worker 启动中...') try { this.initHandlers() const channel = await mq.getChannel(this.channelName) await channel.prefetch(5) await channel.assertQueue(this.taskQueue, { durable: true }) await channel.assertQueue(this.resultQueue, { durable: true }) await channel.assertQueue(this.deadQueue, { durable: true }) await channel.consume(this.taskQueue, async (msg) => { if (!msg) return let content try { content = JSON.parse(msg.content.toString()) } catch { return channel.ack(msg) } const { id, type, data, retry = 0 } = content const traceId = this.traceId() const handler = this.handlers[type] if (!handler) { this.log(traceId, 'ERROR', '未知任务', { type }) return channel.ack(msg) } try { const result = await this.withTimeout( handler(data, { traceId, channel, taskId: id }), type ) await this.sendResult(channel, { id, success: true, result }) this.log(traceId, 'DONE', `任务完成 ${type}`) channel.ack(msg) } catch (err) { this.logErr(traceId, `任务失败 ${type}`, err) if (retry < this.maxRetry && this.isRetryableTaskError(err)) { // 重试 await channel.sendToQueue( this.taskQueue, Buffer.from(JSON.stringify({ ...content, retry: retry + 1 })), { persistent: true } ) this.log(traceId, 'RETRY', `重试第${retry + 1}次`) } else { // 死信 await channel.sendToQueue( this.deadQueue, Buffer.from(JSON.stringify(content)), { persistent: true } ) this.log(traceId, 'DEAD', '进入死信队列') } await this.sendResult(channel, { id, success: false, error: err.message }) channel.ack(msg) } }) this.logger.info('RunForge Worker 启动成功') } catch (err) { this.logger.error('RunForge Worker 启动失败: ' + err.stack) } } async sendResult(channel, data) { channel.sendToQueue( this.resultQueue, Buffer.from(JSON.stringify(data)), { persistent: true } ) } async stop() { this.running = false await mq.close() this.logger.info('RunForge Worker 已停止') } } module.exports = Worker