import logging from pathlib import Path from typing import Optional from enum import Enum import git from mcp.types import TextContent from mcp.server.fastmcp import FastMCP # --- Pydantic Models are REMOVED --- # --- GitTools Enum (remains the same) --- class GitTools(str, Enum): STATUS = "git_status" DIFF_UNSTAGED = "git_diff_unstaged" DIFF_STAGED = "git_diff_staged" DIFF = "git_diff" COMMIT = "git_commit" ADD = "git_add" RESET = "git_reset" LOG = "git_log" CREATE_BRANCH = "git_create_branch" CHECKOUT = "git_checkout" SHOW = "git_show" INIT = "git_init" # --- Low-level Git Functions (remain the same, using gitpython.Repo) --- # Note: Type hint updated to gitpython.Repo def _get_repo(repo_path: str | Path) -> git.Repo: """Helper to get Repo object and handle errors.""" try: return git.Repo(str(repo_path)) except git.InvalidGitRepositoryError: raise ValueError(f"'{repo_path}' is not a valid Git repository.") except git.NoSuchPathError: raise ValueError(f"Repository path '{repo_path}' does not exist.") except Exception as e: raise ValueError(f"Error accessing repository at '{repo_path}': {e}") def git_status(repo: git.Repo) -> str: return repo.git.status() def git_diff_unstaged(repo: git.Repo) -> str: return repo.git.diff() def git_diff_staged(repo: git.Repo) -> str: return repo.git.diff("--cached") def git_diff(repo: git.Repo, target: str) -> str: return repo.git.diff(target) def git_commit(repo: git.Repo, message: str) -> str: try: # Check if there's anything staged to commit *before* attempting # This prevents errors if the index matches HEAD but there are unstaged changes if not repo.index.diff("HEAD"): return "No changes added to commit (working tree clean or changes not staged)." commit = repo.index.commit(message) return f"Changes committed successfully with hash {commit.hexsha}" except Exception as e: # Catch potential errors during commit check or commit itself return f"Error committing changes: {str(e)}" def git_add(repo: git.Repo, files: list[str]) -> str: try: repo.index.add(files) return f"Files staged successfully: {', '.join(files)}" except FileNotFoundError as e: return f"Error staging files: File not found - {e.filename}" except Exception as e: return f"Error staging files: {str(e)}" def git_reset(repo: git.Repo) -> str: try: # Resetting the index to HEAD (unstaging all) repo.index.reset() return "All staged changes reset (unstaged)" except Exception as e: return f"Error resetting staged changes: {str(e)}" def git_log(repo: git.Repo, max_count: int = 10) -> list[str]: commits = list(repo.iter_commits(max_count=max_count)) log = [] for commit in commits: log.append( f"Commit: {commit.hexsha}\n" f"Author: {commit.author}\n" f"Date: {commit.authored_datetime}\n" f"Message: {commit.message.strip()}\n" # Use strip() for cleaner output ) return log def git_create_branch(repo: git.Repo, branch_name: str, base_branch: Optional[str] = None) -> str: try: if base_branch: try: base = repo.refs[base_branch] except IndexError: try: base = repo.commit(base_branch) except git.BadName: return f"Error: Base reference '{base_branch}' not found (neither branch nor commit)." except Exception as e: return f"Error resolving base reference '{base_branch}': {str(e)}" else: base = repo.head.commit if branch_name in repo.heads: return f"Error: Branch '{branch_name}' already exists." new_branch = repo.create_head(branch_name, base) base_ref_name = getattr(base, 'name', base.hexsha) # Get branch name if possible, else hash return f"Created branch '{new_branch.name}' based on '{base_ref_name}'" except git.GitCommandError as e: # Catch specific git errors if possible return f"Error creating branch '{branch_name}': {e.stderr or e.stdout}" except Exception as e: return f"Error creating branch '{branch_name}': {str(e)}" def git_checkout(repo: git.Repo, branch_name: str) -> str: try: repo.git.checkout(branch_name) return f"Switched to branch '{branch_name}'" except git.GitCommandError as e: # Provide more specific feedback if "did not match any file(s) known to git" in (e.stderr or ""): return f"Error: Branch or pathspec '{branch_name}' not found." elif "Please commit your changes or stash them before you switch branches" in (e.stderr or ""): return f"Error: Cannot checkout branch '{branch_name}'. You have unstaged changes. Please commit or stash them first." else: return f"Error checking out branch '{branch_name}': {e.stderr or e.stdout}" except Exception as e: return f"An unexpected error occurred during checkout: {str(e)}" def git_init(repo_path: str) -> str: try: # Check if it already exists and is a repo target_path = Path(repo_path) if target_path.exists() and target_path.joinpath(".git").is_dir(): # Check if it's actually a valid repo try: existing_repo = git.Repo(repo_path) return f"Repository already exists at {existing_repo.git_dir}" except git.InvalidGitRepositoryError: # Path exists, .git dir exists, but it's invalid. Allow re-init? Or error? # Let's error for safety. User can delete .git if they want re-init. return f"Error: An invalid Git repository structure already exists at '{repo_path}'. Remove '.git' folder to reinitialize." except Exception as e: return f"Error checking existing repository at '{repo_path}': {e}" # Initialize (mkdir=True handles non-existent parent dirs) repo = git.Repo.init(path=repo_path, mkdir=True) return f"Initialized empty Git repository in {repo.git_dir}" except Exception as e: return f"Error initializing repository at '{repo_path}': {str(e)}" def git_show(repo: git.Repo, revision: str) -> str: try: commit = repo.commit(revision) except git.BadName: return f"Error: Revision '{revision}' not found." except Exception as e: return f"Error finding revision '{revision}': {str(e)}" output = [ f"Commit: {commit.hexsha}\n" f"Author: {commit.author}\n" f"Date: {commit.authored_datetime}\n" f"Message:\n{commit.message.strip()}\n" ] try: # Show diff against first parent, or initial commit diff diffs = commit.diff(commit.parents[0] if commit.parents else git.NULL_TREE, create_patch=True) if diffs: output.append("\nChanges:\n") for d in diffs: # Use a safer way to decode, ignoring errors diff_text = d.diff.decode('utf-8', errors='ignore') if d.diff else "" a_path = d.a_path or (d.a_blob.path if d.a_blob else 'unknown') b_path = d.b_path or (d.b_blob.path if d.b_blob else 'unknown') output.append(f"--- a/{a_path}\n+++ b/{b_path}\n") output.append(diff_text) else: # Check if it's the initial commit if not commit.parents: output.append("\nChanges: (Initial commit)\n") # Show the initial tree content or a message indicating it's the first commit # For brevity, just stating it's initial might be enough. # Or iterate through tree: for item in commit.tree.traverse(): output.append(f"+ {item.path}\n") else: output.append("\nNo changes in this commit compared to its parent.") except Exception as e: output.append(f"\nWarning: Could not generate diff for commit {commit.hexsha}: {str(e)}") return "".join(output) # --- MCPFast Server Setup --- logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Initialize MCPFast mcp = FastMCP("git") # --- Tool Definitions using @mcp_.tool() --- # @mcp.tool() # async def init_tool(repo_path: str) -> list[TextContent]: # """Initializes a Git repository at the specified path. # Args: # repo_path: The file system path where the Git repository should be initialized. Parent directories will be created if they don't exist. # """ # logger.info(f"Executing tool: {GitTools.INIT} for path {repo_path}") # result = git_init(repo_path) # return [TextContent(type="text", text=result)] @mcp.tool() async def status_tool(repo_path: str) -> list[TextContent]: """Gets the status of a Git repository (shows staged, unstaged, and untracked files). Args: repo_path: The file system path to the Git repository. """ logger.info(f"Executing tool: {GitTools.STATUS} for repo {repo_path}") try: repo = _get_repo(repo_path) status = git_status(repo) result = f"Repository status for '{repo_path}':\n{status}" except ValueError as e: result = str(e) return [TextContent(type="text", text=result)] # @mcp.tool() # async def diff_unstaged_tool(repo_path: str) -> list[TextContent]: # """Shows changes in the working directory that are not staged for commit. # Args: # repo_path: The file system path to the Git repository. # """ # logger.info(f"Executing tool: {GitTools.DIFF_UNSTAGED} for repo {repo_path}") # try: # repo = _get_repo(repo_path) # diff = git_diff_unstaged(repo) # result = f"Unstaged changes (working directory vs index) in '{repo_path}':\n{diff or 'No unstaged changes.'}" # except ValueError as e: # result = str(e) # return [TextContent(type="text", text=result)] # @mcp.tool() # async def diff_staged_tool(repo_path: str) -> list[TextContent]: # """Shows changes that are staged for the next commit (compared to HEAD). # Args: # repo_path: The file system path to the Git repository. # """ # logger.info(f"Executing tool: {GitTools.DIFF_STAGED} for repo {repo_path}") # try: # repo = _get_repo(repo_path) # diff = git_diff_staged(repo) # result = f"Staged changes (index vs HEAD) in '{repo_path}':\n{diff or 'No staged changes.'}" # except ValueError as e: # result = str(e) # return [TextContent(type="text", text=result)] @mcp.tool() async def diff_tool(repo_path: str, target: str) -> list[TextContent]: """Shows differences between the current HEAD and a specified target (e.g., a branch, tag, or commit hash). Args: repo_path: The file system path to the Git repository. target: The branch, tag, commit hash, or other refspec to compare HEAD against. """ logger.info(f"Executing tool: {GitTools.DIFF} for repo {repo_path} against {target}") try: repo = _get_repo(repo_path) diff = git_diff(repo, target) result = f"Diff between HEAD and '{target}' in '{repo_path}':\n{diff or 'No differences found.'}" except ValueError as e: result = str(e) except git.GitCommandError as e: result = f"Error running git diff against '{target}': {e.stderr or e.stdout}" return [TextContent(type="text", text=result)] # @mcp.tool() # async def add_tool(repo_path: str, files: list[str]) -> list[TextContent]: # """Adds specified file contents to the staging area (index) for the next commit. # Args: # repo_path: The file system path to the Git repository. # files: A list of file paths (relative to the repository root) to stage. Use '.' to stage all changes. # """ # logger.info(f"Executing tool: {GitTools.ADD} for repo {repo_path}, files: {files}") # if not files: # return [TextContent(type="text", text="Error: No files specified to add.")] # try: # repo = _get_repo(repo_path) # result = git_add(repo, files) # except ValueError as e: # Catches errors from _get_repo # result = str(e) # except Exception as e: # Catch other potential errors during add # result = f"An unexpected error occurred during add: {str(e)}" # return [TextContent(type="text", text=result)] # @mcp.tool() # async def commit_tool(repo_path: str, message: str) -> list[TextContent]: # """Records changes staged in the index to the repository history. # Args: # repo_path: The file system path to the Git repository. # message: The commit message describing the changes. # """ # logger.info(f"Executing tool: {GitTools.COMMIT} for repo {repo_path}") # try: # repo = _get_repo(repo_path) # # The check is now inside git_commit for cleaner tool function # result = git_commit(repo, message) # except ValueError as e: # Catches errors from _get_repo # result = str(e) # except Exception as e: # Catch other potential errors # result = f"An unexpected error occurred during commit: {str(e)}" # return [TextContent(type="text", text=result)] # @mcp.tool() # async def reset_tool(repo_path: str) -> list[TextContent]: # """Resets the staging area (index) to match the current HEAD commit, effectively unstaging all changes. Does not modify the working directory. # Args: # repo_path: The file system path to the Git repository. # """ # logger.info(f"Executing tool: {GitTools.RESET} for repo {repo_path}") # try: # repo = _get_repo(repo_path) # result = git_reset(repo) # except ValueError as e: # result = str(e) # return [TextContent(type="text", text=result)] @mcp.tool() async def log_tool(repo_path: str, max_count: int = 10) -> list[TextContent]: """Shows the commit history log for the current branch. Args: repo_path: The file system path to the Git repository. max_count: The maximum number of commits to display (default: 10). """ logger.info(f"Executing tool: {GitTools.LOG} for repo {repo_path}, max_count={max_count}") try: repo = _get_repo(repo_path) logs = git_log(repo, max_count) if not logs: result = f"No commit history found for '{repo_path}' (possibly an empty repository)." else: result = f"Commit history for '{repo_path}' (last {len(logs)} commits):\n\n" + "\n\n".join(logs) # Add double newline for readability except ValueError as e: result = str(e) return [TextContent(type="text", text=result)] # @mcp.tool() # async def create_branch_tool(repo_path: str, branch_name: str, base_branch: Optional[str] = None) -> list[TextContent]: # """Creates a new branch. # Args: # repo_path: The file system path to the Git repository. # branch_name: The name for the new branch. # base_branch: Optional. The existing branch or commit hash to base the new branch on. If omitted, defaults to the current HEAD. # """ # logger.info(f"Executing tool: {GitTools.CREATE_BRANCH} for repo {repo_path}, branch: {branch_name}, base: {base_branch}") # try: # repo = _get_repo(repo_path) # result = git_create_branch(repo, branch_name, base_branch) # except ValueError as e: # result = str(e) # return [TextContent(type="text", text=result)] @mcp.tool() async def checkout_tool(repo_path: str, branch_name: str) -> list[TextContent]: """Switches the working directory to a different branch. Args: repo_path: The file system path to the Git repository. branch_name: The name of the branch to switch to. """ logger.info(f"Executing tool: {GitTools.CHECKOUT} for repo {repo_path}, branch: {branch_name}") try: repo = _get_repo(repo_path) result = git_checkout(repo, branch_name) except ValueError as e: result = str(e) return [TextContent(type="text", text=result)] @mcp.tool() async def show_tool(repo_path: str, revision: str) -> list[TextContent]: """Shows details (metadata and content changes) for a specific commit or object. Args: repo_path: The file system path to the Git repository. revision: The commit hash, tag, or branch name to show details for (e.g., 'HEAD', 'main', 'v1.0', 'abcdef123'). """ logger.info(f"Executing tool: {GitTools.SHOW} for repo {repo_path}, revision: {revision}") try: repo = _get_repo(repo_path) result = git_show(repo, revision) except ValueError as e: result = str(e) return [TextContent(type="text", text=result)] if __name__ == "__main__": logger.info("Starting Git Tool Server using MCPFast stdio transport...") # Run using stdio transport. Ensure the environment calling this script # is set up to communicate via stdin/stdout as expected by MCPFast stdio. mcp.run(transport='stdio') logger.info("MCPFast stdio transport finished.")