| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- const mq = require('./index')
- const { publishRunforgeTask } = require('./runforgeTaskMq')
- const {
- popDueMessages,
- stripScheduleMeta,
- requeueAt,
- SCHEDULE_KEY
- } = require('./lepaoAutoScheduleRedis')
- const mqNames = require('./jkesMqNames')
- let intervalHandle = null
- /**
- * 定时将 Redis 中已到期的 JKES 乐跑任务写入 jkes_runforge_task_queue
- */
- function startLepaoSchedulePublisher(options = {}) {
- const logger = options.logger || console
- const intervalMs = options.intervalMs ?? 2000
- const batch = options.batch ?? 100
- if (intervalHandle) return
- intervalHandle = setInterval(async () => {
- try {
- const now = Date.now()
- const rawList = await popDueMessages(now, batch)
- if (!rawList.length) return
- const channel = await mq.getChannel(mqNames.channelScheduleTick)
- for (const raw of rawList) {
- let msg
- try {
- msg = JSON.parse(raw)
- } catch (e) {
- logger.error?.(
- `[LepaoSchedule] 调度 JSON 无效已丢弃: ${String(raw).slice(0, 120)}`
- )
- continue
- }
- try {
- publishRunforgeTask(channel, stripScheduleMeta(msg))
- } catch (e) {
- logger.error?.(`[LepaoSchedule] MQ 投递失败,5s 后重试: ${e.message || e}`)
- const retryAt = now + 5000
- try {
- await requeueAt(retryAt, msg)
- } catch (re2) {
- logger.error?.(`[LepaoSchedule] 写回 Redis 失败: ${re2.message || re2}`)
- }
- }
- }
- } catch (e) {
- logger.error?.(`[LepaoSchedule] tick 异常: ${e.message || e}`)
- }
- }, intervalMs)
- logger.info?.(
- `[LepaoSchedule] 已启动 Redis→MQ(JKES)调度(间隔 ${intervalMs}ms,每批最多 ${batch} 条,key=${SCHEDULE_KEY})`
- )
- }
- module.exports = { startLepaoSchedulePublisher }
|