const mq = require('./index') const { publishRunforgeTask } = require('./runforgeTaskMq') const { popDueMessages, stripScheduleMeta, requeueAt, SCHEDULE_KEY } = require('./lepaoAutoScheduleRedis') let intervalHandle = null /** * 定时将 Redis 中已到期的乐跑任务写入 runforge_task_queue */ function startLepaoSchedulePublisher(options = {}) { const logger = options.logger || console const intervalMs = options.intervalMs ?? 2000 const batch = options.batch ?? 100 if (intervalHandle) return intervalHandle = setInterval(async () => { try { const now = Date.now() const rawList = await popDueMessages(now, batch) if (!rawList.length) return const channel = await mq.getChannel('lepao_schedule_tick') for (const raw of rawList) { let msg try { msg = JSON.parse(raw) } catch (e) { logger.error?.( `[LepaoSchedule] 调度 JSON 无效已丢弃 key=${SCHEDULE_KEY}: ${String(raw).slice(0, 120)}` ) continue } try { publishRunforgeTask(channel, stripScheduleMeta(msg)) } catch (e) { logger.error?.(`[LepaoSchedule] MQ 投递失败,5s 后重试: ${e.message || e}`) const retryAt = now + 5000 try { await requeueAt(retryAt, msg) } catch (re2) { logger.error?.(`[LepaoSchedule] 写回 Redis 失败: ${re2.message || re2}`) } } } } catch (e) { logger.error?.(`[LepaoSchedule] tick 异常: ${e.message || e}`) } }, intervalMs) logger.info?.( `[LepaoSchedule] 已启动 Redis→MQ 调度(间隔 ${intervalMs}ms,每批最多 ${batch} 条,key=${SCHEDULE_KEY})` ) } module.exports = { startLepaoSchedulePublisher }