client.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import json
  2. import asyncio
  3. from typing import Optional
  4. from contextlib import AsyncExitStack
  5. from http import HTTPStatus
  6. from mcp import ClientSession, StdioServerParameters
  7. from mcp.client.stdio import stdio_client
  8. from dashscope import Application, Generation
  9. # from dotenv import load_dotenv
  10. # load_dotenv()
  11. class MCPClient:
  12. def __init__(self,session_id):
  13. self.session: Optional[ClientSession] = None
  14. self.exit_stack = AsyncExitStack()
  15. self.session_id = session_id
  16. self.api_key = "sk-0164613e1a2143fc808fc4cc2451bef0"
  17. self.app_id = "b424f3fa1d4544d68579671d70596f29"
  18. self.model = "qwen-max"
  19. self.SCRIPT_PATH = "mcp_/server.py"
  20. self.SYSTEM_PROMPT = """
  21. 您是一个可执行git操作的AI助手,可用工具:
  22. {tools_description}
  23. 响应规则:
  24. 1. 需要工具时必须返回严格JSON格式:
  25. {{
  26. "tool": "工具名",
  27. "arguments": {{参数键值对}}
  28. }}
  29. 2. 不需要工具时直接回复自然语言
  30. 3. 仓库都在/www/gitnexus_repos/{uuid}
  31. 4. 工具列表:
  32. {available_tools}"""
  33. async def connect_to_server(self):
  34. server_params = StdioServerParameters(
  35. command="python",
  36. args=[self.SCRIPT_PATH]
  37. )
  38. stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
  39. self.stdio, self.write = stdio_transport
  40. self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
  41. await self.session.initialize()
  42. response = await self.session.list_tools()
  43. tools = response.tools
  44. print("\nConnected to server with tools:", [tool.name for tool in tools])
  45. async def _call_bailian_api(self, prompt: str) -> str:
  46. try:
  47. tool_response = await self.session.list_tools()
  48. tools_desc = "\n".join([f"- {t.name}: {t.description}" for t in tool_response.tools])
  49. full_prompt = self.SYSTEM_PROMPT.format(
  50. tools_description=tools_desc,
  51. available_tools=[t.name for t in tool_response.tools],
  52. uuid=self.session_id
  53. )
  54. response = await asyncio.to_thread(
  55. Application.call,
  56. api_key=self.api_key,
  57. app_id=self.app_id,
  58. prompt=full_prompt + "\n用户提问:" + prompt,
  59. session_id=self.session_id,
  60. stream=False,
  61. incremental_output=False,
  62. enable_mcp=True,
  63. tool_choice="auto"
  64. )
  65. if response.status_code == HTTPStatus.OK:
  66. return response.output.text
  67. return f"API Error: {response.message}"
  68. except Exception as e:
  69. print(e)
  70. return f"调用异常: {str(e)}"
  71. async def process_query(self, query: str) -> str:
  72. bailian_response = await self._call_bailian_api(query)
  73. try:
  74. tool_call = json.loads(bailian_response)
  75. print(tool_call)
  76. if "tool" in tool_call:
  77. result = await self.session.call_tool(
  78. tool_call["tool"],
  79. tool_call["arguments"]
  80. )
  81. bailian_response = await self._call_bailian_api("tool_response:"+result.content[0].text)
  82. return bailian_response
  83. except json.JSONDecodeError:
  84. return bailian_response
  85. async def chat_loop(self):
  86. print("\nMCP Client Started!")
  87. try:
  88. query = input("\nQuery: ").strip()
  89. response = await self.process_query(query)
  90. print("\n" + response)
  91. except Exception as e:
  92. print(f"\nError: {str(e)}")
  93. async def cleanup(self):
  94. await self.exit_stack.aclose()
  95. async def main():
  96. client = MCPClient()
  97. try:
  98. await client.connect_to_server()
  99. print("Connected to server!")
  100. await client.chat_loop()
  101. finally:
  102. await client.cleanup()
  103. if __name__ == "__main__":
  104. asyncio.run(main())