CreateOrder.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. const API = require("../../lib/API.js")
  2. const db = require("../../plugin/DataBase/db.js")
  3. const Redis = require('../../plugin/DataBase/Redis')
  4. const { BaseStdResponse } = require("../../BaseStdResponse.js")
  5. const AccessControl = require("../../lib/AccessControl.js")
  6. const crypto = require('crypto')
  7. const axios = require('axios')
  8. const config = require('../../config.json')
  9. const mq = require('../../plugin/mq')
  10. const { mq: mqName } = require('../../plugin/mq/mqPrefix')
  11. const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger')
  12. const ORDER_PAYMENT_QUEUE = mqName('order_payment_check')
  13. let orderPaymentWorkerStarted = false
  14. async function writePurchaseLedger(orderId, userUuid, addCount, logger) {
  15. const delta = Number(addCount || 0)
  16. if (!orderId || !userUuid || delta === 0) return
  17. try {
  18. const userRows = await db.query(
  19. 'SELECT lepao_count FROM users WHERE uuid = ?',
  20. [userUuid]
  21. )
  22. if (!userRows || userRows.length !== 1) return
  23. const afterCount = Number(userRows[0].lepao_count || 0)
  24. const beforeCount = afterCount - delta
  25. await insertLedgerRecord({
  26. userUuid,
  27. delta,
  28. balanceBefore: beforeCount,
  29. balanceAfter: afterCount,
  30. bizType: 'purchase',
  31. bizId: orderId,
  32. remark: `订单号:${orderId}`
  33. })
  34. } catch (error) {
  35. logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`)
  36. }
  37. }
  38. async function startOrderPaymentWorker(logger) {
  39. if (orderPaymentWorkerStarted) {
  40. return
  41. }
  42. try {
  43. const ch = await mq.getChannel('order_payment')
  44. await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
  45. durable: true
  46. })
  47. await ch.prefetch(1)
  48. logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
  49. orderPaymentWorkerStarted = true
  50. ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
  51. if (!msg) return
  52. const content = JSON.parse(msg.content.toString() || '{}')
  53. const { orderId } = content
  54. if (!orderId) {
  55. logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
  56. ch.ack(msg)
  57. return
  58. }
  59. try {
  60. await pollOrderPaymentStatus(orderId, logger)
  61. ch.ack(msg)
  62. } catch (err) {
  63. logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
  64. // 简单重试策略:出现异常时,Nack 并重新入队
  65. ch.nack(msg, false, true)
  66. }
  67. })
  68. } catch (e) {
  69. logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
  70. }
  71. }
  72. async function pollOrderPaymentStatus(orderId, logger) {
  73. const paymentConfig = config.pay || {}
  74. if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
  75. logger.error('支付配置错误,无法轮询易支付状态')
  76. return
  77. }
  78. const MAX_RETRIES = 120 // 5分钟 / 5秒
  79. const DELAY = 2500 // 5秒
  80. const queryUrl = `${paymentConfig.url}/api.php?act=order&pid=${paymentConfig.pid}&key=${paymentConfig.key}&out_trade_no=${orderId}`
  81. const pollOrderStatus = async (retry = 0) => {
  82. if (retry >= MAX_RETRIES) {
  83. const closeRes = await db.query(
  84. 'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0',
  85. [orderId]
  86. )
  87. if (closeRes?.affectedRows > 0) {
  88. logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
  89. }
  90. return
  91. }
  92. try {
  93. const queryRes = await axios.get(queryUrl)
  94. const queryData = queryRes.data
  95. console.log(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`)
  96. if (queryData.code == 1 && queryData.status == 1) {
  97. const { trade_no, out_trade_no, type } = queryData
  98. const time = Date.now()
  99. let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
  100. const result = await db.query(sql, [type, trade_no, time, out_trade_no])
  101. if (result.affectedRows > 0) {
  102. sql = `
  103. SELECT
  104. g.lepao_count,
  105. g.ic_count,
  106. g.vip,
  107. a.create_user
  108. FROM
  109. orders a
  110. LEFT JOIN
  111. goods g
  112. ON
  113. a.goods_id = g.id
  114. WHERE
  115. a.orderId = ?
  116. `
  117. const rows = await db.query(sql, [out_trade_no])
  118. if (!rows || rows.length !== 1) {
  119. logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
  120. await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
  121. return
  122. }
  123. const { lepao_count, ic_count, vip, create_user } = rows[0]
  124. sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
  125. const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
  126. if (!updateUser || updateUser.affectedRows !== 1) {
  127. logger.error(`更新用户失败,UUID: ${create_user}`)
  128. await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
  129. }
  130. sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
  131. await db.query(sql, [out_trade_no])
  132. await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger)
  133. logger.info(`订单处理成功:${out_trade_no}`)
  134. return
  135. } else {
  136. logger.warn(`订单不存在或已处理:${out_trade_no}`)
  137. }
  138. }
  139. // 未支付,继续轮询
  140. setTimeout(() => pollOrderStatus(retry + 1), DELAY)
  141. } catch (error) {
  142. logger.warn(`轮询支付状态失败:${error.stack || error}`)
  143. setTimeout(() => pollOrderStatus(retry + 1), DELAY)
  144. }
  145. }
  146. logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
  147. pollOrderStatus()
  148. }
  149. function generateOrderId() {
  150. const now = new Date()
  151. const pad = (n, w = 2) => n.toString().padStart(w, '0')
  152. return `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}` +
  153. `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}` +
  154. `${pad(now.getMilliseconds(), 3)}`
  155. }
  156. function generatePaymentSign(params, key) {
  157. const sorted = Object.keys(params).sort()
  158. const query = sorted.map(k => `${k}=${params[k]}`).join('&') + key
  159. return crypto.createHash('md5').update(query, 'utf8').digest('hex')
  160. }
  161. class CreateOrder extends API {
  162. constructor() {
  163. super()
  164. this.setPath('/Order/CreateOrder')
  165. this.setMethod('POST')
  166. // 启动订单支付 MQ 消费者(只会启动一次)
  167. startOrderPaymentWorker(this.logger)
  168. }
  169. async onRequest(req, res) {
  170. const { uuid, session, goods_id, pay_type } = req.body
  171. if ([uuid, session, goods_id, pay_type].some(v => v === '' || v === null || v === undefined)) {
  172. return res.json({
  173. ...BaseStdResponse.MISSING_PARAMETER
  174. })
  175. }
  176. const sessionValid = await AccessControl.checkSession(uuid, session)
  177. if (!sessionValid) {
  178. return res.status(401).json({
  179. ...BaseStdResponse.ACCESS_DENIED
  180. })
  181. }
  182. try {
  183. const goodsSql = 'SELECT name, price, num, state FROM goods WHERE id = ?'
  184. const goodsRows = await db.query(goodsSql, [goods_id])
  185. if (!goodsRows || goodsRows.length !== 1) {
  186. return res.json({
  187. ...BaseStdResponse.ERR,
  188. msg: '商品不存在'
  189. })
  190. }
  191. const goods = goodsRows[0]
  192. if (goods.num < 1 || goods.state !== 1) {
  193. return res.json({
  194. ...BaseStdResponse.ERR,
  195. msg: '商品已下架或库存不足'
  196. })
  197. }
  198. const createTime = Date.now()
  199. const orderId = generateOrderId()
  200. const insertSql = `
  201. INSERT INTO orders (orderId, create_user, create_time, goods_id, price, pay_type)
  202. VALUES (?, ?, ?, ?, ?, ?)
  203. `
  204. const result = await db.query(insertSql, [
  205. orderId, uuid, createTime, goods_id, goods.price, pay_type
  206. ])
  207. const updateSql = 'UPDATE goods SET num = num - 1 WHERE id = ?'
  208. await db.query(updateSql, [goods_id])
  209. if (result && result.affectedRows > 0) {
  210. const paymentConfig = config.pay || {}
  211. if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key || !paymentConfig.return_url) {
  212. return res.json({
  213. ...BaseStdResponse.ERR,
  214. msg: '支付配置错误,请联系管理员'
  215. })
  216. }
  217. const deviceType = req.headers['device-type'] ?? '浏览器'
  218. let return_url
  219. if(deviceType === 'RunForge Uniapp Client')
  220. return_url = paymentConfig.uni_return_url + orderId
  221. else
  222. return_url = paymentConfig.return_url + orderId
  223. const payParams = {
  224. pid: paymentConfig.pid,
  225. type: pay_type,
  226. out_trade_no: orderId,
  227. notify_url: `${config.url}/Order/CallBack`,
  228. return_url,
  229. name: goods.name,
  230. money: goods.price
  231. }
  232. const sign = generatePaymentSign(payParams, paymentConfig.key)
  233. payParams.sign = sign
  234. payParams.sign_type = 'MD5'
  235. await Redis.set(`payData:${orderId}`, JSON.stringify(payParams), {
  236. EX: 300
  237. })
  238. // 下单成功后推送到订单支付检查队列,由当前模块的消费者进行轮询及超时取消
  239. try {
  240. const ch = await mq.getChannel('order_payment')
  241. await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
  242. durable: true
  243. })
  244. const msg = {
  245. orderId,
  246. enqueueTime: Date.now()
  247. }
  248. ch.sendToQueue(ORDER_PAYMENT_QUEUE, Buffer.from(JSON.stringify(msg)), {
  249. persistent: true
  250. })
  251. } catch (error) {
  252. this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
  253. }
  254. res.json({
  255. ...BaseStdResponse.OK,
  256. id: orderId,
  257. pay: {
  258. payUrl: `${paymentConfig.url}/submit.php`,
  259. payData: payParams
  260. }
  261. })
  262. } else {
  263. return res.json({
  264. ...BaseStdResponse.ERR,
  265. msg: '创建订单失败'
  266. })
  267. }
  268. } catch (err) {
  269. this.logger.error(`创建订单失败!${err.stack}`)
  270. return res.json({
  271. ...BaseStdResponse.ERR,
  272. msg: "创建订单异常,请联系管理员"
  273. })
  274. }
  275. }
  276. }
  277. module.exports.CreateOrder = CreateOrder