import json import asyncio import re from typing import Optional from contextlib import AsyncExitStack from http import HTTPStatus from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from dashscope import Application from base_config import mcp_key, mcp_app_id # from dotenv import load_dotenv # load_dotenv() class MCPClient: def __init__(self,session_id): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.session_id = session_id self.api_key = mcp_key self.app_id = mcp_app_id self.model = "qwen-max" self.SCRIPT_PATH = "mcp_/server.py" self.SYSTEM_PROMPT = """ 您是一个可执行git操作的AI助手,可用工具: {tools_description} 响应规则: 1. 需要工具时必须返回严格JSON格式: {{ "tool": "工具名", "arguments": {{参数键值对}} }} 2. 不需要工具时直接回复自然语言 3. 仓库都在/www/gitnexus_repos/{uuid} 4. 工具列表: {available_tools}""" async def connect_to_server(self): server_params = StdioServerParameters( command="python", args=[self.SCRIPT_PATH] ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools]) async def _call_bailian_api(self, prompt: str) -> str: try: tool_response = await self.session.list_tools() tools_desc = "\n".join([f"- {t.name}: {t.description}" for t in tool_response.tools]) full_prompt = self.SYSTEM_PROMPT.format( tools_description=tools_desc, available_tools=[t.name for t in tool_response.tools], uuid=self.session_id ) response = await asyncio.to_thread( Application.call, api_key=self.api_key, app_id=self.app_id, prompt=full_prompt + "\n用户提问:" + prompt, session_id=self.session_id, stream=False, incremental_output=False, enable_mcp=True, tool_choice="auto" ) if response.status_code == HTTPStatus.OK: return response.output.text return f"API Error: {response.message}" except Exception as e: print(e) return f"调用异常: {str(e)}" async def process_query(self, query: str) -> str: bailian_response = await self._call_bailian_api(query) # print("respone:"+bailian_response) json_str = re.findall( r'\{.*\}', bailian_response, re.S) # print(json_str) if json_str: tool_call=json.loads(json_str[0]) if "tool" in tool_call: result = await self.session.call_tool( tool_call["tool"], tool_call["arguments"] ) final_answer = await self._call_bailian_api("tool_response:"+result.content[0].text) return final_answer else: return bailian_response async def chat_loop(self): print("\nMCP Client Started!") try: query = input("\nQuery: ").strip() response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}") async def cleanup(self): await self.exit_stack.aclose()