server.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import logging
  2. from pathlib import Path
  3. from typing import Optional
  4. from enum import Enum
  5. from base_config import path as REPO_BASE_PATH
  6. import git
  7. from mcp.types import TextContent
  8. from mcp.server.fastmcp import FastMCP
  9. # --- Pydantic Models are REMOVED ---
  10. # --- GitTools Enum (remains the same) ---
  11. class GitTools(str, Enum):
  12. STATUS = "git_status"
  13. DIFF_UNSTAGED = "git_diff_unstaged"
  14. DIFF_STAGED = "git_diff_staged"
  15. DIFF = "git_diff"
  16. COMMIT = "git_commit"
  17. ADD = "git_add"
  18. RESET = "git_reset"
  19. LOG = "git_log"
  20. CREATE_BRANCH = "git_create_branch"
  21. CHECKOUT = "git_checkout"
  22. SHOW = "git_show"
  23. INIT = "git_init"
  24. # --- Low-level Git Functions (remain the same, using gitpython.Repo) ---
  25. # Note: Type hint updated to gitpython.Repo
  26. def _get_repo(repo_path: str | Path) -> git.Repo:
  27. """Helper to get Repo object and handle errors."""
  28. try:
  29. return git.Repo(str(repo_path))
  30. except git.InvalidGitRepositoryError:
  31. raise ValueError(f"'{repo_path}' is not a valid Git repository.")
  32. except git.NoSuchPathError:
  33. raise ValueError(f"Repository path '{repo_path}' does not exist.")
  34. except Exception as e:
  35. raise ValueError(f"Error accessing repository at '{repo_path}': {e}")
  36. def git_status(repo: git.Repo) -> str:
  37. return repo.git.status()
  38. def git_diff_unstaged(repo: git.Repo) -> str:
  39. return repo.git.diff()
  40. def git_diff_staged(repo: git.Repo) -> str:
  41. return repo.git.diff("--cached")
  42. def git_diff(repo: git.Repo, target: str) -> str:
  43. return repo.git.diff(target)
  44. def git_commit(repo: git.Repo, message: str) -> str:
  45. try:
  46. # Check if there's anything staged to commit *before* attempting
  47. # This prevents errors if the index matches HEAD but there are unstaged changes
  48. if not repo.index.diff("HEAD"):
  49. return "No changes added to commit (working tree clean or changes not staged)."
  50. commit = repo.index.commit(message)
  51. return f"Changes committed successfully with hash {commit.hexsha}"
  52. except Exception as e:
  53. # Catch potential errors during commit check or commit itself
  54. return f"Error committing changes: {str(e)}"
  55. def git_add(repo: git.Repo, files: list[str]) -> str:
  56. try:
  57. repo.index.add(files)
  58. return f"Files staged successfully: {', '.join(files)}"
  59. except FileNotFoundError as e:
  60. return f"Error staging files: File not found - {e.filename}"
  61. except Exception as e:
  62. return f"Error staging files: {str(e)}"
  63. def git_reset(repo: git.Repo) -> str:
  64. try:
  65. # Resetting the index to HEAD (unstaging all)
  66. repo.index.reset()
  67. return "All staged changes reset (unstaged)"
  68. except Exception as e:
  69. return f"Error resetting staged changes: {str(e)}"
  70. def git_log(repo: git.Repo, max_count: int = 10) -> list[str]:
  71. commits = list(repo.iter_commits(max_count=max_count))
  72. log = []
  73. for commit in commits:
  74. log.append(
  75. f"Commit: {commit.hexsha}\n"
  76. f"Author: {commit.author}\n"
  77. f"Date: {commit.authored_datetime}\n"
  78. f"Message: {commit.message.strip()}\n" # Use strip() for cleaner output
  79. )
  80. return log
  81. def git_create_branch(repo: git.Repo, branch_name: str, base_branch: Optional[str] = None) -> str:
  82. try:
  83. if base_branch:
  84. try:
  85. base = repo.refs[base_branch]
  86. except IndexError:
  87. try:
  88. base = repo.commit(base_branch)
  89. except git.BadName:
  90. return f"Error: Base reference '{base_branch}' not found (neither branch nor commit)."
  91. except Exception as e:
  92. return f"Error resolving base reference '{base_branch}': {str(e)}"
  93. else:
  94. base = repo.head.commit
  95. if branch_name in repo.heads:
  96. return f"Error: Branch '{branch_name}' already exists."
  97. new_branch = repo.create_head(branch_name, base)
  98. base_ref_name = getattr(base, 'name', base.hexsha) # Get branch name if possible, else hash
  99. return f"Created branch '{new_branch.name}' based on '{base_ref_name}'"
  100. except git.GitCommandError as e:
  101. # Catch specific git errors if possible
  102. return f"Error creating branch '{branch_name}': {e.stderr or e.stdout}"
  103. except Exception as e:
  104. return f"Error creating branch '{branch_name}': {str(e)}"
  105. def git_checkout(repo: git.Repo, branch_name: str) -> str:
  106. try:
  107. repo.git.checkout(branch_name)
  108. return f"Switched to branch '{branch_name}'"
  109. except git.GitCommandError as e:
  110. # Provide more specific feedback
  111. if "did not match any file(s) known to git" in (e.stderr or ""):
  112. return f"Error: Branch or pathspec '{branch_name}' not found."
  113. elif "Please commit your changes or stash them before you switch branches" in (e.stderr or ""):
  114. return f"Error: Cannot checkout branch '{branch_name}'. You have unstaged changes. Please commit or stash them first."
  115. else:
  116. return f"Error checking out branch '{branch_name}': {e.stderr or e.stdout}"
  117. except Exception as e:
  118. return f"An unexpected error occurred during checkout: {str(e)}"
  119. def git_init(repo_path: str) -> str:
  120. try:
  121. # Check if it already exists and is a repo
  122. target_path = Path(repo_path)
  123. if target_path.exists() and target_path.joinpath(".git").is_dir():
  124. # Check if it's actually a valid repo
  125. try:
  126. existing_repo = git.Repo(repo_path)
  127. return f"Repository already exists at {existing_repo.git_dir}"
  128. except git.InvalidGitRepositoryError:
  129. # Path exists, .git dir exists, but it's invalid. Allow re-init? Or error?
  130. # Let's error for safety. User can delete .git if they want re-init.
  131. return f"Error: An invalid Git repository structure already exists at '{repo_path}'. Remove '.git' folder to reinitialize."
  132. except Exception as e:
  133. return f"Error checking existing repository at '{repo_path}': {e}"
  134. # Initialize (mkdir=True handles non-existent parent dirs)
  135. repo = git.Repo.init(path=repo_path, mkdir=True)
  136. return f"Initialized empty Git repository in {repo.git_dir}"
  137. except Exception as e:
  138. return f"Error initializing repository at '{repo_path}': {str(e)}"
  139. def git_show(repo: git.Repo, revision: str) -> str:
  140. try:
  141. commit = repo.commit(revision)
  142. except git.BadName:
  143. return f"Error: Revision '{revision}' not found."
  144. except Exception as e:
  145. return f"Error finding revision '{revision}': {str(e)}"
  146. output = [
  147. f"Commit: {commit.hexsha}\n"
  148. f"Author: {commit.author}\n"
  149. f"Date: {commit.authored_datetime}\n"
  150. f"Message:\n{commit.message.strip()}\n"
  151. ]
  152. try:
  153. # Show diff against first parent, or initial commit diff
  154. diffs = commit.diff(commit.parents[0] if commit.parents else git.NULL_TREE, create_patch=True)
  155. if diffs:
  156. output.append("\nChanges:\n")
  157. for d in diffs:
  158. # Use a safer way to decode, ignoring errors
  159. diff_text = d.diff.decode('utf-8', errors='ignore') if d.diff else ""
  160. a_path = d.a_path or (d.a_blob.path if d.a_blob else 'unknown')
  161. b_path = d.b_path or (d.b_blob.path if d.b_blob else 'unknown')
  162. output.append(f"--- a/{a_path}\n+++ b/{b_path}\n")
  163. output.append(diff_text)
  164. else:
  165. # Check if it's the initial commit
  166. if not commit.parents:
  167. output.append("\nChanges: (Initial commit)\n")
  168. # Show the initial tree content or a message indicating it's the first commit
  169. # For brevity, just stating it's initial might be enough.
  170. # Or iterate through tree: for item in commit.tree.traverse(): output.append(f"+ {item.path}\n")
  171. else:
  172. output.append("\nNo changes in this commit compared to its parent.")
  173. except Exception as e:
  174. output.append(f"\nWarning: Could not generate diff for commit {commit.hexsha}: {str(e)}")
  175. return "".join(output)
  176. # --- MCPFast Server Setup ---
  177. logger = logging.getLogger(__name__)
  178. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  179. # Initialize MCPFast
  180. mcp = FastMCP("git")
  181. # --- Tool Definitions using @mcp_.tool() ---
  182. @mcp.tool()
  183. async def list_repos_tool(uuid: str) -> list[TextContent]:
  184. """
  185. 列出指定用户在 GitNexus 平台上的所有仓库名称(即 repo 文件夹名)。
  186. Args:
  187. uuid: 用户的唯一标识,用于定位其仓库根路径。
  188. """
  189. user_path = Path(REPO_BASE_PATH) / uuid
  190. logger.info(f"Executing tool: list_repos_tool for user {uuid}")
  191. if not user_path.exists() or not user_path.is_dir():
  192. return [TextContent(type="text", text=f"用户还没有可用的Git仓库")]
  193. try:
  194. repo_names = [
  195. f.name for f in user_path.iterdir()
  196. if f.is_dir() and (f / ".git").exists()
  197. ]
  198. if not repo_names:
  199. return [TextContent(type="text", text="当前没有可用的仓库。")]
  200. result = "您当前拥有以下仓库:\n" + "\n".join(f"- {name}" for name in repo_names)
  201. return [TextContent(type="text", text=result)]
  202. except Exception as e:
  203. logger.exception("Error while listing repositories")
  204. return [TextContent(type="text", text=f"获取仓库列表时出错:{e}")]
  205. @mcp.tool()
  206. async def status_tool(repo_path: str) -> list[TextContent]:
  207. """Gets the status of a Git repository (shows staged, unstaged, and untracked files).
  208. Args:
  209. repo_path: The file system path to the Git repository.
  210. """
  211. logger.info(f"Executing tool: {GitTools.STATUS} for repo {repo_path}")
  212. try:
  213. repo = _get_repo(repo_path)
  214. status = git_status(repo)
  215. result = f"Repository status for '{repo_path}':\n{status}"
  216. except ValueError as e:
  217. result = str(e)
  218. return [TextContent(type="text", text=result)]
  219. @mcp.tool()
  220. async def diff_tool(repo_path: str, target: str) -> list[TextContent]:
  221. """Shows differences between the current HEAD and a specified target (e.g., a branch, tag, or commit hash).
  222. Args:
  223. repo_path: The file system path to the Git repository.
  224. target: The branch, tag, commit hash, or other refspec to compare HEAD against.
  225. """
  226. logger.info(f"Executing tool: {GitTools.DIFF} for repo {repo_path} against {target}")
  227. try:
  228. repo = _get_repo(repo_path)
  229. diff = git_diff(repo, target)
  230. result = f"Diff between HEAD and '{target}' in '{repo_path}':\n{diff or 'No differences found.'}"
  231. except ValueError as e:
  232. result = str(e)
  233. except git.GitCommandError as e:
  234. result = f"Error running git diff against '{target}': {e.stderr or e.stdout}"
  235. return [TextContent(type="text", text=result)]
  236. @mcp.tool()
  237. async def log_tool(repo_path: str, max_count: int = 10) -> list[TextContent]:
  238. """Shows the commit history log for the current branch.
  239. Args:
  240. repo_path: The file system path to the Git repository.
  241. max_count: The maximum number of commits to display (default: 10).
  242. """
  243. logger.info(f"Executing tool: {GitTools.LOG} for repo {repo_path}, max_count={max_count}")
  244. try:
  245. repo = _get_repo(repo_path)
  246. logs = git_log(repo, max_count)
  247. if not logs:
  248. result = f"No commit history found for '{repo_path}' (possibly an empty repository)."
  249. else:
  250. result = f"Commit history for '{repo_path}' (last {len(logs)} commits):\n\n" + "\n\n".join(logs) # Add double newline for readability
  251. except ValueError as e:
  252. result = str(e)
  253. return [TextContent(type="text", text=result)]
  254. @mcp.tool()
  255. async def checkout_tool(repo_path: str, branch_name: str) -> list[TextContent]:
  256. """Switches the working directory to a different branch.
  257. Args:
  258. repo_path: The file system path to the Git repository.
  259. branch_name: The name of the branch to switch to.
  260. """
  261. logger.info(f"Executing tool: {GitTools.CHECKOUT} for repo {repo_path}, branch: {branch_name}")
  262. try:
  263. repo = _get_repo(repo_path)
  264. result = git_checkout(repo, branch_name)
  265. except ValueError as e:
  266. result = str(e)
  267. return [TextContent(type="text", text=result)]
  268. @mcp.tool()
  269. async def show_tool(repo_path: str, revision: str) -> list[TextContent]:
  270. """Shows details (metadata and content changes) for a specific commit or object.
  271. Args:
  272. repo_path: The file system path to the Git repository.
  273. revision: The commit hash, tag, or branch name to show details for (e.g., 'HEAD', 'main', 'v1.0', 'abcdef123').
  274. """
  275. logger.info(f"Executing tool: {GitTools.SHOW} for repo {repo_path}, revision: {revision}")
  276. try:
  277. repo = _get_repo(repo_path)
  278. result = git_show(repo, revision)
  279. except ValueError as e:
  280. result = str(e)
  281. return [TextContent(type="text", text=result)]
  282. if __name__ == "__main__":
  283. logger.info("Starting Git Tool Server using MCPFast stdio transport...")
  284. # Run using stdio transport. Ensure the environment calling this script
  285. # is set up to communicate via stdin/stdout as expected by MCPFast stdio.
  286. mcp.run(transport='stdio')
  287. logger.info("MCPFast stdio transport finished.")