| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- const API = require('../../../lib/API')
- const axios = require('axios')
- const mq = require('../../../plugin/mq')
- const config = require('../../../config.json')
- const AccessControl = require('../../../lib/AccessControl')
- const { BaseStdResponse } = require('../../../BaseStdResponse')
- const {
- SCHEDULE_KEY,
- listPendingScheduledForAdmin,
- countPendingScheduled
- } = require('../../../plugin/mq/lepaoAutoScheduleRedis')
- /** 允许通过管理接口查看的队列(防任意队列名探测) */
- const ALLOWED_QUEUES = [
- 'jkes_runforge_task_queue',
- 'runforge_task_result_queue',
- 'runforge_task_dead_queue',
- 'runforge_message_queue',
- 'order_payment_check',
- 'mq_health_check'
- ]
- function parseAmqpHttpBase(amqpUrl) {
- const u = new URL(String(amqpUrl).replace(/^amqp:/, 'http:'))
- return {
- user: decodeURIComponent(u.username || ''),
- password: decodeURIComponent(u.password || ''),
- hostname: u.hostname
- }
- }
- function managementEndpoint(queueName) {
- const rm = config.rabbitmq || {}
- const creds = parseAmqpHttpBase(rm.url || '')
- const base = (rm.managementBaseUrl || `http://${creds.hostname}:15672`).replace(/\/$/, '')
- const vhost = encodeURIComponent(rm.vhost != null ? rm.vhost : '/')
- const q = encodeURIComponent(queueName)
- return {
- url: `${base}/api/queues/${vhost}/${q}/get`,
- auth: {
- username: rm.managementUser || creds.user,
- password: rm.managementPassword || creds.password
- }
- }
- }
- function decodePayload(msg) {
- const enc = msg.payload_encoding || 'string'
- let raw = msg.payload
- if (enc === 'base64' && typeof raw === 'string') {
- try {
- raw = Buffer.from(raw, 'base64').toString('utf8')
- } catch {
- return { raw: msg.payload, encoding: enc, parseError: true }
- }
- }
- if (typeof raw === 'string') {
- try {
- return { body: JSON.parse(raw), encoding: enc }
- } catch {
- return { body: raw, encoding: enc }
- }
- }
- return { body: raw, encoding: enc }
- }
- /**
- * 通过 Management API 窥视队列消息(reject_requeue_true:看完后重新入队,不消费)
- */
- async function peekQueueMessages(queueName, limit) {
- const { url, auth } = managementEndpoint(queueName)
- const { data, status } = await axios.post(
- url,
- {
- count: limit,
- ackmode: 'reject_requeue_true',
- encoding: 'auto'
- },
- {
- auth,
- timeout: 12000,
- validateStatus: () => true
- }
- )
- if (status >= 400) {
- const reason =
- typeof data === 'object' && data !== null
- ? data.reason || data.error || JSON.stringify(data)
- : String(data)
- const err = new Error(reason || `Management HTTP ${status}`)
- err.status = status
- throw err
- }
- const list = Array.isArray(data) ? data : []
- return list.map((m) => {
- const decoded = decodePayload(m)
- return {
- redelivered: m.redelivered,
- routing_key: m.routing_key,
- exchange: m.exchange,
- properties: m.properties
- ? {
- messageId: m.properties.message_id,
- timestamp: m.properties.timestamp,
- contentType: m.properties.content_type,
- headers: m.properties.headers
- }
- : undefined,
- payload: decoded.body,
- payload_encoding: decoded.encoding
- }
- })
- }
- class GetQueueTasks extends API {
- constructor() {
- super()
- this.setPath('/Admin/MQ/GetQueueTasks')
- this.setMethod('get')
- }
- async onRequest(req, res) {
- const {
- uuid,
- session,
- queue,
- limit: limitStr,
- summary,
- includeScheduled,
- scheduledLimit: scheduledLimitStr
- } = req.query
- if ([uuid, session].some((v) => v === '' || v == null))
- return res.json({
- ...BaseStdResponse.MISSING_PARAMETER
- })
- if (!(await AccessControl.checkSession(uuid, session)))
- return res.status(401).json({
- ...BaseStdResponse.ACCESS_DENIED
- })
- const permission = await AccessControl.getPermission(uuid)
- if (!permission.includes('admin') && !permission.includes('service'))
- return res.json({
- ...BaseStdResponse.PERMISSION_DENIED
- })
- const wantSummary = summary === '1' || summary === 'true'
- try {
- const ch = await mq.getChannel('admin_queue_inspect')
- if (wantSummary) {
- const queues = {}
- for (const name of ALLOWED_QUEUES) {
- try {
- const info = await ch.checkQueue(name)
- queues[name] = {
- messageCount: info.messageCount,
- consumerCount: info.consumerCount
- }
- } catch (e) {
- queues[name] = {
- messageCount: null,
- consumerCount: null,
- error: e.message || String(e)
- }
- }
- }
- const slimit = Math.min(
- 2000,
- Math.max(1, parseInt(scheduledLimitStr, 10) || 800)
- )
- const pendingCount = await countPendingScheduled(Date.now())
- const scheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
- return res.json({
- ...BaseStdResponse.OK,
- data: {
- summary: true,
- queues,
- redisScheduler: {
- keys: [SCHEDULE_KEY],
- pendingCount,
- note: '到期任务由本服务定时写入 jkes_runforge_task_queue。'
- },
- autoRunScheduledMirror: {
- pendingCount,
- note: scheduledMirror.note,
- sample: scheduledMirror.items.slice(0, 20)
- },
- fetchedAt: Date.now()
- }
- })
- }
- const queueName = queue || 'jkes_runforge_task_queue'
- if (!ALLOWED_QUEUES.includes(queueName))
- return res.json({
- ...BaseStdResponse.ERR,
- msg: '不支持的队列名称'
- })
- let limit = parseInt(limitStr, 10)
- if (Number.isNaN(limit) || limit < 1) limit = 30
- if (limit > 100) limit = 100
- let messageCount = null
- let consumerCount = null
- try {
- const info = await ch.checkQueue(queueName)
- messageCount = info.messageCount
- consumerCount = info.consumerCount
- } catch (e) {
- return res.json({
- ...BaseStdResponse.ERR,
- msg: `无法访问队列:${e.message || e}`
- })
- }
- let tasks = []
- let managementError = null
- try {
- tasks = await peekQueueMessages(queueName, limit)
- } catch (e) {
- managementError =
- e.status === 401 || e.status === 403
- ? 'Management 鉴权失败,请在 config.json 的 rabbitmq 中配置 managementUser / managementPassword,或检查管理插件用户权限'
- : e.code === 'ECONNREFUSED' || e.code === 'ETIMEDOUT'
- ? '无法连接 RabbitMQ Management(默认 15672)。请开启管理插件并开放端口,或配置 rabbitmq.managementBaseUrl'
- : (e.message || String(e))
- this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`)
- }
- const wantScheduled =
- includeScheduled !== '0' &&
- includeScheduled !== 'false' &&
- queueName === 'jkes_runforge_task_queue'
- let autoRunScheduledMirror = null
- let pendingScheduledCount = null
- if (queueName === 'jkes_runforge_task_queue') {
- pendingScheduledCount = await countPendingScheduled(Date.now())
- if (wantScheduled) {
- const slimit = Math.min(
- 500,
- Math.max(1, parseInt(scheduledLimitStr, 10) || 200)
- )
- autoRunScheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
- }
- }
- const detail = {
- queue: queueName,
- messageCount,
- consumerCount,
- peekLimit: limit,
- tasks,
- managementError,
- redisScheduler:
- queueName === 'jkes_runforge_task_queue'
- ? {
- keys: [SCHEDULE_KEY],
- pendingCount: pendingScheduledCount
- }
- : undefined,
- autoRunScheduledMirror,
- fetchedAt: Date.now()
- }
- if (queueName === 'jkes_runforge_task_queue') {
- detail.peekNote =
- 'tasks:已在主队列中的消息;autoRunScheduledMirror:尚未到 fireAt、仍只在 Redis 中的调度(到期后由服务写入 jkes_runforge_task_queue)。'
- }
- return res.json({
- ...BaseStdResponse.OK,
- data: detail
- })
- } catch (error) {
- this.logger.error(`GetQueueTasks: ${error.stack || error}`)
- return res.json({
- ...BaseStdResponse.ERR,
- msg: error.message || '查询失败'
- })
- }
- }
- }
- module.exports.GetQueueTasks = GetQueueTasks
|