lepaoAutoScheduleRedis.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. const Redis = require('../DataBase/Redis')
  2. /** 到期后由进程定时器投递到 runforge_task_queue(重启不丢,依赖 Redis) */
  3. const SCHEDULE_KEY = 'lepao:mq:scheduled'
  4. const POP_DUE_LUA = `
  5. local key = KEYS[1]
  6. local now = tonumber(ARGV[1])
  7. local limit = tonumber(ARGV[2])
  8. local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, limit)
  9. for i = 1, #items do
  10. redis.call('ZREM', key, items[i])
  11. end
  12. return items
  13. `
  14. function stripScheduleMeta(msg) {
  15. if (!msg || typeof msg !== 'object') return msg
  16. const copy = { ...msg }
  17. delete copy._scheduleMeta
  18. return copy
  19. }
  20. /**
  21. * 延迟投递:整消息入 ZSET,score = fireAt(毫秒时间戳)
  22. * @param {object} meta 可选,管理端展示用:{ name, account, delayMs }
  23. */
  24. async function scheduleDelayedRunforgeTask(fireAt, messageObject, meta = null) {
  25. const toStore =
  26. meta != null
  27. ? {
  28. ...messageObject,
  29. _scheduleMeta: {
  30. ...meta,
  31. fireAt
  32. }
  33. }
  34. : messageObject
  35. const member = JSON.stringify(toStore)
  36. await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
  37. }
  38. async function requeueAt(fireAt, messageObject) {
  39. const member = JSON.stringify(messageObject)
  40. await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
  41. }
  42. /**
  43. * 原子取出已到期的成员(原始 JSON 字符串数组)
  44. */
  45. async function popDueMessages(now = Date.now(), limit = 100) {
  46. const res = await Redis.sendCommand([
  47. 'EVAL',
  48. POP_DUE_LUA,
  49. '1',
  50. SCHEDULE_KEY,
  51. String(now),
  52. String(limit)
  53. ])
  54. if (!Array.isArray(res)) return []
  55. return res.map((x) => (Buffer.isBuffer(x) ? x.toString('utf8') : String(x)))
  56. }
  57. /** 清理过久未弹出项(异常场景) */
  58. async function pruneStaleScheduled(beforeScore, now = Date.now()) {
  59. await Redis.sendCommand(['ZREMRANGEBYSCORE', SCHEDULE_KEY, '-inf', String(beforeScore)])
  60. }
  61. /**
  62. * 管理端:尚未到期的调度(score > now)
  63. */
  64. async function listPendingScheduledForAdmin(now = Date.now(), limitTotal = 800) {
  65. await pruneStaleScheduled(now - 48 * 3600 * 1000, now)
  66. const raw = await Redis.sendCommand([
  67. 'ZRANGEBYSCORE',
  68. SCHEDULE_KEY,
  69. `(${String(now)}`,
  70. '+inf',
  71. 'WITHSCORES',
  72. 'LIMIT',
  73. '0',
  74. String(limitTotal)
  75. ])
  76. const items = []
  77. for (let i = 0; i < raw.length; i += 2) {
  78. const score = Number(raw[i + 1])
  79. const value = Buffer.isBuffer(raw[i]) ? raw[i].toString('utf8') : raw[i]
  80. let parsed
  81. try {
  82. parsed = JSON.parse(value)
  83. } catch {
  84. parsed = { raw: value }
  85. }
  86. const meta = parsed._scheduleMeta || {}
  87. const fireAt = meta.fireAt != null ? meta.fireAt : score
  88. items.push({
  89. taskId: parsed.id,
  90. type: parsed.type,
  91. account: parsed.data?.account,
  92. name: meta.name,
  93. fireAt,
  94. score,
  95. delayMs: meta.delayMs,
  96. remainMs: Math.max(0, fireAt - now),
  97. payloadPreview: stripScheduleMeta(parsed)
  98. })
  99. }
  100. return {
  101. items,
  102. note: 'Redis 调度:未到 fireAt 前不会进入 runforge_task_queue;到期由本机定时任务写入 MQ。'
  103. }
  104. }
  105. async function countPendingScheduled(now = Date.now()) {
  106. const n = await Redis.sendCommand([
  107. 'ZCOUNT',
  108. SCHEDULE_KEY,
  109. `(${String(now)}`,
  110. '+inf'
  111. ])
  112. return Number(n) || 0
  113. }
  114. module.exports = {
  115. SCHEDULE_KEY,
  116. stripScheduleMeta,
  117. scheduleDelayedRunforgeTask,
  118. requeueAt,
  119. popDueMessages,
  120. listPendingScheduledForAdmin,
  121. countPendingScheduled
  122. }