123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import json
- import asyncio
- import re
- from typing import Optional
- from contextlib import AsyncExitStack
- from http import HTTPStatus
- from mcp import ClientSession, StdioServerParameters
- from mcp.client.stdio import stdio_client
- from dashscope import Application
- from base_config import mcp_key, mcp_app_id, path as REPO_BASE_PATH
- # from dotenv import load_dotenv
- # load_dotenv()
- class MCPClient:
- def __init__(self,session_id):
- self.session: Optional[ClientSession] = None
- self.exit_stack = AsyncExitStack()
- self.session_id = session_id
- self.api_key = mcp_key
- self.app_id = mcp_app_id
- self.model = "qwen-max"
- self.SCRIPT_PATH = "mcp_/server.py"
- self.SYSTEM_PROMPT = """
- 您是 GitNexus 网站中的智能助手“小吉”,具备调用指定工具执行 Git 操作的能力。以下是您的行为规范:
- 可用工具简介:
- {tools_description}
- 响应规则:
- 响应规则如下:
- 1. 当任务需要调用工具完成时,必须返回**严格符合 JSON 格式**的响应:
- {{
- "tool": "工具名称",
- "arguments": {{参数键值对}}
- }}
- 2. 如果任务不需要工具支持,请直接用自然语言回答,无需 JSON。
- 3. 所有 Git 仓库统一位于路径 `/www/gitnexus_repos/{uuid}/仓库名称`,但请注意:
- **在回复中请勿直接透露仓库的物理路径。**
- 4. 工具列表:
- {available_tools}"""
- async def connect_to_server(self):
- server_params = StdioServerParameters(
- command="python",
- args=[self.SCRIPT_PATH]
- )
- stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
- self.stdio, self.write = stdio_transport
- self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
- await self.session.initialize()
- response = await self.session.list_tools()
- tools = response.tools
- print("\nConnected to server with tools:", [tool.name for tool in tools])
- async def _call_bailian_api(self, prompt: str) -> str:
- try:
- tool_response = await self.session.list_tools()
- tools_desc = "\n".join([f"- {t.name}: {t.description}" for t in tool_response.tools])
- full_prompt = self.SYSTEM_PROMPT.format(
- tools_description=tools_desc,
- available_tools=[t.name for t in tool_response.tools],
- uuid=self.session_id
- )
- response = await asyncio.to_thread(
- Application.call,
- api_key=self.api_key,
- app_id=self.app_id,
- prompt=full_prompt + "\n用户提问:" + prompt,
- session_id=self.session_id,
- stream=False,
- incremental_output=False,
- enable_mcp=True,
- tool_choice="auto"
- )
- if response.status_code == HTTPStatus.OK:
- return response.output.text
- return f"API Error: {response.message}"
- except Exception as e:
- print(e)
- return f"调用异常: {str(e)}"
- async def process_query(self, query: str) -> str:
- bailian_response = await self._call_bailian_api(query)
- # print("respone:"+bailian_response)
- json_str = re.findall( r'\{.*\}', bailian_response, re.S)
- # print(json_str)
- if json_str:
- tool_call=json.loads(json_str[0])
- if "tool" in tool_call:
- result = await self.session.call_tool(
- tool_call["tool"],
- tool_call["arguments"]
- )
- final_answer = await self._call_bailian_api("tool_response:"+result.content[0].text)
- return final_answer
- else:
- return bailian_response
- async def chat_loop(self):
- print("\nMCP Client Started!")
- try:
- query = input("\nQuery: ").strip()
- response = await self.process_query(query)
- print("\n" + response)
- except Exception as e:
- print(f"\nError: {str(e)}")
- async def cleanup(self):
- await self.exit_stack.aclose()
|