GetQueueTasks.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. const API = require('../../../lib/API')
  2. const axios = require('axios')
  3. const mq = require('../../../plugin/mq')
  4. const config = require('../../../config.json')
  5. const AccessControl = require('../../../lib/AccessControl')
  6. const { BaseStdResponse } = require('../../../BaseStdResponse')
  7. /** 允许通过管理接口查看的队列(防任意队列名探测) */
  8. const ALLOWED_QUEUES = [
  9. 'runforge_task_queue',
  10. 'runforge_task_result_queue',
  11. 'runforge_task_dead_queue',
  12. 'runforge_message_queue',
  13. 'order_payment_check',
  14. 'mq_health_check'
  15. ]
  16. function parseAmqpHttpBase(amqpUrl) {
  17. const u = new URL(String(amqpUrl).replace(/^amqp:/, 'http:'))
  18. return {
  19. user: decodeURIComponent(u.username || ''),
  20. password: decodeURIComponent(u.password || ''),
  21. hostname: u.hostname
  22. }
  23. }
  24. function managementEndpoint(queueName) {
  25. const rm = config.rabbitmq || {}
  26. const creds = parseAmqpHttpBase(rm.url || '')
  27. const base = (rm.managementBaseUrl || `http://${creds.hostname}:15672`).replace(/\/$/, '')
  28. const vhost = encodeURIComponent(rm.vhost != null ? rm.vhost : '/')
  29. const q = encodeURIComponent(queueName)
  30. return {
  31. url: `${base}/api/queues/${vhost}/${q}/get`,
  32. auth: {
  33. username: rm.managementUser || creds.user,
  34. password: rm.managementPassword || creds.password
  35. }
  36. }
  37. }
  38. function decodePayload(msg) {
  39. const enc = msg.payload_encoding || 'string'
  40. let raw = msg.payload
  41. if (enc === 'base64' && typeof raw === 'string') {
  42. try {
  43. raw = Buffer.from(raw, 'base64').toString('utf8')
  44. } catch {
  45. return { raw: msg.payload, encoding: enc, parseError: true }
  46. }
  47. }
  48. if (typeof raw === 'string') {
  49. try {
  50. return { body: JSON.parse(raw), encoding: enc }
  51. } catch {
  52. return { body: raw, encoding: enc }
  53. }
  54. }
  55. return { body: raw, encoding: enc }
  56. }
  57. /**
  58. * 通过 Management API 窥视队列消息(reject_requeue_true:看完后重新入队,不消费)
  59. */
  60. async function peekQueueMessages(queueName, limit) {
  61. const { url, auth } = managementEndpoint(queueName)
  62. const { data, status } = await axios.post(
  63. url,
  64. {
  65. count: limit,
  66. ackmode: 'reject_requeue_true',
  67. encoding: 'auto'
  68. },
  69. {
  70. auth,
  71. timeout: 12000,
  72. validateStatus: () => true
  73. }
  74. )
  75. if (status >= 400) {
  76. const reason =
  77. typeof data === 'object' && data !== null
  78. ? data.reason || data.error || JSON.stringify(data)
  79. : String(data)
  80. const err = new Error(reason || `Management HTTP ${status}`)
  81. err.status = status
  82. throw err
  83. }
  84. const list = Array.isArray(data) ? data : []
  85. return list.map((m) => {
  86. const decoded = decodePayload(m)
  87. return {
  88. redelivered: m.redelivered,
  89. routing_key: m.routing_key,
  90. exchange: m.exchange,
  91. properties: m.properties
  92. ? {
  93. messageId: m.properties.message_id,
  94. timestamp: m.properties.timestamp,
  95. contentType: m.properties.content_type,
  96. headers: m.properties.headers
  97. }
  98. : undefined,
  99. payload: decoded.body,
  100. payload_encoding: decoded.encoding
  101. }
  102. })
  103. }
  104. class GetQueueTasks extends API {
  105. constructor() {
  106. super()
  107. this.setPath('/Admin/MQ/GetQueueTasks')
  108. this.setMethod('get')
  109. }
  110. async onRequest(req, res) {
  111. const { uuid, session, queue, limit: limitStr, summary } = req.query
  112. if ([uuid, session].some((v) => v === '' || v == null))
  113. return res.json({
  114. ...BaseStdResponse.MISSING_PARAMETER
  115. })
  116. if (!(await AccessControl.checkSession(uuid, session)))
  117. return res.status(401).json({
  118. ...BaseStdResponse.ACCESS_DENIED
  119. })
  120. const permission = await AccessControl.getPermission(uuid)
  121. if (!permission.includes('admin') && !permission.includes('service'))
  122. return res.json({
  123. ...BaseStdResponse.PERMISSION_DENIED
  124. })
  125. const wantSummary = summary === '1' || summary === 'true'
  126. try {
  127. const ch = await mq.getChannel('admin_queue_inspect')
  128. if (wantSummary) {
  129. const queues = {}
  130. for (const name of ALLOWED_QUEUES) {
  131. try {
  132. const info = await ch.checkQueue(name)
  133. queues[name] = {
  134. messageCount: info.messageCount,
  135. consumerCount: info.consumerCount
  136. }
  137. } catch (e) {
  138. queues[name] = {
  139. messageCount: null,
  140. consumerCount: null,
  141. error: e.message || String(e)
  142. }
  143. }
  144. }
  145. return res.json({
  146. ...BaseStdResponse.OK,
  147. data: {
  148. summary: true,
  149. queues,
  150. fetchedAt: Date.now()
  151. }
  152. })
  153. }
  154. const queueName = queue || 'runforge_task_queue'
  155. if (!ALLOWED_QUEUES.includes(queueName))
  156. return res.json({
  157. ...BaseStdResponse.ERR,
  158. msg: '不支持的队列名称'
  159. })
  160. let limit = parseInt(limitStr, 10)
  161. if (Number.isNaN(limit) || limit < 1) limit = 30
  162. if (limit > 100) limit = 100
  163. let messageCount = null
  164. let consumerCount = null
  165. try {
  166. const info = await ch.checkQueue(queueName)
  167. messageCount = info.messageCount
  168. consumerCount = info.consumerCount
  169. } catch (e) {
  170. return res.json({
  171. ...BaseStdResponse.ERR,
  172. msg: `无法访问队列:${e.message || e}`
  173. })
  174. }
  175. let tasks = []
  176. let managementError = null
  177. try {
  178. tasks = await peekQueueMessages(queueName, limit)
  179. } catch (e) {
  180. managementError =
  181. e.status === 401 || e.status === 403
  182. ? 'Management 鉴权失败,请在 config.json 的 rabbitmq 中配置 managementUser / managementPassword,或检查管理插件用户权限'
  183. : e.code === 'ECONNREFUSED' || e.code === 'ETIMEDOUT'
  184. ? '无法连接 RabbitMQ Management(默认 15672)。请开启管理插件并开放端口,或配置 rabbitmq.managementBaseUrl'
  185. : (e.message || String(e))
  186. this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`)
  187. }
  188. return res.json({
  189. ...BaseStdResponse.OK,
  190. data: {
  191. queue: queueName,
  192. messageCount,
  193. consumerCount,
  194. peekLimit: limit,
  195. tasks,
  196. managementError,
  197. fetchedAt: Date.now()
  198. }
  199. })
  200. } catch (error) {
  201. this.logger.error(`GetQueueTasks: ${error.stack || error}`)
  202. return res.json({
  203. ...BaseStdResponse.ERR,
  204. msg: error.message || '查询失败'
  205. })
  206. }
  207. }
  208. }
  209. module.exports.GetQueueTasks = GetQueueTasks