| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- const path = require('path')
- const WebSocket = require('ws')
- const config = require('../../config.json')
- const Logger = require('../../lib/Logger')
- const db = require('../DataBase/db.js')
- const EmailTemplate = require('../Email/emailTemplate')
- const logger = new Logger(path.join(__dirname, '../../logs/OneBotV11.log'), 'INFO')
- let wsClient = null
- let connectingPromise = null
- let reconnectTimer = null
- let wsApiClient = null
- let connectingApiPromise = null
- let apiReconnectTimer = null
- function stringifySafe(data) {
- try {
- return JSON.stringify(data)
- } catch (_) {
- return '[unserializable]'
- }
- }
- function getOneBotConfig() {
- const onebot = config.onebotv11 || {}
- return {
- enabled: onebot.enabled === true,
- transport: onebot.transport || 'reverse_ws',
- reverseWsUrl: onebot.reverseWsUrl || '',
- reverseWsHost: onebot.reverseWsHost || '127.0.0.1',
- reverseWsPort: Number(onebot.reverseWsPort || 15700),
- /** AstrBot / aiocqhttp 反向 WS 常见路径为 /ws;根路径 / 常返回 405 */
- reverseWsPath: onebot.reverseWsPath || '',
- reverseWsToken: onebot.reverseWsToken || '',
- callbackToken: onebot.callbackToken || '',
- ticketSenderNickname: onebot.ticketSenderNickname || '工单系统',
- selfId: onebot.selfId || '',
- botName: onebot.botName || '小妍助理',
- botUuid: onebot.botUuid || 'onebot-v11-xiaoyan-assistant'
- }
- }
- function getReverseWsUrl(onebot) {
- const defaultPath = '/ws'
- const explicitPath = onebot.reverseWsPath
- ? (onebot.reverseWsPath.startsWith('/') ? onebot.reverseWsPath : `/${onebot.reverseWsPath}`)
- : null
- if (onebot.reverseWsUrl) {
- try {
- const u = new URL(onebot.reverseWsUrl)
- if (explicitPath) {
- u.pathname = explicitPath
- return u.href
- }
- if (u.pathname && u.pathname !== '/') return u.href
- u.pathname = defaultPath
- return u.href
- } catch (_) {
- return onebot.reverseWsUrl
- }
- }
- const pathPart = explicitPath || defaultPath
- return `ws://${onebot.reverseWsHost}:${onebot.reverseWsPort}${pathPart}`
- }
- function scheduleReconnect() {
- if (reconnectTimer) return
- logger.info('OneBot v11 ws 3秒后尝试重连')
- reconnectTimer = setTimeout(() => {
- reconnectTimer = null
- connectReverseWs().catch(err => {
- logger.error(`OneBot v11 重连失败:${err.message}`)
- })
- }, 3000)
- }
- function scheduleApiReconnect() {
- if (apiReconnectTimer) return
- logger.info('OneBot v11 API ws 3秒后尝试重连')
- apiReconnectTimer = setTimeout(() => {
- apiReconnectTimer = null
- connectApiReverseWs().catch(err => {
- logger.error(`OneBot v11 API ws 重连失败:${err.message}`)
- })
- }, 3000)
- }
- async function connectReverseWs() {
- if (wsClient && wsClient.readyState === WebSocket.OPEN) return wsClient
- if (connectingPromise) return connectingPromise
- const onebot = getOneBotConfig()
- if (!onebot.enabled) throw new Error('OneBot v11 未启用')
- if (onebot.transport !== 'reverse_ws') throw new Error('仅支持 reverse_ws 传输模式')
- const wsUrl = getReverseWsUrl(onebot)
- logger.info(`OneBot v11 准备连接 reverse ws:${wsUrl}`)
- if (!onebot.selfId) {
- throw new Error('OneBot v11 未配置 selfId:握手头 X-Self-ID 必填')
- }
- const selfIdStr = String(onebot.selfId).trim()
- if (!selfIdStr) {
- throw new Error('OneBot v11 selfId 不能为空')
- }
- const headers = {
- 'User-Agent': 'runforge/1.0 (OneBot-v11-reverse-ws-client)',
- 'X-Client-Role': 'Universal',
- 'X-Self-ID': selfIdStr
- }
- if (onebot.reverseWsToken) headers['Authorization'] = `Bearer ${onebot.reverseWsToken}`
- connectingPromise = new Promise((resolve, reject) => {
- const client = new WebSocket(wsUrl, { headers })
- let opened = false
- const connTimeout = setTimeout(() => {
- if (!opened) {
- try { client.terminate() } catch (_) { }
- reject(new Error(`连接 OneBot v11 超时:${wsUrl}`))
- }
- }, 5000)
- client.on('unexpected-response', (_req, res) => {
- const statusCode = res && res.statusCode
- const statusMessage = res && res.statusMessage
- const rh = res && typeof res.getHeaders === 'function' ? res.getHeaders() : {}
- const rawPreview = res && Array.isArray(res.rawHeaders) ? res.rawHeaders.slice(0, 24) : []
- logger.error(`OneBot v11 ws unexpected-response: ${statusCode} ${statusMessage}`)
- logger.error(`OneBot v11 ws response headers: ${stringifySafe({ rh, rawPreview })}`)
- })
- client.on('open', () => {
- clearTimeout(connTimeout)
- opened = true
- wsClient = client
- logger.info(`OneBot v11 reverse ws 已连接:${wsUrl},selfId=${onebot.selfId || '未配置'}`)
- const lifecycleConnect = buildLifecycleConnectEvent(onebot)
- client.send(JSON.stringify(lifecycleConnect), (err) => {
- if (err) {
- logger.error(`OneBot 生命周期事件发送失败:${err.message}`)
- return
- }
- logger.info('OneBot 生命周期事件已发送:meta_event.lifecycle.connect')
- })
- resolve(client)
- })
- client.on('error', (err) => {
- clearTimeout(connTimeout)
- if (!opened) reject(err)
- logger.error(`OneBot v11 ws 异常:${err.message}`)
- })
- client.on('close', () => {
- wsClient = null
- connectingPromise = null
- logger.error('OneBot v11 ws 连接已关闭')
- if (onebot.enabled) scheduleReconnect()
- })
- client.on('message', (raw) => {
- const text = typeof raw === 'string' ? raw : raw.toString('utf8')
- logger.info(`OneBot v11 收到消息:${text.slice(0, 1000)}`)
- handleWsIncomingFrame(text, client).catch((err) => {
- logger.error(`OneBot v11 处理收到消息失败:${err.message}`)
- })
- })
- })
- try {
- return await connectingPromise
- } finally {
- connectingPromise = null
- }
- }
- async function connectApiReverseWs() {
- if (wsApiClient && wsApiClient.readyState === WebSocket.OPEN) return wsApiClient
- if (connectingApiPromise) return connectingApiPromise
- const onebot = getOneBotConfig()
- if (!onebot.enabled) throw new Error('OneBot v11 未启用')
- if (onebot.transport !== 'reverse_ws') throw new Error('仅支持 reverse_ws 传输模式')
- const wsUrl = getReverseWsUrl(onebot)
- if (!onebot.selfId) throw new Error('API ws 缺少 selfId')
- const selfIdStr = String(onebot.selfId).trim()
- if (!selfIdStr) throw new Error('API ws selfId 不能为空')
- const headers = {
- 'User-Agent': 'runforge/1.0 (OneBot-v11-reverse-ws-api-client)',
- 'X-Client-Role': 'Api',
- 'X-Self-ID': selfIdStr
- }
- if (onebot.reverseWsToken) headers['Authorization'] = `Bearer ${onebot.reverseWsToken}`
- connectingApiPromise = new Promise((resolve, reject) => {
- const client = new WebSocket(wsUrl, { headers })
- let opened = false
- const connTimeout = setTimeout(() => {
- if (!opened) {
- try { client.terminate() } catch (_) { }
- reject(new Error(`连接 OneBot v11 API ws 超时:${wsUrl}`))
- }
- }, 5000)
- client.on('open', () => {
- clearTimeout(connTimeout)
- opened = true
- wsApiClient = client
- logger.info(`OneBot v11 API ws 已连接:${wsUrl}`)
- resolve(client)
- })
- client.on('error', (err) => {
- clearTimeout(connTimeout)
- if (!opened) reject(err)
- logger.error(`OneBot v11 API ws 异常:${err.message}`)
- })
- client.on('close', () => {
- wsApiClient = null
- connectingApiPromise = null
- logger.error('OneBot v11 API ws 连接已关闭')
- if (onebot.enabled) scheduleApiReconnect()
- })
- client.on('message', (raw) => {
- const text = typeof raw === 'string' ? raw : raw.toString('utf8')
- logger.info(`OneBot v11 API ws 收到消息:${text.slice(0, 1000)}`)
- handleWsIncomingFrame(text, client).catch((err) => {
- logger.error(`OneBot v11 API ws 消息处理失败:${err.message}`)
- })
- })
- })
- try {
- return await connectingApiPromise
- } finally {
- connectingApiPromise = null
- }
- }
- function buildSyntheticPrivateMessageEvent(text, onebot, orderId) {
- const now = Math.floor(Date.now() / 1000)
- const selfId = String(onebot.selfId).trim()
- const senderIdStr = String(orderId || '').trim()
- if (!senderIdStr || !/^\d+$/.test(senderIdStr) || senderIdStr === selfId) {
- throw new Error('orderId 无效,无法作为 OneBot user_id')
- }
- // AstrBot aiocqhttp 消息链路要求 message 为消息段数组(字符串会走失败分支)
- const messageArr = [{ type: 'text', data: { text } }]
- const messageId = Math.floor(Math.random() * 2000000000) + 1
- return {
- time: now,
- self_id: selfId,
- post_type: 'message',
- message_type: 'private',
- sub_type: 'friend',
- message_id: messageId,
- user_id: senderIdStr,
- message: messageArr,
- raw_message: text,
- font: 0,
- sender: {
- user_id: senderIdStr,
- nickname: onebot.ticketSenderNickname || '工单系统',
- sex: 'unknown',
- age: 0
- }
- }
- }
- function buildLifecycleConnectEvent(onebot) {
- return {
- time: Math.floor(Date.now() / 1000),
- self_id: String(onebot.selfId).trim(),
- post_type: 'meta_event',
- meta_event_type: 'lifecycle',
- sub_type: 'connect'
- }
- }
- async function sendImplementationEvent(eventObj) {
- const client = await connectReverseWs()
- const line = stringifySafe(eventObj)
- logger.info(`OneBot v11 上报事件 post_type=${eventObj.post_type} message_type=${eventObj.message_type} len=${line.length}`)
- return new Promise((resolve, reject) => {
- client.send(line, (err) => {
- if (err) {
- logger.error(`OneBot 事件发送失败:${err.message}`)
- return reject(err)
- }
- logger.info('OneBot v11 事件已写入 WebSocket')
- resolve(true)
- })
- })
- }
- function decodeMessageText(message) {
- if (!message) return ''
- if (typeof message === 'string') return message
- if (Array.isArray(message)) {
- return message.map(seg => {
- if (typeof seg === 'string') return seg
- if (seg && seg.type === 'text' && seg.data && typeof seg.data.text === 'string') return seg.data.text
- return ''
- }).join('')
- }
- return String(message)
- }
- async function persistAiReplyAsServerReply(orderId, plainText) {
- const onebot = getOneBotConfig()
- const parsedOrderId = Number(orderId)
- if (!Number.isInteger(parsedOrderId) || parsedOrderId <= 0) return
- const rows = await db.query('SELECT msg, state, email FROM work_order WHERE id = ?', [parsedOrderId])
- if (!rows || rows.length !== 1 || rows[0].state === 2) return
- const now = Date.now()
- const msg = rows[0].msg || []
- msg.push({
- time: now,
- content: plainText,
- files: [],
- uuid: 'e4fe0277-0b1a-41a1-b25f-8b6e4cec3281',
- type: 'ai'
- })
- await db.query('UPDATE work_order SET msg = ?, update_time = ?, state = 3 WHERE id = ?', [msg, now, parsedOrderId])
- if (rows[0].email) {
- await EmailTemplate.orderNewReply(rows[0].email, { id: parsedOrderId, content: plainText, files: [] })
- }
- }
- async function handleWsIncomingFrame(text, client) {
- let j
- try {
- j = JSON.parse(text)
- } catch (_) {
- return
- }
- if (j == null || typeof j !== 'object') return
- if (j.action === 'send_private_msg' && j.params) {
- const plainText = decodeMessageText(j.params.message)
- logger.info(`OneBot v11 收到中文消息(解码):${plainText}`)
- await persistAiReplyAsServerReply(j.params.user_id, plainText)
- }
- /**
- * 反向 WS 上「应用端 → 实现端」的 API 请求(aiocqhttp 使用 echo: { seq })。
- * 若不回包,AstrBot 侧 ResultStore 会超时,事件处理卡住,表现为服务端「收不到」工单事件。
- */
- if (j.action !== undefined && j.echo !== undefined && typeof j.echo === 'object' && j.echo !== null && j.echo.seq !== undefined) {
- const resp = {
- status: 'ok',
- retcode: 0,
- data: null,
- echo: j.echo
- }
- try {
- client.send(JSON.stringify(resp))
- logger.info(`OneBot v11 已应答应用端 API:action=${j.action} seq=${JSON.stringify(j.echo.seq)}`)
- } catch (err) {
- logger.error(`OneBot v11 应答 API 失败:${err.message}`)
- }
- return
- }
- if ('post_type' in j) return
- if (j.echo !== undefined && j.status !== undefined) {
- if (j.retcode !== 0 && j.retcode !== undefined) {
- const wording = j.wording || j.message || stringifySafe(j.data)
- logger.error(`OneBot API 返回失败 echo=${j.echo} retcode=${j.retcode} ${wording}`)
- } else {
- logger.info(`OneBot API 成功 echo=${j.echo}`)
- }
- }
- }
- function buildOrderText(payload) {
- const {
- orderId,
- title,
- role,
- content,
- files = [],
- senderUuid
- } = payload
- const fileText = files.length > 0 ? `\n附件:\n${files.join('\n')}` : ''
- return [
- `[工单#${orderId}] ${title ? title : '工单消息更新'}`,
- `发送方:${role === '用户' ? '用户' : '系统'}`,
- `用户uuid:${senderUuid || '-'}`,
- '消息内容:',
- content
- ].join('\n') + fileText
- }
- async function sendOrderMessage(payload) {
- const onebot = getOneBotConfig()
- if (!onebot.enabled) return false
- if (onebot.transport !== 'reverse_ws') {
- logger.error('OneBot v11 传输模式错误,仅支持 reverse_ws')
- return false
- }
- const text = buildOrderText(payload)
- const eventObj = buildSyntheticPrivateMessageEvent(text, onebot, payload.orderId)
- await sendImplementationEvent(eventObj)
- logger.info(`OneBot 工单事件已上报:orderId=${payload.orderId}, userId=${eventObj.user_id}, messageId=${eventObj.message_id}`)
- return true
- }
- async function initOneBotWs() {
- const onebot = getOneBotConfig()
- if (!onebot.enabled) {
- logger.info('OneBot v11 未启用,跳过 ws 初始化')
- return false
- }
- if (onebot.transport !== 'reverse_ws') {
- logger.error('OneBot v11 初始化失败:传输模式不是 reverse_ws')
- return false
- }
- await connectReverseWs()
- await connectApiReverseWs()
- return true
- }
- module.exports = {
- sendOrderMessage,
- getOneBotConfig,
- initOneBotWs
- }
|