| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- const Redis = require('../DataBase/Redis')
- /** 到期后由进程定时器投递到 runforge_task_queue(重启不丢,依赖 Redis) */
- const SCHEDULE_KEY = 'lepao:mq:scheduled'
- const POP_DUE_LUA = `
- local key = KEYS[1]
- local now = tonumber(ARGV[1])
- local limit = tonumber(ARGV[2])
- local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, limit)
- for i = 1, #items do
- redis.call('ZREM', key, items[i])
- end
- return items
- `
- function stripScheduleMeta(msg) {
- if (!msg || typeof msg !== 'object') return msg
- const copy = { ...msg }
- delete copy._scheduleMeta
- return copy
- }
- /**
- * 延迟投递:整消息入 ZSET,score = fireAt(毫秒时间戳)
- * @param {object} meta 可选,管理端展示用:{ name, account, delayMs }
- */
- async function scheduleDelayedRunforgeTask(fireAt, messageObject, meta = null) {
- const toStore =
- meta != null
- ? {
- ...messageObject,
- _scheduleMeta: {
- ...meta,
- fireAt
- }
- }
- : messageObject
- const member = JSON.stringify(toStore)
- await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
- }
- async function requeueAt(fireAt, messageObject) {
- const member = JSON.stringify(messageObject)
- await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
- }
- /**
- * 原子取出已到期的成员(原始 JSON 字符串数组)
- */
- async function popDueMessages(now = Date.now(), limit = 100) {
- const res = await Redis.sendCommand([
- 'EVAL',
- POP_DUE_LUA,
- '1',
- SCHEDULE_KEY,
- String(now),
- String(limit)
- ])
- if (!Array.isArray(res)) return []
- return res.map((x) => (Buffer.isBuffer(x) ? x.toString('utf8') : String(x)))
- }
- /** 清理过久未弹出项(异常场景) */
- async function pruneStaleScheduled(beforeScore, now = Date.now()) {
- await Redis.sendCommand(['ZREMRANGEBYSCORE', SCHEDULE_KEY, '-inf', String(beforeScore)])
- }
- /**
- * 管理端:尚未到期的调度(score > now)
- */
- async function listPendingScheduledForAdmin(now = Date.now(), limitTotal = 800) {
- await pruneStaleScheduled(now - 48 * 3600 * 1000, now)
- const raw = await Redis.sendCommand([
- 'ZRANGEBYSCORE',
- SCHEDULE_KEY,
- `(${String(now)}`,
- '+inf',
- 'WITHSCORES',
- 'LIMIT',
- '0',
- String(limitTotal)
- ])
- const items = []
- for (let i = 0; i < raw.length; i += 2) {
- const score = Number(raw[i + 1])
- const value = Buffer.isBuffer(raw[i]) ? raw[i].toString('utf8') : raw[i]
- let parsed
- try {
- parsed = JSON.parse(value)
- } catch {
- parsed = { raw: value }
- }
- const meta = parsed._scheduleMeta || {}
- const fireAt = meta.fireAt != null ? meta.fireAt : score
- items.push({
- taskId: parsed.id,
- type: parsed.type,
- account: parsed.data?.account,
- name: meta.name,
- fireAt,
- score,
- delayMs: meta.delayMs,
- remainMs: Math.max(0, fireAt - now),
- payloadPreview: stripScheduleMeta(parsed)
- })
- }
- return {
- items,
- note: 'Redis 调度:未到 fireAt 前不会进入 runforge_task_queue;到期由本机定时任务写入 MQ。'
- }
- }
- async function countPendingScheduled(now = Date.now()) {
- const n = await Redis.sendCommand([
- 'ZCOUNT',
- SCHEDULE_KEY,
- `(${String(now)}`,
- '+inf'
- ])
- return Number(n) || 0
- }
- module.exports = {
- SCHEDULE_KEY,
- stripScheduleMeta,
- scheduleDelayedRunforgeTask,
- requeueAt,
- popDueMessages,
- listPendingScheduledForAdmin,
- countPendingScheduled
- }
|