Browse Source

✨ feat: 乐跑执行方式改为mq

Pchen. 1 month ago
parent
commit
8412512714
8 changed files with 1392 additions and 9 deletions
  1. 21 3
      apis/Corn/StartAutoLepao.js
  2. 21 3
      apis/Corn/StartLepao.js
  3. 23 3
      apis/Lepao/SingleRun.js
  4. 921 0
      lib/Lepao/Worker.js
  5. 10 0
      lib/Server.js
  6. 89 0
      plugin/Lepao/Crypto.js
  7. 164 0
      plugin/Lepao/Path.js
  8. 143 0
      plugin/mq/Worker.js

+ 21 - 3
apis/Corn/StartAutoLepao.js

@@ -1,7 +1,7 @@
 const API = require("../../lib/API");
 const API = require("../../lib/API");
 const db = require('../../plugin/DataBase/db')
 const db = require('../../plugin/DataBase/db')
 const Redis = require('../../plugin/DataBase/Redis')
 const Redis = require('../../plugin/DataBase/Redis')
-const lepao = require("../../lib/Lepao/Lepao.js").lepao
+const mq = require('../../plugin/mq')
 
 
 const { BaseStdResponse } = require("../../BaseStdResponse");
 const { BaseStdResponse } = require("../../BaseStdResponse");
 
 
@@ -45,8 +45,26 @@ class StartAutoLepao extends API {
                 }
                 }
 
 
                 try {
                 try {
-                    await lepao.beginLepao(create_user, student_num, token, uid, school_id, state)
-                    this.logger.info(`${name}(${student_num})乐跑完成`)
+                    const channel = await mq.getChannel('lepao_corn')
+                    await channel.assertQueue('task_queue', { durable: true })
+
+                    const taskId = `lepao:auto:${Date.now()}:${student_num}`
+                    const payload = {
+                        id: taskId,
+                        type: 'lepao.startRun',
+                        data: {
+                            taskId,
+                            account: student_num
+                        },
+                        retry: 0
+                    }
+
+                    channel.sendToQueue(
+                        'task_queue',
+                        Buffer.from(JSON.stringify(payload)),
+                        { persistent: true, contentType: 'application/json' }
+                    )
+                    this.logger.info(`${name}(${student_num})已投递自动乐跑任务`)
                 } catch (err) {
                 } catch (err) {
                     this.logger.error(`${name}(${student_num})乐跑失败:${err.message || err}`)
                     this.logger.error(`${name}(${student_num})乐跑失败:${err.message || err}`)
                 }
                 }

+ 21 - 3
apis/Corn/StartLepao.js

@@ -1,7 +1,7 @@
 const API = require("../../lib/API");
 const API = require("../../lib/API");
 const db = require('../../plugin/DataBase/db')
 const db = require('../../plugin/DataBase/db')
 const Redis = require('../../plugin/DataBase/Redis')
 const Redis = require('../../plugin/DataBase/Redis')
-const lepao = require("../../lib/Lepao/Lepao.js").lepao
+const mq = require('../../plugin/mq')
 const { BaseStdResponse } = require("../../BaseStdResponse")
 const { BaseStdResponse } = require("../../BaseStdResponse")
 
 
 // 出现异常情况时补充乐跑
 // 出现异常情况时补充乐跑
@@ -52,8 +52,26 @@ class StartLepao extends API {
                 }
                 }
 
 
                 try {
                 try {
-                    await lepao.beginLepao(create_user, student_num, token, uid, school_id, state)
-                    this.logger.info(`${name}(${student_num})补充乐跑完成`)
+                    const channel = await mq.getChannel('lepao_corn')
+                    await channel.assertQueue('task_queue', { durable: true })
+
+                    const taskId = `lepao:repair:${Date.now()}:${student_num}`
+                    const payload = {
+                        id: taskId,
+                        type: 'lepao.startRun',
+                        data: {
+                            taskId,
+                            account: student_num
+                        },
+                        retry: 0
+                    }
+
+                    channel.sendToQueue(
+                        'task_queue',
+                        Buffer.from(JSON.stringify(payload)),
+                        { persistent: true, contentType: 'application/json' }
+                    )
+                    this.logger.info(`${name}(${student_num})已投递补充乐跑任务`)
                 } catch (err) {
                 } catch (err) {
                     this.logger.error(`${name}(${student_num})补充乐跑失败:${err.message || err}`)
                     this.logger.error(`${name}(${student_num})补充乐跑失败:${err.message || err}`)
                 }
                 }

+ 23 - 3
apis/Lepao/SingleRun.js

@@ -3,12 +3,14 @@ const Redis = require('../../plugin/DataBase/Redis')
 const db = require("../../plugin/DataBase/db.js")
 const db = require("../../plugin/DataBase/db.js")
 const { BaseStdResponse } = require("../../BaseStdResponse.js")
 const { BaseStdResponse } = require("../../BaseStdResponse.js")
 const AccessControl = require("../../lib/AccessControl.js")
 const AccessControl = require("../../lib/AccessControl.js")
-const lepao = require("../../lib/Lepao/Lepao.js").lepao
+const mq = require('../../plugin/mq')
 
 
 // 单次乐跑
 // 单次乐跑
 class SingleRun extends API {
 class SingleRun extends API {
     constructor() {
     constructor() {
-        super();
+        super()
+
+        
 
 
         this.setPath('/Lepao/SingleRun')
         this.setPath('/Lepao/SingleRun')
         this.setMethod('GET')
         this.setMethod('GET')
@@ -86,7 +88,25 @@ class SingleRun extends API {
             })
             })
 
 
             try {
             try {
-                await lepao.beginLepao(selectRows[0].create_user, student_num, rows[0].token, rows[0].uid, rows[0].school_id, rows[0].state)
+                const channel = await mq.getChannel('lepao_api')
+                await channel.assertQueue('task_queue', { durable: true })
+
+                const taskId = `lepao:${Date.now()}:${student_num}`
+                const payload = {
+                    id: taskId,
+                    type: 'lepao.startRun',
+                    data: {
+                        taskId,
+                        account: student_num
+                    },
+                    retry: 0
+                }
+
+                channel.sendToQueue(
+                    'task_queue',
+                    Buffer.from(JSON.stringify(payload)),
+                    { persistent: true, contentType: 'application/json' }
+                )
             } catch (err) {
             } catch (err) {
                 this.logger.error(`后台乐跑任务异常:${err.stack}`)
                 this.logger.error(`后台乐跑任务异常:${err.stack}`)
             }
             }

+ 921 - 0
lib/Lepao/Worker.js

@@ -0,0 +1,921 @@
+const path = require('path')
+const axios = require('axios')
+const OSS = require('ali-oss')
+const mq = require('../../plugin/mq')
+const db = require('../../plugin/DataBase/db')
+const Redis = require('../../plugin/DataBase/Redis')
+const EmailTemplate = require('../../plugin/Email/emailTemplate')
+const { URLSearchParams } = require('url')
+const {
+    getPathData,
+    selectCheckpoints,
+    generateCadence
+} = require('../../plugin/Lepao/Path')
+const {
+    dataEncrypt,
+    dataDecrypt,
+    dataSign
+} = require('../../plugin/Lepao/Crypto')
+
+const Logger = require('../Logger')
+
+class Worker {
+    constructor() {
+        this.logger = new Logger(
+            path.join(__dirname, '../logs/LepaoWorker.log'),
+            'INFO'
+        )
+
+        this.handlers = {}
+        this.running = false
+
+        this.baseUrl = 'https://lepao.ctbu.edu.cn/v3/api.php'
+
+        this.taskQueue = 'task_queue'
+        this.resultQueue = 'task_result_queue'
+        this.deadQueue = 'task_dead_queue'
+        this.noticeQueue = 'runforge_message_queue'
+
+        this.channelName = 'lepao_worker'
+
+        this.maxRetry = 3
+        this.timeout = 15000
+
+        this.defaultUserAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64'
+
+        // 调试模式:将 axios 请求走本地代理(例如 charles/fiddler)
+        // 开启方式:设置环境变量 LEPAO_DEBUG_PROXY=1
+        this.debugProxyEnabled = String(process.env.LEPAO_DEBUG_PROXY || '').trim() === '1'
+        this.debugProxyHost = process.env.LEPAO_DEBUG_PROXY_HOST || '127.0.0.1'
+        this.debugProxyPort = Number(process.env.LEPAO_DEBUG_PROXY_PORT || 9000)
+    }
+
+    /* ================= 工具 ================= */
+
+    api(path) {
+        return this.baseUrl + path
+    }
+
+    traceId() {
+        return Date.now() + '_' + Math.random().toString(36).slice(2, 8)
+    }
+
+    sleep(ms) {
+        return new Promise(r => setTimeout(r, ms))
+    }
+
+    lepaoTimestamp() {
+        return (Date.now() / 1000).toFixed(3)
+    }
+
+    axiosProxyConfig() {
+        if (!this.debugProxyEnabled) {
+            return { proxy: false }
+        }
+        this.logger.info(`使用本地代理: ${this.debugProxyHost}:${this.debugProxyPort}`)
+        return {
+            proxy: {
+                host: this.debugProxyHost,
+                port: this.debugProxyPort,
+                protocol: 'http'
+            }
+        }
+    }
+
+    async enqueueTask(channel, type, data, options = {}) {
+        const payload = {
+            id: options.id || this.traceId(),
+            type,
+            data,
+            retry: options.retry ?? 0
+        }
+
+        await channel.sendToQueue(
+            this.taskQueue,
+            Buffer.from(JSON.stringify(payload)),
+            { persistent: true, contentType: 'application/json' }
+        )
+
+        return payload.id
+    }
+
+    async withTimeout(promise, name) {
+        return Promise.race([
+            promise,
+            new Promise((_, reject) =>
+                setTimeout(() => reject(new Error(`${name} 超时`)), this.timeout)
+            )
+        ])
+    }
+
+    async retry(fn, name) {
+        let lastErr
+
+        for (let i = 0; i < this.maxRetry; i++) {
+            try {
+                return await fn()
+            } catch (err) {
+                lastErr = err
+                this.logger.warn(`[RETRY] ${name} 第${i + 1}次失败`)
+
+                await this.sleep(1000 * (i + 1)) // 指数退避
+            }
+        }
+
+        throw lastErr
+    }
+
+    isNetworkError(err) {
+        if (!err) return false
+        if (err.code && ['ECONNRESET', 'ECONNABORTED', 'ETIMEDOUT', 'ENOTFOUND', 'EAI_AGAIN'].includes(err.code)) {
+            return true
+        }
+        if (err.isAxiosError && !err.response) return true
+        const msg = (err.message || '').toLowerCase()
+        return msg.includes('timeout') || msg.includes('network')
+    }
+
+    isRetryableTaskError(err) {
+        if (!err) return false
+        if (err.retryable === true) return true
+        if (this.isNetworkError(err)) return true
+        return ['PATH_SELECT_FAILED', 'CHECKPOINT_FETCH_FAILED', 'CHECKPOINT_INSUFFICIENT'].includes(err.code)
+    }
+
+    log(traceId, type, msg, data) {
+        this.logger.info(`[${traceId}] [${type}] ${msg} ${data ? JSON.stringify(data) : ''}`)
+    }
+
+    logErr(traceId, msg, err) {
+        this.logger.error(`[${traceId}] ${msg} ${err.stack || err}`)
+    }
+
+    async request(traceId, name, url, raw, headers = {}) {
+        return this.retry(async () => {
+            this.log(traceId, 'REQ', name, raw)
+
+            const mergedHeaders = {
+                'Content-Type': 'application/x-www-form-urlencoded',
+                'Accept': '*/*',
+                'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
+                'Accept-Encoding': 'gzip, deflate, br',
+                'Referer': 'https://servicewechat.com/wxf94c4ddb63d87ede/32/page-frame.html',
+                ...headers
+            }
+            if (!mergedHeaders['User-Agent']) {
+                mergedHeaders['User-Agent'] = this.defaultUserAgent
+            }
+
+            const form = new URLSearchParams()
+            form.append('ostype', '5')
+            form.append('data', dataEncrypt(JSON.stringify(raw)))
+
+            const res = await this.withTimeout(
+                axios.post(
+                    url,
+                    form,
+                    {
+                        headers: mergedHeaders,
+                        ...this.axiosProxyConfig()
+                    }
+                ),
+                name
+            )
+
+            this.log(traceId, 'RES', name, res)
+
+            let result = res.data
+
+            if (result?.data && result?.is_encrypt === 1) {
+                result.data = JSON.parse(dataDecrypt(result.data))
+            }
+
+            this.log(traceId, 'RES', name, result)
+
+            return result
+        }, name)
+    }
+
+    register(type, handler) {
+        this.handlers[type] = handler
+        this.logger.info(`注册任务: ${type}`)
+    }
+
+    /* ================= 业务 ================= */
+
+    initHandlers() {
+        /* ---------------- 开始乐跑 ---------------- */
+        this.register('lepao.startRun', async (req, ctx) => {
+            const traceId = ctx.traceId
+            const maxPathRetry = 999  // 自动获取路径失败最大重试次数
+            let pathRetry = 0
+            let pointData = null
+            let ossPath = null
+            let userData = null
+            let pathId = null
+            let runZoneId = 0
+
+            try {
+                userData = await this.handlers['lepao.getUserData'](req, ctx)
+                req = {
+                    ...req,
+                    ...userData,
+                    student_id: req.account
+                }
+
+                // 1.5️⃣ 乐跑开始前扣减次数(失败会返还,且有幂等保护)
+                await this.handlers['lepao.consumeCount']({
+                    account: req.account,
+                    uuid: userData?.create_user
+                }, ctx)
+
+                while (pathRetry < maxPathRetry) {
+                    try {
+                        // 2️⃣ 获取路径(仅路径选择失败时重试)
+                        const pathRes = await this.handlers['lepao.getPath'](req, ctx)
+                        pathId = pathRes.path_id
+
+                        // 3️⃣ 切换跑区
+                        const zoneRes = await this.handlers['lepao.setZone']({ ...req, random_id: pathId }, ctx)
+                        runZoneId = zoneRes?.run_zone_id || 0
+
+                        // 4️⃣ 上传 OSS 文件、生成打卡点
+                        const uploadRes = await this.handlers['lepao.uploadOssFile']({ ...req, random_id: pathId }, ctx)
+                        ossPath = uploadRes.oss_path
+                        pointData = uploadRes.point_data
+
+                        if (!pointData) {
+                            pathRetry++
+                            this.logger.warn(`[${traceId}] 打卡点不满足要求,重新获取路径 第${pathRetry}次`)
+                            continue
+                        }
+
+                        // 打卡点符合要求,跳出循环
+                        break
+                    } catch (err) {
+                        if (!this.isRetryableTaskError(err)) {
+                            throw err
+                        }
+                        this.logger.warn(`[${traceId}] 可重试错误,重新获取路径 第${pathRetry + 1}次,原因:${err.message}`)
+                        pathRetry++
+                        await this.sleep(1000 * pathRetry)
+                    }
+                }
+
+                if (!pointData) {
+                    throw new Error('打卡点生成失败,乐跑任务终止')
+                }
+
+                // 5️⃣ 提交跑步数据
+                const bindRes = await this.handlers['lepao.bindData']({
+                    ...req,
+                    run_zone_id: runZoneId,
+                    record_file: ossPath,
+                    point_data: pointData
+                }, ctx)
+
+                // 6️⃣ 发送通知
+                if (ctx.channel) {
+                    await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
+                        account: req.account,
+                        success: true,
+                        data: bindRes?.data ?? bindRes,
+                        traceId
+                    }, { id: `${traceId}:notice:success` })
+                }
+
+                return { traceId, ossPath, pointData, bindRes }
+
+            } catch (err) {
+                this.logger.error(`[${traceId}] 乐跑流程失败:`, err)
+
+                // 若已扣减次数,则失败时返还(幂等)
+                try {
+                    await this.handlers['lepao.refundCount']({
+                        account: req.account,
+                        uuid: userData?.create_user
+                    }, ctx)
+                } catch (e) {
+                    this.logger.error(`[${traceId}] 返还乐跑次数失败:${e.stack || e}`)
+                }
+
+                if (ctx.channel) {
+                    await this.enqueueTask(ctx.channel, 'lepao.sendNotice', {
+                        account: req.account,
+                        success: false,
+                        reason: err.message || '未知错误',
+                        traceId
+                    }, { id: `${traceId}:notice:fail` })
+                }
+
+                // 将失败消息发送到结果队列或死信队列
+                if (ctx.channel) {
+                    await this.sendResult(ctx.channel, {
+                        id: req.taskId,
+                        success: false,
+                        error: err.message
+                    })
+                    await ctx.channel.sendToQueue(
+                        this.deadQueue,
+                        Buffer.from(JSON.stringify({ ...req, error: err.message })),
+                        { persistent: true }
+                    )
+                }
+                throw err
+            }
+        })
+
+        /* ---------------- 发送通知(独立 MQ 任务) ---------------- */
+        this.register('lepao.sendNotice', async (req, ctx) => {
+            const { account, success, data, reason, traceId } = req || {}
+
+            if (!account) {
+                throw new Error('发送通知失败:缺少 account')
+            }
+
+            const emailSql = `
+                SELECT 
+                    a.name, 
+                    a.email, 
+                    a.target_count,
+                    a.notice_type,
+                    e.bot_umo
+                FROM 
+                    lepao_account a
+                LEFT JOIN
+                    lepao_extra e
+                ON 
+                    a.student_num = e.student_num
+                WHERE
+                    a.student_num = ?
+            `
+            const rows = await db.query(emailSql, [account])
+            if (!rows || rows.length === 0) {
+                throw new Error('发送通知失败:未找到用户通知配置')
+            }
+
+            const user = rows[0]
+            const noticeType = user.notice_type || 'none'
+
+            const payload = success ? {
+                ...(data && typeof data === 'object' ? data : {}),
+                type: 'lepao_success',
+                umo: user.bot_umo,
+                // 沿用原 Lepao.js 字段:term_num 实际传的是 target_count
+                term_num: user.target_count ?? 0,
+                name: user.name,
+                account,
+                traceId
+            } : {
+                type: 'lepao_fail',
+                umo: user.bot_umo,
+                name: user.name,
+                account,
+                reason,
+                traceId
+            }
+
+            if (noticeType === 'bot' && user.bot_umo) {
+                const ch = await mq.getChannel(this.noticeQueue)
+                await ch.assertQueue(this.noticeQueue, { durable: true })
+                ch.sendToQueue(
+                    this.noticeQueue,
+                    Buffer.from(JSON.stringify(payload)),
+                    {
+                        persistent: true,
+                        contentType: 'application/json'
+                    }
+                )
+                return { delivered: true, via: 'bot' }
+            }
+
+            if (noticeType === 'email' && user.email) {
+                if (success) {
+                    await EmailTemplate.lepaoSuccess(user.email, payload)
+                    return { delivered: true, via: 'email' }
+                }
+
+                await EmailTemplate.lepaoFail(user.email, {
+                    name: user.name,
+                    account,
+                    reason: reason || '系统繁忙,请联系客服或稍后再试',
+                    traceId
+                })
+                return { delivered: true, via: 'email' }
+            }
+
+            return { delivered: false, via: 'none' }
+        })
+
+        /* ---------------- 扣减次数(仅成功时执行) ---------------- */
+        this.register('lepao.consumeCount', async (req, ctx) => {
+            const account = req?.account
+            const uuid = req?.uuid
+            if (!uuid) {
+                throw new Error('扣减乐跑次数失败:缺少 uuid')
+            }
+
+            // 幂等:同一 taskId 只扣一次
+            const consumeKey = `lepao:consume:${ctx?.taskId || ctx?.traceId || account || uuid}`
+            const existed = await Redis.get(consumeKey)
+            if (existed) {
+                return true
+            }
+
+            this.logger.info(`${account || uuid}开始扣减乐跑次数`)
+            const useLepaoCountSql = 'UPDATE users SET lepao_count = lepao_count - 1 WHERE uuid  = ?'
+            const r = await db.query(useLepaoCountSql, [uuid])
+            if (!r || r.affectedRows !== 1) {
+                throw new Error('扣减乐跑次数失败:数据库更新失败')
+            }
+            this.logger.info(`${account || uuid}扣减乐跑次数完成`)
+
+            await Redis.set(consumeKey, '1', { EX: 3600 })
+            return true
+        })
+
+        /* ---------------- 返还次数(失败时执行) ---------------- */
+        this.register('lepao.refundCount', async (req, ctx) => {
+            const account = req?.account
+            const uuid = req?.uuid
+            if (!uuid) {
+                return true
+            }
+
+            const baseKey = `${ctx?.taskId || ctx?.traceId || account || uuid}`
+            const consumeKey = `lepao:consume:${baseKey}`
+            const refundKey = `lepao:refund:${baseKey}`
+
+            const consumed = await Redis.get(consumeKey)
+            if (!consumed) {
+                return true
+            }
+            const refunded = await Redis.get(refundKey)
+            if (refunded) {
+                return true
+            }
+
+            this.logger.info(`${account || uuid}开始返还乐跑次数`)
+            const sql = 'UPDATE users SET lepao_count = lepao_count + 1 WHERE uuid  = ?'
+            const r = await db.query(sql, [uuid])
+            if (!r || r.affectedRows !== 1) {
+                throw new Error('返还乐跑次数失败:数据库更新失败')
+            }
+            this.logger.info(`${account || uuid}返还乐跑次数完成`)
+            await Redis.set(refundKey, '1', { EX: 3600 })
+            return true
+        })
+
+        this.register('lepao.getUserData', async (req, ctx) => {
+            const account = req.account
+            this.logger.info(`${account}开始获取用户数据`)
+            const accountSql = `
+                SELECT
+                    u.uuid, 
+                    u.lepao_count,
+                    l.create_user,
+                    l.name, 
+                    l.student_num,
+                    l.area, 
+                    l.sex, 
+                    l.state,  
+                    l.token, 
+                    l.uid, 
+                    l.school_id, 
+                    l.userAgent,
+                    l.deviceModel,
+                    l.notice_type, 
+                    l.email,
+                    e.bot_account
+                FROM 
+                    lepao_account l
+                LEFT JOIN
+                    users u
+                ON
+                    l.create_user = u.uuid
+                LEFT JOIN
+                    lepao_extra e
+                ON 
+                    l.student_num = e.student_num
+                WHERE 
+                    l.student_num = ?
+            `
+            const rows = await db.query(accountSql, [account])
+            if (!rows || rows.length === 0) {
+                this.logger.error(`${account}无法获取账号数据`)
+                throw new Error('无法获取账号数据,请联系客服或稍后再试')
+            }
+
+            let userData = rows[0]
+            
+            if (!userData.create_user || !userData.uuid) {
+                this.logger.warn(`${account}账号状态异常`)
+                throw new Error('当前账号状态异常,请联系客服')
+            }
+
+            if (userData.state !== 1) {
+                this.logger.warn(`${account}登录状态异常 state=${userData.state}`)
+                throw new Error('乐跑账号登录已过期,请尝试使用登录器重新登录')
+            }
+
+            if (userData.lepao_count < 1) {
+                this.logger.warn(`${account}乐跑次数不足`)
+                throw new Error('用户乐跑次数不足,请购买乐跑次数后重试!')
+            }
+
+            if(!userData.userAgent)
+                userData.userAgent = 'Mozilla/5.0 (Linux; Android 16; 2211133C Build/BP2A.250605.031.A3; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/138.0.7204.180 Mobile Safari/537.36 XWEB/1380347 MMWEBSDK/20250202 MMWEBID/1020 wxwork/5.0.6.66174 MicroMessenger/8.0.28.48(0x28001c30) MiniProgramEnv/android Luggage/3.0.2.95ef3f83 NetType/WIFI Language/zh_CN ABI/arm64'
+
+            if(!userData.deviceModel)
+                userData.deviceModel = '2211133C'
+
+            return userData
+        })
+
+        this.register('lepao.getPath', async (req, ctx) => {
+            const account = req.account
+            this.logger.info(`${account}开始获取路径`)
+            const accountSql = 'SELECT area, sex FROM lepao_account WHERE student_num = ?'
+            const rows = await db.query(accountSql, [account])
+            if (!rows || rows.length === 0) {
+                this.logger.error(`${account}无法获取账号数据`)
+                throw new Error('无法获取账号数据')
+            }
+
+            const { area, sex } = rows[0]
+
+            let max = 4.00
+            let min = 2.00
+            if (sex === 2) {
+                max = 2.00
+                min = 1.60
+            }
+
+            this.logger.info(`${account}路径参数: area=${area ?? '随机'}, max_distance=${max}, min_distance=${min}`)
+
+            let pathSql = 'SELECT id FROM path_data WHERE state = 1 AND distance < ? AND distance > ? '
+            const pathParams = [max, min]
+
+            if (area) {
+                pathSql += ' AND run_zone_name = ?'
+                pathParams.push(area)
+            }
+
+            pathSql += ' ORDER BY count ASC LIMIT 1'
+
+            const paths = await db.query(pathSql, pathParams)
+            if (!paths || paths.length === 0) {
+                this.logger.error(`${account}未找到符合条件的路线`)
+                const err = new Error('未找到符合条件的路线,请改变路径选择条件')
+                err.code = 'PATH_SELECT_FAILED'
+                err.retryable = true
+                throw err
+            }
+
+            const randomPath = paths[0]
+
+            const updateSql = 'UPDATE path_data SET count = count + 1 WHERE id = ?'
+            await db.query(updateSql, [randomPath.id])
+
+            this.logger.info(`${account}路径选中id=${randomPath.id},计数加1成功`)
+
+            return { path_id: randomPath.id }
+        })
+
+        /* ---------------- 获取跑步记录 ---------------- */
+        this.register('lepao.getRecord', async (req, ctx) => {
+            const now = this.lepaoTimestamp()
+            const raw = {
+                uid: req.uid,
+                token: req.token,
+                school_id: req.school_id,
+                term_id: 0,
+                course_id: 0,
+                class_id: 0,
+                student_num: req.student_id,
+                card_id: req.student_id,
+                timestamp: now,
+                version: 1,
+                nonce: String(Math.floor(Math.random() * 900000 + 100000)),
+                ostype: 5
+            }
+            raw.sign = dataSign(raw)
+
+            return this.request(
+                ctx.traceId,
+                'getRecord',
+                this.api('/Run2/beforeRunV260'),
+                raw,
+                {
+                    'User-Agent': req.userAgent,
+                    'charset': 'utf-8',
+                    'Referer': 'https://servicewechat.com/wxf94c4ddb63d87ede/32/page-frame.html',
+                }
+            )
+        })
+
+        /* ---------------- 切换跑区 ---------------- */
+        this.register('lepao.setZone', async (req, ctx) => {
+            const runZoneMap = {
+                '兰花湖校区跑区': 2,
+                '主校区北跑区': 3,
+                '主校区南跑区': 5,
+                '重庆工商大学茶园校区': 6
+            }
+
+            const record = await db.query(
+                'SELECT run_zone_name FROM path_data WHERE id = ?',
+                [req.random_id]
+            )
+            if (!record || record.length === 0) {
+                throw new Error('跑区不存在')
+            }
+
+            const runZoneId = runZoneMap[record[0].run_zone_name]
+            if (!runZoneId) throw new Error('跑区不存在')
+
+            const raw = {
+                uid: req.uid,
+                token: req.token,
+                school_id: req.school_id,
+                term_id: 0,
+                course_id: 0,
+                class_id: 0,
+                student_num: req.student_id,
+                card_id: req.student_id,
+                timestamp: this.lepaoTimestamp(),
+                version: 1,
+                nonce: String(Math.floor(Math.random() * 900000 + 100000)),
+                ostype: 5,
+                run_zone_id: String(runZoneId)
+            }
+            raw.sign = dataSign(raw)
+
+            await this.request(
+                ctx.traceId,
+                'setZone',
+                this.api('/Run/setRunZone'),
+                raw
+            )
+            return { run_zone_id: runZoneId }
+        })
+
+        /* ---------------- 获取 OSS STS ---------------- */
+        this.register('lepao.getOssSts', async (req, ctx) => {
+            const raw = {
+                uid: req.uid,
+                token: req.token,
+                school_id: req.school_id,
+                term_id: 0,
+                course_id: 0,
+                class_id: 0,
+                student_num: req.student_id,
+                card_id: req.student_id,
+                timestamp: this.lepaoTimestamp(),
+                version: 1,
+                nonce: String(Math.floor(Math.random() * 900000 + 100000)),
+                ostype: 5
+            }
+            raw.sign = dataSign(raw)
+
+            const res = await this.request(
+                ctx.traceId,
+                'getOssSts',
+                this.api('/WpIndex/getOssSts'),
+                raw
+            )
+
+            return res.data
+        })
+
+        /* ---------------- 上传 OSS 文件 ---------------- */
+        this.register('lepao.uploadOssFile', async (req, ctx) => {
+            const pathRow = await db.query(
+                'SELECT * FROM path_data WHERE id=?',
+                [req.random_id]
+            )
+            if (!pathRow || pathRow.length === 0) {
+                throw new Error('路径数据不存在')
+            }
+            const pathData = pathRow[0]
+
+            // 处理跑步路径
+            const newPathData = getPathData(pathData.data, req.run_end_time, pathData.time)
+            const pathResult = dataEncrypt(JSON.stringify(newPathData))
+
+            // 获取跑步规则参数
+            const runRule = await this.handlers['lepao.getRecord'](req, ctx)
+            const ruleData = runRule?.data
+            if (!ruleData?.run_line_info?.point_list || !ruleData?.time_rule_arr?.length) {
+                const err = new Error('获取打卡点规则失败')
+                err.code = 'CHECKPOINT_FETCH_FAILED'
+                err.retryable = true
+                throw err
+            }
+            const check_points = ruleData.run_line_info.point_list
+            let min_log_num = ruleData.time_rule_arr[0]?.min_log_num || 4
+            const point_update_distance = parseFloat(ruleData.run_line_info.point_update_distance || 0) * 1000
+            const log_max_distance = Number(ruleData.run_line_info.log_max_distance || 0)
+
+            // 生成打卡点
+            const point_data = selectCheckpoints(newPathData, check_points, min_log_num, point_update_distance, log_max_distance, req.run_end_time, pathData.time)
+            if (!point_data) {
+                this.logger.warn(`[RETRY] 打卡点数量不足,重新更换路径`)
+                const err = new Error('打卡点数量不足')
+                err.code = 'CHECKPOINT_INSUFFICIENT'
+                err.retryable = true
+                throw err
+            }
+
+            const sts = await this.handlers['lepao.getOssSts'](req, ctx)
+            if (!sts?.bucket || !sts?.AccessKeyId || !sts?.AccessKeySecret || !sts?.SecurityToken) {
+                throw new Error('获取 OSS STS 失败')
+            }
+
+            const now = new Date()
+            const yyyy = now.getFullYear()
+            const mm = String(now.getMonth() + 1).padStart(2, '0')
+            const dd = String(now.getDate()).padStart(2, '0')
+            const formattedToday = `${yyyy}-${mm}-${dd}`
+            const boundary = String(Date.now())
+            const timestamp = String(Date.now())
+            const ossPath = `Public/Upload/file/run_record/${boundary.slice(-3)}/${formattedToday}/${timestamp}-${Math.floor(Math.random() * 150)}.txt`
+            const client = new OSS({
+                bucket: sts.bucket,
+                region: sts.region || 'oss-cn-hangzhou',
+                accessKeyId: sts.AccessKeyId,
+                accessKeySecret: sts.AccessKeySecret,
+                stsToken: sts.SecurityToken,
+                secure: true
+            })
+            await client.put(ossPath, Buffer.from(pathResult, 'utf-8'))
+
+            return { oss_path: ossPath, point_data: point_data }
+        })
+
+        /* ---------------- 提交跑步数据 ---------------- */
+        this.register('lepao.bindData', async (req, ctx) => {
+            const pathRow = await db.query(
+                'SELECT * FROM path_data WHERE id=?',
+                [req.random_id]
+            )
+            const pathData = pathRow[0]
+
+            const stepData = generateCadence(pathData.distance, pathData.time)
+            const stepInfo = JSON.stringify({ interval: 60, list: stepData.cadence_list })
+
+            const data = {
+                uid: req.uid,
+                token: req.token,
+                school_id: req.school_id,
+                term_id: 1,
+                course_id: 0,
+                class_id: 0,
+                student_num: req.student_id,
+                card_id: req.student_id,
+                timestamp: this.lepaoTimestamp(),
+                version: 1,
+                nonce: String(Math.floor(Math.random() * 900000 + 100000)),
+                ostype: 5,
+                game_id: req.run_zone_id || 0,
+                start_time: req.run_end_time - Number(pathData.time),
+                end_time: req.run_end_time,
+                distance: pathData.distance,
+                record_img: "",
+                log_data: req.point_data,
+                file_img: "",
+                is_running_area_valid: 1,
+                mobileDeviceId: 1,
+                mobileModel: req.deviceModel,
+                step_info: stepInfo,
+                step_num: stepData.total_steps,
+                used_time: pathData.time,
+                mobileOsVersion: 1,
+                record_file: req.record_file
+            }
+
+            data.sign = dataSign(data)
+
+            return this.request(
+                ctx.traceId,
+                'bindData',
+                this.api('/Run/stopRunV278'),
+                data
+            )
+        })
+    }
+
+    /* ================= Worker核心 ================= */
+
+    async start() {
+        if (this.running) return
+        this.running = true
+
+        this.logger.info('Worker 启动中...')
+
+        try {
+            this.initHandlers()
+
+            const channel = await mq.getChannel(this.channelName)
+
+            await channel.prefetch(5)
+
+            await channel.assertQueue(this.taskQueue, { durable: true })
+            await channel.assertQueue(this.resultQueue, { durable: true })
+            await channel.assertQueue(this.deadQueue, { durable: true })
+
+            await channel.consume(this.taskQueue, async (msg) => {
+                if (!msg) return
+
+                let content
+
+                try {
+                    content = JSON.parse(msg.content.toString())
+                } catch {
+                    return channel.ack(msg)
+                }
+
+                const { id, type, data, retry = 0 } = content
+
+                const traceId = this.traceId()
+
+                const handler = this.handlers[type]
+
+                if (!handler) {
+                    this.log(traceId, 'ERROR', '未知任务', { type })
+                    return channel.ack(msg)
+                }
+
+                try {
+                    const result = await this.withTimeout(
+                        handler(data, { traceId, channel, taskId: id }),
+                        type
+                    )
+
+                    await this.sendResult(channel, {
+                        id,
+                        success: true,
+                        result
+                    })
+
+                    this.log(traceId, 'DONE', `任务完成 ${type}`)
+                    channel.ack(msg)
+
+                } catch (err) {
+                    this.logErr(traceId, `任务失败 ${type}`, err)
+
+                    if (retry < this.maxRetry && this.isRetryableTaskError(err)) {
+                        // 重试
+                        await channel.sendToQueue(
+                            this.taskQueue,
+                            Buffer.from(JSON.stringify({
+                                ...content,
+                                retry: retry + 1
+                            })),
+                            { persistent: true }
+                        )
+
+                        this.log(traceId, 'RETRY', `重试第${retry + 1}次`)
+                    } else {
+                        // 死信
+                        await channel.sendToQueue(
+                            this.deadQueue,
+                            Buffer.from(JSON.stringify(content)),
+                            { persistent: true }
+                        )
+
+                        this.log(traceId, 'DEAD', '进入死信队列')
+                    }
+
+                    await this.sendResult(channel, {
+                        id,
+                        success: false,
+                        error: err.message
+                    })
+
+                    channel.ack(msg)
+                }
+            })
+
+            this.logger.info('RunForge Worker 启动成功')
+        } catch (err) {
+            this.logger.error('RunForge Worker 启动失败: ' + err.stack)
+        }
+    }
+
+    async sendResult(channel, data) {
+        channel.sendToQueue(
+            this.resultQueue,
+            Buffer.from(JSON.stringify(data)),
+            { persistent: true }
+        )
+    }
+
+    async stop() {
+        this.running = false
+        await mq.close()
+        this.logger.info('RunForge Worker 已停止')
+    }
+}
+
+module.exports = Worker

+ 10 - 0
lib/Server.js

@@ -5,6 +5,7 @@ const fs = require('fs')
 const config = require('../config.json')
 const config = require('../config.json')
 const Logger = require('./Logger')
 const Logger = require('./Logger')
 const MySQL = require('../plugin/DataBase/MySQL')
 const MySQL = require('../plugin/DataBase/MySQL')
+const Worker = require('./Lepao/Worker')
 const mq = require('../plugin/mq')
 const mq = require('../plugin/mq')
 
 
 class SERVER {
 class SERVER {
@@ -52,6 +53,15 @@ class SERVER {
 
 
             await ch.assertQueue('mq_health_check', { durable: false })
             await ch.assertQueue('mq_health_check', { durable: false })
             this.logger.info('✅ RabbitMQ 初始化 & 测试成功')
             this.logger.info('✅ RabbitMQ 初始化 & 测试成功')
+
+            const worker = new Worker()
+            try {
+                await worker.start()
+                this.logger.info('RunForge Worker 已启动,正在监听 MQ 任务...')
+            } catch (err) {
+                console.error('RunForge Worker 启动失败:', err)
+                process.exit(1)
+            }
         } catch (e) {
         } catch (e) {
             this.logger.error('❌ RabbitMQ 初始化失败')
             this.logger.error('❌ RabbitMQ 初始化失败')
             process.exit(1)
             process.exit(1)

+ 89 - 0
plugin/Lepao/Crypto.js

@@ -0,0 +1,89 @@
+const crypto = require('crypto')
+
+const KEY_STR = "Wet2C8d34f62ndi3"
+const IV_STR = "K6iv85jBD8jgf32D"
+const SALT = "rDJiNB9j7vD2"
+
+const KEY = Buffer.from(KEY_STR, 'utf-8')
+const IV = Buffer.from(IV_STR, 'utf-8')
+
+/**
+ * AES 加密
+ */
+function dataEncrypt(plainText) {
+    try {
+        const cipher = crypto.createCipheriv('aes-128-cbc', KEY, IV)
+        cipher.setAutoPadding(true)
+
+        let encrypted = cipher.update(plainText, 'utf-8')
+        encrypted = Buffer.concat([encrypted, cipher.final()])
+
+        return encrypted.toString('base64')
+    } catch (err) {
+        console.error('加密失败:', err.message)
+        return null
+    }
+}
+
+/**
+ * AES 解密
+ */
+function dataDecrypt(encryptedBase64Text) {
+    try {
+        const encryptedBuffer = Buffer.from(encryptedBase64Text, 'base64')
+
+        const decipher = crypto.createDecipheriv('aes-128-cbc', KEY, IV)
+        decipher.setAutoPadding(true)
+
+        let decrypted = decipher.update(encryptedBuffer)
+        decrypted = Buffer.concat([decrypted, decipher.final()])
+
+        return decrypted.toString('utf-8')
+    } catch (err) {
+        console.error('解密失败:', err.message)
+        return null
+    }
+}
+
+/**
+ * MD5 签名
+ */
+function dataSign(dataObj) {
+    if (typeof dataObj !== 'object') {
+        throw new TypeError('data must be object')
+    }
+
+    const sortedKeys = Object.keys(dataObj).sort()
+
+    let str = ''
+    for (const key of sortedKeys) {
+        str += key + String(dataObj[key])
+    }
+
+    str += SALT
+
+    return crypto.createHash('md5').update(str, 'utf-8').digest('hex')
+}
+
+/**
+ * OSS POST 签名
+ */
+function ossPostSign(accessKeySecret, policyDocument) {
+    const policyStr = JSON.stringify(policyDocument)
+
+    const base64Policy = Buffer.from(policyStr).toString('base64')
+
+    const signature = crypto
+        .createHmac('sha1', accessKeySecret)
+        .update(base64Policy)
+        .digest('base64')
+
+    return [base64Policy, signature]
+}
+
+module.exports = {
+    dataEncrypt,
+    dataDecrypt,
+    dataSign,
+    ossPostSign
+}

+ 164 - 0
plugin/Lepao/Path.js

@@ -0,0 +1,164 @@
+// 随机扰动函数
+function randomPerturbation(scale = 1e-7) {
+    return (Math.random() * 2 - 1) * scale
+}
+
+// Haversine 公式计算两点距离(米)
+function haversine(lat1, lon1, lat2, lon2) {
+    const R = 6371000
+    const toRad = (deg) => deg * Math.PI / 180
+    const phi1 = toRad(lat1)
+    const phi2 = toRad(lat2)
+    const dphi = toRad(lat2 - lat1)
+    const dlambda = toRad(lon2 - lon1)
+
+    const a = Math.sin(dphi / 2) ** 2 + Math.cos(phi1) * Math.cos(phi2) * Math.sin(dlambda / 2) ** 2
+    return 2 * R * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
+}
+
+/**
+ * 根据原路径时间,重新生成新的时间路径数据
+ */
+function getPathData(pathlist, runEndTime, useTime) {
+    const startTime = (runEndTime - useTime) * 1000
+    const oldStartTime = parseInt(pathlist[0].d.split(' ')[0])
+
+    const newPathlist = pathlist.map((item, i) => {
+        const newItem = { ...item }
+        const [oldTimeStr, suffix] = item.d.split(' ', 2)
+        const oldTime = parseInt(oldTimeStr)
+
+        if (i === 0) {
+            newItem.d = `${startTime} ${suffix}`
+        } else {
+            const newTime = startTime + oldTime - oldStartTime
+            newItem.d = `${newTime} ${suffix}`
+            newItem.a += randomPerturbation()
+            newItem.o += randomPerturbation()
+        }
+        return newItem
+    })
+
+    console.log(newPathlist)
+
+    return newPathlist
+}
+
+/**
+ * 选择打卡点
+ */
+function selectCheckpoints(path, checkpoints, runLogNum, pointUpdateDistance, logMaxDistance, runEndTime, pathTime) {
+    const results = []
+    let totalDistance = 0
+    let lastCheckpointDistance = 0
+
+    // 筛选有效打卡点
+    const filteredCheckpoints = checkpoints
+        .filter(cp => cp.is_del === "0" && cp.is_online === 1 && cp.ctrl_status === "1")
+    // 打乱顺序
+    for (let i = filteredCheckpoints.length - 1; i > 0; i--) {
+        const j = Math.floor(Math.random() * (i + 1))
+        [filteredCheckpoints[i], filteredCheckpoints[j]] = [filteredCheckpoints[j], filteredCheckpoints[i]]
+    }
+
+    const usedCheckpoints = new Set()
+
+    for (const p of path) {
+        const parts = p.d.split(' ')
+        const tsMs = parseInt(parts[0])
+        const stepDistance = parseFloat(parts[1].split('_')[0]) || 0
+
+        totalDistance += stepDistance
+
+        for (const cp of filteredCheckpoints.slice(0, -1)) {
+            if (usedCheckpoints.has(cp.id)) continue
+
+            const [lat, lon] = cp.jingwei.split(',').map(Number)
+            const dist = haversine(p.a, p.o, lat, lon)
+
+            if (dist < logMaxDistance && (totalDistance - lastCheckpointDistance) > 200) {
+                console.log(`选中打卡点 ${cp.id} ${cp.address},距离:${dist}`)
+                results.push({
+                    point_id: cp.id,
+                    distance: +(totalDistance / 1000).toFixed(2),
+                    longitude: p.o,
+                    longtitude: p.o, // 保留原字段
+                    latitude: p.a,
+                    address: cp.address,
+                    jingwei: cp.jingwei.split(','),
+                    time: Math.floor(tsMs / 1000)
+                })
+                usedCheckpoints.add(cp.id)
+                lastCheckpointDistance = totalDistance
+                break
+            }
+        }
+    }
+
+    if (results.length >= runLogNum) {
+        // 随机抽取 runLogNum 个打卡点
+        const indices = []
+        while (indices.length < runLogNum) {
+            const idx = Math.floor(Math.random() * results.length)
+            if (!indices.includes(idx)) indices.push(idx)
+        }
+        indices.sort((a, b) => a - b)
+        const selectedResults = indices.map(i => results[i])
+
+        const n = selectedResults.length
+        let d = 0
+        for (let i = 0; i < n; i++) {
+            if (selectedResults[i].time >= runEndTime - 5) {
+                console.log("打卡点时间异常,重新分配时间")
+                for (let j = 0; j < n; j++) {
+                    selectedResults[j].time = Math.floor(runEndTime - (pathTime / n) * (n - j))
+                    selectedResults[j].distance = d + 0.3 * (j + 1)
+                }
+                break
+            }
+        }
+        console.log(`选中打卡点:${JSON.stringify(selectedResults)}`)
+        return selectedResults
+    } else {
+        console.log(`选中打卡点数量不足:${JSON.stringify(results)}`)
+        return false
+    }
+}
+
+/**
+ * 根据距离和用时生成步频数据
+ */
+function generateCadence(distanceKm, usedTime) {
+    const paceSec = usedTime / distanceKm
+    const paceMin = paceSec / 60
+
+    let spmRange
+    if (paceMin >= 8) spmRange = [85, 110]
+    else if (paceMin >= 5) spmRange = [150, 170]
+    else spmRange = [170, 190]
+
+    const minutes = Math.ceil(usedTime / 60)
+    const cadenceList = []
+
+    for (let i = 0; i < minutes; i++) {
+        let spm = Math.floor(Math.random() * (spmRange[1] - spmRange[0] + 1) + spmRange[0])
+        if (i === minutes - 1) {
+            const lastDuration = usedTime - (minutes - 1) * 60
+            spm = Math.round(spm * (lastDuration / 60))
+        }
+        cadenceList.push(spm)
+    }
+
+    const totalSteps = cadenceList.reduce((a, b) => a + b, 0)
+
+    return {
+        cadence_list: cadenceList,
+        total_steps: totalSteps
+    }
+}
+
+module.exports = {
+    getPathData,
+    selectCheckpoints,
+    generateCadence
+}

+ 143 - 0
plugin/mq/Worker.js

@@ -0,0 +1,143 @@
+const db = require('../DataBase/db')
+const path = require('path')
+const Logger = require('../../lib/Logger')
+const mq = require('.')
+
+class Worker {
+    constructor() {
+        this.logger = new Logger(
+            path.join(__dirname, '../logs/Worker.log'),
+            'INFO'
+        )
+
+        this.handlers = {}
+        this.running = false
+
+        // 队列名
+        this.taskQueue = 'task_queue'
+        this.resultQueue = 'task_result_queue'
+
+        // channel 名称(避免和别的模块冲突)
+        this.channelName = 'worker_channel'
+    }
+
+    /**
+     * 注册任务处理器
+     */
+    register(type, handler) {
+        this.handlers[type] = handler
+        this.logger.info(`注册处理器: ${type}`)
+    }
+
+    /**
+     * 启动 Worker
+     */
+    async start() {
+        if (this.running) return
+        this.running = true
+
+        this.logger.info('Worker 启动中...')
+
+        try {
+            const channel = await mq.getChannel(this.channelName)
+
+            // 控制并发(重要)
+            await channel.prefetch(5)
+
+            // 确保队列存在
+            await channel.assertQueue(this.taskQueue, { durable: true })
+            await channel.assertQueue(this.resultQueue, { durable: true })
+
+            // 开始消费
+            await channel.consume(
+                this.taskQueue,
+                async (msg) => {
+                    if (!msg) return
+
+                    let content
+
+                    try {
+                        content = JSON.parse(msg.content.toString())
+                    } catch (err) {
+                        this.logger.error('消息解析失败: ' + err.message)
+                        channel.ack(msg)
+                        return
+                    }
+
+                    const { id, type, data } = content
+
+                    this.logger.info(`收到任务: ${id} 类型: ${type}`)
+
+                    const handler = this.handlers[type]
+
+                    if (!handler) {
+                        this.logger.error(`未找到处理器: ${type}`)
+                        channel.ack(msg)
+                        return
+                    }
+
+                    try {
+                        const result = await handler(data, {
+                            db,
+                            logger: this.logger
+                        })
+
+                        this.logger.info(`任务完成: ${id}`)
+
+                        await this.sendResult(channel, {
+                            id,
+                            success: true,
+                            result
+                        })
+
+                        channel.ack(msg)
+                    } catch (err) {
+                        this.logger.error(`任务失败: ${id} - ${err.stack}`)
+
+                        await this.sendResult(channel, {
+                            id,
+                            success: false,
+                            error: err.message
+                        })
+
+                        // 简单策略:失败直接 ack(避免死循环)
+                        channel.ack(msg)
+                    }
+                },
+                {
+                    noAck: false
+                }
+            )
+
+            this.logger.info('Worker 启动成功')
+        } catch (err) {
+            this.logger.error('Worker 启动失败: ' + err.stack)
+        }
+    }
+
+    /**
+     * 发送结果
+     */
+    async sendResult(channel, data) {
+        try {
+            channel.sendToQueue(
+                this.resultQueue,
+                Buffer.from(JSON.stringify(data)),
+                { persistent: true }
+            )
+        } catch (err) {
+            this.logger.error('结果发送失败: ' + err.message)
+        }
+    }
+
+    /**
+     * 停止 Worker
+     */
+    async stop() {
+        this.running = false
+        await mq.close()
+        this.logger.info('Worker 已停止')
+    }
+}
+
+module.exports = Worker