lepaoAutoScheduleRedis.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. const Redis = require('../DataBase/Redis')
  2. /** JKES 延迟调度 ZSET(唯一) */
  3. const SCHEDULE_KEY = 'jkes_lepao:mq:scheduled'
  4. const POP_DUE_LUA = `
  5. local key = KEYS[1]
  6. local now = tonumber(ARGV[1])
  7. local limit = tonumber(ARGV[2])
  8. local items = redis.call('ZRANGEBYSCORE', key, '-inf', now, 'LIMIT', 0, limit)
  9. for i = 1, #items do
  10. redis.call('ZREM', key, items[i])
  11. end
  12. return items
  13. `
  14. function stripScheduleMeta(msg) {
  15. if (!msg || typeof msg !== 'object') return msg
  16. const copy = { ...msg }
  17. delete copy._scheduleMeta
  18. return copy
  19. }
  20. async function scheduleDelayedRunforgeTask(fireAt, messageObject, meta = null) {
  21. const toStore =
  22. meta != null
  23. ? {
  24. ...messageObject,
  25. _scheduleMeta: {
  26. ...meta,
  27. fireAt
  28. }
  29. }
  30. : messageObject
  31. const member = JSON.stringify(toStore)
  32. await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
  33. }
  34. async function requeueAt(fireAt, messageObject) {
  35. const member = JSON.stringify(messageObject)
  36. await Redis.sendCommand(['ZADD', SCHEDULE_KEY, String(fireAt), member])
  37. }
  38. async function popDueMessages(now = Date.now(), limit = 100) {
  39. const res = await Redis.sendCommand([
  40. 'EVAL',
  41. POP_DUE_LUA,
  42. '1',
  43. SCHEDULE_KEY,
  44. String(now),
  45. String(limit)
  46. ])
  47. if (!Array.isArray(res)) return []
  48. return res.map((x) => (Buffer.isBuffer(x) ? x.toString('utf8') : String(x)))
  49. }
  50. async function pruneStaleScheduled(beforeScore, now = Date.now()) {
  51. await Redis.sendCommand(['ZREMRANGEBYSCORE', SCHEDULE_KEY, '-inf', String(beforeScore)])
  52. }
  53. async function listPendingScheduledForAdmin(now = Date.now(), limitTotal = 800) {
  54. await pruneStaleScheduled(now - 48 * 3600 * 1000, now)
  55. const raw = await Redis.sendCommand([
  56. 'ZRANGEBYSCORE',
  57. SCHEDULE_KEY,
  58. `(${String(now)}`,
  59. '+inf',
  60. 'WITHSCORES',
  61. 'LIMIT',
  62. '0',
  63. String(limitTotal)
  64. ])
  65. const items = []
  66. for (let i = 0; i < raw.length; i += 2) {
  67. const score = Number(raw[i + 1])
  68. const value = Buffer.isBuffer(raw[i]) ? raw[i].toString('utf8') : raw[i]
  69. let parsed
  70. try {
  71. parsed = JSON.parse(value)
  72. } catch {
  73. parsed = { raw: value }
  74. }
  75. const meta = parsed._scheduleMeta || {}
  76. const fireAt = meta.fireAt != null ? meta.fireAt : score
  77. items.push({
  78. taskId: parsed.id,
  79. type: parsed.type,
  80. account: parsed.data?.account,
  81. name: meta.name,
  82. fireAt,
  83. score,
  84. delayMs: meta.delayMs,
  85. remainMs: Math.max(0, fireAt - now),
  86. payloadPreview: stripScheduleMeta(parsed)
  87. })
  88. }
  89. return {
  90. items,
  91. note: 'Redis 调度:未到 fireAt 前不会进入 jkes_runforge_task_queue;到期由定时任务写入 MQ。'
  92. }
  93. }
  94. async function countPendingScheduled(now = Date.now()) {
  95. const n = await Redis.sendCommand([
  96. 'ZCOUNT',
  97. SCHEDULE_KEY,
  98. `(${String(now)}`,
  99. '+inf'
  100. ])
  101. return Number(n) || 0
  102. }
  103. module.exports = {
  104. SCHEDULE_KEY,
  105. stripScheduleMeta,
  106. scheduleDelayedRunforgeTask,
  107. requeueAt,
  108. popDueMessages,
  109. listPendingScheduledForAdmin,
  110. countPendingScheduled
  111. }