server.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. import logging
  2. from pathlib import Path
  3. from typing import Optional
  4. from enum import Enum
  5. import git
  6. from mcp.types import TextContent
  7. from mcp.server.fastmcp import FastMCP
  8. # --- Pydantic Models are REMOVED ---
  9. # --- GitTools Enum (remains the same) ---
  10. class GitTools(str, Enum):
  11. STATUS = "git_status"
  12. DIFF_UNSTAGED = "git_diff_unstaged"
  13. DIFF_STAGED = "git_diff_staged"
  14. DIFF = "git_diff"
  15. COMMIT = "git_commit"
  16. ADD = "git_add"
  17. RESET = "git_reset"
  18. LOG = "git_log"
  19. CREATE_BRANCH = "git_create_branch"
  20. CHECKOUT = "git_checkout"
  21. SHOW = "git_show"
  22. INIT = "git_init"
  23. # --- Low-level Git Functions (remain the same, using gitpython.Repo) ---
  24. # Note: Type hint updated to gitpython.Repo
  25. def _get_repo(repo_path: str | Path) -> git.Repo:
  26. """Helper to get Repo object and handle errors."""
  27. try:
  28. return git.Repo(str(repo_path))
  29. except git.InvalidGitRepositoryError:
  30. raise ValueError(f"'{repo_path}' is not a valid Git repository.")
  31. except git.NoSuchPathError:
  32. raise ValueError(f"Repository path '{repo_path}' does not exist.")
  33. except Exception as e:
  34. raise ValueError(f"Error accessing repository at '{repo_path}': {e}")
  35. def git_status(repo: git.Repo) -> str:
  36. return repo.git.status()
  37. def git_diff_unstaged(repo: git.Repo) -> str:
  38. return repo.git.diff()
  39. def git_diff_staged(repo: git.Repo) -> str:
  40. return repo.git.diff("--cached")
  41. def git_diff(repo: git.Repo, target: str) -> str:
  42. return repo.git.diff(target)
  43. def git_commit(repo: git.Repo, message: str) -> str:
  44. try:
  45. # Check if there's anything staged to commit *before* attempting
  46. # This prevents errors if the index matches HEAD but there are unstaged changes
  47. if not repo.index.diff("HEAD"):
  48. return "No changes added to commit (working tree clean or changes not staged)."
  49. commit = repo.index.commit(message)
  50. return f"Changes committed successfully with hash {commit.hexsha}"
  51. except Exception as e:
  52. # Catch potential errors during commit check or commit itself
  53. return f"Error committing changes: {str(e)}"
  54. def git_add(repo: git.Repo, files: list[str]) -> str:
  55. try:
  56. repo.index.add(files)
  57. return f"Files staged successfully: {', '.join(files)}"
  58. except FileNotFoundError as e:
  59. return f"Error staging files: File not found - {e.filename}"
  60. except Exception as e:
  61. return f"Error staging files: {str(e)}"
  62. def git_reset(repo: git.Repo) -> str:
  63. try:
  64. # Resetting the index to HEAD (unstaging all)
  65. repo.index.reset()
  66. return "All staged changes reset (unstaged)"
  67. except Exception as e:
  68. return f"Error resetting staged changes: {str(e)}"
  69. def git_log(repo: git.Repo, max_count: int = 10) -> list[str]:
  70. commits = list(repo.iter_commits(max_count=max_count))
  71. log = []
  72. for commit in commits:
  73. log.append(
  74. f"Commit: {commit.hexsha}\n"
  75. f"Author: {commit.author}\n"
  76. f"Date: {commit.authored_datetime}\n"
  77. f"Message: {commit.message.strip()}\n" # Use strip() for cleaner output
  78. )
  79. return log
  80. def git_create_branch(repo: git.Repo, branch_name: str, base_branch: Optional[str] = None) -> str:
  81. try:
  82. if base_branch:
  83. try:
  84. base = repo.refs[base_branch]
  85. except IndexError:
  86. try:
  87. base = repo.commit(base_branch)
  88. except git.BadName:
  89. return f"Error: Base reference '{base_branch}' not found (neither branch nor commit)."
  90. except Exception as e:
  91. return f"Error resolving base reference '{base_branch}': {str(e)}"
  92. else:
  93. base = repo.head.commit
  94. if branch_name in repo.heads:
  95. return f"Error: Branch '{branch_name}' already exists."
  96. new_branch = repo.create_head(branch_name, base)
  97. base_ref_name = getattr(base, 'name', base.hexsha) # Get branch name if possible, else hash
  98. return f"Created branch '{new_branch.name}' based on '{base_ref_name}'"
  99. except git.GitCommandError as e:
  100. # Catch specific git errors if possible
  101. return f"Error creating branch '{branch_name}': {e.stderr or e.stdout}"
  102. except Exception as e:
  103. return f"Error creating branch '{branch_name}': {str(e)}"
  104. def git_checkout(repo: git.Repo, branch_name: str) -> str:
  105. try:
  106. repo.git.checkout(branch_name)
  107. return f"Switched to branch '{branch_name}'"
  108. except git.GitCommandError as e:
  109. # Provide more specific feedback
  110. if "did not match any file(s) known to git" in (e.stderr or ""):
  111. return f"Error: Branch or pathspec '{branch_name}' not found."
  112. elif "Please commit your changes or stash them before you switch branches" in (e.stderr or ""):
  113. return f"Error: Cannot checkout branch '{branch_name}'. You have unstaged changes. Please commit or stash them first."
  114. else:
  115. return f"Error checking out branch '{branch_name}': {e.stderr or e.stdout}"
  116. except Exception as e:
  117. return f"An unexpected error occurred during checkout: {str(e)}"
  118. def git_init(repo_path: str) -> str:
  119. try:
  120. # Check if it already exists and is a repo
  121. target_path = Path(repo_path)
  122. if target_path.exists() and target_path.joinpath(".git").is_dir():
  123. # Check if it's actually a valid repo
  124. try:
  125. existing_repo = git.Repo(repo_path)
  126. return f"Repository already exists at {existing_repo.git_dir}"
  127. except git.InvalidGitRepositoryError:
  128. # Path exists, .git dir exists, but it's invalid. Allow re-init? Or error?
  129. # Let's error for safety. User can delete .git if they want re-init.
  130. return f"Error: An invalid Git repository structure already exists at '{repo_path}'. Remove '.git' folder to reinitialize."
  131. except Exception as e:
  132. return f"Error checking existing repository at '{repo_path}': {e}"
  133. # Initialize (mkdir=True handles non-existent parent dirs)
  134. repo = git.Repo.init(path=repo_path, mkdir=True)
  135. return f"Initialized empty Git repository in {repo.git_dir}"
  136. except Exception as e:
  137. return f"Error initializing repository at '{repo_path}': {str(e)}"
  138. def git_show(repo: git.Repo, revision: str) -> str:
  139. try:
  140. commit = repo.commit(revision)
  141. except git.BadName:
  142. return f"Error: Revision '{revision}' not found."
  143. except Exception as e:
  144. return f"Error finding revision '{revision}': {str(e)}"
  145. output = [
  146. f"Commit: {commit.hexsha}\n"
  147. f"Author: {commit.author}\n"
  148. f"Date: {commit.authored_datetime}\n"
  149. f"Message:\n{commit.message.strip()}\n"
  150. ]
  151. try:
  152. # Show diff against first parent, or initial commit diff
  153. diffs = commit.diff(commit.parents[0] if commit.parents else git.NULL_TREE, create_patch=True)
  154. if diffs:
  155. output.append("\nChanges:\n")
  156. for d in diffs:
  157. # Use a safer way to decode, ignoring errors
  158. diff_text = d.diff.decode('utf-8', errors='ignore') if d.diff else ""
  159. a_path = d.a_path or (d.a_blob.path if d.a_blob else 'unknown')
  160. b_path = d.b_path or (d.b_blob.path if d.b_blob else 'unknown')
  161. output.append(f"--- a/{a_path}\n+++ b/{b_path}\n")
  162. output.append(diff_text)
  163. else:
  164. # Check if it's the initial commit
  165. if not commit.parents:
  166. output.append("\nChanges: (Initial commit)\n")
  167. # Show the initial tree content or a message indicating it's the first commit
  168. # For brevity, just stating it's initial might be enough.
  169. # Or iterate through tree: for item in commit.tree.traverse(): output.append(f"+ {item.path}\n")
  170. else:
  171. output.append("\nNo changes in this commit compared to its parent.")
  172. except Exception as e:
  173. output.append(f"\nWarning: Could not generate diff for commit {commit.hexsha}: {str(e)}")
  174. return "".join(output)
  175. # --- MCPFast Server Setup ---
  176. logger = logging.getLogger(__name__)
  177. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  178. # Initialize MCPFast
  179. mcp = FastMCP("git")
  180. # --- Tool Definitions using @mcp_.tool() ---
  181. @mcp.tool()
  182. async def init_tool(repo_path: str) -> list[TextContent]:
  183. """Initializes a Git repository at the specified path.
  184. Args:
  185. repo_path: The file system path where the Git repository should be initialized. Parent directories will be created if they don't exist.
  186. """
  187. logger.info(f"Executing tool: {GitTools.INIT} for path {repo_path}")
  188. result = git_init(repo_path)
  189. return [TextContent(type="text", text=result)]
  190. @mcp.tool()
  191. async def status_tool(repo_path: str) -> list[TextContent]:
  192. """Gets the status of a Git repository (shows staged, unstaged, and untracked files).
  193. Args:
  194. repo_path: The file system path to the Git repository.
  195. """
  196. logger.info(f"Executing tool: {GitTools.STATUS} for repo {repo_path}")
  197. try:
  198. repo = _get_repo(repo_path)
  199. status = git_status(repo)
  200. result = f"Repository status for '{repo_path}':\n{status}"
  201. except ValueError as e:
  202. result = str(e)
  203. return [TextContent(type="text", text=result)]
  204. @mcp.tool()
  205. async def diff_unstaged_tool(repo_path: str) -> list[TextContent]:
  206. """Shows changes in the working directory that are not staged for commit.
  207. Args:
  208. repo_path: The file system path to the Git repository.
  209. """
  210. logger.info(f"Executing tool: {GitTools.DIFF_UNSTAGED} for repo {repo_path}")
  211. try:
  212. repo = _get_repo(repo_path)
  213. diff = git_diff_unstaged(repo)
  214. result = f"Unstaged changes (working directory vs index) in '{repo_path}':\n{diff or 'No unstaged changes.'}"
  215. except ValueError as e:
  216. result = str(e)
  217. return [TextContent(type="text", text=result)]
  218. @mcp.tool()
  219. async def diff_staged_tool(repo_path: str) -> list[TextContent]:
  220. """Shows changes that are staged for the next commit (compared to HEAD).
  221. Args:
  222. repo_path: The file system path to the Git repository.
  223. """
  224. logger.info(f"Executing tool: {GitTools.DIFF_STAGED} for repo {repo_path}")
  225. try:
  226. repo = _get_repo(repo_path)
  227. diff = git_diff_staged(repo)
  228. result = f"Staged changes (index vs HEAD) in '{repo_path}':\n{diff or 'No staged changes.'}"
  229. except ValueError as e:
  230. result = str(e)
  231. return [TextContent(type="text", text=result)]
  232. @mcp.tool()
  233. async def diff_tool(repo_path: str, target: str) -> list[TextContent]:
  234. """Shows differences between the current HEAD and a specified target (e.g., a branch, tag, or commit hash).
  235. Args:
  236. repo_path: The file system path to the Git repository.
  237. target: The branch, tag, commit hash, or other refspec to compare HEAD against.
  238. """
  239. logger.info(f"Executing tool: {GitTools.DIFF} for repo {repo_path} against {target}")
  240. try:
  241. repo = _get_repo(repo_path)
  242. diff = git_diff(repo, target)
  243. result = f"Diff between HEAD and '{target}' in '{repo_path}':\n{diff or 'No differences found.'}"
  244. except ValueError as e:
  245. result = str(e)
  246. except git.GitCommandError as e:
  247. result = f"Error running git diff against '{target}': {e.stderr or e.stdout}"
  248. return [TextContent(type="text", text=result)]
  249. @mcp.tool()
  250. async def add_tool(repo_path: str, files: list[str]) -> list[TextContent]:
  251. """Adds specified file contents to the staging area (index) for the next commit.
  252. Args:
  253. repo_path: The file system path to the Git repository.
  254. files: A list of file paths (relative to the repository root) to stage. Use '.' to stage all changes.
  255. """
  256. logger.info(f"Executing tool: {GitTools.ADD} for repo {repo_path}, files: {files}")
  257. if not files:
  258. return [TextContent(type="text", text="Error: No files specified to add.")]
  259. try:
  260. repo = _get_repo(repo_path)
  261. result = git_add(repo, files)
  262. except ValueError as e: # Catches errors from _get_repo
  263. result = str(e)
  264. except Exception as e: # Catch other potential errors during add
  265. result = f"An unexpected error occurred during add: {str(e)}"
  266. return [TextContent(type="text", text=result)]
  267. @mcp.tool()
  268. async def commit_tool(repo_path: str, message: str) -> list[TextContent]:
  269. """Records changes staged in the index to the repository history.
  270. Args:
  271. repo_path: The file system path to the Git repository.
  272. message: The commit message describing the changes.
  273. """
  274. logger.info(f"Executing tool: {GitTools.COMMIT} for repo {repo_path}")
  275. try:
  276. repo = _get_repo(repo_path)
  277. # The check is now inside git_commit for cleaner tool function
  278. result = git_commit(repo, message)
  279. except ValueError as e: # Catches errors from _get_repo
  280. result = str(e)
  281. except Exception as e: # Catch other potential errors
  282. result = f"An unexpected error occurred during commit: {str(e)}"
  283. return [TextContent(type="text", text=result)]
  284. @mcp.tool()
  285. async def reset_tool(repo_path: str) -> list[TextContent]:
  286. """Resets the staging area (index) to match the current HEAD commit, effectively unstaging all changes. Does not modify the working directory.
  287. Args:
  288. repo_path: The file system path to the Git repository.
  289. """
  290. logger.info(f"Executing tool: {GitTools.RESET} for repo {repo_path}")
  291. try:
  292. repo = _get_repo(repo_path)
  293. result = git_reset(repo)
  294. except ValueError as e:
  295. result = str(e)
  296. return [TextContent(type="text", text=result)]
  297. @mcp.tool()
  298. async def log_tool(repo_path: str, max_count: int = 10) -> list[TextContent]:
  299. """Shows the commit history log for the current branch.
  300. Args:
  301. repo_path: The file system path to the Git repository.
  302. max_count: The maximum number of commits to display (default: 10).
  303. """
  304. logger.info(f"Executing tool: {GitTools.LOG} for repo {repo_path}, max_count={max_count}")
  305. try:
  306. repo = _get_repo(repo_path)
  307. logs = git_log(repo, max_count)
  308. if not logs:
  309. result = f"No commit history found for '{repo_path}' (possibly an empty repository)."
  310. else:
  311. result = f"Commit history for '{repo_path}' (last {len(logs)} commits):\n\n" + "\n\n".join(logs) # Add double newline for readability
  312. except ValueError as e:
  313. result = str(e)
  314. return [TextContent(type="text", text=result)]
  315. @mcp.tool()
  316. async def create_branch_tool(repo_path: str, branch_name: str, base_branch: Optional[str] = None) -> list[TextContent]:
  317. """Creates a new branch.
  318. Args:
  319. repo_path: The file system path to the Git repository.
  320. branch_name: The name for the new branch.
  321. base_branch: Optional. The existing branch or commit hash to base the new branch on. If omitted, defaults to the current HEAD.
  322. """
  323. logger.info(f"Executing tool: {GitTools.CREATE_BRANCH} for repo {repo_path}, branch: {branch_name}, base: {base_branch}")
  324. try:
  325. repo = _get_repo(repo_path)
  326. result = git_create_branch(repo, branch_name, base_branch)
  327. except ValueError as e:
  328. result = str(e)
  329. return [TextContent(type="text", text=result)]
  330. @mcp.tool()
  331. async def checkout_tool(repo_path: str, branch_name: str) -> list[TextContent]:
  332. """Switches the working directory to a different branch.
  333. Args:
  334. repo_path: The file system path to the Git repository.
  335. branch_name: The name of the branch to switch to.
  336. """
  337. logger.info(f"Executing tool: {GitTools.CHECKOUT} for repo {repo_path}, branch: {branch_name}")
  338. try:
  339. repo = _get_repo(repo_path)
  340. result = git_checkout(repo, branch_name)
  341. except ValueError as e:
  342. result = str(e)
  343. return [TextContent(type="text", text=result)]
  344. @mcp.tool()
  345. async def show_tool(repo_path: str, revision: str) -> list[TextContent]:
  346. """Shows details (metadata and content changes) for a specific commit or object.
  347. Args:
  348. repo_path: The file system path to the Git repository.
  349. revision: The commit hash, tag, or branch name to show details for (e.g., 'HEAD', 'main', 'v1.0', 'abcdef123').
  350. """
  351. logger.info(f"Executing tool: {GitTools.SHOW} for repo {repo_path}, revision: {revision}")
  352. try:
  353. repo = _get_repo(repo_path)
  354. result = git_show(repo, revision)
  355. except ValueError as e:
  356. result = str(e)
  357. return [TextContent(type="text", text=result)]
  358. if __name__ == "__main__":
  359. logger.info("Starting Git Tool Server using MCPFast stdio transport...")
  360. # Run using stdio transport. Ensure the environment calling this script
  361. # is set up to communicate via stdin/stdout as expected by MCPFast stdio.
  362. mcp.run(transport='stdio')
  363. logger.info("MCPFast stdio transport finished.")