Browse Source

🐞 fix: 修复订单处理进程重复消费的问题

Pchen. 4 days ago
parent
commit
928d51be85
3 changed files with 30 additions and 7 deletions
  1. 14 5
      apis/Order/CreateOrder.js
  2. 9 1
      lib/Server.js
  3. 7 1
      lib/serverRole.js

+ 14 - 5
apis/Order/CreateOrder.js

@@ -106,6 +106,16 @@ async function pollOrderPaymentStatus(orderId, logger) {
         }
 
         try {
+            const existing = await db.query('SELECT state FROM orders WHERE orderId = ?', [orderId])
+            if (!existing?.length) {
+                logger.warn(`订单不存在,停止轮询:${orderId}`)
+                return
+            }
+            if (Number(existing[0].state) !== 0) {
+                logger.info(`订单已处理(state=${existing[0].state}),停止轮询:${orderId}`)
+                return
+            }
+
             const queryData = await queryPaymentOrder(orderId, logger)
             logger.info(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`)
 
@@ -154,9 +164,10 @@ async function pollOrderPaymentStatus(orderId, logger) {
 
                     logger.info(`订单处理成功:${out_trade_no}`)
                     return
-                } else {
-                    logger.warn(`订单不存在或已处理:${out_trade_no}`)
                 }
+
+                logger.info(`支付网关已确认付款,订单已由其他进程/回调处理,停止轮询:${out_trade_no}`)
+                return
             }
 
             // 未支付,继续轮询
@@ -206,9 +217,6 @@ class CreateOrder extends API {
         super()
         this.setPath('/Order/CreateOrder')
         this.setMethod('POST')
-
-        // 启动订单支付 MQ 消费者(只会启动一次)
-        startOrderPaymentWorker(this.logger)
     }
 
     async onRequest(req, res) {
@@ -406,3 +414,4 @@ class CreateOrder extends API {
 }
 
 module.exports.CreateOrder = CreateOrder
+module.exports.startOrderPaymentWorker = startOrderPaymentWorker

+ 9 - 1
lib/Server.js

@@ -14,7 +14,8 @@ const AccessControl = require('./AccessControl')
 const {
     resolveServerRole,
     shouldServeApi,
-    shouldRunLepaoWorker
+    shouldRunLepaoWorker,
+    shouldRunOrderPaymentWorker
 } = require('./serverRole')
 const { TASK_QUEUE } = require('../plugin/mq/runforgeTaskMq')
 const { PREFIX } = require('../plugin/mq/mqPrefix')
@@ -82,6 +83,13 @@ class SERVER {
                     `serverRole=api,跳过乐跑 Worker;乐跑任务将投递到 MQ 队列「${TASK_QUEUE}」(mqPrefix=${JSON.stringify(PREFIX)})`
                 )
             }
+
+            if (shouldRunOrderPaymentWorker(this.serverRole)) {
+                const { startOrderPaymentWorker } = require('../apis/Order/CreateOrder')
+                await startOrderPaymentWorker(this.logger)
+            } else if (shouldServeApi(this.serverRole)) {
+                this.logger.info('serverRole=api,跳过订单支付 MQ 消费者(由 all 进程消费)')
+            }
         } catch (e) {
             this.logger.error('❌ RabbitMQ 初始化失败')
             process.exit(1)

+ 7 - 1
lib/serverRole.js

@@ -22,8 +22,14 @@ function shouldRunLepaoWorker(role) {
     return role === 'all' || role === 'worker'
 }
 
+/** 订单支付 MQ 消费者仅在 all 进程运行;api 只入队不消费 */
+function shouldRunOrderPaymentWorker(role) {
+    return role === 'all'
+}
+
 module.exports = {
     resolveServerRole,
     shouldServeApi,
-    shouldRunLepaoWorker
+    shouldRunLepaoWorker,
+    shouldRunOrderPaymentWorker
 }