const Redis = require('../DataBase/Redis') /** 到期后由进程定时器投递到 runforge_task_queue(重启不丢,依赖 Redis) */ const SCHEDULE_KEY = 'lepao:mq:scheduled' const POP_DUE_LUA = ` local key = KEYS[1] local now = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, limit) for i = 1, #items do redis.call('ZREM', key, items[i]) end return items ` function stripScheduleMeta(msg) { if (!msg || typeof msg !== 'object') return msg const copy = { ...msg } delete copy._scheduleMeta return copy } /** * 延迟投递:整消息入 ZSET,score = fireAt(毫秒时间戳) * @param {object} meta 可选,管理端展示用:{ name, account, delayMs } */ async function scheduleDelayedRunforgeTask(fireAt, messageObject, meta = null) { const toStore = meta != null ? { ...messageObject, _scheduleMeta: { ...meta, fireAt } } : messageObject const member = JSON.stringify(toStore) await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member]) } async function requeueAt(fireAt, messageObject) { const member = JSON.stringify(messageObject) await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member]) } /** * 原子取出已到期的成员(原始 JSON 字符串数组) */ async function popDueMessages(now = Date.now(), limit = 100) { const res = await Redis.sendCommand([ 'EVAL', POP_DUE_LUA, '1', SCHEDULE_KEY, String(now), String(limit) ]) if (!Array.isArray(res)) return [] return res.map((x) => (Buffer.isBuffer(x) ? x.toString('utf8') : String(x))) } /** 清理过久未弹出项(异常场景) */ async function pruneStaleScheduled(beforeScore, now = Date.now()) { await Redis.sendCommand(['ZREMRANGEBYSCORE', SCHEDULE_KEY, '-inf', String(beforeScore)]) } /** * 管理端:尚未到期的调度(score > now) */ async function listPendingScheduledForAdmin(now = Date.now(), limitTotal = 800) { await pruneStaleScheduled(now - 48 * 3600 * 1000, now) const raw = await Redis.sendCommand([ 'ZRANGEBYSCORE', SCHEDULE_KEY, `(${String(now)}`, '+inf', 'WITHSCORES', 'LIMIT', '0', String(limitTotal) ]) const items = [] for (let i = 0; i < raw.length; i += 2) { const score = Number(raw[i + 1]) const value = Buffer.isBuffer(raw[i]) ? raw[i].toString('utf8') : raw[i] let parsed try { parsed = JSON.parse(value) } catch { parsed = { raw: value } } const meta = parsed._scheduleMeta || {} const fireAt = meta.fireAt != null ? meta.fireAt : score items.push({ taskId: parsed.id, type: parsed.type, account: parsed.data?.account, name: meta.name, fireAt, score, delayMs: meta.delayMs, remainMs: Math.max(0, fireAt - now), payloadPreview: stripScheduleMeta(parsed) }) } return { items, note: 'Redis 调度:未到 fireAt 前不会进入 runforge_task_queue;到期由本机定时任务写入 MQ。' } } async function countPendingScheduled(now = Date.now()) { const n = await Redis.sendCommand([ 'ZCOUNT', SCHEDULE_KEY, `(${String(now)}`, '+inf' ]) return Number(n) || 0 } module.exports = { SCHEDULE_KEY, stripScheduleMeta, scheduleDelayedRunforgeTask, requeueAt, popDueMessages, listPendingScheduledForAdmin, countPendingScheduled }