Browse Source

🐞 fix: 修复订单处理香港问题

Pchen. 3 days ago
parent
commit
e50ce5158a
3 changed files with 187 additions and 192 deletions
  1. 5 191
      apis/Order/CreateOrder.js
  2. 1 1
      lib/Server.js
  3. 181 0
      plugin/mq/orderPaymentWorker.js

+ 5 - 191
apis/Order/CreateOrder.js

@@ -5,182 +5,9 @@ const { BaseStdResponse } = require("../../BaseStdResponse.js")
 const AccessControl = require("../../lib/AccessControl.js")
 const AccessControl = require("../../lib/AccessControl.js")
 const crypto = require('crypto')
 const crypto = require('crypto')
 const config = require('../../config.json')
 const config = require('../../config.json')
-const mq = require('../../plugin/mq')
-const { mq: mqName } = require('../../plugin/mq/mqPrefix')
-const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger')
-const { validateCoupon, recordUsage, releaseUsageForOrder, roundMoney } = require('../../lib/CouponService')
-const { normalizePayBaseUrl, queryPaymentOrder } = require('../../lib/PaymentClient')
-
-const ORDER_PAYMENT_QUEUE = mqName('order_payment_check')
-let orderPaymentWorkerStarted = false
-
-async function writePurchaseLedger(orderId, userUuid, addCount, logger) {
-    const delta = Number(addCount || 0)
-    if (!orderId || !userUuid || delta === 0) return
-    try {
-        const userRows = await db.query(
-            'SELECT lepao_count FROM users WHERE uuid = ?',
-            [userUuid]
-        )
-        if (!userRows || userRows.length !== 1) return
-        const afterCount = Number(userRows[0].lepao_count || 0)
-        const beforeCount = afterCount - delta
-        await insertLedgerRecord({
-            userUuid,
-            delta,
-            balanceBefore: beforeCount,
-            balanceAfter: afterCount,
-            bizType: 'purchase',
-            bizId: orderId,
-            remark: `订单号:${orderId}`
-        })
-    } catch (error) {
-        logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`)
-    }
-}
-
-async function startOrderPaymentWorker(logger) {
-    if (orderPaymentWorkerStarted) {
-        return
-    }
-
-    try {
-        const ch = await mq.getChannel('order_payment')
-        await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
-            durable: true
-        })
-        await ch.prefetch(1)
-
-        logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
-
-        orderPaymentWorkerStarted = true
-
-        ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
-            if (!msg) return
-
-            const content = JSON.parse(msg.content.toString() || '{}')
-            const { orderId } = content
-
-            if (!orderId) {
-                logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
-                ch.ack(msg)
-                return
-            }
-
-            try {
-                await pollOrderPaymentStatus(orderId, logger)
-                ch.ack(msg)
-            } catch (err) {
-                logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
-                // 简单重试策略:出现异常时,Nack 并重新入队
-                ch.nack(msg, false, true)
-            }
-        })
-    } catch (e) {
-        logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
-    }
-}
-
-async function pollOrderPaymentStatus(orderId, logger) {
-    const paymentConfig = config.pay || {}
-
-    if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
-        logger.error('支付配置错误,无法轮询易支付状态')
-        return
-    }
-
-    const MAX_RETRIES = 120 // 5分钟 / 5秒
-    const DELAY = 2500 // 5秒
-
-    const pollOrderStatus = async (retry = 0) => {
-        if (retry >= MAX_RETRIES) {
-            const closeRes = await db.query(
-                'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0',
-                [orderId]
-            )
-            if (closeRes?.affectedRows > 0) {
-                await releaseUsageForOrder(orderId)
-                logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
-            }
-            return
-        }
-
-        try {
-            const existing = await db.query('SELECT state FROM orders WHERE orderId = ?', [orderId])
-            if (!existing?.length) {
-                logger.warn(`订单不存在,停止轮询:${orderId}`)
-                return
-            }
-            if (Number(existing[0].state) !== 0) {
-                logger.info(`订单已处理(state=${existing[0].state}),停止轮询:${orderId}`)
-                return
-            }
-
-            const queryData = await queryPaymentOrder(orderId, logger)
-            logger.info(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`)
-
-            if (queryData.code == 1 && queryData.status == 1) {
-                const { trade_no, out_trade_no, type } = queryData
-                const time = Date.now()
-
-                let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
-                const result = await db.query(sql, [type, trade_no, time, out_trade_no])
-
-                if (result.affectedRows > 0) {
-                    sql = `
-                                    SELECT 
-                                        g.lepao_count,
-                                        g.ic_count,
-                                        g.vip,
-                                        a.create_user
-                                    FROM 
-                                        orders a
-                                    LEFT JOIN 
-                                        goods g 
-                                    ON 
-                                        a.goods_id = g.id
-                                    WHERE 
-                                        a.orderId = ?
-                                `
-                    const rows = await db.query(sql, [out_trade_no])
-                    if (!rows || rows.length !== 1) {
-                        logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
-                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
-                        return
-                    }
-
-                    const { lepao_count, ic_count, vip, create_user } = rows[0]
-                    sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
-                    const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
-
-                    if (!updateUser || updateUser.affectedRows !== 1) {
-                        logger.error(`更新用户失败,UUID: ${create_user}`)
-                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
-                    }
-
-                    sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
-                    await db.query(sql, [out_trade_no])
-                    await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger)
-
-                    logger.info(`订单处理成功:${out_trade_no}`)
-                    return
-                }
-
-                logger.info(`支付网关已确认付款,订单已由其他进程/回调处理,停止轮询:${out_trade_no}`)
-                return
-            }
-
-            // 未支付,继续轮询
-            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
-        } catch (error) {
-            logger.warn(`轮询支付状态失败,订单号:${orderId},原因:${error.message || error}`)
-            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
-        }
-    }
-
-    logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
-    pollOrderStatus()
-}
+const { validateCoupon, recordUsage, roundMoney } = require('../../lib/CouponService')
+const { normalizePayBaseUrl } = require('../../lib/PaymentClient')
+const { enqueueOrderPaymentCheck } = require('../../plugin/mq/orderPaymentWorker')
 
 
 function generateOrderId() {
 function generateOrderId() {
     const now = new Date()
     const now = new Date()
@@ -288,7 +115,6 @@ class CreateOrder extends API {
                 }
                 }
                 couponLockKey = lockRet.lockKey
                 couponLockKey = lockRet.lockKey
 
 
-                // 拿到锁后重查一次,避免并发下单绕过“每人限用次数”
                 couponResult = await validateCoupon({
                 couponResult = await validateCoupon({
                     code: coupon_code,
                     code: coupon_code,
                     userUuid: uuid,
                     userUuid: uuid,
@@ -346,7 +172,7 @@ class CreateOrder extends API {
 
 
                 const deviceType = req.headers['device-type'] ?? '浏览器'
                 const deviceType = req.headers['device-type'] ?? '浏览器'
                 let return_url
                 let return_url
-                if(deviceType === 'RunForge Uniapp Client') 
+                if (deviceType === 'RunForge Uniapp Client')
                     return_url = paymentConfig.uni_return_url + orderId
                     return_url = paymentConfig.uni_return_url + orderId
                 else
                 else
                     return_url = paymentConfig.return_url + orderId
                     return_url = paymentConfig.return_url + orderId
@@ -369,19 +195,8 @@ class CreateOrder extends API {
                     EX: 300
                     EX: 300
                 })
                 })
 
 
-                // 下单成功后推送到订单支付检查队列,由当前模块的消费者进行轮询及超时取消
                 try {
                 try {
-                    const ch = await mq.getChannel('order_payment')
-                    await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
-                        durable: true
-                    })
-                    const msg = {
-                        orderId,
-                        enqueueTime: Date.now()
-                    }
-                    ch.sendToQueue(ORDER_PAYMENT_QUEUE, Buffer.from(JSON.stringify(msg)), {
-                        persistent: true
-                    })
+                    await enqueueOrderPaymentCheck(orderId)
                 } catch (error) {
                 } catch (error) {
                     this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
                     this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
                 }
                 }
@@ -414,4 +229,3 @@ class CreateOrder extends API {
 }
 }
 
 
 module.exports.CreateOrder = CreateOrder
 module.exports.CreateOrder = CreateOrder
-module.exports.startOrderPaymentWorker = startOrderPaymentWorker

+ 1 - 1
lib/Server.js

@@ -85,7 +85,7 @@ class SERVER {
             }
             }
 
 
             if (shouldRunOrderPaymentWorker(this.serverRole)) {
             if (shouldRunOrderPaymentWorker(this.serverRole)) {
-                const { startOrderPaymentWorker } = require('../apis/Order/CreateOrder')
+                const { startOrderPaymentWorker } = require('../plugin/mq/orderPaymentWorker')
                 await startOrderPaymentWorker(this.logger)
                 await startOrderPaymentWorker(this.logger)
             } else if (shouldServeApi(this.serverRole)) {
             } else if (shouldServeApi(this.serverRole)) {
                 this.logger.info('serverRole=api,跳过订单支付 MQ 消费者(由 all 进程消费)')
                 this.logger.info('serverRole=api,跳过订单支付 MQ 消费者(由 all 进程消费)')

+ 181 - 0
plugin/mq/orderPaymentWorker.js

@@ -0,0 +1,181 @@
+const db = require('../DataBase/db')
+const config = require('../../config.json')
+const mq = require('./index')
+const { mq: mqName } = require('./mqPrefix')
+const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger')
+const { releaseUsageForOrder } = require('../../lib/CouponService')
+const { queryPaymentOrder } = require('../../lib/PaymentClient')
+
+const ORDER_PAYMENT_QUEUE = mqName('order_payment_check')
+let orderPaymentWorkerStarted = false
+
+async function writePurchaseLedger(orderId, userUuid, addCount, logger) {
+    const delta = Number(addCount || 0)
+    if (!orderId || !userUuid || delta === 0) return
+    try {
+        const userRows = await db.query(
+            'SELECT lepao_count FROM users WHERE uuid = ?',
+            [userUuid]
+        )
+        if (!userRows || userRows.length !== 1) return
+        const afterCount = Number(userRows[0].lepao_count || 0)
+        const beforeCount = afterCount - delta
+        await insertLedgerRecord({
+            userUuid,
+            delta,
+            balanceBefore: beforeCount,
+            balanceAfter: afterCount,
+            bizType: 'purchase',
+            bizId: orderId,
+            remark: `订单号:${orderId}`
+        })
+    } catch (error) {
+        logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`)
+    }
+}
+
+async function pollOrderPaymentStatus(orderId, logger) {
+    const paymentConfig = config.pay || {}
+
+    if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
+        logger.error('支付配置错误,无法轮询易支付状态')
+        return
+    }
+
+    const MAX_RETRIES = 120
+    const DELAY = 2500
+
+    const pollOrderStatus = async (retry = 0) => {
+        if (retry >= MAX_RETRIES) {
+            const closeRes = await db.query(
+                'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0',
+                [orderId]
+            )
+            if (closeRes?.affectedRows > 0) {
+                await releaseUsageForOrder(orderId)
+                logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
+            }
+            return
+        }
+
+        try {
+            const existing = await db.query('SELECT state FROM orders WHERE orderId = ?', [orderId])
+            if (!existing?.length) {
+                logger.warn(`订单不存在,停止轮询:${orderId}`)
+                return
+            }
+            if (Number(existing[0].state) !== 0) {
+                logger.info(`订单已处理(state=${existing[0].state}),停止轮询:${orderId}`)
+                return
+            }
+
+            const queryData = await queryPaymentOrder(orderId, logger)
+            logger.info(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`)
+
+            if (queryData.code == 1 && queryData.status == 1) {
+                const { trade_no, out_trade_no, type } = queryData
+                const time = Date.now()
+
+                let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
+                const result = await db.query(sql, [type, trade_no, time, out_trade_no])
+
+                if (result.affectedRows > 0) {
+                    sql = `
+                        SELECT g.lepao_count, g.ic_count, g.vip, a.create_user
+                        FROM orders a
+                        LEFT JOIN goods g ON a.goods_id = g.id
+                        WHERE a.orderId = ?
+                    `
+                    const rows = await db.query(sql, [out_trade_no])
+                    if (!rows || rows.length !== 1) {
+                        logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
+                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
+                        return
+                    }
+
+                    const { lepao_count, ic_count, vip, create_user } = rows[0]
+                    sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
+                    const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
+
+                    if (!updateUser || updateUser.affectedRows !== 1) {
+                        logger.error(`更新用户失败,UUID: ${create_user}`)
+                        await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
+                    }
+
+                    sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
+                    await db.query(sql, [out_trade_no])
+                    await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger)
+
+                    logger.info(`订单处理成功:${out_trade_no}`)
+                    return
+                }
+
+                logger.info(`支付网关已确认付款,订单已由其他进程/回调处理,停止轮询:${out_trade_no}`)
+                return
+            }
+
+            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
+        } catch (error) {
+            logger.warn(`轮询支付状态失败,订单号:${orderId},原因:${error.message || error}`)
+            setTimeout(() => pollOrderStatus(retry + 1), DELAY)
+        }
+    }
+
+    logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
+    pollOrderStatus()
+}
+
+async function enqueueOrderPaymentCheck(orderId) {
+    const ch = await mq.getChannel('order_payment')
+    await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true })
+    ch.sendToQueue(
+        ORDER_PAYMENT_QUEUE,
+        Buffer.from(JSON.stringify({ orderId, enqueueTime: Date.now() })),
+        { persistent: true }
+    )
+}
+
+async function startOrderPaymentWorker(logger) {
+    if (orderPaymentWorkerStarted) {
+        return
+    }
+
+    try {
+        const ch = await mq.getChannel('order_payment')
+        await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true })
+        await ch.prefetch(1)
+
+        logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
+
+        orderPaymentWorkerStarted = true
+
+        ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
+            if (!msg) return
+
+            const content = JSON.parse(msg.content.toString() || '{}')
+            const { orderId } = content
+
+            if (!orderId) {
+                logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
+                ch.ack(msg)
+                return
+            }
+
+            try {
+                await pollOrderPaymentStatus(orderId, logger)
+                ch.ack(msg)
+            } catch (err) {
+                logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
+                ch.nack(msg, false, true)
+            }
+        })
+    } catch (e) {
+        logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
+    }
+}
+
+module.exports = {
+    ORDER_PAYMENT_QUEUE,
+    enqueueOrderPaymentCheck,
+    startOrderPaymentWorker
+}