const db = require('../DataBase/db') const config = require('../../config.json') const mq = require('./index') const { mq: mqName } = require('./mqPrefix') const { insertLedgerRecord } = require('../../lib/Lepao/CountLedger') const { releaseUsageForOrder } = require('../../lib/CouponService') const { queryPaymentOrder } = require('../../lib/PaymentClient') const ORDER_PAYMENT_QUEUE = mqName('order_payment_check') let orderPaymentWorkerStarted = false async function writePurchaseLedger(orderId, userUuid, addCount, logger) { const delta = Number(addCount || 0) if (!orderId || !userUuid || delta === 0) return try { const userRows = await db.query( 'SELECT lepao_count FROM users WHERE uuid = ?', [userUuid] ) if (!userRows || userRows.length !== 1) return const afterCount = Number(userRows[0].lepao_count || 0) const beforeCount = afterCount - delta await insertLedgerRecord({ userUuid, delta, balanceBefore: beforeCount, balanceAfter: afterCount, bizType: 'purchase', bizId: orderId, remark: `订单号:${orderId}` }) } catch (error) { logger?.error?.(`写入购买次数流水失败 ${orderId}: ${error.stack || error}`) } } async function pollOrderPaymentStatus(orderId, logger) { const paymentConfig = config.pay || {} if (!paymentConfig.pid || !paymentConfig.url || !paymentConfig.key) { logger.error('支付配置错误,无法轮询易支付状态') return } const MAX_RETRIES = 120 const DELAY = 2500 const pollOrderStatus = async (retry = 0) => { if (retry >= MAX_RETRIES) { const closeRes = await db.query( 'UPDATE orders SET state = 3 WHERE orderId = ? AND state = 0', [orderId] ) if (closeRes?.affectedRows > 0) { await releaseUsageForOrder(orderId) logger.info(`订单超时未支付,自动取消,订单号:${orderId}`) } return } try { const existing = await db.query('SELECT state FROM orders WHERE orderId = ?', [orderId]) if (!existing?.length) { logger.warn(`订单不存在,停止轮询:${orderId}`) return } if (Number(existing[0].state) !== 0) { logger.info(`订单已处理(state=${existing[0].state}),停止轮询:${orderId}`) return } const queryData = await queryPaymentOrder(orderId, logger) logger.info(`轮询订单支付状态,订单号:${orderId},尝试次数:${retry + 1},查询结果:${JSON.stringify(queryData)}`) if (queryData.code == 1 && queryData.status == 1) { const { trade_no, out_trade_no, type } = queryData const time = Date.now() let sql = 'UPDATE orders SET state = 1, pay_type = ?, pay_id = ?, pay_time = ? WHERE orderId = ? AND state = 0' const result = await db.query(sql, [type, trade_no, time, out_trade_no]) if (result.affectedRows > 0) { sql = ` SELECT g.lepao_count, g.ic_count, g.vip, a.create_user FROM orders a LEFT JOIN goods g ON a.goods_id = g.id WHERE a.orderId = ? ` const rows = await db.query(sql, [out_trade_no]) if (!rows || rows.length !== 1) { logger.error(`订单商品信息异常,订单号:${out_trade_no}`) await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no]) return } const { lepao_count, ic_count, vip, create_user } = rows[0] sql = 'UPDATE users SET lepao_count = lepao_count + ?, ic_count = ic_count + ?, vip = ? WHERE uuid = ?' const updateUser = await db.query(sql, [lepao_count, ic_count, vip, create_user]) if (!updateUser || updateUser.affectedRows !== 1) { logger.error(`更新用户失败,UUID: ${create_user}`) await db.query('UPDATE orders SET state = 4 WHERE orderId = ?', [out_trade_no]) } sql = 'UPDATE orders SET state = 2 WHERE orderId = ?' await db.query(sql, [out_trade_no]) await writePurchaseLedger(out_trade_no, create_user, lepao_count, logger) logger.info(`订单处理成功:${out_trade_no}`) return } logger.info(`支付网关已确认付款,订单已由其他进程/回调处理,停止轮询:${out_trade_no}`) return } setTimeout(() => pollOrderStatus(retry + 1), DELAY) } catch (error) { logger.warn(`轮询支付状态失败,订单号:${orderId},原因:${error.message || error}`) setTimeout(() => pollOrderStatus(retry + 1), DELAY) } } logger.info(`开始轮询订单支付状态,订单号:${orderId}`) pollOrderStatus() } async function enqueueOrderPaymentCheck(orderId) { const ch = await mq.getChannel('order_payment') await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true }) ch.sendToQueue( ORDER_PAYMENT_QUEUE, Buffer.from(JSON.stringify({ orderId, enqueueTime: Date.now() })), { persistent: true } ) } async function startOrderPaymentWorker(logger) { if (orderPaymentWorkerStarted) { return } try { const ch = await mq.getChannel('order_payment') await ch.assertQueue(ORDER_PAYMENT_QUEUE, { durable: true }) await ch.prefetch(1) logger.info(`订单支付结果轮询消费者已启动,队列:${ORDER_PAYMENT_QUEUE}`) orderPaymentWorkerStarted = true ch.consume(ORDER_PAYMENT_QUEUE, async (msg) => { if (!msg) return const content = JSON.parse(msg.content.toString() || '{}') const { orderId } = content if (!orderId) { logger.warn('收到无效的订单支付检查消息(缺少 orderId)') ch.ack(msg) return } try { await pollOrderPaymentStatus(orderId, logger) ch.ack(msg) } catch (err) { logger.error(`订单支付轮询处理失败,订单号:${orderId},错误:${err.stack || err}`) ch.nack(msg, false, true) } }) } catch (e) { logger.error(`启动订单支付 MQ 消费者失败:${e.stack || e}`) } } module.exports = { ORDER_PAYMENT_QUEUE, enqueueOrderPaymentCheck, startOrderPaymentWorker }