runforgeTaskMq.js 972 B

123456789101112131415161718192021222324252627282930313233
  1. const { mq } = require('./mqPrefix')
  2. const TASK_QUEUE = mq('runforge_task_queue')
  3. /**
  4. * 声明乐跑任务主队列(无 RabbitMQ 插件)
  5. */
  6. async function assertRunforgeTaskIngress(channel, logger) {
  7. await channel.assertQueue(TASK_QUEUE, { durable: true })
  8. return { mode: 'direct', queue: TASK_QUEUE }
  9. }
  10. /**
  11. * 投递乐跑任务 JSON 消息体(与 Worker 消费格式一致)
  12. */
  13. function publishRunforgeTask(channel, messageObject, logger) {
  14. const body = Buffer.from(JSON.stringify(messageObject))
  15. const ok = channel.sendToQueue(TASK_QUEUE, body, {
  16. persistent: true,
  17. contentType: 'application/json'
  18. })
  19. if (!ok) {
  20. throw new Error(`MQ 背压,未能写入队列 ${TASK_QUEUE}`)
  21. }
  22. logger?.info?.(
  23. `乐跑任务已投递 MQ queue=${TASK_QUEUE} id=${messageObject?.id} type=${messageObject?.type}`
  24. )
  25. }
  26. module.exports = {
  27. TASK_QUEUE,
  28. assertRunforgeTaskIngress,
  29. publishRunforgeTask
  30. }