const API = require("../../lib/API.js") const db = require("../../plugin/DataBase/db.js") const Redis = require('../../plugin/DataBase/Redis') const { BaseStdResponse } = require("../../BaseStdResponse.js") const AccessControl = require("../../lib/AccessControl.js") const crypto = require('crypto') const config = require('../../config.json') const mq = require('../../plugin/mq') const { mq: mqName } = require('../../plugin/mq/mqPrefix') const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger') const { validateCoupon, recordUsage, releaseUsageForOrder, roundMoney } = require('../../lib/CouponService') const { normalizePayBaseUrl, queryPaymentOrder } = require('../../lib/PaymentClient') const ORDER_PAYMENT_QUEUE = mqName('order_payment_check') let orderPaymentWorkerStarted = false async function writePurchaseLedger(orderId, userUuid, addCount, logger) { const delta = Number(addCount || 0) if (!orderId || !userUuid || delta === 0) return try { const userRows = await db.query( 'SELECT lepao_count FROM users WHERE uuid = ?', [userUuid] ) if (!userRows || userRows.length !== 1) return const afterCount = Number(userRows[0].lepao_count || 0) const beforeCount = afterCount - delta await insertLedgerRecord({ userUuid, delta, balanceBefore: beforeCount, balanceAfter: afterCount, bizType: 'purchase', bizId: orderId, remark: `订单号:${orderId}` }) } catch (error) { logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`) } } async function startOrderPaymentWorker(logger) { if (orderPaymentWorkerStarted) { return } try { const ch = await mq.getChannel('order_payment') await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true }) await ch.prefetch(1) logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`) orderPaymentWorkerStarted = true ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => { if (!msg) return const content = JSON.parse(msg.content.toString() || '{}') const { orderId } = content if (!orderId) { logger.warn('收到无效的订单支付检查消息(缺少 orderId)') ch.ack(msg) return } try { await pollOrderPaymentStatus(orderId, logger) ch.ack(msg) } catch (err) { logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`) // 简单重试策略:出现异常时,Nack 并重新入队 ch.nack(msg, false, true) } }) } catch (e) { logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`) } } async function pollOrderPaymentStatus(orderId, logger) { const paymentConfig = config.pay || {} if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) { logger.error('支付配置错误,无法轮询易支付状态') return } const MAX_RETRIES = 120 // 5分钟 / 5秒 const DELAY = 2500 // 5秒 const pollOrderStatus = async (retry = 0) => { if (retry >= MAX_RETRIES) { const closeRes = await db.query( 'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0', [orderId] ) if (closeRes?.affectedRows > 0) { await releaseUsageForOrder(orderId) logger.info(`订单超时未支付,自动取消,订单号:${orderId}`) } return } try { const existing = await db.query('SELECT state FROM orders WHERE orderId = ?', [orderId]) if (!existing?.length) { logger.warn(`订单不存在,停止轮询:${orderId}`) return } if (Number(existing[0].state) !== 0) { logger.info(`订单已处理(state=${existing[0].state}),停止轮询:${orderId}`) return } const queryData = await queryPaymentOrder(orderId, logger) logger.info(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`) if (queryData.code == 1 && queryData.status == 1) { const { trade_no, out_trade_no, type } = queryData const time = Date.now() let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0' const result = await db.query(sql, [type, trade_no, time, out_trade_no]) if (result.affectedRows > 0) { sql = ` SELECT g.lepao_count, g.ic_count, g.vip, a.create_user FROM orders a LEFT JOIN goods g ON a.goods_id = g.id WHERE a.orderId = ? ` const rows = await db.query(sql, [out_trade_no]) if (!rows || rows.length !== 1) { logger.error(`订单商品信息异常,订单号:${out_trade_no}`) await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no]) return } const { lepao_count, ic_count, vip, create_user } = rows[0] sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?' const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user]) if (!updateUser || updateUser.affectedRows !== 1) { logger.error(`更新用户失败,UUID: ${create_user}`) await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no]) } sql = 'UPDATE orders SET state = 2 WHERE orderId = ?' await db.query(sql, [out_trade_no]) await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger) logger.info(`订单处理成功:${out_trade_no}`) return } logger.info(`支付网关已确认付款,订单已由其他进程/回调处理,停止轮询:${out_trade_no}`) return } // 未支付,继续轮询 setTimeout(() => pollOrderStatus(retry + 1), DELAY) } catch (error) { logger.warn(`轮询支付状态失败,订单号:${orderId},原因:${error.message || error}`) setTimeout(() => pollOrderStatus(retry + 1), DELAY) } } logger.info(`开始轮询订单支付状态,订单号:${orderId}`) pollOrderStatus() } function generateOrderId() { const now = new Date() const pad = (n, w = 2) => n.toString().padStart(w, '0') return `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}` + `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}` + `${pad(now.getMilliseconds(), 3)}` } function generatePaymentSign(params, key) { const sorted = Object.keys(params).sort() const query = sorted.map(k => `${k}=${params[k]}`).join('&') + key return crypto.createHash('md5').update(query, 'utf8').digest('hex') } async function acquireCouponUsageLock(couponId) { const lockKey = `coupon:usage:${couponId}` const rows = await db.query('SELECT GET_LOCK(?, 5) AS ok', [lockKey]) return { lockKey, ok: Number(rows?.[0]?.ok || 0) === 1 } } async function releaseCouponUsageLock(lockKey) { if (!lockKey) return try { await db.query('SELECT RELEASE_LOCK(?)', [lockKey]) } catch (e) { // 释放失败仅记录,不影响主流程 } } class CreateOrder extends API { constructor() { super() this.setPath('/Order/CreateOrder') this.setMethod('POST') } async onRequest(req, res) { const { uuid, session, goods_id, pay_type, coupon_code } = req.body if ([uuid, session, goods_id, pay_type].some(v => v === '' || v === null || v === undefined)) { return res.json({ ...BaseStdResponse.MISSING_PARAMETER }) } const sessionValid = await AccessControl.checkSession(uuid, session) if (!sessionValid) { return res.status(401).json({ ...BaseStdResponse.ACCESS_DENIED }) } let couponLockKey = null try { const goodsSql = 'SELECT name, price, num, state FROM goods WHERE id = ?' const goodsRows = await db.query(goodsSql, [goods_id]) if (!goodsRows || goodsRows.length !== 1) { return res.json({ ...BaseStdResponse.ERR, msg: '商品不存在' }) } const goods = goodsRows[0] if (goods.num < 1 || goods.state !== 1) { return res.json({ ...BaseStdResponse.ERR, msg: '商品已下架或库存不足' }) } const createTime = Date.now() const orderId = generateOrderId() const originalPrice = roundMoney(goods.price) let finalPrice = originalPrice let discountAmount = 0 let couponId = null let appliedCouponCode = null if (coupon_code && String(coupon_code).trim()) { let couponResult = await validateCoupon({ code: coupon_code, userUuid: uuid, goodsId: goods_id, goodsPrice: goods.price }) if (!couponResult.ok) { return res.json({ ...BaseStdResponse.ERR, msg: couponResult.msg }) } finalPrice = couponResult.finalPrice discountAmount = couponResult.discountAmount couponId = couponResult.couponId appliedCouponCode = couponResult.code const lockRet = await acquireCouponUsageLock(couponId) if (!lockRet.ok) { return res.json({ ...BaseStdResponse.ERR, msg: '优惠码校验繁忙,请稍后重试' }) } couponLockKey = lockRet.lockKey // 拿到锁后重查一次,避免并发下单绕过“每人限用次数” couponResult = await validateCoupon({ code: coupon_code, userUuid: uuid, goodsId: goods_id, goodsPrice: goods.price }) if (!couponResult.ok) { return res.json({ ...BaseStdResponse.ERR, msg: couponResult.msg }) } finalPrice = couponResult.finalPrice discountAmount = couponResult.discountAmount couponId = couponResult.couponId appliedCouponCode = couponResult.code } const insertSql = ` INSERT INTO orders ( orderId, create_user, create_time, goods_id, price, pay_type, original_price, discount_amount, coupon_id, coupon_code ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` const result = await db.query(insertSql, [ orderId, uuid, createTime, goods_id, finalPrice, pay_type, originalPrice, discountAmount, couponId, appliedCouponCode ]) const updateSql = 'UPDATE goods SET num = num - 1 WHERE id = ?' await db.query(updateSql, [goods_id]) if (result && result.affectedRows > 0) { if (couponId) { await recordUsage(couponId, orderId, uuid, discountAmount) } const paymentConfig = config.pay || {} if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key || !paymentConfig.return_url) { return res.json({ ...BaseStdResponse.ERR, msg: '支付配置错误,请联系管理员' }) } const payBaseUrl = normalizePayBaseUrl(paymentConfig.url) const deviceType = req.headers['device-type'] ?? '浏览器' let return_url if(deviceType === 'RunForge Uniapp Client') return_url = paymentConfig.uni_return_url + orderId else return_url = paymentConfig.return_url + orderId const payParams = { pid: paymentConfig.pid, type: pay_type, out_trade_no: orderId, notify_url: `${config.url}/Order/CallBack`, return_url, name: goods.name, money: String(finalPrice) } const sign = generatePaymentSign(payParams, paymentConfig.key) payParams.sign = sign payParams.sign_type = 'MD5' await Redis.set(`payData:${orderId}`, JSON.stringify(payParams), { EX: 300 }) // 下单成功后推送到订单支付检查队列,由当前模块的消费者进行轮询及超时取消 try { const ch = await mq.getChannel('order_payment') await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true }) const msg = { orderId, enqueueTime: Date.now() } ch.sendToQueue(ORDER_PAYMENT_QUEUE, Buffer.from(JSON.stringify(msg)), { persistent: true }) } catch (error) { this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`) } res.json({ ...BaseStdResponse.OK, id: orderId, pay: { payUrl: `${payBaseUrl}/submit.php`, payData: payParams } }) } else { return res.json({ ...BaseStdResponse.ERR, msg: '创建订单失败' }) } } catch (err) { this.logger.error(`创建订单失败!${err.stack}`) return res.json({ ...BaseStdResponse.ERR, msg: "创建订单异常,请联系管理员" }) } finally { await releaseCouponUsageLock(couponLockKey) } } } module.exports.CreateOrder = CreateOrder module.exports.startOrderPaymentWorker = startOrderPaymentWorker