lepaoSchedulePublisher.js 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. const mq = require('./index')
  2. const { publishRunforgeTask } = require('./runforgeTaskMq')
  3. const {
  4. popDueMessages,
  5. stripScheduleMeta,
  6. requeueAt,
  7. SCHEDULE_KEY
  8. } = require('./lepaoAutoScheduleRedis')
  9. let intervalHandle = null
  10. /**
  11. * 定时将 Redis 中已到期的乐跑任务写入 runforge_task_queue
  12. */
  13. function startLepaoSchedulePublisher(options = {}) {
  14. const logger = options.logger || console
  15. const intervalMs = options.intervalMs ?? 2000
  16. const batch = options.batch ?? 100
  17. if (intervalHandle) return
  18. intervalHandle = setInterval(async () => {
  19. try {
  20. const now = Date.now()
  21. const rawList = await popDueMessages(now, batch)
  22. if (!rawList.length) return
  23. const channel = await mq.getChannel('lepao_schedule_tick')
  24. for (const raw of rawList) {
  25. let msg
  26. try {
  27. msg = JSON.parse(raw)
  28. } catch (e) {
  29. logger.error?.(
  30. `[LepaoSchedule] 调度 JSON 无效已丢弃 key=${SCHEDULE_KEY}: ${String(raw).slice(0, 120)}`
  31. )
  32. continue
  33. }
  34. try {
  35. publishRunforgeTask(channel, stripScheduleMeta(msg))
  36. } catch (e) {
  37. logger.error?.(`[LepaoSchedule] MQ 投递失败,5s 后重试: ${e.message || e}`)
  38. const retryAt = now + 5000
  39. try {
  40. await requeueAt(retryAt, msg)
  41. } catch (re2) {
  42. logger.error?.(`[LepaoSchedule] 写回 Redis 失败: ${re2.message || re2}`)
  43. }
  44. }
  45. }
  46. } catch (e) {
  47. logger.error?.(`[LepaoSchedule] tick 异常: ${e.message || e}`)
  48. }
  49. }, intervalMs)
  50. logger.info?.(
  51. `[LepaoSchedule] 已启动 Redis→MQ 调度(间隔 ${intervalMs}ms,每批最多 ${batch} 条,key=${SCHEDULE_KEY})`
  52. )
  53. }
  54. module.exports = { startLepaoSchedulePublisher }