const path = require('path') const mq = require('../../plugin/mq') const { assertRunforgeTaskIngress, TASK_QUEUE } = require('../../plugin/mq/runforgeTaskMq') const db = require('../../plugin/DataBase/db') const Redis = require('../../plugin/DataBase/Redis') const EmailTemplate = require('../../plugin/Email/emailTemplate') const jkesRedisKeys = require('../../plugin/jkes/redisKeys') const Logger = require('../Logger') class Worker { constructor() { this.logger = new Logger( path.join(__dirname, '../logs/LepaoWorker.log'), 'INFO' ) this.handlers = {} this.running = false this.taskQueue = TASK_QUEUE this.resultQueue = 'runforge_task_result_queue' this.deadQueue = 'runforge_task_dead_queue' this.noticeQueue = 'runforge_message_queue' this.channelName = 'lepao_worker' this.maxRetry = 3 this.timeout = 15000 this._lepaoRecordPathColumn = null } roundKm(n) { const v = Number(n) if (!Number.isFinite(v)) return 0 return Math.round(v * 100) / 100 } traceId() { return Date.now() + '_' + Math.random().toString(36).slice(2, 8) } sleep(ms) { return new Promise((r) => setTimeout(r, ms)) } async markLoginExpired(account) { if (!account) return try { const sql = 'UPDATE lepao_account SET state = 0 WHERE student_num = ?' await db.query(sql, [account]) try { await Redis.del(jkesRedisKeys.runnerFlag(account)) } catch (e) { this.logger.warn(`${account} 清理 jkes_runner 标记失败:${e.message || e}`) } this.logger.warn(`${account} 登录状态已失效,已自动更新账号状态`) } catch (error) { this.logger.error(`更新账号登录状态失败:${error.stack || error}`) } } async writeSuccessRedis(account) { if (!account) return try { const now = new Date() const tomorrow = new Date().setHours(24, 0, 0, 0) const exp = Math.floor((tomorrow - now) / 1000) await Redis.set(jkesRedisKeys.lepaoSuccess(account), account, { EX: exp }) } catch (error) { this.logger.error(`写入乐跑成功缓存失败: ${error.stack || error}`) } } async getLepaoRecordPathColumn() { if (this._lepaoRecordPathColumn) return this._lepaoRecordPathColumn try { await db.query('SELECT path_data FROM lepao_record LIMIT 1') this._lepaoRecordPathColumn = 'path_data' } catch (e) { if ((e?.message || '').includes("Unknown column 'path_data'")) { this._lepaoRecordPathColumn = 'point_data' } else { throw e } } return this._lepaoRecordPathColumn } async createLepaoRecord({ uuid, account, result = {}, pathId = null, pathData = [], state = 0 }) { if (!uuid || !account) return null const pathCol = await this.getLepaoRecordPathColumn() const sql = `INSERT INTO lepao_record (uuid, time, lepao_account, result, path_id, ${pathCol}, state) VALUES (?, ?, ?, ?, ?, ?, ?)` const r = await db.query(sql, [ uuid, Date.now(), account, JSON.stringify(result || {}), pathId, JSON.stringify(pathData || []), state ]) return r?.insertId || null } async updateLepaoRecord(id, { result, pathData, state } = {}) { if (!id) return const pathCol = await this.getLepaoRecordPathColumn() const sets = [] const params = [] if (result !== undefined) { sets.push('result = ?') params.push(JSON.stringify(result || {})) } if (pathData !== undefined) { sets.push(`${pathCol} = ?`) params.push(JSON.stringify(pathData || [])) } if (state !== undefined) { sets.push('state = ?') params.push(state) } if (!sets.length) return params.push(id) await db.query(`UPDATE lepao_record SET ${sets.join(', ')} WHERE id = ?`, params) } async syncJkesRunCount(req) { const sid = req?.student_id || req?.account const token = req?.token if (!sid || !token) return const { fetchJkesMonthKm, fetchJkesTotalKm } = require('../../plugin/jkes/stats') const { readState, writeState } = require('../../plugin/jkes/monthPolicy') const now = new Date() const y = now.getFullYear() const m = now.getMonth() + 1 const monthKm = await fetchJkesMonthKm(token, y, m) const totalKm = await fetchJkesTotalKm(token) const sql = 'UPDATE lepao_account SET term_num = ?, total_num = ? WHERE student_num = ?' const rows = await db.query(sql, [monthKm, totalKm, sid]) if (!rows || rows.affectedRows !== 1) { this.logger.warn(`${sid} JKES 更新里程字段失败`) } else { this.logger.info(`${sid} JKES 同步里程 本月=${monthKm} 累计=${totalKm}`) } const prevLocal = await readState(sid, now) await writeState(sid, { km: monthKm, doubles: prevLocal.doubles }, now) } async syncRunCount(req) { try { await this.syncJkesRunCount(req) } catch (error) { this.logger.warn(`${req?.account || 'unknown'}同步乐跑里程失败: ${error.message || error}`) } } async enqueueTask(channel, type, data, options = {}) { const payload = { id: options.id || this.traceId(), type, data, retry: options.retry ?? 0 } await channel.sendToQueue( this.taskQueue, Buffer.from(JSON.stringify(payload)), { persistent: true, contentType: 'application/json' } ) return payload.id } async withTimeout(promise, name, ms) { const limit = typeof ms === 'number' && ms > 0 ? ms : this.timeout return Promise.race([ promise, new Promise((_, reject) => setTimeout(() => reject(new Error(`${name} 超时`)), limit) ) ]) } async retry(fn, name) { let lastErr for (let i = 0; i < this.maxRetry; i++) { try { return await fn() } catch (err) { lastErr = err if (!this.isRetryableTaskError(err)) { throw err } this.logger.warn(`[RETRY] ${name} 第${i + 1}次失败`) await this.sleep(1000 * (i + 1)) } } throw lastErr } isNetworkError(err) { if (!err) return false if ( err.code && ['ECONNRESET', 'ECONNABORTED', 'ETIMEDOUT', 'ENOTFOUND', 'EAI_AGAIN'].includes(err.code) ) { return true } if (err.isAxiosError && !err.response) return true const msg = (err.message || '').toLowerCase() return msg.includes('timeout') || msg.includes('network') } isRetryableTaskError(err) { if (!err) return false if (err.retryable === true) return true if (this.isNetworkError(err)) return true return ['PATH_SELECT_FAILED', 'CHECKPOINT_FETCH_FAILED', 'CHECKPOINT_INSUFFICIENT'].includes( err.code ) } safeStringify(obj) { const seen = new WeakSet() return JSON.stringify(obj, (key, value) => { if (typeof value === 'object' && value !== null) { if (seen.has(value)) return '[Circular]' seen.add(value) } return value }) } log(traceId, type, msg, data) { this.logger.info(`[${traceId}] [${type}] ${msg} ${data ? this.safeStringify(data) : ''}`) } logErr(traceId, msg, err) { this.logger.error(`[${traceId}] ${msg} ${err.stack || err}`) } register(type, handler) { this.handlers[type] = handler this.logger.info(`注册任务: ${type}`) } /** * 仅选取轨迹几何(闭合环路),实际跑步距离与配速由 runJkesRecord 的 distanceM / paceSecPerKm 决定,不再按 path_data.distance 筛选 */ async selectJkesPathRow(account) { const accountSql = 'SELECT area FROM lepao_account WHERE student_num = ?' const rows = await db.query(accountSql, [account]) const area = rows?.[0]?.area let pathSql = 'SELECT id, data FROM path_data WHERE state = 1 ' const pathParams = [] if (area) { pathSql += ' AND run_zone_name = ?' pathParams.push(area) } pathSql += ' ORDER BY count ASC LIMIT 1' const paths = await db.query(pathSql, pathParams) if (!paths || paths.length === 0) { const err = new Error('未找到符合条件的路线,请改变路径选择条件') err.code = 'PATH_SELECT_FAILED' err.retryable = true throw err } const picked = paths[0] await db.query('UPDATE path_data SET count = count + 1 WHERE id = ?', [picked.id]) return picked } initHandlers() { this.register('lepao.startRun', async (req, ctx) => { const traceId = ctx.traceId const maxPathRetry = 5 let pathRetry = 0 let userData = null let recordDbId = null let deductedKm = 0 try { // const isSuccess = await Redis.get(jkesRedisKeys.lepaoSuccess(req.account)) // if (isSuccess) throw new Error('该账号当天已乐跑成功!请勿重复乐跑') userData = await this.handlers['lepao.getUserData'](req, ctx) req = { ...req, ...userData, student_id: req.account } const progressKey = jkesRedisKeys.lepaoProgress(req.account) const inProgress = await Redis.get(progressKey) if (inProgress) { throw new Error('该账号已进入乐跑任务队列,请等待乐跑完成后再进行乐跑操作') } await Redis.set(progressKey, req.account, { EX: 1800 }) const hour = new Date().getHours() // if (hour < 7) { // throw new Error('当前不在有效乐跑时间范围内。哪吒乐跑支持乐跑时间段为7:00~24:00') // } const { runJkesRecord } = require('../../plugin/jkes/runRecord') const { getJkesSettings } = require('../../plugin/jkes/jkesSettings') const { randomPaceSecPerKm } = require('../../plugin/jkes/paceUtils') const { isJkesRecordValidInCampus } = require('../../plugin/jkes/stats') const manual = req.manual === true || req.manual === 'true' const jkesSettings = getJkesSettings() let targetKm = Number(req.targetKm) if (!Number.isFinite(targetKm) || targetKm < 1) targetKm = 1 const maxAutoKm = Math.max(2, Number(jkesSettings.autoSingleRunMaxKm) || 5) if (manual) { if (targetKm > 5) targetKm = 5 } else if (targetKm > maxAutoKm) { targetKm = maxAutoKm } targetKm = this.roundKm(targetKm) const distanceM = targetKm * 1000 deductedKm = targetKm let pace if (manual) { const p = Number(req.paceSecPerKm) if (!Number.isFinite(p) || p < 180 || p > 600) { throw new Error('手动乐跑任务缺少有效配速 paceSecPerKm(3:00–10:00/km)') } pace = p } else { pace = randomPaceSecPerKm( jkesSettings.paceRandomMinSecPerKm, jkesSettings.paceRandomMaxSecPerKm ) } await this.handlers['lepao.consumeCount']( { account: req.account, uuid: userData?.create_user, amountKm: deductedKm }, ctx ) let jkesPathId = null let jkesEnd = null while (pathRetry < maxPathRetry) { try { const pathRow = await this.selectJkesPathRow(req.account) jkesPathId = pathRow.id let rawData = pathRow.data if (typeof rawData === 'string') { rawData = JSON.parse(rawData) } if (!Array.isArray(rawData) || rawData.length === 0) { pathRetry++ this.logger.warn(`[${traceId}] JKES 轨迹数据无效,换路径 第${pathRetry}次`) continue } if (!recordDbId) { recordDbId = await this.createLepaoRecord({ uuid: userData?.create_user, account: req.account, pathId: jkesPathId, pathData: [], result: { phase: 'running', planned_km: targetKm, pace_sec_per_km: pace }, state: 0 }) } jkesEnd = await runJkesRecord({ token: req.token, recordDbId, pathPoints: rawData, distanceM, paceSecPerKm: pace, log: (msg) => this.logger.info(`[${traceId}] ${msg}`) }) break } catch (err) { if (!this.isRetryableTaskError(err)) { throw err } this.logger.warn(`[${traceId}] JKES 可重试错误 第${pathRetry + 1}次:${err.message}`) pathRetry++ await this.sleep(1000 * pathRetry) } } if (!jkesEnd) { throw new Error('JKES 乐跑失败:未获得有效跑步结果') } const info = jkesEnd.endJson?.data?.info const infoWithMeta = info ? { ...info, planned_km: targetKm, deducted_km: deductedKm, pace_sec_per_km: pace } : null if (!recordDbId) { recordDbId = await this.createLepaoRecord({ uuid: userData?.create_user, account: req.account, pathId: jkesPathId, pathData: jkesEnd.uploadedPayloadPoints || [], result: infoWithMeta || jkesEnd.endJson?.data || {}, state: 1 }) } else { await this.updateLepaoRecord(recordDbId, { result: infoWithMeta || jkesEnd.endJson?.data || {}, pathData: jkesEnd.uploadedPayloadPoints || [], state: 1 }) } const ok = isJkesRecordValidInCampus(info || {}) if (!ok) { const reason = info?.dataStatus?.label || info?.status?.label || '跑步记录未记作校内有效' throw new Error(reason) } await this.writeSuccessRedis(req.account) const autoDoubleSlot = !manual && (req.autoDoubleSlot === true || req.autoDoubleSlot === 'true') const finalizeTask = { account: req.account, token: req.token, uuid: userData?.create_user, recordDbId, jkesRecordId: jkesEnd.recordId || info?.id, deductedKm, targetKm, autoDoubleSlot, traceId } if (ctx.channel) { try { await this.enqueueTask(ctx.channel, 'lepao.finalizeRunSync', finalizeTask, { id: `${traceId}:finalize:${req.account}` }) } catch (e) { this.logger.warn(`[${traceId}] finalize 任务投递失败,改为同步执行:${e.message || e}`) await this.handlers['lepao.finalizeRunSync'](finalizeTask, ctx) } } else { await this.handlers['lepao.finalizeRunSync'](finalizeTask, ctx) } return { traceId, jkes: true, endJson: jkesEnd.endJson } } catch (err) { this.logger.error(`[${traceId}] 乐跑流程失败:`, err) try { await this.handlers['lepao.refundCount']( { account: req.account, uuid: userData?.create_user, amountKm: deductedKm }, ctx ) } catch (e) { this.logger.error(`[${traceId}] 返还乐跑公里失败:${e.stack || e}`) } if (recordDbId) { try { await this.updateLepaoRecord(recordDbId, { result: { phase: 'error', reason: err.message || '未知错误' }, state: 3 }) } catch (e) { this.logger.error(`[${traceId}] 更新乐跑记录失败:${e.stack || e}`) } } if (ctx.channel) { await this.enqueueTask(ctx.channel, 'lepao.sendNotice', { account: req.account, success: false, reason: err.message || '未知错误', traceId }, { id: `${traceId}:notice:fail` }) } throw err } finally { if (req?.account) { await Redis.del(jkesRedisKeys.lepaoProgress(req.account)) } } }) this.register('lepao.finalizeRunSync', async (req, ctx) => { const traceId = req?.traceId || ctx?.traceId || this.traceId() const { account, token, uuid, recordDbId, jkesRecordId, deductedKm, targetKm, autoDoubleSlot } = req || {} if (!account || !token || !recordDbId || !jkesRecordId) { throw new Error('finalizeRunSync 参数缺失') } const { fetchJkesRecordById, isJkesRecordFullySynced, isJkesRecordValidInCampus, recordDistanceKm } = require('../../plugin/jkes/stats') const { recordSuccess } = require('../../plugin/jkes/monthPolicy') const pollCount = 40 const pollIntervalMs = 90 * 1000 let latest = null let synced = false for (let i = 1; i <= pollCount; i++) { latest = await fetchJkesRecordById(token, jkesRecordId, 10) if (latest && isJkesRecordFullySynced(latest)) { synced = true break } if (i < pollCount) { await this.sleep(pollIntervalMs) } } if (!latest || !isJkesRecordValidInCampus(latest)) { await this.updateLepaoRecord(recordDbId, { result: latest || { phase: 'finalize_failed', reason: '记录未校内有效' }, state: 3 }) await this.handlers['lepao.refundCount']( { account, uuid, amountKm: this.roundKm(deductedKm || targetKm || 0) }, { ...ctx, taskId: `${ctx?.taskId || traceId}:finalize_refund_full` } ) if (ctx.channel) { await this.enqueueTask(ctx.channel, 'lepao.sendNotice', { account, success: false, reason: '乐跑记录未同步为校内有效,已退还预扣公里', traceId }, { id: `${traceId}:notice:fail:finalize` }) } return { traceId, finalized: false, refunded: true } } if (!synced) { await this.updateLepaoRecord(recordDbId, { result: latest, state: 1 }) const err = new Error('官方记录尚未完全同步(distance/speed)') err.retryable = true throw err } const officialKmRaw = this.roundKm(recordDistanceKm(latest)) const officialKm = officialKmRaw > 0 ? officialKmRaw : this.roundKm(targetKm || 0) const preDeduct = this.roundKm(deductedKm || targetKm || 0) const refundKm = this.roundKm(Math.max(0, preDeduct - officialKm)) if (refundKm > 0) { await this.handlers['lepao.refundCount']( { account, uuid, amountKm: refundKm }, { ...ctx, taskId: `${ctx?.taskId || traceId}:finalize_refund_over` } ) } await this.updateLepaoRecord(recordDbId, { result: { ...(latest || {}), planned_km: this.roundKm(targetKm || 0), deducted_km: preDeduct, official_km: officialKm, refunded_km: refundKm }, state: 2 }) await recordSuccess(account, officialKm, { autoDoubleSlot: officialKm >= 2 && !!autoDoubleSlot }) await this.syncRunCount({ account, student_id: account, token }) // 达成本月预设目标后:关闭自动乐跑并发送目标完成邮件(target_count=0 表示不限制) try { const accRows = await db.query( 'SELECT name, email, notice_type, auto_run, target_count, term_num, total_num FROM lepao_account WHERE student_num = ?', [account] ) if (accRows && accRows.length) { const acc = accRows[0] const targetKm = Number(acc.target_count) || 0 const monthKm = Number(acc.term_num) || 0 if (acc.auto_run === 1 && targetKm !== 0 && monthKm >= targetKm) { await db.query('UPDATE lepao_account SET auto_run = 0 WHERE student_num = ?', [account]) if (acc.notice_type === 'email' && acc.email) { await EmailTemplate.lepaoOver(acc.email, { name: acc.name || account, month_km: monthKm }) } } } } catch (e) { this.logger.warn(`[${traceId}] 目标达成处理失败:${e.message || e}`) } if (ctx.channel) { await this.enqueueTask(ctx.channel, 'lepao.sendNotice', { account, success: true, data: { distance: officialKm, time: Number(latest.useTime) || 0, record_failed_reason: '', refundedKm: refundKm }, traceId }, { id: `${traceId}:notice:success:finalize` }) } return { traceId, finalized: true, officialKm, refundKm } }) this.register('lepao.sendNotice', async (req, ctx) => { const { account, success, data, reason, traceId } = req || {} if (!account) { throw new Error('发送通知失败:缺少 account') } const emailSql = ` SELECT a.name, a.email, a.target_count, a.term_num, a.total_num, a.notice_type, e.bot_umo FROM lepao_account a LEFT JOIN lepao_extra e ON a.student_num = e.student_num WHERE a.student_num = ? ` const rows = await db.query(emailSql, [account]) if (!rows || rows.length === 0) { throw new Error('发送通知失败:未找到用户通知配置') } const user = rows[0] const noticeType = user.notice_type || 'none' let runZoneName = data?.run_zone_name if (!runZoneName && success) { try { const z = await db.query( ` SELECT p.run_zone_name FROM lepao_record r LEFT JOIN path_data p ON r.path_id = p.id WHERE r.lepao_account = ? ORDER BY r.id DESC LIMIT 1 `, [account] ) runZoneName = z?.length ? z[0].run_zone_name : null } catch (e) { this.logger.warn(`[${traceId}] 查询跑区失败:${e.message || e}`) } } const payload = success ? { ...(data && typeof data === 'object' ? data : {}), type: 'lepao_success', umo: user.bot_umo, run_zone_name: runZoneName, month_km: Number(user.term_num) || 0, total_km: Number(user.total_num) || 0, target_km: Number(user.target_count) || 0, name: user.name, account, traceId } : { type: 'lepao_fail', umo: user.bot_umo, name: user.name, account, reason, traceId } if (noticeType === 'bot' && user.bot_umo) { const ch = await mq.getChannel(this.noticeQueue) await ch.assertQueue(this.noticeQueue, { durable: true }) ch.sendToQueue( this.noticeQueue, Buffer.from(JSON.stringify(payload)), { persistent: true, contentType: 'application/json' } ) return { delivered: true, via: 'bot' } } if (noticeType === 'email' && user.email) { if (success) { await EmailTemplate.lepaoSuccess(user.email, payload) return { delivered: true, via: 'email' } } await EmailTemplate.lepaoFail(user.email, { name: user.name, account, reason: reason || '系统繁忙,请联系客服或稍后再试', traceId }) return { delivered: true, via: 'email' } } return { delivered: false, via: 'none' } }) this.register('lepao.consumeCount', async (req, ctx) => { const account = req?.account const uuid = req?.uuid const amountKm = this.roundKm(req?.amountKm ?? 1) if (!uuid) { throw new Error('扣减乐跑公里失败:缺少 uuid') } if (!(amountKm > 0)) { throw new Error('扣减乐跑公里失败:amountKm 无效') } const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}` const consumeKey = jkesRedisKeys.consume(baseKey) const existed = await Redis.get(consumeKey) if (existed) { return true } this.logger.info(`${account || uuid}开始扣减乐跑公里 ${amountKm}km`) const useLepaoCountSql = 'UPDATE users SET lepao_count = ROUND(lepao_count - ?, 2) WHERE uuid = ? AND lepao_count >= ?' const r = await db.query(useLepaoCountSql, [amountKm, uuid, amountKm]) if (!r || r.affectedRows !== 1) { throw new Error(`扣减乐跑公里失败:余额不足(需 ${amountKm}km)`) } this.logger.info(`${account || uuid}扣减乐跑公里完成`) await Redis.set(consumeKey, '1', { EX: 3600 }) return true }) this.register('lepao.refundCount', async (req, ctx) => { const account = req?.account const uuid = req?.uuid const amountKm = this.roundKm(req?.amountKm ?? 0) if (!uuid) { return true } if (!(amountKm > 0)) return true const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}` const consumeKey = jkesRedisKeys.consume(baseKey) const refundKey = jkesRedisKeys.refund(baseKey) const consumed = await Redis.get(consumeKey) if (!consumed) { return true } const refunded = await Redis.get(refundKey) if (refunded) { return true } this.logger.info(`${account || uuid}开始返还乐跑公里 ${amountKm}km`) const sql = 'UPDATE users SET lepao_count = ROUND(lepao_count + ?, 2) WHERE uuid = ?' const r = await db.query(sql, [amountKm, uuid]) if (!r || r.affectedRows !== 1) { throw new Error('返还乐跑公里失败:数据库更新失败') } this.logger.info(`${account || uuid}返还乐跑公里完成`) await Redis.set(refundKey, '1', { EX: 3600 }) return true }) this.register('lepao.getUserData', async (req, ctx) => { const account = req.account this.logger.info(`${account}开始获取用户数据`) const accountSql = ` SELECT u.uuid, u.lepao_count, l.create_user, l.name, l.student_num, l.area, l.sex, l.state, l.token, l.userAgent, l.deviceModel, l.notice_type, l.email, e.bot_account FROM lepao_account l LEFT JOIN users u ON l.create_user = u.uuid LEFT JOIN lepao_extra e ON l.student_num = e.student_num WHERE l.student_num = ? ` const rows = await db.query(accountSql, [account]) if (!rows || rows.length === 0) { this.logger.error(`${account}无法获取账号数据`) throw new Error('无法获取账号数据,请联系客服或稍后再试') } let userData = rows[0] if (!userData.create_user || !userData.uuid) { this.logger.warn(`${account}账号状态异常`) throw new Error('当前账号状态异常,请联系客服') } if (userData.state !== 1) { this.logger.warn(`${account}登录状态异常 state=${userData.state}`) throw new Error('乐跑账号登录已过期,请尝试使用登录器重新登录') } if (Number(userData.lepao_count) <= 0) { this.logger.warn(`${account}乐跑公里余额不足`) throw new Error('用户乐跑公里余额不足,请购买后重试!') } if (!userData.userAgent) { userData.userAgent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.69 NetType/WIFI Language/zh_CN' } if (!userData.deviceModel) userData.deviceModel = 'unknown' return userData }) } async start() { if (this.running) return this.running = true this.logger.info('Worker 启动中(JKES)...') try { this.initHandlers() const channel = await mq.getChannel(this.channelName) await channel.prefetch(5) await assertRunforgeTaskIngress(channel, this.logger) await channel.assertQueue(this.resultQueue, { durable: true }) await channel.assertQueue(this.deadQueue, { durable: true }) const handleTaskMessage = async (msg) => { if (!msg) return let content try { content = JSON.parse(msg.content.toString()) } catch { return channel.ack(msg) } const { id, type, data, retry = 0 } = content const traceId = this.traceId() const handler = this.handlers[type] if (!handler) { this.log(traceId, 'ERROR', '未知任务', { type }) return channel.ack(msg) } try { const runMs = type === 'lepao.startRun' ? 3600000 : type === 'lepao.finalizeRunSync' ? 4 * 3600000 : undefined const result = await this.withTimeout( handler(data, { traceId, channel, taskId: id }), type, runMs ) await this.sendResult(channel, { id, success: true, result }) this.log(traceId, 'DONE', `任务完成 ${type}`) channel.ack(msg) } catch (err) { this.logErr(traceId, `任务失败 ${type}`, err) if (err?.loginExpired) { const account = data?.account || data?.student_num || data?.studentNum await this.markLoginExpired(account) } if (retry < this.maxRetry && this.isRetryableTaskError(err)) { await channel.sendToQueue( this.taskQueue, Buffer.from( JSON.stringify({ ...content, retry: retry + 1 }) ), { persistent: true } ) this.log(traceId, 'RETRY', `重试第${retry + 1}次`) } else { await channel.sendToQueue( this.deadQueue, Buffer.from(JSON.stringify(content)), { persistent: true } ) this.log(traceId, 'DEAD', '进入死信队列') } await this.sendResult(channel, { id, success: false, error: err.message }) channel.ack(msg) } } await channel.consume(this.taskQueue, handleTaskMessage, { noAck: false }) this.logger.info('哪吒乐跑 Worker 启动成功(JKES)') } catch (err) { this.logger.error('哪吒乐跑 Worker 启动失败: ' + err.stack) } } async sendResult(channel, data) { channel.sendToQueue( this.resultQueue, Buffer.from(JSON.stringify(data)), { persistent: true } ) } async stop() { this.running = false await mq.close() this.logger.info('哪吒乐跑 Worker 已停止') } } module.exports = Worker