server.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import logging
  2. from pathlib import Path
  3. from typing import Optional
  4. from enum import Enum
  5. import git
  6. from mcp.types import TextContent
  7. from mcp.server.fastmcp import FastMCP
  8. # --- Pydantic Models are REMOVED ---
  9. # --- GitTools Enum (remains the same) ---
  10. class GitTools(str, Enum):
  11. STATUS = "git_status"
  12. DIFF_UNSTAGED = "git_diff_unstaged"
  13. DIFF_STAGED = "git_diff_staged"
  14. DIFF = "git_diff"
  15. COMMIT = "git_commit"
  16. ADD = "git_add"
  17. RESET = "git_reset"
  18. LOG = "git_log"
  19. CREATE_BRANCH = "git_create_branch"
  20. CHECKOUT = "git_checkout"
  21. SHOW = "git_show"
  22. INIT = "git_init"
  23. # --- Low-level Git Functions (remain the same, using gitpython.Repo) ---
  24. # Note: Type hint updated to gitpython.Repo
  25. def _get_repo(repo_path: str | Path) -> git.Repo:
  26. """Helper to get Repo object and handle errors."""
  27. try:
  28. return git.Repo(str(repo_path))
  29. except git.InvalidGitRepositoryError:
  30. raise ValueError(f"'{repo_path}' is not a valid Git repository.")
  31. except git.NoSuchPathError:
  32. raise ValueError(f"Repository path '{repo_path}' does not exist.")
  33. except Exception as e:
  34. raise ValueError(f"Error accessing repository at '{repo_path}': {e}")
  35. def git_status(repo: git.Repo) -> str:
  36. return repo.git.status()
  37. def git_diff_unstaged(repo: git.Repo) -> str:
  38. return repo.git.diff()
  39. def git_diff_staged(repo: git.Repo) -> str:
  40. return repo.git.diff("--cached")
  41. def git_diff(repo: git.Repo, target: str) -> str:
  42. return repo.git.diff(target)
  43. def git_commit(repo: git.Repo, message: str) -> str:
  44. try:
  45. # Check if there's anything staged to commit *before* attempting
  46. # This prevents errors if the index matches HEAD but there are unstaged changes
  47. if not repo.index.diff("HEAD"):
  48. return "No changes added to commit (working tree clean or changes not staged)."
  49. commit = repo.index.commit(message)
  50. return f"Changes committed successfully with hash {commit.hexsha}"
  51. except Exception as e:
  52. # Catch potential errors during commit check or commit itself
  53. return f"Error committing changes: {str(e)}"
  54. def git_add(repo: git.Repo, files: list[str]) -> str:
  55. try:
  56. repo.index.add(files)
  57. return f"Files staged successfully: {', '.join(files)}"
  58. except FileNotFoundError as e:
  59. return f"Error staging files: File not found - {e.filename}"
  60. except Exception as e:
  61. return f"Error staging files: {str(e)}"
  62. def git_reset(repo: git.Repo) -> str:
  63. try:
  64. # Resetting the index to HEAD (unstaging all)
  65. repo.index.reset()
  66. return "All staged changes reset (unstaged)"
  67. except Exception as e:
  68. return f"Error resetting staged changes: {str(e)}"
  69. def git_log(repo: git.Repo, max_count: int = 10) -> list[str]:
  70. commits = list(repo.iter_commits(max_count=max_count))
  71. log = []
  72. for commit in commits:
  73. log.append(
  74. f"Commit: {commit.hexsha}\n"
  75. f"Author: {commit.author}\n"
  76. f"Date: {commit.authored_datetime}\n"
  77. f"Message: {commit.message.strip()}\n" # Use strip() for cleaner output
  78. )
  79. return log
  80. def git_create_branch(repo: git.Repo, branch_name: str, base_branch: Optional[str] = None) -> str:
  81. try:
  82. if base_branch:
  83. try:
  84. base = repo.refs[base_branch]
  85. except IndexError:
  86. try:
  87. base = repo.commit(base_branch)
  88. except git.BadName:
  89. return f"Error: Base reference '{base_branch}' not found (neither branch nor commit)."
  90. except Exception as e:
  91. return f"Error resolving base reference '{base_branch}': {str(e)}"
  92. else:
  93. base = repo.head.commit
  94. if branch_name in repo.heads:
  95. return f"Error: Branch '{branch_name}' already exists."
  96. new_branch = repo.create_head(branch_name, base)
  97. base_ref_name = getattr(base, 'name', base.hexsha) # Get branch name if possible, else hash
  98. return f"Created branch '{new_branch.name}' based on '{base_ref_name}'"
  99. except git.GitCommandError as e:
  100. # Catch specific git errors if possible
  101. return f"Error creating branch '{branch_name}': {e.stderr or e.stdout}"
  102. except Exception as e:
  103. return f"Error creating branch '{branch_name}': {str(e)}"
  104. def git_checkout(repo: git.Repo, branch_name: str) -> str:
  105. try:
  106. repo.git.checkout(branch_name)
  107. return f"Switched to branch '{branch_name}'"
  108. except git.GitCommandError as e:
  109. # Provide more specific feedback
  110. if "did not match any file(s) known to git" in (e.stderr or ""):
  111. return f"Error: Branch or pathspec '{branch_name}' not found."
  112. elif "Please commit your changes or stash them before you switch branches" in (e.stderr or ""):
  113. return f"Error: Cannot checkout branch '{branch_name}'. You have unstaged changes. Please commit or stash them first."
  114. else:
  115. return f"Error checking out branch '{branch_name}': {e.stderr or e.stdout}"
  116. except Exception as e:
  117. return f"An unexpected error occurred during checkout: {str(e)}"
  118. def git_init(repo_path: str) -> str:
  119. try:
  120. # Check if it already exists and is a repo
  121. target_path = Path(repo_path)
  122. if target_path.exists() and target_path.joinpath(".git").is_dir():
  123. # Check if it's actually a valid repo
  124. try:
  125. existing_repo = git.Repo(repo_path)
  126. return f"Repository already exists at {existing_repo.git_dir}"
  127. except git.InvalidGitRepositoryError:
  128. # Path exists, .git dir exists, but it's invalid. Allow re-init? Or error?
  129. # Let's error for safety. User can delete .git if they want re-init.
  130. return f"Error: An invalid Git repository structure already exists at '{repo_path}'. Remove '.git' folder to reinitialize."
  131. except Exception as e:
  132. return f"Error checking existing repository at '{repo_path}': {e}"
  133. # Initialize (mkdir=True handles non-existent parent dirs)
  134. repo = git.Repo.init(path=repo_path, mkdir=True)
  135. return f"Initialized empty Git repository in {repo.git_dir}"
  136. except Exception as e:
  137. return f"Error initializing repository at '{repo_path}': {str(e)}"
  138. def git_show(repo: git.Repo, revision: str) -> str:
  139. try:
  140. commit = repo.commit(revision)
  141. except git.BadName:
  142. return f"Error: Revision '{revision}' not found."
  143. except Exception as e:
  144. return f"Error finding revision '{revision}': {str(e)}"
  145. output = [
  146. f"Commit: {commit.hexsha}\n"
  147. f"Author: {commit.author}\n"
  148. f"Date: {commit.authored_datetime}\n"
  149. f"Message:\n{commit.message.strip()}\n"
  150. ]
  151. try:
  152. # Show diff against first parent, or initial commit diff
  153. diffs = commit.diff(commit.parents[0] if commit.parents else git.NULL_TREE, create_patch=True)
  154. if diffs:
  155. output.append("\nChanges:\n")
  156. for d in diffs:
  157. # Use a safer way to decode, ignoring errors
  158. diff_text = d.diff.decode('utf-8', errors='ignore') if d.diff else ""
  159. a_path = d.a_path or (d.a_blob.path if d.a_blob else 'unknown')
  160. b_path = d.b_path or (d.b_blob.path if d.b_blob else 'unknown')
  161. output.append(f"--- a/{a_path}\n+++ b/{b_path}\n")
  162. output.append(diff_text)
  163. else:
  164. # Check if it's the initial commit
  165. if not commit.parents:
  166. output.append("\nChanges: (Initial commit)\n")
  167. # Show the initial tree content or a message indicating it's the first commit
  168. # For brevity, just stating it's initial might be enough.
  169. # Or iterate through tree: for item in commit.tree.traverse(): output.append(f"+ {item.path}\n")
  170. else:
  171. output.append("\nNo changes in this commit compared to its parent.")
  172. except Exception as e:
  173. output.append(f"\nWarning: Could not generate diff for commit {commit.hexsha}: {str(e)}")
  174. return "".join(output)
  175. # --- MCPFast Server Setup ---
  176. logger = logging.getLogger(__name__)
  177. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  178. # Initialize MCPFast
  179. mcp = FastMCP("git")
  180. # --- Tool Definitions using @mcp_.tool() ---
  181. @mcp.tool()
  182. async def list_repos_tool(uuid: str) -> list[TextContent]:
  183. """
  184. 列出指定用户在 GitNexus 平台上的所有仓库名称(即 repo 文件夹名)。
  185. Args:
  186. uuid: 用户的唯一标识,用于定位其仓库根路径。
  187. """
  188. user_path = Path('/www/gitnexus_repos') / uuid
  189. logger.info(f"Executing tool: list_repos_tool for user {uuid}")
  190. if not user_path.exists() or not user_path.is_dir():
  191. return [TextContent(type="text", text=f"用户还没有可用的Git仓库")]
  192. try:
  193. repo_names = [
  194. f.name for f in user_path.iterdir()
  195. if f.is_dir() and (f / ".git").exists()
  196. ]
  197. if not repo_names:
  198. return [TextContent(type="text", text="当前没有可用的仓库。")]
  199. result = "您当前拥有以下仓库:\n" + "\n".join(f"- {name}" for name in repo_names)
  200. return [TextContent(type="text", text=result)]
  201. except Exception as e:
  202. logger.exception("Error while listing repositories")
  203. return [TextContent(type="text", text=f"获取仓库列表时出错:{e}")]
  204. @mcp.tool()
  205. async def status_tool(repo_path: str) -> list[TextContent]:
  206. """Gets the status of a Git repository (shows staged, unstaged, and untracked files).
  207. Args:
  208. repo_path: The file system path to the Git repository.
  209. """
  210. logger.info(f"Executing tool: {GitTools.STATUS} for repo {repo_path}")
  211. try:
  212. repo = _get_repo(repo_path)
  213. status = git_status(repo)
  214. result = f"Repository status for '{repo_path}':\n{status}"
  215. except ValueError as e:
  216. result = str(e)
  217. return [TextContent(type="text", text=result)]
  218. @mcp.tool()
  219. async def diff_tool(repo_path: str, target: str) -> list[TextContent]:
  220. """Shows differences between the current HEAD and a specified target (e.g., a branch, tag, or commit hash).
  221. Args:
  222. repo_path: The file system path to the Git repository.
  223. target: The branch, tag, commit hash, or other refspec to compare HEAD against.
  224. """
  225. logger.info(f"Executing tool: {GitTools.DIFF} for repo {repo_path} against {target}")
  226. try:
  227. repo = _get_repo(repo_path)
  228. diff = git_diff(repo, target)
  229. result = f"Diff between HEAD and '{target}' in '{repo_path}':\n{diff or 'No differences found.'}"
  230. except ValueError as e:
  231. result = str(e)
  232. except git.GitCommandError as e:
  233. result = f"Error running git diff against '{target}': {e.stderr or e.stdout}"
  234. return [TextContent(type="text", text=result)]
  235. @mcp.tool()
  236. async def log_tool(repo_path: str, max_count: int = 10) -> list[TextContent]:
  237. """Shows the commit history log for the current branch.
  238. Args:
  239. repo_path: The file system path to the Git repository.
  240. max_count: The maximum number of commits to display (default: 10).
  241. """
  242. logger.info(f"Executing tool: {GitTools.LOG} for repo {repo_path}, max_count={max_count}")
  243. try:
  244. repo = _get_repo(repo_path)
  245. logs = git_log(repo, max_count)
  246. if not logs:
  247. result = f"No commit history found for '{repo_path}' (possibly an empty repository)."
  248. else:
  249. result = f"Commit history for '{repo_path}' (last {len(logs)} commits):\n\n" + "\n\n".join(logs) # Add double newline for readability
  250. except ValueError as e:
  251. result = str(e)
  252. return [TextContent(type="text", text=result)]
  253. @mcp.tool()
  254. async def checkout_tool(repo_path: str, branch_name: str) -> list[TextContent]:
  255. """Switches the working directory to a different branch.
  256. Args:
  257. repo_path: The file system path to the Git repository.
  258. branch_name: The name of the branch to switch to.
  259. """
  260. logger.info(f"Executing tool: {GitTools.CHECKOUT} for repo {repo_path}, branch: {branch_name}")
  261. try:
  262. repo = _get_repo(repo_path)
  263. result = git_checkout(repo, branch_name)
  264. except ValueError as e:
  265. result = str(e)
  266. return [TextContent(type="text", text=result)]
  267. @mcp.tool()
  268. async def show_tool(repo_path: str, revision: str) -> list[TextContent]:
  269. """Shows details (metadata and content changes) for a specific commit or object.
  270. Args:
  271. repo_path: The file system path to the Git repository.
  272. revision: The commit hash, tag, or branch name to show details for (e.g., 'HEAD', 'main', 'v1.0', 'abcdef123').
  273. """
  274. logger.info(f"Executing tool: {GitTools.SHOW} for repo {repo_path}, revision: {revision}")
  275. try:
  276. repo = _get_repo(repo_path)
  277. result = git_show(repo, revision)
  278. except ValueError as e:
  279. result = str(e)
  280. return [TextContent(type="text", text=result)]
  281. if __name__ == "__main__":
  282. logger.info("Starting Git Tool Server using MCPFast stdio transport...")
  283. # Run using stdio transport. Ensure the environment calling this script
  284. # is set up to communicate via stdin/stdout as expected by MCPFast stdio.
  285. mcp.run(transport='stdio')
  286. logger.info("MCPFast stdio transport finished.")