GetQueueTasks.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. const API = require('../../../lib/API')
  2. const axios = require('axios')
  3. const mq = require('../../../plugin/mq')
  4. const config = require('../../../config.json')
  5. const AccessControl = require('../../../lib/AccessControl')
  6. const { BaseStdResponse } = require('../../../BaseStdResponse')
  7. const {
  8. SCHEDULE_KEY,
  9. listPendingScheduledForAdmin,
  10. countPendingScheduled
  11. } = require('../../../plugin/mq/lepaoAutoScheduleRedis')
  12. /** 允许通过管理接口查看的队列(防任意队列名探测) */
  13. const ALLOWED_QUEUES = [
  14. 'jkes_runforge_task_queue',
  15. 'runforge_task_result_queue',
  16. 'runforge_task_dead_queue',
  17. 'runforge_message_queue',
  18. 'order_payment_check',
  19. 'mq_health_check'
  20. ]
  21. function parseAmqpHttpBase(amqpUrl) {
  22. const u = new URL(String(amqpUrl).replace(/^amqp:/, 'http:'))
  23. return {
  24. user: decodeURIComponent(u.username || ''),
  25. password: decodeURIComponent(u.password || ''),
  26. hostname: u.hostname
  27. }
  28. }
  29. function managementEndpoint(queueName) {
  30. const rm = config.rabbitmq || {}
  31. const creds = parseAmqpHttpBase(rm.url || '')
  32. const base = (rm.managementBaseUrl || `http://${creds.hostname}:15672`).replace(/\/$/, '')
  33. const vhost = encodeURIComponent(rm.vhost != null ? rm.vhost : '/')
  34. const q = encodeURIComponent(queueName)
  35. return {
  36. url: `${base}/api/queues/${vhost}/${q}/get`,
  37. auth: {
  38. username: rm.managementUser || creds.user,
  39. password: rm.managementPassword || creds.password
  40. }
  41. }
  42. }
  43. function decodePayload(msg) {
  44. const enc = msg.payload_encoding || 'string'
  45. let raw = msg.payload
  46. if (enc === 'base64' && typeof raw === 'string') {
  47. try {
  48. raw = Buffer.from(raw, 'base64').toString('utf8')
  49. } catch {
  50. return { raw: msg.payload, encoding: enc, parseError: true }
  51. }
  52. }
  53. if (typeof raw === 'string') {
  54. try {
  55. return { body: JSON.parse(raw), encoding: enc }
  56. } catch {
  57. return { body: raw, encoding: enc }
  58. }
  59. }
  60. return { body: raw, encoding: enc }
  61. }
  62. /**
  63. * 通过 Management API 窥视队列消息(reject_requeue_true:看完后重新入队,不消费)
  64. */
  65. async function peekQueueMessages(queueName, limit) {
  66. const { url, auth } = managementEndpoint(queueName)
  67. const { data, status } = await axios.post(
  68. url,
  69. {
  70. count: limit,
  71. ackmode: 'reject_requeue_true',
  72. encoding: 'auto'
  73. },
  74. {
  75. auth,
  76. timeout: 12000,
  77. validateStatus: () => true
  78. }
  79. )
  80. if (status >= 400) {
  81. const reason =
  82. typeof data === 'object' && data !== null
  83. ? data.reason || data.error || JSON.stringify(data)
  84. : String(data)
  85. const err = new Error(reason || `Management HTTP ${status}`)
  86. err.status = status
  87. throw err
  88. }
  89. const list = Array.isArray(data) ? data : []
  90. return list.map((m) => {
  91. const decoded = decodePayload(m)
  92. return {
  93. redelivered: m.redelivered,
  94. routing_key: m.routing_key,
  95. exchange: m.exchange,
  96. properties: m.properties
  97. ? {
  98. messageId: m.properties.message_id,
  99. timestamp: m.properties.timestamp,
  100. contentType: m.properties.content_type,
  101. headers: m.properties.headers
  102. }
  103. : undefined,
  104. payload: decoded.body,
  105. payload_encoding: decoded.encoding
  106. }
  107. })
  108. }
  109. class GetQueueTasks extends API {
  110. constructor() {
  111. super()
  112. this.setPath('/Admin/MQ/GetQueueTasks')
  113. this.setMethod('get')
  114. }
  115. async onRequest(req, res) {
  116. const {
  117. uuid,
  118. session,
  119. queue,
  120. limit: limitStr,
  121. summary,
  122. includeScheduled,
  123. scheduledLimit: scheduledLimitStr
  124. } = req.query
  125. if ([uuid, session].some((v) => v === '' || v == null))
  126. return res.json({
  127. ...BaseStdResponse.MISSING_PARAMETER
  128. })
  129. if (!(await AccessControl.checkSession(uuid, session)))
  130. return res.status(401).json({
  131. ...BaseStdResponse.ACCESS_DENIED
  132. })
  133. const permission = await AccessControl.getPermission(uuid)
  134. if (!permission.includes('admin') && !permission.includes('service'))
  135. return res.json({
  136. ...BaseStdResponse.PERMISSION_DENIED
  137. })
  138. const wantSummary = summary === '1' || summary === 'true'
  139. try {
  140. const ch = await mq.getChannel('admin_queue_inspect')
  141. if (wantSummary) {
  142. const queues = {}
  143. for (const name of ALLOWED_QUEUES) {
  144. try {
  145. const info = await ch.checkQueue(name)
  146. queues[name] = {
  147. messageCount: info.messageCount,
  148. consumerCount: info.consumerCount
  149. }
  150. } catch (e) {
  151. queues[name] = {
  152. messageCount: null,
  153. consumerCount: null,
  154. error: e.message || String(e)
  155. }
  156. }
  157. }
  158. const slimit = Math.min(
  159. 2000,
  160. Math.max(1, parseInt(scheduledLimitStr, 10) || 800)
  161. )
  162. const pendingCount = await countPendingScheduled(Date.now())
  163. const scheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
  164. return res.json({
  165. ...BaseStdResponse.OK,
  166. data: {
  167. summary: true,
  168. queues,
  169. redisScheduler: {
  170. keys: [SCHEDULE_KEY],
  171. pendingCount,
  172. note: '到期任务由本服务定时写入 jkes_runforge_task_queue。'
  173. },
  174. autoRunScheduledMirror: {
  175. pendingCount,
  176. note: scheduledMirror.note,
  177. sample: scheduledMirror.items.slice(0, 20)
  178. },
  179. fetchedAt: Date.now()
  180. }
  181. })
  182. }
  183. const queueName = queue || 'jkes_runforge_task_queue'
  184. if (!ALLOWED_QUEUES.includes(queueName))
  185. return res.json({
  186. ...BaseStdResponse.ERR,
  187. msg: '不支持的队列名称'
  188. })
  189. let limit = parseInt(limitStr, 10)
  190. if (Number.isNaN(limit) || limit < 1) limit = 30
  191. if (limit > 100) limit = 100
  192. let messageCount = null
  193. let consumerCount = null
  194. try {
  195. const info = await ch.checkQueue(queueName)
  196. messageCount = info.messageCount
  197. consumerCount = info.consumerCount
  198. } catch (e) {
  199. return res.json({
  200. ...BaseStdResponse.ERR,
  201. msg: `无法访问队列:${e.message || e}`
  202. })
  203. }
  204. let tasks = []
  205. let managementError = null
  206. try {
  207. tasks = await peekQueueMessages(queueName, limit)
  208. } catch (e) {
  209. managementError =
  210. e.status === 401 || e.status === 403
  211. ? 'Management 鉴权失败,请在 config.json 的 rabbitmq 中配置 managementUser / managementPassword,或检查管理插件用户权限'
  212. : e.code === 'ECONNREFUSED' || e.code === 'ETIMEDOUT'
  213. ? '无法连接 RabbitMQ Management(默认 15672)。请开启管理插件并开放端口,或配置 rabbitmq.managementBaseUrl'
  214. : (e.message || String(e))
  215. this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`)
  216. }
  217. const wantScheduled =
  218. includeScheduled !== '0' &&
  219. includeScheduled !== 'false' &&
  220. queueName === 'jkes_runforge_task_queue'
  221. let autoRunScheduledMirror = null
  222. let pendingScheduledCount = null
  223. if (queueName === 'jkes_runforge_task_queue') {
  224. pendingScheduledCount = await countPendingScheduled(Date.now())
  225. if (wantScheduled) {
  226. const slimit = Math.min(
  227. 500,
  228. Math.max(1, parseInt(scheduledLimitStr, 10) || 200)
  229. )
  230. autoRunScheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
  231. }
  232. }
  233. const detail = {
  234. queue: queueName,
  235. messageCount,
  236. consumerCount,
  237. peekLimit: limit,
  238. tasks,
  239. managementError,
  240. redisScheduler:
  241. queueName === 'jkes_runforge_task_queue'
  242. ? {
  243. keys: [SCHEDULE_KEY],
  244. pendingCount: pendingScheduledCount
  245. }
  246. : undefined,
  247. autoRunScheduledMirror,
  248. fetchedAt: Date.now()
  249. }
  250. if (queueName === 'jkes_runforge_task_queue') {
  251. detail.peekNote =
  252. 'tasks:已在主队列中的消息;autoRunScheduledMirror:尚未到 fireAt、仍只在 Redis 中的调度(到期后由服务写入 jkes_runforge_task_queue)。'
  253. }
  254. return res.json({
  255. ...BaseStdResponse.OK,
  256. data: detail
  257. })
  258. } catch (error) {
  259. this.logger.error(`GetQueueTasks: ${error.stack || error}`)
  260. return res.json({
  261. ...BaseStdResponse.ERR,
  262. msg: error.message || '查询失败'
  263. })
  264. }
  265. }
  266. }
  267. module.exports.GetQueueTasks = GetQueueTasks