Browse Source

🐞 fix: 订单轮询引入mq

Pchen. 1 month ago
parent
commit
77714ec2b3
1 changed files with 151 additions and 85 deletions
  1. 151 85
      apis/Order/CreateOrder.js

+ 151 - 85
apis/Order/CreateOrder.js

@@ -6,6 +6,137 @@ const AccessControl = require("../../lib/AccessControl.js")
 const crypto = require('crypto')
 const axios = require('axios')
 const config = require('../../config.json')
+const mq = require('../../plugin/mq')
+
+const ORDER_PAYMENT_QUEUE = 'order_payment_check'
+let orderPaymentWorkerStarted = false
+
+async function startOrderPaymentWorker(logger) {
+    if (orderPaymentWorkerStarted) {
+        return
+    }
+
+    try {
+        const ch = await mq.getChannel('order_payment')
+        await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
+            durable: true
+        })
+        await ch.prefetch(1)
+
+        logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
+
+        orderPaymentWorkerStarted = true
+
+        ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
+            if (!msg) return
+
+            const content = JSON.parse(msg.content.toString() || '{}')
+            const { orderId } = content
+
+            if (!orderId) {
+                logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
+                ch.ack(msg)
+                return
+            }
+
+            try {
+                await pollOrderPaymentStatus(orderId, logger)
+                ch.ack(msg)
+            } catch (err) {
+                logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
+                // 简单重试策略:出现异常时,Nack 并重新入队
+                ch.nack(msg, false, true)
+            }
+        })
+    } catch (e) {
+        logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
+    }
+}
+
+async function pollOrderPaymentStatus(orderId, logger) {
+    const paymentConfig = config.pay || {}
+
+    if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
+        logger.error('支付配置错误,无法轮询易支付状态')
+        return
+    }
+
+    const MAX_RETRIES = 60 // 5分钟 / 5秒
+    const DELAY = 5000 // 5秒
+
+    const queryUrl = `${paymentConfig.url}/api.php?act=order&pid=${paymentConfig.pid}&key=${paymentConfig.key}&out_trade_no=${orderId}`
+
+    const pollOrderStatus = async (retry = 0) => {
+        if (retry >= MAX_RETRIES) {
+            logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
+            await db.query('UPDATE orders SET state = 3 WHERE orderId = ?', [orderId])
+            return
+        }
+
+        try {
+            const queryRes = await axios.get(queryUrl)
+            const queryData = queryRes.data
+
+            if (queryData.code === 1 && queryData.status === 1) {
+                const { trade_no, out_trade_no, type } = queryData
+                const time = Date.now()
+
+                let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
+                const result = await db.query(sql, [type, trade_no, time, out_trade_no])
+
+                if (result.affectedRows > 0) {
+                    sql = `
+                                    SELECT 
+                                        g.lepao_count,
+                                        g.ic_count,
+                                        g.vip,
+                                        a.create_user
+                                    FROM 
+                                        orders a
+                                    LEFT JOIN 
+                                        goods g 
+                                    ON 
+                                        a.goods_id = g.id
+                                    WHERE 
+                                        a.orderId = ?
+                                `
+                    const rows = await db.query(sql, [out_trade_no])
+                    if (!rows || rows.length !== 1) {
+                        logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
+                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
+                        return
+                    }
+
+                    const { lepao_count, ic_count, vip, create_user } = rows[0]
+                    sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
+                    const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
+
+                    if (!updateUser || updateUser.affectedRows !== 1) {
+                        logger.error(`更新用户失败,UUID: ${create_user}`)
+                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
+                    }
+
+                    sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
+                    await db.query(sql, [out_trade_no])
+
+                    logger.info(`订单处理成功:${out_trade_no}`)
+                    return
+                } else {
+                    logger.warn(`订单不存在或已处理:${out_trade_no}`)
+                }
+            }
+
+            // 未支付,继续轮询
+            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
+        } catch (error) {
+            logger.warn(`轮询支付状态失败:${error.stack || error}`)
+            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
+        }
+    }
+
+    logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
+    pollOrderStatus()
+}
 
 function generateOrderId() {
     const now = new Date()
@@ -27,6 +158,9 @@ class CreateOrder extends API {
         super()
         this.setPath('/Order/CreateOrder')
         this.setMethod('POST')
+
+        // 启动订单支付 MQ 消费者(只会启动一次)
+        startOrderPaymentWorker(this.logger)
     }
 
     async onRequest(req, res) {
@@ -113,6 +247,23 @@ class CreateOrder extends API {
                     EX: 300
                 })
 
+                // 下单成功后推送到订单支付检查队列,由当前模块的消费者进行轮询及超时取消
+                try {
+                    const ch = await mq.getChannel('order_payment')
+                    await ch.assertQueue('order_payment_check', {
+                        durable: true
+                    })
+                    const msg = {
+                        orderId,
+                        enqueueTime: Date.now()
+                    }
+                    ch.sendToQueue('order_payment_check', Buffer.from(JSON.stringify(msg)), {
+                        persistent: true
+                    })
+                } catch (error) {
+                    this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
+                }
+
                 res.json({
                     ...BaseStdResponse.OK,
                     id: orderId,
@@ -122,91 +273,6 @@ class CreateOrder extends API {
                     }
                 })
 
-                // 定时器轮询订单状态
-                try {
-                    // 定时轮询订单状态,最多持续5分钟(300秒),每次间隔5秒
-                    const MAX_RETRIES = 60 // 5分钟 / 5秒
-                    const DELAY = 5000 // 5秒
-
-                    const queryUrl = `${paymentConfig.url}/api.php?act=order&pid=${paymentConfig.pid}&key=${paymentConfig.key}&out_trade_no=${orderId}`
-
-                    const pollOrderStatus = async (retry = 0) => {
-                        if (retry >= MAX_RETRIES) {
-                            this.logger.info(`订单超时未支付,订单号:${orderId}`)
-                            await db.query('UPDATE orders SET state = 3 WHERE orderId = ?', [orderId]);
-                            return
-                        }
-
-                        try {
-                            const queryRes = await axios.get(queryUrl)
-                            const queryData = queryRes.data
-
-                            if (queryData.code === 1 && queryData.status === 1) {
-                                const { trade_no, out_trade_no, type } = queryData
-                                const time = Date.now()
-
-                                let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
-                                const result = await db.query(sql, [type, trade_no, time, out_trade_no])
-
-                                if (result.affectedRows > 0) {
-                                    // 查询订单与商品信息
-                                    sql = `
-                                    SELECT 
-                                        g.lepao_count,
-                                        g.ic_count,
-                                        g.vip,
-                                        a.create_user
-                                    FROM 
-                                        orders a
-                                    LEFT JOIN 
-                                        goods g 
-                                    ON 
-                                        a.goods_id = g.id
-                                    WHERE 
-                                        a.orderId = ?
-                                `
-                                    const rows = await db.query(sql, [out_trade_no])
-                                    if (!rows || rows.length !== 1) {
-                                        this.logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
-                                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
-                                        return
-                                    }
-
-                                    const { lepao_count, ic_count, vip, create_user } = rows[0]
-                                    sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
-                                    const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
-
-                                    if (!updateUser || updateUser.affectedRows !== 1) {
-                                        this.logger.error(`更新用户失败,UUID: ${create_user}`)
-                                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
-                                    }
-
-                                    sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
-                                    await db.query(sql, [out_trade_no])
-
-                                    this.logger.info(`订单处理成功:${out_trade_no}`)
-                                    return // 成功处理后终止轮询
-                                } else {
-                                    this.logger.warn(`订单不存在或已处理:${out_trade_no}`)
-                                }
-                            }
-
-                            // 未支付,继续轮询
-                            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
-
-                        } catch (error) {
-                            this.logger.warn(`轮询支付状态失败:${error.stack}`)
-                            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
-                        }
-                    }
-
-                    // 启动轮询
-                    pollOrderStatus()
-
-                } catch {
-                    this.logger.info(`获取订单支付状态失败!${error.stack}`)
-                }
-
             } else {
                 return res.json({
                     ...BaseStdResponse.ERR,