client.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import json
  2. import asyncio
  3. import re
  4. from typing import Optional
  5. from contextlib import AsyncExitStack
  6. from http import HTTPStatus
  7. from mcp import ClientSession, StdioServerParameters
  8. from mcp.client.stdio import stdio_client
  9. from dashscope import Application
  10. from base_config import mcp_key, mcp_app_id
  11. # from dotenv import load_dotenv
  12. # load_dotenv()
  13. class MCPClient:
  14. def __init__(self,session_id):
  15. self.session: Optional[ClientSession] = None
  16. self.exit_stack = AsyncExitStack()
  17. self.session_id = session_id
  18. self.api_key = mcp_key
  19. self.app_id = mcp_app_id
  20. self.model = "qwen-max"
  21. self.SCRIPT_PATH = "mcp_/server.py"
  22. self.SYSTEM_PROMPT = """
  23. 您是 GitNexus 网站中的智能助手“小吉”,具备调用指定工具执行 Git 操作的能力。以下是您的行为规范:
  24. 可用工具简介:
  25. {tools_description}
  26. 响应规则:
  27. 响应规则如下:
  28. 1. 当任务需要调用工具完成时,必须返回**严格符合 JSON 格式**的响应:
  29. {{
  30. "tool": "工具名称",
  31. "arguments": {{参数键值对}}
  32. }}
  33. 2. 如果任务不需要工具支持,请直接用自然语言回答,无需 JSON。
  34. 3. 所有 Git 仓库统一位于路径 `/www/gitnexus_repos/{uuid}/仓库名称`,但请注意:
  35. **在回复中请勿直接透露仓库的物理路径。**
  36. 4. 工具列表:
  37. {available_tools}"""
  38. async def connect_to_server(self):
  39. server_params = StdioServerParameters(
  40. command="python",
  41. args=[self.SCRIPT_PATH]
  42. )
  43. stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
  44. self.stdio, self.write = stdio_transport
  45. self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
  46. await self.session.initialize()
  47. response = await self.session.list_tools()
  48. tools = response.tools
  49. print("\nConnected to server with tools:", [tool.name for tool in tools])
  50. async def _call_bailian_api(self, prompt: str) -> str:
  51. try:
  52. tool_response = await self.session.list_tools()
  53. tools_desc = "\n".join([f"- {t.name}: {t.description}" for t in tool_response.tools])
  54. full_prompt = self.SYSTEM_PROMPT.format(
  55. tools_description=tools_desc,
  56. available_tools=[t.name for t in tool_response.tools],
  57. uuid=self.session_id
  58. )
  59. response = await asyncio.to_thread(
  60. Application.call,
  61. api_key=self.api_key,
  62. app_id=self.app_id,
  63. prompt=full_prompt + "\n用户提问:" + prompt,
  64. session_id=self.session_id,
  65. stream=False,
  66. incremental_output=False,
  67. enable_mcp=True,
  68. tool_choice="auto"
  69. )
  70. if response.status_code == HTTPStatus.OK:
  71. return response.output.text
  72. return f"API Error: {response.message}"
  73. except Exception as e:
  74. print(e)
  75. return f"调用异常: {str(e)}"
  76. async def process_query(self, query: str) -> str:
  77. bailian_response = await self._call_bailian_api(query)
  78. # print("respone:"+bailian_response)
  79. json_str = re.findall( r'\{.*\}', bailian_response, re.S)
  80. # print(json_str)
  81. if json_str:
  82. tool_call=json.loads(json_str[0])
  83. if "tool" in tool_call:
  84. result = await self.session.call_tool(
  85. tool_call["tool"],
  86. tool_call["arguments"]
  87. )
  88. final_answer = await self._call_bailian_api("tool_response:"+result.content[0].text)
  89. return final_answer
  90. else:
  91. return bailian_response
  92. async def chat_loop(self):
  93. print("\nMCP Client Started!")
  94. try:
  95. query = input("\nQuery: ").strip()
  96. response = await self.process_query(query)
  97. print("\n" + response)
  98. except Exception as e:
  99. print(f"\nError: {str(e)}")
  100. async def cleanup(self):
  101. await self.exit_stack.aclose()