| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- const API = require("../../lib/API.js")
- const db = require("../../plugin/DataBase/db.js")
- const Redis = require('../../plugin/DataBase/Redis')
- const { BaseStdResponse } = require("../../BaseStdResponse.js")
- const AccessControl = require("../../lib/AccessControl.js")
- const crypto = require('crypto')
- const axios = require('axios')
- const config = require('../../config.json')
- const mq = require('../../plugin/mq')
- const { mq: mqName } = require('../../plugin/mq/mqPrefix')
- const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger')
- const ORDER_PAYMENT_QUEUE = mqName('order_payment_check')
- let orderPaymentWorkerStarted = false
- async function writePurchaseLedger(orderId, userUuid, addCount, logger) {
- const delta = Number(addCount || 0)
- if (!orderId || !userUuid || delta === 0) return
- try {
- const userRows = await db.query(
- 'SELECT lepao_count FROM users WHERE uuid = ?',
- [userUuid]
- )
- if (!userRows || userRows.length !== 1) return
- const afterCount = Number(userRows[0].lepao_count || 0)
- const beforeCount = afterCount - delta
- await insertLedgerRecord({
- userUuid,
- delta,
- balanceBefore: beforeCount,
- balanceAfter: afterCount,
- bizType: 'purchase',
- bizId: orderId,
- remark: `订单号:${orderId}`
- })
- } catch (error) {
- logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`)
- }
- }
- async function startOrderPaymentWorker(logger) {
- if (orderPaymentWorkerStarted) {
- return
- }
- try {
- const ch = await mq.getChannel('order_payment')
- await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
- durable: true
- })
- await ch.prefetch(1)
- logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
- orderPaymentWorkerStarted = true
- ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
- if (!msg) return
- const content = JSON.parse(msg.content.toString() || '{}')
- const { orderId } = content
- if (!orderId) {
- logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
- ch.ack(msg)
- return
- }
- try {
- await pollOrderPaymentStatus(orderId, logger)
- ch.ack(msg)
- } catch (err) {
- logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
- // 简单重试策略:出现异常时,Nack 并重新入队
- ch.nack(msg, false, true)
- }
- })
- } catch (e) {
- logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
- }
- }
- async function pollOrderPaymentStatus(orderId, logger) {
- const paymentConfig = config.pay || {}
- if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
- logger.error('支付配置错误,无法轮询易支付状态')
- return
- }
- const MAX_RETRIES = 120 // 5分钟 / 5秒
- const DELAY = 2500 // 5秒
- const queryUrl = `${paymentConfig.url}/api.php?act=order&pid=${paymentConfig.pid}&key=${paymentConfig.key}&out_trade_no=${orderId}`
- const pollOrderStatus = async (retry = 0) => {
- if (retry >= MAX_RETRIES) {
- const closeRes = await db.query(
- 'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0',
- [orderId]
- )
- if (closeRes?.affectedRows > 0) {
- logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
- }
- return
- }
- try {
- const queryRes = await axios.get(queryUrl)
- const queryData = queryRes.data
- console.log(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`)
- if (queryData.code == 1 && queryData.status == 1) {
- const { trade_no, out_trade_no, type } = queryData
- const time = Date.now()
- let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
- const result = await db.query(sql, [type, trade_no, time, out_trade_no])
- if (result.affectedRows > 0) {
- sql = `
- SELECT
- g.lepao_count,
- g.ic_count,
- g.vip,
- a.create_user
- FROM
- orders a
- LEFT JOIN
- goods g
- ON
- a.goods_id = g.id
- WHERE
- a.orderId = ?
- `
- const rows = await db.query(sql, [out_trade_no])
- if (!rows || rows.length !== 1) {
- logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
- await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
- return
- }
- const { lepao_count, ic_count, vip, create_user } = rows[0]
- sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
- const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
- if (!updateUser || updateUser.affectedRows !== 1) {
- logger.error(`更新用户失败,UUID: ${create_user}`)
- await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
- }
- sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
- await db.query(sql, [out_trade_no])
- await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger)
- logger.info(`订单处理成功:${out_trade_no}`)
- return
- } else {
- logger.warn(`订单不存在或已处理:${out_trade_no}`)
- }
- }
- // 未支付,继续轮询
- setTimeout(() => pollOrderStatus(retry + 1), DELAY)
- } catch (error) {
- logger.warn(`轮询支付状态失败:${error.stack || error}`)
- setTimeout(() => pollOrderStatus(retry + 1), DELAY)
- }
- }
- logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
- pollOrderStatus()
- }
- function generateOrderId() {
- const now = new Date()
- const pad = (n, w = 2) => n.toString().padStart(w, '0')
- return `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}` +
- `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}` +
- `${pad(now.getMilliseconds(), 3)}`
- }
- function generatePaymentSign(params, key) {
- const sorted = Object.keys(params).sort()
- const query = sorted.map(k => `${k}=${params[k]}`).join('&') + key
- return crypto.createHash('md5').update(query, 'utf8').digest('hex')
- }
- class CreateOrder extends API {
- constructor() {
- super()
- this.setPath('/Order/CreateOrder')
- this.setMethod('POST')
- // 启动订单支付 MQ 消费者(只会启动一次)
- startOrderPaymentWorker(this.logger)
- }
- async onRequest(req, res) {
- const { uuid, session, goods_id, pay_type } = req.body
- if ([uuid, session, goods_id, pay_type].some(v => v === '' || v === null || v === undefined)) {
- return res.json({
- ...BaseStdResponse.MISSING_PARAMETER
- })
- }
- const sessionValid = await AccessControl.checkSession(uuid, session)
- if (!sessionValid) {
- return res.status(401).json({
- ...BaseStdResponse.ACCESS_DENIED
- })
- }
- try {
- const goodsSql = 'SELECT name, price, num, state FROM goods WHERE id = ?'
- const goodsRows = await db.query(goodsSql, [goods_id])
- if (!goodsRows || goodsRows.length !== 1) {
- return res.json({
- ...BaseStdResponse.ERR,
- msg: '商品不存在'
- })
- }
- const goods = goodsRows[0]
- if (goods.num < 1 || goods.state !== 1) {
- return res.json({
- ...BaseStdResponse.ERR,
- msg: '商品已下架或库存不足'
- })
- }
- const createTime = Date.now()
- const orderId = generateOrderId()
- const insertSql = `
- INSERT INTO orders (orderId, create_user, create_time, goods_id, price, pay_type)
- VALUES (?, ?, ?, ?, ?, ?)
- `
- const result = await db.query(insertSql, [
- orderId, uuid, createTime, goods_id, goods.price, pay_type
- ])
- const updateSql = 'UPDATE goods SET num = num - 1 WHERE id = ?'
- await db.query(updateSql, [goods_id])
- if (result && result.affectedRows > 0) {
- const paymentConfig = config.pay || {}
- if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key || !paymentConfig.return_url) {
- return res.json({
- ...BaseStdResponse.ERR,
- msg: '支付配置错误,请联系管理员'
- })
- }
- const deviceType = req.headers['device-type'] ?? '浏览器'
- let return_url
- if(deviceType === 'RunForge Uniapp Client')
- return_url = paymentConfig.uni_return_url + orderId
- else
- return_url = paymentConfig.return_url + orderId
- const payParams = {
- pid: paymentConfig.pid,
- type: pay_type,
- out_trade_no: orderId,
- notify_url: `${config.url}/Order/CallBack`,
- return_url,
- name: goods.name,
- money: goods.price
- }
- const sign = generatePaymentSign(payParams, paymentConfig.key)
- payParams.sign = sign
- payParams.sign_type = 'MD5'
- await Redis.set(`payData:${orderId}`, JSON.stringify(payParams), {
- EX: 300
- })
- // 下单成功后推送到订单支付检查队列,由当前模块的消费者进行轮询及超时取消
- try {
- const ch = await mq.getChannel('order_payment')
- await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
- durable: true
- })
- const msg = {
- orderId,
- enqueueTime: Date.now()
- }
- ch.sendToQueue(ORDER_PAYMENT_QUEUE, Buffer.from(JSON.stringify(msg)), {
- persistent: true
- })
- } catch (error) {
- this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
- }
- res.json({
- ...BaseStdResponse.OK,
- id: orderId,
- pay: {
- payUrl: `${paymentConfig.url}/submit.php`,
- payData: payParams
- }
- })
- } else {
- return res.json({
- ...BaseStdResponse.ERR,
- msg: '创建订单失败'
- })
- }
- } catch (err) {
- this.logger.error(`创建订单失败!${err.stack}`)
- return res.json({
- ...BaseStdResponse.ERR,
- msg: "创建订单异常,请联系管理员"
- })
- }
- }
- }
- module.exports.CreateOrder = CreateOrder
|