const API = require('../../../lib/API') const axios = require('axios') const mq = require('../../../plugin/mq') const config = require('../../../config.json') const AccessControl = require('../../../lib/AccessControl') const { BaseStdResponse } = require('../../../BaseStdResponse') const { SCHEDULE_KEY, listPendingScheduledForAdmin, countPendingScheduled } = require('../../../plugin/mq/lepaoAutoScheduleRedis') /** 允许通过管理接口查看的队列(防任意队列名探测) */ const ALLOWED_QUEUES = [ 'runforge_task_queue', 'runforge_task_result_queue', 'runforge_task_dead_queue', 'runforge_message_queue', 'order_payment_check', 'mq_health_check' ] function parseAmqpHttpBase(amqpUrl) { const u = new URL(String(amqpUrl).replace(/^amqp:/, 'http:')) return { user: decodeURIComponent(u.username || ''), password: decodeURIComponent(u.password || ''), hostname: u.hostname } } function managementEndpoint(queueName) { const rm = config.rabbitmq || {} const creds = parseAmqpHttpBase(rm.url || '') const base = (rm.managementBaseUrl || `http://${creds.hostname}:15672`).replace(/\/$/, '') const vhost = encodeURIComponent(rm.vhost != null ? rm.vhost : '/') const q = encodeURIComponent(queueName) return { url: `${base}/api/queues/${vhost}/${q}/get`, auth: { username: rm.managementUser || creds.user, password: rm.managementPassword || creds.password } } } function decodePayload(msg) { const enc = msg.payload_encoding || 'string' let raw = msg.payload if (enc === 'base64' && typeof raw === 'string') { try { raw = Buffer.from(raw, 'base64').toString('utf8') } catch { return { raw: msg.payload, encoding: enc, parseError: true } } } if (typeof raw === 'string') { try { return { body: JSON.parse(raw), encoding: enc } } catch { return { body: raw, encoding: enc } } } return { body: raw, encoding: enc } } /** * 通过 Management API 窥视队列消息(reject_requeue_true:看完后重新入队,不消费) */ async function peekQueueMessages(queueName, limit) { const { url, auth } = managementEndpoint(queueName) const { data, status } = await axios.post( url, { count: limit, ackmode: 'reject_requeue_true', encoding: 'auto' }, { auth, timeout: 12000, validateStatus: () => true } ) if (status >= 400) { const reason = typeof data === 'object' && data !== null ? data.reason || data.error || JSON.stringify(data) : String(data) const err = new Error(reason || `Management HTTP ${status}`) err.status = status throw err } const list = Array.isArray(data) ? data : [] return list.map((m) => { const decoded = decodePayload(m) return { redelivered: m.redelivered, routing_key: m.routing_key, exchange: m.exchange, properties: m.properties ? { messageId: m.properties.message_id, timestamp: m.properties.timestamp, contentType: m.properties.content_type, headers: m.properties.headers } : undefined, payload: decoded.body, payload_encoding: decoded.encoding } }) } class GetQueueTasks extends API { constructor() { super() this.setPath('/Admin/MQ/GetQueueTasks') this.setMethod('get') } async onRequest(req, res) { const { uuid, session, queue, limit: limitStr, summary, includeScheduled, scheduledLimit: scheduledLimitStr } = req.query if ([uuid, session].some((v) => v === '' || v == null)) return res.json({ ...BaseStdResponse.MISSING_PARAMETER }) if (!(await AccessControl.checkSession(uuid, session))) return res.status(401).json({ ...BaseStdResponse.ACCESS_DENIED }) const permission = await AccessControl.getPermission(uuid) if (!permission.includes('admin') && !permission.includes('service')) return res.json({ ...BaseStdResponse.PERMISSION_DENIED }) const wantSummary = summary === '1' || summary === 'true' try { const ch = await mq.getChannel('admin_queue_inspect') if (wantSummary) { const queues = {} for (const name of ALLOWED_QUEUES) { try { const info = await ch.checkQueue(name) queues[name] = { messageCount: info.messageCount, consumerCount: info.consumerCount } } catch (e) { queues[name] = { messageCount: null, consumerCount: null, error: e.message || String(e) } } } const slimit = Math.min( 2000, Math.max(1, parseInt(scheduledLimitStr, 10) || 800) ) const pendingCount = await countPendingScheduled(Date.now()) const scheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit) return res.json({ ...BaseStdResponse.OK, data: { summary: true, queues, redisScheduler: { key: SCHEDULE_KEY, pendingCount, note: '到期任务由本服务定时写入 runforge_task_queue;多实例共享同一 Redis ZSET。' }, autoRunScheduledMirror: { pendingCount, note: scheduledMirror.note, sample: scheduledMirror.items.slice(0, 20) }, fetchedAt: Date.now() } }) } const queueName = queue || 'runforge_task_queue' if (!ALLOWED_QUEUES.includes(queueName)) return res.json({ ...BaseStdResponse.ERR, msg: '不支持的队列名称' }) let limit = parseInt(limitStr, 10) if (Number.isNaN(limit) || limit < 1) limit = 30 if (limit > 100) limit = 100 let messageCount = null let consumerCount = null try { const info = await ch.checkQueue(queueName) messageCount = info.messageCount consumerCount = info.consumerCount } catch (e) { return res.json({ ...BaseStdResponse.ERR, msg: `无法访问队列:${e.message || e}` }) } let tasks = [] let managementError = null try { tasks = await peekQueueMessages(queueName, limit) } catch (e) { managementError = e.status === 401 || e.status === 403 ? 'Management 鉴权失败,请在 config.json 的 rabbitmq 中配置 managementUser / managementPassword,或检查管理插件用户权限' : e.code === 'ECONNREFUSED' || e.code === 'ETIMEDOUT' ? '无法连接 RabbitMQ Management(默认 15672)。请开启管理插件并开放端口,或配置 rabbitmq.managementBaseUrl' : (e.message || String(e)) this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`) } const wantScheduled = includeScheduled !== '0' && includeScheduled !== 'false' && queueName === 'runforge_task_queue' let autoRunScheduledMirror = null let pendingScheduledCount = null if (queueName === 'runforge_task_queue') { pendingScheduledCount = await countPendingScheduled(Date.now()) if (wantScheduled) { const slimit = Math.min( 500, Math.max(1, parseInt(scheduledLimitStr, 10) || 200) ) autoRunScheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit) } } const detail = { queue: queueName, messageCount, consumerCount, peekLimit: limit, tasks, managementError, redisScheduler: queueName === 'runforge_task_queue' ? { key: SCHEDULE_KEY, pendingCount: pendingScheduledCount } : undefined, autoRunScheduledMirror, fetchedAt: Date.now() } if (queueName === 'runforge_task_queue') { detail.peekNote = 'tasks:已在主队列中的消息;autoRunScheduledMirror:尚未到 fireAt、仍只在 Redis 中的调度(到期后由服务写入 MQ)。' } return res.json({ ...BaseStdResponse.OK, data: detail }) } catch (error) { this.logger.error(`GetQueueTasks: ${error.stack || error}`) return res.json({ ...BaseStdResponse.ERR, msg: error.message || '查询失败' }) } } } module.exports.GetQueueTasks = GetQueueTasks