123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- import json
- import asyncio
- from typing import Optional
- from contextlib import AsyncExitStack
- from http import HTTPStatus
- from mcp import ClientSession, StdioServerParameters
- from mcp.client.stdio import stdio_client
- from dashscope import Application, Generation
- # from dotenv import load_dotenv
- # load_dotenv()
- class MCPClient:
- def __init__(self,session_id):
- self.session: Optional[ClientSession] = None
- self.exit_stack = AsyncExitStack()
- self.session_id = session_id
- self.api_key = "sk-0164613e1a2143fc808fc4cc2451bef0"
- self.app_id = "b424f3fa1d4544d68579671d70596f29"
- self.model = "qwen-max"
- self.SCRIPT_PATH = "mcp_/server.py"
- self.SYSTEM_PROMPT = """
- 您是一个可执行git操作的AI助手,可用工具:
- {tools_description}
- 响应规则:
- 1. 需要工具时必须返回严格JSON格式:
- {{
- "tool": "工具名",
- "arguments": {{参数键值对}}
- }}
- 2. 不需要工具时直接回复自然语言
- 3. 仓库都在/www/gitnexus_repos/{uuid}
- 4. 工具列表:
- {available_tools}"""
- async def connect_to_server(self):
- server_params = StdioServerParameters(
- command="python",
- args=[self.SCRIPT_PATH]
- )
- stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
- self.stdio, self.write = stdio_transport
- self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
- await self.session.initialize()
- response = await self.session.list_tools()
- tools = response.tools
- print("\nConnected to server with tools:", [tool.name for tool in tools])
- async def _call_bailian_api(self, prompt: str) -> str:
- try:
- tool_response = await self.session.list_tools()
- tools_desc = "\n".join([f"- {t.name}: {t.description}" for t in tool_response.tools])
- full_prompt = self.SYSTEM_PROMPT.format(
- tools_description=tools_desc,
- available_tools=[t.name for t in tool_response.tools],
- uuid=self.session_id
- )
- response = await asyncio.to_thread(
- Application.call,
- api_key=self.api_key,
- app_id=self.app_id,
- prompt=full_prompt + "\n用户提问:" + prompt,
- session_id=self.session_id,
- stream=False,
- incremental_output=False,
- enable_mcp=True,
- tool_choice="auto"
- )
- if response.status_code == HTTPStatus.OK:
- return response.output.text
- return f"API Error: {response.message}"
- except Exception as e:
- print(e)
- return f"调用异常: {str(e)}"
- async def process_query(self, query: str) -> str:
- bailian_response = await self._call_bailian_api(query)
- try:
- tool_call = json.loads(bailian_response)
- print(tool_call)
- if "tool" in tool_call:
- result = await self.session.call_tool(
- tool_call["tool"],
- tool_call["arguments"]
- )
- bailian_response = await self._call_bailian_api("tool_response:"+result.content[0].text)
- return bailian_response
- except json.JSONDecodeError:
- return bailian_response
- async def chat_loop(self):
- print("\nMCP Client Started!")
- try:
- query = input("\nQuery: ").strip()
- response = await self.process_query(query)
- print("\n" + response)
- except Exception as e:
- print(f"\nError: {str(e)}")
- async def cleanup(self):
- await self.exit_stack.aclose()
- async def main():
- client = MCPClient()
- try:
- await client.connect_to_server()
- print("Connected to server!")
- await client.chat_loop()
- finally:
- await client.cleanup()
- if __name__ == "__main__":
- asyncio.run(main())
|