import asyncio import os, json,hashlib,re,shutil,time from fastapi import APIRouter, BackgroundTasks from base_config import path, avatar_url from git import Repo, GitCommandError from pydantic import BaseModel from models.gitModels import Repos class RequestBody(BaseModel): uuid: str repo_url: str repo_id: int class CommitHash(BaseModel): uuid: str repo_url: str commit_hash: str repo_id: int def generate_repo_path(uuid, repo_url): repo_name = repo_url.split("/")[-1].replace(".git", "") base_path = os.path.join(path, uuid) return os.path.join(base_path, repo_name), repo_name def get_repo(uuid, repo_url): path, _ = generate_repo_path(uuid, repo_url) if not os.path.exists(path): return 0 return Repo(path) def git_stats_to_json(text): pattern = r",?\s*(\d+)\s*files changed|,?\s*(\d+)\s*insertions\(\+\)|,?\s*(\d+)\s+deletions\(\-\)" data = re.findall(pattern, text) result = {} for item in data: if item[0]: result["files_changed"] = int(item[0]) if item[1]: result["insertions"] = int(item[1]) if item[2]: result["deletions"] = int(item[2]) return result gitrouter = APIRouter() async def clone_task(repo_url, local_path, uuid, repo_id): current_time = int(time.time() * 1000) print(f"开始克隆仓库: {repo_url}") try: loop = asyncio.get_event_loop() await loop.run_in_executor(None, Repo.clone_from, repo_url, local_path) await Repos.filter(id = repo_id, create_user = uuid).update(path=local_path, state=1, update_time=current_time) print(f"克隆仓库成功: {repo_url}") except: print(f"克隆仓库失败: {repo_url}") await Repos.filter(id = repo_id, create_user = uuid).update(path=local_path, state=2, update_time=current_time) shutil.rmtree(local_path) @gitrouter.post("/clone") async def clone(request: RequestBody, background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) if os.path.exists(local_path): return {"status": "400", "msg": "仓库已存在", "uuid": request.uuid, "repo_url": request.repo_url, "path": local_path} else: background_tasks.add_task(clone_task, request.repo_url, local_path, request.uuid, request.repo_id) response = {"status": "200", "msg": "成功创建克隆任务", "uuid": request.uuid, "repo_name": repo_name, "local_path": local_path} return response @gitrouter.post("/log") async def log(request: RequestBody): local_path, _ = generate_repo_path(request.uuid, request.repo_url) repo = get_repo(request.uuid, request.repo_url) if not repo: return { "status": "404", "msg": "仓库不存在", "uuid": request.uuid, "repo_url": request.repo_url, "local_path": local_path } # 使用git log --numstat一次性获取所有必要信息 git_log_format = '--pretty=format:%h|%an|%ce|%s|%cd' try: log_output = repo.git.log( git_log_format, '--numstat', '--no-renames', date='format:%Y-%m-%d %H:%M' ) except GitCommandError as e: return {"status": "500", "msg": f"获取日志失败: {str(e)}"} log = [] current_commit = None for line in log_output.split('\n'): if not line.strip(): continue # 跳过空行 if '\t' in line and len(line.split('\t')) == 3: # 处理numstat行,例如 "10\t5\tfile.txt" if current_commit is None: continue # 防止数据错误 insertions_str, deletions_str, _ = line.split('\t') try: insertions = int(insertions_str) if insertions_str != '-' else 0 deletions = int(deletions_str) if deletions_str != '-' else 0 except ValueError: insertions, deletions = 0, 0 current_commit['change']['insertions'] += insertions current_commit['change']['deletions'] += deletions current_commit['change']['files'] += 1 else: # 处理提交信息行,例如 "abc123|Author|email|summary|2023-10-01 12:34" if current_commit is not None: # 生成avatar的md5 email = current_commit['email'] email_md5 = hashlib.md5(email.encode('utf-8')).hexdigest() current_commit['avatar'] = f"{avatar_url}{email_md5}?d=identicon" log.append(current_commit) try: commit_hash, author, email, summary, date = line.split('|', 4) current_commit = { "commit": commit_hash, "author": author, "email": email, "summary": summary, "date": date, "avatar": "", "change": { "insertions": 0, "deletions": 0, "files": 0 } } except ValueError: current_commit = None # 忽略格式错误行 # 添加最后一个提交 if current_commit is not None: email = current_commit['email'] email_md5 = hashlib.md5(email.encode('utf-8')).hexdigest() current_commit['avatar'] = f"{avatar_url}{email_md5}?d=identicon" log.append(current_commit) # 按时间倒序排列(git log默认最新在前) return { "status": "200", "msg": "成功获取日志", "uuid": request.uuid, "repo_url": request.repo_url, "local_path": local_path, "git_log": log } @gitrouter.post("/status") async def status(request: RequestBody): repo = get_repo(request.uuid, request.repo_url) # 手动获取所有数据 active_branch = repo.active_branch tracking_branch = active_branch.tracking_branch() ahead = sum(1 for _ in repo.iter_commits(f"{active_branch}..{tracking_branch}")) behind = sum(1 for _ in repo.iter_commits(f"{tracking_branch}..{active_branch}")) conflicts = repo.index.unmerged_blobs() conflicted = [path for path, entries in conflicts.items()] created_files = repo.untracked_files current = repo.active_branch.name head_commit = repo.head.commit tree = head_commit.tree all_files = [item.path for item in tree.traverse() if item.type == 'blob'] diffs = repo.index.diff(None) deleted = [d.a_path for d in diffs if d.change_type == 'D'] detached = repo.head.is_detached ignored_files = repo.git.execute(["git", "ls-files", "--others", "--ignored", "--exclude-standard"]).split("\n") modified_files = [d.a_path for d in diffs] untracked_files = repo.untracked_files staged_entries = repo.index.entries staged = [path[0] for path, _ in staged_entries.items()] tracking = active_branch.tracking_branch().name status = {"ahead": ahead, "behind": behind, "conflicted": conflicted, "created": created_files, "current": current, "deleted": deleted, "detached": detached, "files": all_files, "ignored": ignored_files, "modified": modified_files, "not_added": untracked_files, "staged": staged, "tracking": tracking} return status @gitrouter.post("/change") async def change(request: CommitHash): repo = get_repo(request.uuid, request.repo_url) if not repo: return {"status": "404", "msg": "仓库不存在", "uuid": request.uuid, "repo_url": request.repo_url} commit = repo.commit(request.commit_hash) if not commit.parents: print("首次提交,无父提交对比") return parent = commit.parents[0] diffs = commit.diff(commit,create_patch=True, no_renames=True) print(diffs) for diff in diffs: print(f"文件 {diff.a_path} ({diff.change_type}):") print(diff.diff.decode('utf-8'))