client.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import json
  2. import asyncio
  3. import re
  4. from typing import Optional
  5. from contextlib import AsyncExitStack
  6. from http import HTTPStatus
  7. from mcp import ClientSession, StdioServerParameters
  8. from mcp.client.stdio import stdio_client
  9. from dashscope import Application
  10. from base_config import mcp_key, mcp_app_id
  11. # from dotenv import load_dotenv
  12. # load_dotenv()
  13. class MCPClient:
  14. def __init__(self,session_id):
  15. self.session: Optional[ClientSession] = None
  16. self.exit_stack = AsyncExitStack()
  17. self.session_id = session_id
  18. self.api_key = mcp_key
  19. self.app_id = mcp_app_id
  20. self.model = "qwen-max"
  21. self.SCRIPT_PATH = "mcp_/server.py"
  22. self.SYSTEM_PROMPT = """
  23. 您是一个可执行git操作的AI助手,可用工具:
  24. {tools_description}
  25. 响应规则:
  26. 1. 需要工具时必须返回严格JSON格式:
  27. {{
  28. "tool": "工具名",
  29. "arguments": {{参数键值对}}
  30. }}
  31. 2. 不需要工具时直接回复自然语言
  32. 3. 仓库都在/www/gitnexus_repos/{uuid}
  33. 4. 工具列表:
  34. {available_tools}"""
  35. async def connect_to_server(self):
  36. server_params = StdioServerParameters(
  37. command="python",
  38. args=[self.SCRIPT_PATH]
  39. )
  40. stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
  41. self.stdio, self.write = stdio_transport
  42. self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
  43. await self.session.initialize()
  44. response = await self.session.list_tools()
  45. tools = response.tools
  46. print("\nConnected to server with tools:", [tool.name for tool in tools])
  47. async def _call_bailian_api(self, prompt: str) -> str:
  48. try:
  49. tool_response = await self.session.list_tools()
  50. tools_desc = "\n".join([f"- {t.name}: {t.description}" for t in tool_response.tools])
  51. full_prompt = self.SYSTEM_PROMPT.format(
  52. tools_description=tools_desc,
  53. available_tools=[t.name for t in tool_response.tools],
  54. uuid=self.session_id
  55. )
  56. response = await asyncio.to_thread(
  57. Application.call,
  58. api_key=self.api_key,
  59. app_id=self.app_id,
  60. prompt=full_prompt + "\n用户提问:" + prompt,
  61. session_id=self.session_id,
  62. stream=False,
  63. incremental_output=False,
  64. enable_mcp=True,
  65. tool_choice="auto"
  66. )
  67. if response.status_code == HTTPStatus.OK:
  68. return response.output.text
  69. return f"API Error: {response.message}"
  70. except Exception as e:
  71. print(e)
  72. return f"调用异常: {str(e)}"
  73. async def process_query(self, query: str) -> str:
  74. bailian_response = await self._call_bailian_api(query)
  75. # print("respone:"+bailian_response)
  76. json_str = re.findall( r'\{.*\}', bailian_response, re.S)
  77. # print(json_str)
  78. if json_str:
  79. tool_call=json.loads(json_str[0])
  80. if "tool" in tool_call:
  81. result = await self.session.call_tool(
  82. tool_call["tool"],
  83. tool_call["arguments"]
  84. )
  85. final_answer = await self._call_bailian_api("tool_response:"+result.content[0].text)
  86. return final_answer
  87. else:
  88. return bailian_response
  89. async def chat_loop(self):
  90. print("\nMCP Client Started!")
  91. try:
  92. query = input("\nQuery: ").strip()
  93. response = await self.process_query(query)
  94. print("\n" + response)
  95. except Exception as e:
  96. print(f"\nError: {str(e)}")
  97. async def cleanup(self):
  98. await self.exit_stack.aclose()