Browse Source

🦄 refactor: 重构自动乐跑逻辑

Pchen. 1 month ago
parent
commit
5f6f528189

+ 32 - 23
apis/Corn/StartAutoLepao.js

@@ -2,6 +2,8 @@ const API = require("../../lib/API");
 const db = require('../../plugin/DataBase/db')
 const Redis = require('../../plugin/DataBase/Redis')
 const mq = require('../../plugin/mq')
+const { assertRunforgeTaskIngress, publishRunforgeTask } = require('../../plugin/mq/runforgeTaskMq')
+const { scheduleDelayedRunforgeTask } = require('../../plugin/mq/lepaoAutoScheduleRedis')
 
 const { BaseStdResponse } = require("../../BaseStdResponse");
 
@@ -43,9 +45,9 @@ class StartAutoLepao extends API {
             let channel
             try {
                 channel = await mq.getChannel('lepao_corn')
-                await channel.assertQueue('runforge_task_queue', { durable: true })
+                await assertRunforgeTaskIngress(channel, this.logger)
             } catch (err) {
-                this.logger.error(`自动乐跑:连接 MQ 失败:${err.message || err}`)
+                this.logger.error(`自动乐跑:连接 MQ 或声明拓扑失败:${err.message || err}`)
                 return
             }
 
@@ -60,32 +62,39 @@ class StartAutoLepao extends API {
 
                 const delayMs = spreadWindowMs > 0 ? Math.floor(Math.random() * spreadWindowMs) : 0
                 const fireAt = nowMs + delayMs
+                const taskId = `lepao:auto:${fireAt}:${student_num}`
+                const payload = {
+                    id: taskId,
+                    type: 'lepao.startRun',
+                    data: {
+                        taskId,
+                        account: student_num
+                    },
+                    retry: 0
+                }
 
-                this.logger.info(`${name}(${student_num})将加入自动乐跑队列(延迟约 ${Math.round(delayMs / 1000)}s)`)
-
-                setTimeout(() => {
+                if (delayMs > 0) {
                     try {
-                        const taskId = `lepao:auto:${fireAt}:${student_num}`
-                        const payload = {
-                            id: taskId,
-                            type: 'lepao.startRun',
-                            data: {
-                                taskId,
-                                account: student_num
-                            },
-                            retry: 0
-                        }
-
-                        channel.sendToQueue(
-                            'runforge_task_queue',
-                            Buffer.from(JSON.stringify(payload)),
-                            { persistent: true, contentType: 'application/json' }
+                        await scheduleDelayedRunforgeTask(fireAt, payload, {
+                            name,
+                            account: student_num,
+                            delayMs
+                        })
+                        this.logger.info(
+                            `${name}(${student_num})已写入 Redis 调度(约 ${Math.round(delayMs / 1000)}s 后进 MQ 主队列)`
                         )
-                        this.logger.info(`${name}(${student_num})已投递自动乐跑任务(延迟约 ${Math.round(delayMs / 1000)}s)`)
                     } catch (err) {
-                        this.logger.error(`${name}(${student_num})乐跑投递失败:${err.message || err}`)
+                        this.logger.error(`${name}(${student_num})Redis 调度失败:${err.message || err}`)
                     }
-                }, delayMs)
+                    continue
+                }
+
+                try {
+                    publishRunforgeTask(channel, payload)
+                    this.logger.info(`${name}(${student_num})已投递自动乐跑任务`)
+                } catch (err) {
+                    this.logger.error(`${name}(${student_num})乐跑投递失败:${err.message || err}`)
+                }
             }
         } catch (error) {
             this.logger.error(error)

+ 6 - 6
apis/Corn/StartLepao.js

@@ -2,6 +2,10 @@ const API = require("../../lib/API");
 const db = require('../../plugin/DataBase/db')
 const Redis = require('../../plugin/DataBase/Redis')
 const mq = require('../../plugin/mq')
+const {
+    assertRunforgeTaskIngress,
+    publishRunforgeTask
+} = require('../../plugin/mq/runforgeTaskMq')
 const { BaseStdResponse } = require("../../BaseStdResponse")
 
 // 出现异常情况时补充乐跑
@@ -53,7 +57,7 @@ class StartLepao extends API {
 
                 try {
                     const channel = await mq.getChannel('lepao_corn')
-                    await channel.assertQueue('runforge_task_queue', { durable: true })
+                    await assertRunforgeTaskIngress(channel, this.logger)
 
                     const taskId = `lepao:repair:${Date.now()}:${student_num}`
                     const payload = {
@@ -66,11 +70,7 @@ class StartLepao extends API {
                         retry: 0
                     }
 
-                    channel.sendToQueue(
-                        'runforge_task_queue',
-                        Buffer.from(JSON.stringify(payload)),
-                        { persistent: true, contentType: 'application/json' }
-                    )
+                    publishRunforgeTask(channel, payload)
                     this.logger.info(`${name}(${student_num})已投递补充乐跑任务`)
                 } catch (err) {
                     this.logger.error(`${name}(${student_num})补充乐跑失败:${err.message || err}`)

+ 6 - 6
apis/Lepao/SingleRun.js

@@ -4,6 +4,10 @@ const db = require("../../plugin/DataBase/db.js")
 const { BaseStdResponse } = require("../../BaseStdResponse.js")
 const AccessControl = require("../../lib/AccessControl.js")
 const mq = require('../../plugin/mq')
+const {
+    assertRunforgeTaskIngress,
+    publishRunforgeTask
+} = require('../../plugin/mq/runforgeTaskMq')
 
 // 单次乐跑
 class SingleRun extends API {
@@ -87,7 +91,7 @@ class SingleRun extends API {
 
             try {
                 const channel = await mq.getChannel('lepao_api')
-                await channel.assertQueue('runforge_task_queue', { durable: true })
+                await assertRunforgeTaskIngress(channel, this.logger)
 
                 const taskId = `lepao:${Date.now()}:${student_num}`
                 const payload = {
@@ -100,11 +104,7 @@ class SingleRun extends API {
                     retry: 0
                 }
 
-                channel.sendToQueue(
-                    'runforge_task_queue',
-                    Buffer.from(JSON.stringify(payload)),
-                    { persistent: true, contentType: 'application/json' }
-                )
+                publishRunforgeTask(channel, payload)
             } catch (err) {
                 this.logger.error(`后台乐跑任务异常:${err.stack}`)
             }

+ 2 - 2
apis/Public/GetAppVersion.js

@@ -13,8 +13,8 @@ class GetAppVersion extends API {
         res.json({
             ...BaseStdResponse.OK,
             data: {
-                version: '1.9',
-                msg: '\n更新内容:\n修复部分bug,优化用户体验'
+                version: '2.0',
+                msg: '\n更新内容:\n重构乐跑逻辑 优化乐跑任务调度功能'
             }
         })
     }

+ 73 - 10
apis/User/Admin/GetQueueTasks.js

@@ -4,6 +4,11 @@ const mq = require('../../../plugin/mq')
 const config = require('../../../config.json')
 const AccessControl = require('../../../lib/AccessControl')
 const { BaseStdResponse } = require('../../../BaseStdResponse')
+const {
+    SCHEDULE_KEY,
+    listPendingScheduledForAdmin,
+    countPendingScheduled
+} = require('../../../plugin/mq/lepaoAutoScheduleRedis')
 
 /** 允许通过管理接口查看的队列(防任意队列名探测) */
 const ALLOWED_QUEUES = [
@@ -118,7 +123,15 @@ class GetQueueTasks extends API {
     }
 
     async onRequest(req, res) {
-        const { uuid, session, queue, limit: limitStr, summary } = req.query
+        const {
+            uuid,
+            session,
+            queue,
+            limit: limitStr,
+            summary,
+            includeScheduled,
+            scheduledLimit: scheduledLimitStr
+        } = req.query
 
         if ([uuid, session].some((v) => v === '' || v == null))
             return res.json({
@@ -158,11 +171,29 @@ class GetQueueTasks extends API {
                         }
                     }
                 }
+
+                const slimit = Math.min(
+                    2000,
+                    Math.max(1, parseInt(scheduledLimitStr, 10) || 800)
+                )
+                const pendingCount = await countPendingScheduled(Date.now())
+                const scheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
+
                 return res.json({
                     ...BaseStdResponse.OK,
                     data: {
                         summary: true,
                         queues,
+                        redisScheduler: {
+                            key: SCHEDULE_KEY,
+                            pendingCount,
+                            note: '到期任务由本服务定时写入 runforge_task_queue;多实例共享同一 Redis ZSET。'
+                        },
+                        autoRunScheduledMirror: {
+                            pendingCount,
+                            note: scheduledMirror.note,
+                            sample: scheduledMirror.items.slice(0, 20)
+                        },
                         fetchedAt: Date.now()
                     }
                 })
@@ -206,17 +237,49 @@ class GetQueueTasks extends API {
                 this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`)
             }
 
+            const wantScheduled =
+                includeScheduled !== '0' &&
+                includeScheduled !== 'false' &&
+                queueName === 'runforge_task_queue'
+
+            let autoRunScheduledMirror = null
+            let pendingScheduledCount = null
+            if (queueName === 'runforge_task_queue') {
+                pendingScheduledCount = await countPendingScheduled(Date.now())
+                if (wantScheduled) {
+                    const slimit = Math.min(
+                        500,
+                        Math.max(1, parseInt(scheduledLimitStr, 10) || 200)
+                    )
+                    autoRunScheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
+                }
+            }
+
+            const detail = {
+                queue: queueName,
+                messageCount,
+                consumerCount,
+                peekLimit: limit,
+                tasks,
+                managementError,
+                redisScheduler:
+                    queueName === 'runforge_task_queue'
+                        ? {
+                              key: SCHEDULE_KEY,
+                              pendingCount: pendingScheduledCount
+                          }
+                        : undefined,
+                autoRunScheduledMirror,
+                fetchedAt: Date.now()
+            }
+            if (queueName === 'runforge_task_queue') {
+                detail.peekNote =
+                    'tasks:已在主队列中的消息;autoRunScheduledMirror:尚未到 fireAt、仍只在 Redis 中的调度(到期后由服务写入 MQ)。'
+            }
+
             return res.json({
                 ...BaseStdResponse.OK,
-                data: {
-                    queue: queueName,
-                    messageCount,
-                    consumerCount,
-                    peekLimit: limit,
-                    tasks,
-                    managementError,
-                    fetchedAt: Date.now()
-                }
+                data: detail
             })
         } catch (error) {
             this.logger.error(`GetQueueTasks: ${error.stack || error}`)

+ 7 - 1
lib/Lepao/Worker.js

@@ -2,6 +2,7 @@ const path = require('path')
 const axios = require('axios')
 const OSS = require('ali-oss')
 const mq = require('../../plugin/mq')
+const { assertRunforgeTaskIngress } = require('../../plugin/mq/runforgeTaskMq')
 const db = require('../../plugin/DataBase/db')
 const Redis = require('../../plugin/DataBase/Redis')
 const EmailTemplate = require('../../plugin/Email/emailTemplate')
@@ -353,6 +354,11 @@ class Worker {
             let bindRes = null
 
             try {
+                // 检查redis是否存在当天乐跑成功记录
+                const isSuccess = await Redis.get(`lepaoSuccess:${req.account}`)
+                if (isSuccess)
+                    throw new Error('该账号当天已乐跑成功!请勿重复乐跑')
+
                 userData = await this.handlers['lepao.getUserData'](req, ctx)
 
                 // 进入乐跑进程后写入进行中缓存
@@ -1009,7 +1015,7 @@ class Worker {
 
             await channel.prefetch(5)
 
-            await channel.assertQueue(this.taskQueue, { durable: true })
+            await assertRunforgeTaskIngress(channel, this.logger)
             await channel.assertQueue(this.resultQueue, { durable: true })
             await channel.assertQueue(this.deadQueue, { durable: true })
 

+ 6 - 0
lib/Server.js

@@ -7,6 +7,7 @@ const Logger = require('./Logger')
 const MySQL = require('../plugin/DataBase/MySQL')
 const Worker = require('./Lepao/Worker')
 const mq = require('../plugin/mq')
+const { startLepaoSchedulePublisher } = require('../plugin/mq/lepaoSchedulePublisher')
 
 class SERVER {
     constructor() {
@@ -58,6 +59,11 @@ class SERVER {
             try {
                 await worker.start()
                 this.logger.info('RunForge Worker 已启动,正在监听 MQ 任务...')
+                startLepaoSchedulePublisher({
+                    logger: this.logger,
+                    intervalMs: config.rabbitmq?.lepaoScheduleTickMs ?? 2000,
+                    batch: config.rabbitmq?.lepaoScheduleBatch ?? 100
+                })
             } catch (err) {
                 console.error('RunForge Worker 启动失败:', err)
                 process.exit(1)

+ 135 - 0
plugin/mq/lepaoAutoScheduleRedis.js

@@ -0,0 +1,135 @@
+const Redis = require('../DataBase/Redis')
+
+/** 到期后由进程定时器投递到 runforge_task_queue(重启不丢,依赖 Redis) */
+const SCHEDULE_KEY = 'lepao:mq:scheduled'
+
+const POP_DUE_LUA = `
+local key = KEYS[1]
+local now = tonumber(ARGV[1])
+local limit = tonumber(ARGV[2])
+local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, limit)
+for i = 1, #items do
+  redis.call('ZREM', key, items[i])
+end
+return items
+`
+
+function stripScheduleMeta(msg) {
+    if (!msg || typeof msg !== 'object') return msg
+    const copy = { ...msg }
+    delete copy._scheduleMeta
+    return copy
+}
+
+/**
+ * 延迟投递:整消息入 ZSET,score = fireAt(毫秒时间戳)
+ * @param {object} meta 可选,管理端展示用:{ name, account, delayMs }
+ */
+async function scheduleDelayedRunforgeTask(fireAt, messageObject, meta = null) {
+    const toStore =
+        meta != null
+            ? {
+                  ...messageObject,
+                  _scheduleMeta: {
+                      ...meta,
+                      fireAt
+                  }
+              }
+            : messageObject
+    const member = JSON.stringify(toStore)
+    await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
+}
+
+async function requeueAt(fireAt, messageObject) {
+    const member = JSON.stringify(messageObject)
+    await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
+}
+
+/**
+ * 原子取出已到期的成员(原始 JSON 字符串数组)
+ */
+async function popDueMessages(now = Date.now(), limit = 100) {
+    const res = await Redis.sendCommand([
+        'EVAL',
+        POP_DUE_LUA,
+        '1',
+        SCHEDULE_KEY,
+        String(now),
+        String(limit)
+    ])
+    if (!Array.isArray(res)) return []
+    return res.map((x) => (Buffer.isBuffer(x) ? x.toString('utf8') : String(x)))
+}
+
+/** 清理过久未弹出项(异常场景) */
+async function pruneStaleScheduled(beforeScore, now = Date.now()) {
+    await Redis.sendCommand(['ZREMRANGEBYSCORE', SCHEDULE_KEY, '-inf', String(beforeScore)])
+}
+
+/**
+ * 管理端:尚未到期的调度(score > now)
+ */
+async function listPendingScheduledForAdmin(now = Date.now(), limitTotal = 800) {
+    await pruneStaleScheduled(now - 48 * 3600 * 1000, now)
+
+    const raw = await Redis.sendCommand([
+        'ZRANGEBYSCORE',
+        SCHEDULE_KEY,
+        `(${String(now)}`,
+        '+inf',
+        'WITHSCORES',
+        'LIMIT',
+        '0',
+        String(limitTotal)
+    ])
+
+    const items = []
+    for (let i = 0; i < raw.length; i += 2) {
+        const score = Number(raw[i + 1])
+        const value = Buffer.isBuffer(raw[i]) ? raw[i].toString('utf8') : raw[i]
+        let parsed
+        try {
+            parsed = JSON.parse(value)
+        } catch {
+            parsed = { raw: value }
+        }
+        const meta = parsed._scheduleMeta || {}
+        const fireAt = meta.fireAt != null ? meta.fireAt : score
+        items.push({
+            taskId: parsed.id,
+            type: parsed.type,
+            account: parsed.data?.account,
+            name: meta.name,
+            fireAt,
+            score,
+            delayMs: meta.delayMs,
+            remainMs: Math.max(0, fireAt - now),
+            payloadPreview: stripScheduleMeta(parsed)
+        })
+    }
+
+    return {
+        items,
+        note: 'Redis 调度:未到 fireAt 前不会进入 runforge_task_queue;到期由本机定时任务写入 MQ。'
+    }
+}
+
+async function countPendingScheduled(now = Date.now()) {
+    const n = await Redis.sendCommand([
+        'ZCOUNT',
+        SCHEDULE_KEY,
+        `(${String(now)}`,
+        '+inf'
+    ])
+    return Number(n) || 0
+}
+
+module.exports = {
+    SCHEDULE_KEY,
+    stripScheduleMeta,
+    scheduleDelayedRunforgeTask,
+    requeueAt,
+    popDueMessages,
+    listPendingScheduledForAdmin,
+    countPendingScheduled
+}

+ 63 - 0
plugin/mq/lepaoSchedulePublisher.js

@@ -0,0 +1,63 @@
+const mq = require('./index')
+const { publishRunforgeTask } = require('./runforgeTaskMq')
+const {
+    popDueMessages,
+    stripScheduleMeta,
+    requeueAt,
+    SCHEDULE_KEY
+} = require('./lepaoAutoScheduleRedis')
+
+let intervalHandle = null
+
+/**
+ * 定时将 Redis 中已到期的乐跑任务写入 runforge_task_queue
+ */
+function startLepaoSchedulePublisher(options = {}) {
+    const logger = options.logger || console
+    const intervalMs = options.intervalMs ?? 2000
+    const batch = options.batch ?? 100
+
+    if (intervalHandle) return
+
+    intervalHandle = setInterval(async () => {
+        try {
+            const now = Date.now()
+            const rawList = await popDueMessages(now, batch)
+            if (!rawList.length) return
+
+            const channel = await mq.getChannel('lepao_schedule_tick')
+
+            for (const raw of rawList) {
+                let msg
+                try {
+                    msg = JSON.parse(raw)
+                } catch (e) {
+                    logger.error?.(
+                        `[LepaoSchedule] 调度 JSON 无效已丢弃 key=${SCHEDULE_KEY}: ${String(raw).slice(0, 120)}`
+                    )
+                    continue
+                }
+
+                try {
+                    publishRunforgeTask(channel, stripScheduleMeta(msg))
+                } catch (e) {
+                    logger.error?.(`[LepaoSchedule] MQ 投递失败,5s 后重试: ${e.message || e}`)
+                    const retryAt = now + 5000
+                    try {
+                        await requeueAt(retryAt, msg)
+                    } catch (re2) {
+                        logger.error?.(`[LepaoSchedule] 写回 Redis 失败: ${re2.message || re2}`)
+                    }
+                }
+            }
+        } catch (e) {
+            logger.error?.(`[LepaoSchedule] tick 异常: ${e.message || e}`)
+        }
+    }, intervalMs)
+
+    logger.info?.(
+        `[LepaoSchedule] 已启动 Redis→MQ 调度(间隔 ${intervalMs}ms,每批最多 ${batch} 条,key=${SCHEDULE_KEY})`
+    )
+}
+
+module.exports = { startLepaoSchedulePublisher }

+ 26 - 0
plugin/mq/runforgeTaskMq.js

@@ -0,0 +1,26 @@
+const TASK_QUEUE = 'runforge_task_queue'
+
+/**
+ * 声明乐跑任务主队列(无 RabbitMQ 插件)
+ */
+async function assertRunforgeTaskIngress(channel, logger) {
+    await channel.assertQueue(TASK_QUEUE, { durable: true })
+    return { mode: 'direct', queue: TASK_QUEUE }
+}
+
+/**
+ * 投递乐跑任务 JSON 消息体(与 Worker 消费格式一致)
+ */
+function publishRunforgeTask(channel, messageObject) {
+    const body = Buffer.from(JSON.stringify(messageObject))
+    channel.sendToQueue(TASK_QUEUE, body, {
+        persistent: true,
+        contentType: 'application/json'
+    })
+}
+
+module.exports = {
+    TASK_QUEUE,
+    assertRunforgeTaskIngress,
+    publishRunforgeTask
+}