import os,json from fastapi import APIRouter,BackgroundTasks from models.gitModels import * from git import Repo from pydantic import BaseModel class RequestBody(BaseModel): uuid: str repo_url: str def generate_repo_path(uuid, repo_url): repo_name = repo_url.split("/")[-1].replace(".git", "") base_path = os.path.join("C:/Users/32965/repo", uuid) return os.path.join(base_path, repo_name), repo_name def get_repo(uuid, repo_url): path, _ = generate_repo_path(uuid, repo_url) if not os.path.exists(path): return 0 return Repo(path) gitrouter=APIRouter() @gitrouter.post("/clone") async def clone(request: RequestBody, background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) if os.path.exists(local_path): return {"status": "400", "msg": "仓库已存在", "uuid": request.uuid, "repo_url": request.repo_url, "path": local_path} else: background_tasks.add_task(Repo.clone_from, request.repo_url, local_path) response= {"status":"200","msg": "成功创建克隆任务","uuid":request.uuid,"repoName":repo_name,"local_path":local_path} return response @gitrouter.post("/log") async def log(request: RequestBody): local_path, _ = generate_repo_path(request.uuid, request.repo_url) repo=get_repo(request.uuid,request.repo_url) if not repo: return {"status": "400", "msg": "仓库不存在", "uuid": request.uuid, "repo_url": request.repo_url, "local_path": local_path} log_=repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}', max_count=50, date='format:%Y-%m-%d %H:%M').split("\n") log=list(map(json.loads, log_)) response= {"status":"200","msg": "成功获取日志","uuid":request.uuid,"repo_url":request.repo_url, "local_path":local_path,"git_log":log} return response @gitrouter.post("/status") async def status(request: RequestBody): repo=get_repo(request.uuid,request.repo_url) # 手动获取所有数据 active_branch = repo.active_branch tracking_branch = active_branch.tracking_branch() ahead = sum(1 for _ in repo.iter_commits(f"{active_branch}..{tracking_branch}")) behind = sum(1 for _ in repo.iter_commits(f"{tracking_branch}..{active_branch}")) conflicts = repo.index.unmerged_blobs() conflicted = [path for path, entries in conflicts.items()] created_files = repo.untracked_files current = repo.active_branch.name head_commit = repo.head.commit tree = head_commit.tree all_files = [item.path for item in tree.traverse() if item.type == 'blob'] diffs = repo.index.diff(None) deleted = [d.a_path for d in diffs if d.change_type == 'D'] detached = repo.head.is_detached ignored_files = repo.git.execute(["git", "ls-files", "--others", "--ignored", "--exclude-standard"]).split("\n") modified_files = [d.a_path for d in diffs] untracked_files = repo.untracked_files staged_entries = repo.index.entries staged = [path[0] for path, _ in staged_entries.items()] tracking = active_branch.tracking_branch().name status = {"ahead": ahead, "behind": behind, "conflicted": conflicted, "created": created_files, "current": current,"deleted": deleted, "detached": detached, "files": all_files, "ignored": ignored_files, "modified": modified_files,"not_added": untracked_files, "staged": staged, "tracking": tracking} return status