CreateOrder.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. const API = require("../../lib/API.js")
  2. const db = require("../../plugin/DataBase/db.js")
  3. const Redis = require('../../plugin/DataBase/Redis')
  4. const { BaseStdResponse } = require("../../BaseStdResponse.js")
  5. const AccessControl = require("../../lib/AccessControl.js")
  6. const crypto = require('crypto')
  7. const axios = require('axios')
  8. const config = require('../../config.json')
  9. const mq = require('../../plugin/mq')
  10. const ORDER_PAYMENT_QUEUE = 'order_payment_check'
  11. let orderPaymentWorkerStarted = false
  12. async function startOrderPaymentWorker(logger) {
  13. if (orderPaymentWorkerStarted) {
  14. return
  15. }
  16. try {
  17. const ch = await mq.getChannel('order_payment')
  18. await ch.assertQueue(ORDER_PAYMENT_QUEUE, {
  19. durable: true
  20. })
  21. await ch.prefetch(1)
  22. logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
  23. orderPaymentWorkerStarted = true
  24. ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
  25. if (!msg) return
  26. const content = JSON.parse(msg.content.toString() || '{}')
  27. const { orderId } = content
  28. if (!orderId) {
  29. logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
  30. ch.ack(msg)
  31. return
  32. }
  33. try {
  34. await pollOrderPaymentStatus(orderId, logger)
  35. ch.ack(msg)
  36. } catch (err) {
  37. logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
  38. // 简单重试策略:出现异常时,Nack 并重新入队
  39. ch.nack(msg, false, true)
  40. }
  41. })
  42. } catch (e) {
  43. logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
  44. }
  45. }
  46. async function pollOrderPaymentStatus(orderId, logger) {
  47. const paymentConfig = config.pay || {}
  48. if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
  49. logger.error('支付配置错误,无法轮询易支付状态')
  50. return
  51. }
  52. const MAX_RETRIES = 60 // 5分钟 / 5秒
  53. const DELAY = 5000 // 5秒
  54. const queryUrl = `${paymentConfig.url}/api.php?act=order&pid=${paymentConfig.pid}&key=${paymentConfig.key}&out_trade_no=${orderId}`
  55. const pollOrderStatus = async (retry = 0) => {
  56. if (retry >= MAX_RETRIES) {
  57. logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
  58. await db.query('UPDATE orders SET state = 3 WHERE orderId = ?', [orderId])
  59. return
  60. }
  61. try {
  62. const queryRes = await axios.get(queryUrl)
  63. const queryData = queryRes.data
  64. if (queryData.code === 1 && queryData.status === 1) {
  65. const { trade_no, out_trade_no, type } = queryData
  66. const time = Date.now()
  67. let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
  68. const result = await db.query(sql, [type, trade_no, time, out_trade_no])
  69. if (result.affectedRows > 0) {
  70. sql = `
  71. SELECT
  72. g.lepao_count,
  73. g.ic_count,
  74. g.vip,
  75. a.create_user
  76. FROM
  77. orders a
  78. LEFT JOIN
  79. goods g
  80. ON
  81. a.goods_id = g.id
  82. WHERE
  83. a.orderId = ?
  84. `
  85. const rows = await db.query(sql, [out_trade_no])
  86. if (!rows || rows.length !== 1) {
  87. logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
  88. await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
  89. return
  90. }
  91. const { lepao_count, ic_count, vip, create_user } = rows[0]
  92. sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
  93. const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
  94. if (!updateUser || updateUser.affectedRows !== 1) {
  95. logger.error(`更新用户失败,UUID: ${create_user}`)
  96. await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
  97. }
  98. sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
  99. await db.query(sql, [out_trade_no])
  100. logger.info(`订单处理成功:${out_trade_no}`)
  101. return
  102. } else {
  103. logger.warn(`订单不存在或已处理:${out_trade_no}`)
  104. }
  105. }
  106. // 未支付,继续轮询
  107. setTimeout(() => pollOrderStatus(retry + 1), DELAY)
  108. } catch (error) {
  109. logger.warn(`轮询支付状态失败:${error.stack || error}`)
  110. setTimeout(() => pollOrderStatus(retry + 1), DELAY)
  111. }
  112. }
  113. logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
  114. pollOrderStatus()
  115. }
  116. function generateOrderId() {
  117. const now = new Date()
  118. const pad = (n, w = 2) => n.toString().padStart(w, '0')
  119. return `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}` +
  120. `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}` +
  121. `${pad(now.getMilliseconds(), 3)}`
  122. }
  123. function generatePaymentSign(params, key) {
  124. const sorted = Object.keys(params).sort()
  125. const query = sorted.map(k => `${k}=${params[k]}`).join('&') + key
  126. return crypto.createHash('md5').update(query, 'utf8').digest('hex')
  127. }
  128. class CreateOrder extends API {
  129. constructor() {
  130. super()
  131. this.setPath('/Order/CreateOrder')
  132. this.setMethod('POST')
  133. // 启动订单支付 MQ 消费者(只会启动一次)
  134. startOrderPaymentWorker(this.logger)
  135. }
  136. async onRequest(req, res) {
  137. const { uuid, session, goods_id, pay_type } = req.body
  138. if ([uuid, session, goods_id, pay_type].some(v => v === '' || v === null || v === undefined)) {
  139. return res.json({
  140. ...BaseStdResponse.MISSING_PARAMETER
  141. })
  142. }
  143. const sessionValid = await AccessControl.checkSession(uuid, session)
  144. if (!sessionValid) {
  145. return res.status(401).json({
  146. ...BaseStdResponse.ACCESS_DENIED
  147. })
  148. }
  149. try {
  150. const goodsSql = 'SELECT name, price, num, state FROM goods WHERE id = ?'
  151. const goodsRows = await db.query(goodsSql, [goods_id])
  152. if (!goodsRows || goodsRows.length !== 1) {
  153. return res.json({
  154. ...BaseStdResponse.ERR,
  155. msg: '商品不存在'
  156. })
  157. }
  158. const goods = goodsRows[0]
  159. if (goods.num < 1 || goods.state !== 1) {
  160. return res.json({
  161. ...BaseStdResponse.ERR,
  162. msg: '商品已下架或库存不足'
  163. })
  164. }
  165. const createTime = Date.now()
  166. const orderId = generateOrderId()
  167. const insertSql = `
  168. INSERT INTO orders (orderId, create_user, create_time, goods_id, price, pay_type)
  169. VALUES (?, ?, ?, ?, ?, ?)
  170. `
  171. const result = await db.query(insertSql, [
  172. orderId, uuid, createTime, goods_id, goods.price, pay_type
  173. ])
  174. const updateSql = 'UPDATE goods SET num = num - 1 WHERE id = ?'
  175. await db.query(updateSql, [goods_id])
  176. if (result && result.affectedRows > 0) {
  177. const paymentConfig = config.pay || {}
  178. if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key || !paymentConfig.return_url) {
  179. return res.json({
  180. ...BaseStdResponse.ERR,
  181. msg: '支付配置错误,请联系管理员'
  182. })
  183. }
  184. const deviceType = req.headers['device-type'] ?? '浏览器'
  185. let return_url
  186. if(deviceType === 'RunForge Uniapp Client')
  187. return_url = paymentConfig.uni_return_url + orderId
  188. else
  189. return_url = paymentConfig.return_url + orderId
  190. const payParams = {
  191. pid: paymentConfig.pid,
  192. type: pay_type,
  193. out_trade_no: orderId,
  194. notify_url: `${config.url}/Order/CallBack`,
  195. return_url,
  196. name: goods.name,
  197. money: goods.price
  198. }
  199. const sign = generatePaymentSign(payParams, paymentConfig.key)
  200. payParams.sign = sign
  201. payParams.sign_type = 'MD5'
  202. await Redis.set(`payData:${orderId}`, JSON.stringify(payParams), {
  203. EX: 300
  204. })
  205. // 下单成功后推送到订单支付检查队列,由当前模块的消费者进行轮询及超时取消
  206. try {
  207. const ch = await mq.getChannel('order_payment')
  208. await ch.assertQueue('order_payment_check', {
  209. durable: true
  210. })
  211. const msg = {
  212. orderId,
  213. enqueueTime: Date.now()
  214. }
  215. ch.sendToQueue('order_payment_check', Buffer.from(JSON.stringify(msg)), {
  216. persistent: true
  217. })
  218. } catch (error) {
  219. this.logger.error(`推送订单支付检查消息到 MQ 失败,订单号:${orderId},错误:${error.stack || error}`)
  220. }
  221. res.json({
  222. ...BaseStdResponse.OK,
  223. id: orderId,
  224. pay: {
  225. payUrl: `${paymentConfig.url}/submit.php`,
  226. payData: payParams
  227. }
  228. })
  229. } else {
  230. return res.json({
  231. ...BaseStdResponse.ERR,
  232. msg: '创建订单失败'
  233. })
  234. }
  235. } catch (err) {
  236. this.logger.error(`创建订单失败!${err.stack}`)
  237. return res.json({
  238. ...BaseStdResponse.ERR,
  239. msg: "创建订单异常,请联系管理员"
  240. })
  241. }
  242. }
  243. }
  244. module.exports.CreateOrder = CreateOrder