lepaoSchedulePublisher.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. const mq = require('./index')
  2. const { publishRunforgeTask } = require('./runforgeTaskMq')
  3. const {
  4. popDueMessages,
  5. stripScheduleMeta,
  6. requeueAt,
  7. SCHEDULE_KEY
  8. } = require('./lepaoAutoScheduleRedis')
  9. const mqNames = require('./jkesMqNames')
  10. let intervalHandle = null
  11. /**
  12. * 定时将 Redis 中已到期的 JKES 乐跑任务写入 jkes_runforge_task_queue
  13. */
  14. function startLepaoSchedulePublisher(options = {}) {
  15. const logger = options.logger || console
  16. const intervalMs = options.intervalMs ?? 2000
  17. const batch = options.batch ?? 100
  18. if (intervalHandle) return
  19. intervalHandle = setInterval(async () => {
  20. try {
  21. const now = Date.now()
  22. const rawList = await popDueMessages(now, batch)
  23. if (!rawList.length) return
  24. const channel = await mq.getChannel(mqNames.channelScheduleTick)
  25. for (const raw of rawList) {
  26. let msg
  27. try {
  28. msg = JSON.parse(raw)
  29. } catch (e) {
  30. logger.error?.(
  31. `[LepaoSchedule] 调度 JSON 无效已丢弃: ${String(raw).slice(0, 120)}`
  32. )
  33. continue
  34. }
  35. try {
  36. publishRunforgeTask(channel, stripScheduleMeta(msg))
  37. } catch (e) {
  38. logger.error?.(`[LepaoSchedule] MQ 投递失败,5s 后重试: ${e.message || e}`)
  39. const retryAt = now + 5000
  40. try {
  41. await requeueAt(retryAt, msg)
  42. } catch (re2) {
  43. logger.error?.(`[LepaoSchedule] 写回 Redis 失败: ${re2.message || re2}`)
  44. }
  45. }
  46. }
  47. } catch (e) {
  48. logger.error?.(`[LepaoSchedule] tick 异常: ${e.message || e}`)
  49. }
  50. }, intervalMs)
  51. logger.info?.(
  52. `[LepaoSchedule] 已启动 Redis→MQ(JKES)调度(间隔 ${intervalMs}ms,每批最多 ${batch} 条,key=${SCHEDULE_KEY})`
  53. )
  54. }
  55. module.exports = { startLepaoSchedulePublisher }