orderPaymentWorker.js 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. const db = require('../DataBase/db')
  2. const config = require('../../config.json')
  3. const mq = require('./index')
  4. const { mq: mqName } = require('./mqPrefix')
  5. const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger')
  6. const { releaseUsageForOrder } = require('../../lib/CouponService')
  7. const { queryPaymentOrder } = require('../../lib/PaymentClient')
  8. const ORDER_PAYMENT_QUEUE = mqName('order_payment_check')
  9. let orderPaymentWorkerStarted = false
  10. async function writePurchaseLedger(orderId, userUuid, addCount, logger) {
  11. const delta = Number(addCount || 0)
  12. if (!orderId || !userUuid || delta === 0) return
  13. try {
  14. const userRows = await db.query(
  15. 'SELECT lepao_count FROM users WHERE uuid = ?',
  16. [userUuid]
  17. )
  18. if (!userRows || userRows.length !== 1) return
  19. const afterCount = Number(userRows[0].lepao_count || 0)
  20. const beforeCount = afterCount - delta
  21. await insertLedgerRecord({
  22. userUuid,
  23. delta,
  24. balanceBefore: beforeCount,
  25. balanceAfter: afterCount,
  26. bizType: 'purchase',
  27. bizId: orderId,
  28. remark: `订单号:${orderId}`
  29. })
  30. } catch (error) {
  31. logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`)
  32. }
  33. }
  34. async function pollOrderPaymentStatus(orderId, logger) {
  35. const paymentConfig = config.pay || {}
  36. if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) {
  37. logger.error('支付配置错误,无法轮询易支付状态')
  38. return
  39. }
  40. const MAX_RETRIES = 120
  41. const DELAY = 2500
  42. const pollOrderStatus = async (retry = 0) => {
  43. if (retry >= MAX_RETRIES) {
  44. const closeRes = await db.query(
  45. 'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0',
  46. [orderId]
  47. )
  48. if (closeRes?.affectedRows > 0) {
  49. await releaseUsageForOrder(orderId)
  50. logger.info(`订单超时未支付,自动取消,订单号:${orderId}`)
  51. }
  52. return
  53. }
  54. try {
  55. const existing = await db.query('SELECT state FROM orders WHERE orderId = ?', [orderId])
  56. if (!existing?.length) {
  57. logger.warn(`订单不存在,停止轮询:${orderId}`)
  58. return
  59. }
  60. if (Number(existing[0].state) !== 0) {
  61. logger.info(`订单已处理(state=${existing[0].state}),停止轮询:${orderId}`)
  62. return
  63. }
  64. const queryData = await queryPaymentOrder(orderId, logger)
  65. logger.info(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`)
  66. if (queryData.code == 1 && queryData.status == 1) {
  67. const { trade_no, out_trade_no, type } = queryData
  68. const time = Date.now()
  69. let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0'
  70. const result = await db.query(sql, [type, trade_no, time, out_trade_no])
  71. if (result.affectedRows > 0) {
  72. sql = `
  73. SELECT g.lepao_count, g.ic_count, g.vip, a.create_user
  74. FROM orders a
  75. LEFT JOIN goods g ON a.goods_id = g.id
  76. WHERE a.orderId = ?
  77. `
  78. const rows = await db.query(sql, [out_trade_no])
  79. if (!rows || rows.length !== 1) {
  80. logger.error(`订单商品信息异常,订单号:${out_trade_no}`)
  81. await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
  82. return
  83. }
  84. const { lepao_count, ic_count, vip, create_user } = rows[0]
  85. sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?'
  86. const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user])
  87. if (!updateUser || updateUser.affectedRows !== 1) {
  88. logger.error(`更新用户失败,UUID: ${create_user}`)
  89. await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no])
  90. }
  91. sql = 'UPDATE orders SET state = 2 WHERE orderId = ?'
  92. await db.query(sql, [out_trade_no])
  93. await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger)
  94. logger.info(`订单处理成功:${out_trade_no}`)
  95. return
  96. }
  97. logger.info(`支付网关已确认付款,订单已由其他进程/回调处理,停止轮询:${out_trade_no}`)
  98. return
  99. }
  100. setTimeout(() => pollOrderStatus(retry + 1), DELAY)
  101. } catch (error) {
  102. logger.warn(`轮询支付状态失败,订单号:${orderId},原因:${error.message || error}`)
  103. setTimeout(() => pollOrderStatus(retry + 1), DELAY)
  104. }
  105. }
  106. logger.info(`开始轮询订单支付状态,订单号:${orderId}`)
  107. pollOrderStatus()
  108. }
  109. async function enqueueOrderPaymentCheck(orderId) {
  110. const ch = await mq.getChannel('order_payment')
  111. await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true })
  112. ch.sendToQueue(
  113. ORDER_PAYMENT_QUEUE,
  114. Buffer.from(JSON.stringify({ orderId, enqueueTime: Date.now() })),
  115. { persistent: true }
  116. )
  117. }
  118. async function startOrderPaymentWorker(logger) {
  119. if (orderPaymentWorkerStarted) {
  120. return
  121. }
  122. try {
  123. const ch = await mq.getChannel('order_payment')
  124. await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true })
  125. await ch.prefetch(1)
  126. logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`)
  127. orderPaymentWorkerStarted = true
  128. ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => {
  129. if (!msg) return
  130. const content = JSON.parse(msg.content.toString() || '{}')
  131. const { orderId } = content
  132. if (!orderId) {
  133. logger.warn('收到无效的订单支付检查消息(缺少 orderId)')
  134. ch.ack(msg)
  135. return
  136. }
  137. try {
  138. await pollOrderPaymentStatus(orderId, logger)
  139. ch.ack(msg)
  140. } catch (err) {
  141. logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`)
  142. ch.nack(msg, false, true)
  143. }
  144. })
  145. } catch (e) {
  146. logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`)
  147. }
  148. }
  149. module.exports = {
  150. ORDER_PAYMENT_QUEUE,
  151. enqueueOrderPaymentCheck,
  152. startOrderPaymentWorker
  153. }