| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- const mq = require('./index')
- const { publishRunforgeTask } = require('./runforgeTaskMq')
- const {
- popDueMessages,
- stripScheduleMeta,
- requeueAt,
- SCHEDULE_KEY
- } = require('./lepaoAutoScheduleRedis')
- let intervalHandle = null
- /**
- * 定时将 Redis 中已到期的乐跑任务写入 runforge_task_queue
- */
- function startLepaoSchedulePublisher(options = {}) {
- const logger = options.logger || console
- const intervalMs = options.intervalMs ?? 2000
- const batch = options.batch ?? 100
- if (intervalHandle) return
- intervalHandle = setInterval(async () => {
- try {
- const now = Date.now()
- const rawList = await popDueMessages(now, batch)
- if (!rawList.length) return
- const channel = await mq.getChannel('lepao_schedule_tick')
- for (const raw of rawList) {
- let msg
- try {
- msg = JSON.parse(raw)
- } catch (e) {
- logger.error?.(
- `[LepaoSchedule] 调度 JSON 无效已丢弃 key=${SCHEDULE_KEY}: ${String(raw).slice(0, 120)}`
- )
- continue
- }
- try {
- publishRunforgeTask(channel, stripScheduleMeta(msg))
- } catch (e) {
- logger.error?.(`[LepaoSchedule] MQ 投递失败,5s 后重试: ${e.message || e}`)
- const retryAt = now + 5000
- try {
- await requeueAt(retryAt, msg)
- } catch (re2) {
- logger.error?.(`[LepaoSchedule] 写回 Redis 失败: ${re2.message || re2}`)
- }
- }
- }
- } catch (e) {
- logger.error?.(`[LepaoSchedule] tick 异常: ${e.message || e}`)
- }
- }, intervalMs)
- logger.info?.(
- `[LepaoSchedule] 已启动 Redis→MQ 调度(间隔 ${intervalMs}ms,每批最多 ${batch} 条,key=${SCHEDULE_KEY})`
- )
- }
- module.exports = { startLepaoSchedulePublisher }
|