GetQueueTasks.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. const API = require('../../../lib/API')
  2. const axios = require('axios')
  3. const mq = require('../../../plugin/mq')
  4. const config = require('../../../config.json')
  5. const AccessControl = require('../../../lib/AccessControl')
  6. const { BaseStdResponse } = require('../../../BaseStdResponse')
  7. const {
  8. SCHEDULE_KEY,
  9. listPendingScheduledForAdmin,
  10. countPendingScheduled
  11. } = require('../../../plugin/mq/lepaoAutoScheduleRedis')
  12. const { mq: mqPrefixName, PREFIX } = require('../../../plugin/mq/mqPrefix')
  13. const { TASK_QUEUE } = require('../../../plugin/mq/runforgeTaskMq')
  14. /** 未加前缀的队列基名(用于生成 ALLOWED_QUEUES;带 mqPrefix 时与生产隔离) */
  15. const QUEUE_BASE_NAMES = [
  16. 'runforge_task_queue',
  17. 'runforge_task_result_queue',
  18. 'runforge_task_dead_queue',
  19. 'runforge_message_queue',
  20. 'order_payment_check',
  21. 'mq_health_check'
  22. ]
  23. /** 允许通过管理接口查看的队列(防任意队列名探测) */
  24. const ALLOWED_QUEUES = QUEUE_BASE_NAMES.map(mqPrefixName)
  25. function canonicalQueueName(q) {
  26. const raw = q || 'runforge_task_queue'
  27. if (ALLOWED_QUEUES.includes(raw)) return raw
  28. if (PREFIX && QUEUE_BASE_NAMES.includes(raw)) return mqPrefixName(raw)
  29. return raw
  30. }
  31. function parseAmqpHttpBase(amqpUrl) {
  32. const u = new URL(String(amqpUrl).replace(/^amqp:/, 'http:'))
  33. return {
  34. user: decodeURIComponent(u.username || ''),
  35. password: decodeURIComponent(u.password || ''),
  36. hostname: u.hostname
  37. }
  38. }
  39. function managementEndpoint(queueName) {
  40. const rm = config.rabbitmq || {}
  41. const creds = parseAmqpHttpBase(rm.url || '')
  42. const base = (rm.managementBaseUrl || `http://${creds.hostname}:15672`).replace(/\/$/, '')
  43. const vhost = encodeURIComponent(rm.vhost != null ? rm.vhost : '/')
  44. const q = encodeURIComponent(queueName)
  45. return {
  46. url: `${base}/api/queues/${vhost}/${q}/get`,
  47. auth: {
  48. username: rm.managementUser || creds.user,
  49. password: rm.managementPassword || creds.password
  50. }
  51. }
  52. }
  53. function decodePayload(msg) {
  54. const enc = msg.payload_encoding || 'string'
  55. let raw = msg.payload
  56. if (enc === 'base64' && typeof raw === 'string') {
  57. try {
  58. raw = Buffer.from(raw, 'base64').toString('utf8')
  59. } catch {
  60. return { raw: msg.payload, encoding: enc, parseError: true }
  61. }
  62. }
  63. if (typeof raw === 'string') {
  64. try {
  65. return { body: JSON.parse(raw), encoding: enc }
  66. } catch {
  67. return { body: raw, encoding: enc }
  68. }
  69. }
  70. return { body: raw, encoding: enc }
  71. }
  72. /**
  73. * 通过 Management API 窥视队列消息(reject_requeue_true:看完后重新入队,不消费)
  74. */
  75. async function peekQueueMessages(queueName, limit) {
  76. const { url, auth } = managementEndpoint(queueName)
  77. const { data, status } = await axios.post(
  78. url,
  79. {
  80. count: limit,
  81. ackmode: 'reject_requeue_true',
  82. encoding: 'auto'
  83. },
  84. {
  85. auth,
  86. timeout: 12000,
  87. validateStatus: () => true
  88. }
  89. )
  90. if (status >= 400) {
  91. const reason =
  92. typeof data === 'object' && data !== null
  93. ? data.reason || data.error || JSON.stringify(data)
  94. : String(data)
  95. const err = new Error(reason || `Management HTTP ${status}`)
  96. err.status = status
  97. throw err
  98. }
  99. const list = Array.isArray(data) ? data : []
  100. return list.map((m) => {
  101. const decoded = decodePayload(m)
  102. return {
  103. redelivered: m.redelivered,
  104. routing_key: m.routing_key,
  105. exchange: m.exchange,
  106. properties: m.properties
  107. ? {
  108. messageId: m.properties.message_id,
  109. timestamp: m.properties.timestamp,
  110. contentType: m.properties.content_type,
  111. headers: m.properties.headers
  112. }
  113. : undefined,
  114. payload: decoded.body,
  115. payload_encoding: decoded.encoding
  116. }
  117. })
  118. }
  119. class GetQueueTasks extends API {
  120. constructor() {
  121. super()
  122. this.setPath('/Admin/MQ/GetQueueTasks')
  123. this.setMethod('get')
  124. }
  125. async onRequest(req, res) {
  126. const {
  127. uuid,
  128. session,
  129. queue,
  130. limit: limitStr,
  131. summary,
  132. includeScheduled,
  133. scheduledLimit: scheduledLimitStr
  134. } = req.query
  135. if ([uuid, session].some((v) => v === '' || v == null))
  136. return res.json({
  137. ...BaseStdResponse.MISSING_PARAMETER
  138. })
  139. if (!(await AccessControl.checkSession(uuid, session)))
  140. return res.status(401).json({
  141. ...BaseStdResponse.ACCESS_DENIED
  142. })
  143. const permission = await AccessControl.getPermission(uuid)
  144. if (!permission.includes('admin') && !permission.includes('service'))
  145. return res.json({
  146. ...BaseStdResponse.PERMISSION_DENIED
  147. })
  148. const wantSummary = summary === '1' || summary === 'true'
  149. try {
  150. const ch = await mq.getChannel('admin_queue_inspect')
  151. if (wantSummary) {
  152. const queues = {}
  153. for (const name of ALLOWED_QUEUES) {
  154. try {
  155. const info = await ch.checkQueue(name)
  156. queues[name] = {
  157. messageCount: info.messageCount,
  158. consumerCount: info.consumerCount
  159. }
  160. } catch (e) {
  161. queues[name] = {
  162. messageCount: null,
  163. consumerCount: null,
  164. error: e.message || String(e)
  165. }
  166. }
  167. }
  168. const slimit = Math.min(
  169. 2000,
  170. Math.max(1, parseInt(scheduledLimitStr, 10) || 800)
  171. )
  172. const pendingCount = await countPendingScheduled(Date.now())
  173. const scheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
  174. return res.json({
  175. ...BaseStdResponse.OK,
  176. data: {
  177. summary: true,
  178. queues,
  179. redisScheduler: {
  180. key: SCHEDULE_KEY,
  181. pendingCount,
  182. note: '到期任务由本服务定时写入 runforge_task_queue;多实例共享同一 Redis ZSET。'
  183. },
  184. autoRunScheduledMirror: {
  185. pendingCount,
  186. note: scheduledMirror.note,
  187. sample: scheduledMirror.items.slice(0, 20)
  188. },
  189. fetchedAt: Date.now()
  190. }
  191. })
  192. }
  193. const queueName = canonicalQueueName(queue)
  194. if (!ALLOWED_QUEUES.includes(queueName))
  195. return res.json({
  196. ...BaseStdResponse.ERR,
  197. msg: '不支持的队列名称'
  198. })
  199. let limit = parseInt(limitStr, 10)
  200. if (Number.isNaN(limit) || limit < 1) limit = 30
  201. if (limit > 100) limit = 100
  202. let messageCount = null
  203. let consumerCount = null
  204. try {
  205. const info = await ch.checkQueue(queueName)
  206. messageCount = info.messageCount
  207. consumerCount = info.consumerCount
  208. } catch (e) {
  209. return res.json({
  210. ...BaseStdResponse.ERR,
  211. msg: `无法访问队列:${e.message || e}`
  212. })
  213. }
  214. let tasks = []
  215. let managementError = null
  216. try {
  217. tasks = await peekQueueMessages(queueName, limit)
  218. } catch (e) {
  219. managementError =
  220. e.status === 401 || e.status === 403
  221. ? 'Management 鉴权失败,请在 config.json 的 rabbitmq 中配置 managementUser / managementPassword,或检查管理插件用户权限'
  222. : e.code === 'ECONNREFUSED' || e.code === 'ETIMEDOUT'
  223. ? '无法连接 RabbitMQ Management(默认 15672)。请开启管理插件并开放端口,或配置 rabbitmq.managementBaseUrl'
  224. : (e.message || String(e))
  225. this.logger.warn(`[GetQueueTasks] Management 窥视失败: ${managementError}`)
  226. }
  227. const wantScheduled =
  228. includeScheduled !== '0' &&
  229. includeScheduled !== 'false' &&
  230. queueName === TASK_QUEUE
  231. let autoRunScheduledMirror = null
  232. let pendingScheduledCount = null
  233. if (queueName === TASK_QUEUE) {
  234. pendingScheduledCount = await countPendingScheduled(Date.now())
  235. if (wantScheduled) {
  236. const slimit = Math.min(
  237. 500,
  238. Math.max(1, parseInt(scheduledLimitStr, 10) || 200)
  239. )
  240. autoRunScheduledMirror = await listPendingScheduledForAdmin(Date.now(), slimit)
  241. }
  242. }
  243. const detail = {
  244. queue: queueName,
  245. messageCount,
  246. consumerCount,
  247. peekLimit: limit,
  248. tasks,
  249. managementError,
  250. redisScheduler:
  251. queueName === TASK_QUEUE
  252. ? {
  253. key: SCHEDULE_KEY,
  254. pendingCount: pendingScheduledCount
  255. }
  256. : undefined,
  257. autoRunScheduledMirror,
  258. fetchedAt: Date.now()
  259. }
  260. if (queueName === TASK_QUEUE) {
  261. detail.peekNote =
  262. 'tasks:已在主队列中的消息;autoRunScheduledMirror:尚未到 fireAt、仍只在 Redis 中的调度(到期后由服务写入 MQ)。'
  263. }
  264. return res.json({
  265. ...BaseStdResponse.OK,
  266. data: detail
  267. })
  268. } catch (error) {
  269. this.logger.error(`GetQueueTasks: ${error.stack || error}`)
  270. return res.json({
  271. ...BaseStdResponse.ERR,
  272. msg: error.message || '查询失败'
  273. })
  274. }
  275. }
  276. }
  277. module.exports.GetQueueTasks = GetQueueTasks