123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- import logging
- from pathlib import Path
- from typing import Optional
- from enum import Enum
- import git
- from mcp.types import TextContent
- from mcp.server.fastmcp import FastMCP
- # --- Pydantic Models are REMOVED ---
- # --- GitTools Enum (remains the same) ---
- class GitTools(str, Enum):
- STATUS = "git_status"
- DIFF_UNSTAGED = "git_diff_unstaged"
- DIFF_STAGED = "git_diff_staged"
- DIFF = "git_diff"
- COMMIT = "git_commit"
- ADD = "git_add"
- RESET = "git_reset"
- LOG = "git_log"
- CREATE_BRANCH = "git_create_branch"
- CHECKOUT = "git_checkout"
- SHOW = "git_show"
- INIT = "git_init"
- # --- Low-level Git Functions (remain the same, using gitpython.Repo) ---
- # Note: Type hint updated to gitpython.Repo
- def _get_repo(repo_path: str | Path) -> git.Repo:
- """Helper to get Repo object and handle errors."""
- try:
- return git.Repo(str(repo_path))
- except git.InvalidGitRepositoryError:
- raise ValueError(f"'{repo_path}' is not a valid Git repository.")
- except git.NoSuchPathError:
- raise ValueError(f"Repository path '{repo_path}' does not exist.")
- except Exception as e:
- raise ValueError(f"Error accessing repository at '{repo_path}': {e}")
- def git_status(repo: git.Repo) -> str:
- return repo.git.status()
- def git_diff_unstaged(repo: git.Repo) -> str:
- return repo.git.diff()
- def git_diff_staged(repo: git.Repo) -> str:
- return repo.git.diff("--cached")
- def git_diff(repo: git.Repo, target: str) -> str:
- return repo.git.diff(target)
- def git_commit(repo: git.Repo, message: str) -> str:
- try:
- # Check if there's anything staged to commit *before* attempting
- # This prevents errors if the index matches HEAD but there are unstaged changes
- if not repo.index.diff("HEAD"):
- return "No changes added to commit (working tree clean or changes not staged)."
- commit = repo.index.commit(message)
- return f"Changes committed successfully with hash {commit.hexsha}"
- except Exception as e:
- # Catch potential errors during commit check or commit itself
- return f"Error committing changes: {str(e)}"
- def git_add(repo: git.Repo, files: list[str]) -> str:
- try:
- repo.index.add(files)
- return f"Files staged successfully: {', '.join(files)}"
- except FileNotFoundError as e:
- return f"Error staging files: File not found - {e.filename}"
- except Exception as e:
- return f"Error staging files: {str(e)}"
- def git_reset(repo: git.Repo) -> str:
- try:
- # Resetting the index to HEAD (unstaging all)
- repo.index.reset()
- return "All staged changes reset (unstaged)"
- except Exception as e:
- return f"Error resetting staged changes: {str(e)}"
- def git_log(repo: git.Repo, max_count: int = 10) -> list[str]:
- commits = list(repo.iter_commits(max_count=max_count))
- log = []
- for commit in commits:
- log.append(
- f"Commit: {commit.hexsha}\n"
- f"Author: {commit.author}\n"
- f"Date: {commit.authored_datetime}\n"
- f"Message: {commit.message.strip()}\n" # Use strip() for cleaner output
- )
- return log
- def git_create_branch(repo: git.Repo, branch_name: str, base_branch: Optional[str] = None) -> str:
- try:
- if base_branch:
- try:
- base = repo.refs[base_branch]
- except IndexError:
- try:
- base = repo.commit(base_branch)
- except git.BadName:
- return f"Error: Base reference '{base_branch}' not found (neither branch nor commit)."
- except Exception as e:
- return f"Error resolving base reference '{base_branch}': {str(e)}"
- else:
- base = repo.head.commit
- if branch_name in repo.heads:
- return f"Error: Branch '{branch_name}' already exists."
- new_branch = repo.create_head(branch_name, base)
- base_ref_name = getattr(base, 'name', base.hexsha) # Get branch name if possible, else hash
- return f"Created branch '{new_branch.name}' based on '{base_ref_name}'"
- except git.GitCommandError as e:
- # Catch specific git errors if possible
- return f"Error creating branch '{branch_name}': {e.stderr or e.stdout}"
- except Exception as e:
- return f"Error creating branch '{branch_name}': {str(e)}"
- def git_checkout(repo: git.Repo, branch_name: str) -> str:
- try:
- repo.git.checkout(branch_name)
- return f"Switched to branch '{branch_name}'"
- except git.GitCommandError as e:
- # Provide more specific feedback
- if "did not match any file(s) known to git" in (e.stderr or ""):
- return f"Error: Branch or pathspec '{branch_name}' not found."
- elif "Please commit your changes or stash them before you switch branches" in (e.stderr or ""):
- return f"Error: Cannot checkout branch '{branch_name}'. You have unstaged changes. Please commit or stash them first."
- else:
- return f"Error checking out branch '{branch_name}': {e.stderr or e.stdout}"
- except Exception as e:
- return f"An unexpected error occurred during checkout: {str(e)}"
- def git_init(repo_path: str) -> str:
- try:
- # Check if it already exists and is a repo
- target_path = Path(repo_path)
- if target_path.exists() and target_path.joinpath(".git").is_dir():
- # Check if it's actually a valid repo
- try:
- existing_repo = git.Repo(repo_path)
- return f"Repository already exists at {existing_repo.git_dir}"
- except git.InvalidGitRepositoryError:
- # Path exists, .git dir exists, but it's invalid. Allow re-init? Or error?
- # Let's error for safety. User can delete .git if they want re-init.
- return f"Error: An invalid Git repository structure already exists at '{repo_path}'. Remove '.git' folder to reinitialize."
- except Exception as e:
- return f"Error checking existing repository at '{repo_path}': {e}"
- # Initialize (mkdir=True handles non-existent parent dirs)
- repo = git.Repo.init(path=repo_path, mkdir=True)
- return f"Initialized empty Git repository in {repo.git_dir}"
- except Exception as e:
- return f"Error initializing repository at '{repo_path}': {str(e)}"
- def git_show(repo: git.Repo, revision: str) -> str:
- try:
- commit = repo.commit(revision)
- except git.BadName:
- return f"Error: Revision '{revision}' not found."
- except Exception as e:
- return f"Error finding revision '{revision}': {str(e)}"
- output = [
- f"Commit: {commit.hexsha}\n"
- f"Author: {commit.author}\n"
- f"Date: {commit.authored_datetime}\n"
- f"Message:\n{commit.message.strip()}\n"
- ]
- try:
- # Show diff against first parent, or initial commit diff
- diffs = commit.diff(commit.parents[0] if commit.parents else git.NULL_TREE, create_patch=True)
- if diffs:
- output.append("\nChanges:\n")
- for d in diffs:
- # Use a safer way to decode, ignoring errors
- diff_text = d.diff.decode('utf-8', errors='ignore') if d.diff else ""
- a_path = d.a_path or (d.a_blob.path if d.a_blob else 'unknown')
- b_path = d.b_path or (d.b_blob.path if d.b_blob else 'unknown')
- output.append(f"--- a/{a_path}\n+++ b/{b_path}\n")
- output.append(diff_text)
- else:
- # Check if it's the initial commit
- if not commit.parents:
- output.append("\nChanges: (Initial commit)\n")
- # Show the initial tree content or a message indicating it's the first commit
- # For brevity, just stating it's initial might be enough.
- # Or iterate through tree: for item in commit.tree.traverse(): output.append(f"+ {item.path}\n")
- else:
- output.append("\nNo changes in this commit compared to its parent.")
- except Exception as e:
- output.append(f"\nWarning: Could not generate diff for commit {commit.hexsha}: {str(e)}")
- return "".join(output)
- # --- MCPFast Server Setup ---
- logger = logging.getLogger(__name__)
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- # Initialize MCPFast
- mcp = FastMCP("git")
- # --- Tool Definitions using @mcp_.tool() ---
- @mcp.tool()
- async def init_tool(repo_path: str) -> list[TextContent]:
- """Initializes a Git repository at the specified path.
- Args:
- repo_path: The file system path where the Git repository should be initialized. Parent directories will be created if they don't exist.
- """
- logger.info(f"Executing tool: {GitTools.INIT} for path {repo_path}")
- result = git_init(repo_path)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def status_tool(repo_path: str) -> list[TextContent]:
- """Gets the status of a Git repository (shows staged, unstaged, and untracked files).
- Args:
- repo_path: The file system path to the Git repository.
- """
- logger.info(f"Executing tool: {GitTools.STATUS} for repo {repo_path}")
- try:
- repo = _get_repo(repo_path)
- status = git_status(repo)
- result = f"Repository status for '{repo_path}':\n{status}"
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def diff_unstaged_tool(repo_path: str) -> list[TextContent]:
- """Shows changes in the working directory that are not staged for commit.
- Args:
- repo_path: The file system path to the Git repository.
- """
- logger.info(f"Executing tool: {GitTools.DIFF_UNSTAGED} for repo {repo_path}")
- try:
- repo = _get_repo(repo_path)
- diff = git_diff_unstaged(repo)
- result = f"Unstaged changes (working directory vs index) in '{repo_path}':\n{diff or 'No unstaged changes.'}"
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def diff_staged_tool(repo_path: str) -> list[TextContent]:
- """Shows changes that are staged for the next commit (compared to HEAD).
- Args:
- repo_path: The file system path to the Git repository.
- """
- logger.info(f"Executing tool: {GitTools.DIFF_STAGED} for repo {repo_path}")
- try:
- repo = _get_repo(repo_path)
- diff = git_diff_staged(repo)
- result = f"Staged changes (index vs HEAD) in '{repo_path}':\n{diff or 'No staged changes.'}"
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def diff_tool(repo_path: str, target: str) -> list[TextContent]:
- """Shows differences between the current HEAD and a specified target (e.g., a branch, tag, or commit hash).
- Args:
- repo_path: The file system path to the Git repository.
- target: The branch, tag, commit hash, or other refspec to compare HEAD against.
- """
- logger.info(f"Executing tool: {GitTools.DIFF} for repo {repo_path} against {target}")
- try:
- repo = _get_repo(repo_path)
- diff = git_diff(repo, target)
- result = f"Diff between HEAD and '{target}' in '{repo_path}':\n{diff or 'No differences found.'}"
- except ValueError as e:
- result = str(e)
- except git.GitCommandError as e:
- result = f"Error running git diff against '{target}': {e.stderr or e.stdout}"
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def add_tool(repo_path: str, files: list[str]) -> list[TextContent]:
- """Adds specified file contents to the staging area (index) for the next commit.
- Args:
- repo_path: The file system path to the Git repository.
- files: A list of file paths (relative to the repository root) to stage. Use '.' to stage all changes.
- """
- logger.info(f"Executing tool: {GitTools.ADD} for repo {repo_path}, files: {files}")
- if not files:
- return [TextContent(type="text", text="Error: No files specified to add.")]
- try:
- repo = _get_repo(repo_path)
- result = git_add(repo, files)
- except ValueError as e: # Catches errors from _get_repo
- result = str(e)
- except Exception as e: # Catch other potential errors during add
- result = f"An unexpected error occurred during add: {str(e)}"
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def commit_tool(repo_path: str, message: str) -> list[TextContent]:
- """Records changes staged in the index to the repository history.
- Args:
- repo_path: The file system path to the Git repository.
- message: The commit message describing the changes.
- """
- logger.info(f"Executing tool: {GitTools.COMMIT} for repo {repo_path}")
- try:
- repo = _get_repo(repo_path)
- # The check is now inside git_commit for cleaner tool function
- result = git_commit(repo, message)
- except ValueError as e: # Catches errors from _get_repo
- result = str(e)
- except Exception as e: # Catch other potential errors
- result = f"An unexpected error occurred during commit: {str(e)}"
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def reset_tool(repo_path: str) -> list[TextContent]:
- """Resets the staging area (index) to match the current HEAD commit, effectively unstaging all changes. Does not modify the working directory.
- Args:
- repo_path: The file system path to the Git repository.
- """
- logger.info(f"Executing tool: {GitTools.RESET} for repo {repo_path}")
- try:
- repo = _get_repo(repo_path)
- result = git_reset(repo)
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def log_tool(repo_path: str, max_count: int = 10) -> list[TextContent]:
- """Shows the commit history log for the current branch.
- Args:
- repo_path: The file system path to the Git repository.
- max_count: The maximum number of commits to display (default: 10).
- """
- logger.info(f"Executing tool: {GitTools.LOG} for repo {repo_path}, max_count={max_count}")
- try:
- repo = _get_repo(repo_path)
- logs = git_log(repo, max_count)
- if not logs:
- result = f"No commit history found for '{repo_path}' (possibly an empty repository)."
- else:
- result = f"Commit history for '{repo_path}' (last {len(logs)} commits):\n\n" + "\n\n".join(logs) # Add double newline for readability
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def create_branch_tool(repo_path: str, branch_name: str, base_branch: Optional[str] = None) -> list[TextContent]:
- """Creates a new branch.
- Args:
- repo_path: The file system path to the Git repository.
- branch_name: The name for the new branch.
- base_branch: Optional. The existing branch or commit hash to base the new branch on. If omitted, defaults to the current HEAD.
- """
- logger.info(f"Executing tool: {GitTools.CREATE_BRANCH} for repo {repo_path}, branch: {branch_name}, base: {base_branch}")
- try:
- repo = _get_repo(repo_path)
- result = git_create_branch(repo, branch_name, base_branch)
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def checkout_tool(repo_path: str, branch_name: str) -> list[TextContent]:
- """Switches the working directory to a different branch.
- Args:
- repo_path: The file system path to the Git repository.
- branch_name: The name of the branch to switch to.
- """
- logger.info(f"Executing tool: {GitTools.CHECKOUT} for repo {repo_path}, branch: {branch_name}")
- try:
- repo = _get_repo(repo_path)
- result = git_checkout(repo, branch_name)
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- @mcp.tool()
- async def show_tool(repo_path: str, revision: str) -> list[TextContent]:
- """Shows details (metadata and content changes) for a specific commit or object.
- Args:
- repo_path: The file system path to the Git repository.
- revision: The commit hash, tag, or branch name to show details for (e.g., 'HEAD', 'main', 'v1.0', 'abcdef123').
- """
- logger.info(f"Executing tool: {GitTools.SHOW} for repo {repo_path}, revision: {revision}")
- try:
- repo = _get_repo(repo_path)
- result = git_show(repo, revision)
- except ValueError as e:
- result = str(e)
- return [TextContent(type="text", text=result)]
- if __name__ == "__main__":
- logger.info("Starting Git Tool Server using MCPFast stdio transport...")
- # Run using stdio transport. Ensure the environment calling this script
- # is set up to communicate via stdin/stdout as expected by MCPFast stdio.
- mcp.run(transport='stdio')
- logger.info("MCPFast stdio transport finished.")
|