Browse Source

✨ feat: 新增队列查看功能

Pchen. 1 month ago
parent
commit
587ab52949
1 changed files with 231 additions and 0 deletions
  1. 231 0
      apis/User/Admin/GetQueueTasks.js

+ 231 - 0
apis/User/Admin/GetQueueTasks.js

@@ -0,0 +1,231 @@
+const API = require('../../../lib/API')
+const axios = require('axios')
+const mq = require('../../../plugin/mq')
+const config = require('../../../config.json')
+const AccessControl = require('../../../lib/AccessControl')
+const { BaseStdResponse } = require('../../../BaseStdResponse')
+
+/** 允许通过管理接口查看的队列(防任意队列名探测) */
+const ALLOWED_QUEUES = [
+    'runforge_task_queue',
+    'runforge_task_result_queue',
+    'runforge_task_dead_queue',
+    'runforge_message_queue',
+    'order_payment_check',
+    'mq_health_check'
+]
+
+function parseAmqpHttpBase(amqpUrl) {
+    const u = new URL(String(amqpUrl).replace(/^amqp:/, 'http:'))
+    return {
+        user: decodeURIComponent(u.username || ''),
+        password: decodeURIComponent(u.password || ''),
+        hostname: u.hostname
+    }
+}
+
+function managementEndpoint(queueName) {
+    const rm = config.rabbitmq || {}
+    const creds = parseAmqpHttpBase(rm.url || '')
+    const base = (rm.managementBaseUrl || `http://${creds.hostname}:15672`).replace(/\/$/, '')
+    const vhost = encodeURIComponent(rm.vhost != null ? rm.vhost : '/')
+    const q = encodeURIComponent(queueName)
+    return {
+        url: `${base}/api/queues/${vhost}/${q}/get`,
+        auth: {
+            username: rm.managementUser || creds.user,
+            password: rm.managementPassword || creds.password
+        }
+    }
+}
+
+function decodePayload(msg) {
+    const enc = msg.payload_encoding || 'string'
+    let raw = msg.payload
+    if (enc === 'base64' && typeof raw === 'string') {
+        try {
+            raw = Buffer.from(raw, 'base64').toString('utf8')
+        } catch {
+            return { raw: msg.payload, encoding: enc, parseError: true }
+        }
+    }
+    if (typeof raw === 'string') {
+        try {
+            return { body: JSON.parse(raw), encoding: enc }
+        } catch {
+            return { body: raw, encoding: enc }
+        }
+    }
+    return { body: raw, encoding: enc }
+}
+
+/**
+ * 通过 Management API 窥视队列消息(reject_requeue_true:看完后重新入队,不消费)
+ */
+async function peekQueueMessages(queueName, limit) {
+    const { url, auth } = managementEndpoint(queueName)
+    const { data, status } = await axios.post(
+        url,
+        {
+            count: limit,
+            ackmode: 'reject_requeue_true',
+            encoding: 'auto'
+        },
+        {
+            auth,
+            timeout: 12000,
+            validateStatus: () => true
+        }
+    )
+
+    if (status >= 400) {
+        const reason =
+            typeof data === 'object' && data !== null
+                ? data.reason || data.error || JSON.stringify(data)
+                : String(data)
+        const err = new Error(reason || `Management HTTP ${status}`)
+        err.status = status
+        throw err
+    }
+
+    const list = Array.isArray(data) ? data : []
+    return list.map((m) => {
+        const decoded = decodePayload(m)
+        return {
+            redelivered: m.redelivered,
+            routing_key: m.routing_key,
+            exchange: m.exchange,
+            properties: m.properties
+                ? {
+                      messageId: m.properties.message_id,
+                      timestamp: m.properties.timestamp,
+                      contentType: m.properties.content_type,
+                      headers: m.properties.headers
+                  }
+                : undefined,
+            payload: decoded.body,
+            payload_encoding: decoded.encoding
+        }
+    })
+}
+
+class GetQueueTasks extends API {
+    constructor() {
+        super()
+
+        this.setPath('/Admin/MQ/GetQueueTasks')
+        this.setMethod('get')
+    }
+
+    async onRequest(req, res) {
+        const { uuid, session, queue, limit: limitStr, summary } = req.query
+
+        if ([uuid, session].some((v) => v === '' || v == null))
+            return res.json({
+                ...BaseStdResponse.MISSING_PARAMETER
+            })
+
+        if (!(await AccessControl.checkSession(uuid, session)))
+            return res.status(401).json({
+                ...BaseStdResponse.ACCESS_DENIED
+            })
+
+        const permission = await AccessControl.getPermission(uuid)
+        if (!permission.includes('admin') && !permission.includes('service'))
+            return res.json({
+                ...BaseStdResponse.PERMISSION_DENIED
+            })
+
+        const wantSummary = summary === '1' || summary === 'true'
+
+        try {
+            const ch = await mq.getChannel('admin_queue_inspect')
+
+            if (wantSummary) {
+                const queues = {}
+                for (const name of ALLOWED_QUEUES) {
+                    try {
+                        const info = await ch.checkQueue(name)
+                        queues[name] = {
+                            messageCount: info.messageCount,
+                            consumerCount: info.consumerCount
+                        }
+                    } catch (e) {
+                        queues[name] = {
+                            messageCount: null,
+                            consumerCount: null,
+                            error: e.message || String(e)
+                        }
+                    }
+                }
+                return res.json({
+                    ...BaseStdResponse.OK,
+                    data: {
+                        summary: true,
+                        queues,
+                        fetchedAt: Date.now()
+                    }
+                })
+            }
+
+            const queueName = queue || 'runforge_task_queue'
+            if (!ALLOWED_QUEUES.includes(queueName))
+                return res.json({
+                    ...BaseStdResponse.ERR,
+                    msg: '不支持的队列名称'
+                })
+
+            let limit = parseInt(limitStr, 10)
+            if (Number.isNaN(limit) || limit < 1) limit = 30
+            if (limit > 100) limit = 100
+
+            let messageCount = null
+            let consumerCount = null
+            try {
+                const info = await ch.checkQueue(queueName)
+                messageCount = info.messageCount
+                consumerCount = info.consumerCount
+            } catch (e) {
+                return res.json({
+                    ...BaseStdResponse.ERR,
+                    msg: `无法访问队列:${e.message || e}`
+                })
+            }
+
+            let tasks = []
+            let managementError = null
+            try {
+                tasks = await peekQueueMessages(queueName, limit)
+            } catch (e) {
+                managementError =
+                    e.status === 401 || e.status === 403
+                        ? 'Management 鉴权失败,请在 config.json 的 rabbitmq 中配置 managementUser / managementPassword,或检查管理插件用户权限'
+                        : e.code === 'ECONNREFUSED' || e.code === 'ETIMEDOUT'
+                          ? '无法连接 RabbitMQ Management(默认 15672)。请开启管理插件并开放端口,或配置 rabbitmq.managementBaseUrl'
+                          : (e.message || String(e))
+                this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`)
+            }
+
+            return res.json({
+                ...BaseStdResponse.OK,
+                data: {
+                    queue: queueName,
+                    messageCount,
+                    consumerCount,
+                    peekLimit: limit,
+                    tasks,
+                    managementError,
+                    fetchedAt: Date.now()
+                }
+            })
+        } catch (error) {
+            this.logger.error(`GetQueueTasks: ${error.stack || error}`)
+            return res.json({
+                ...BaseStdResponse.ERR,
+                msg: error.message || '查询失败'
+            })
+        }
+    }
+}
+
+module.exports.GetQueueTasks = GetQueueTasks