import os, json,hashlib,re from fastapi import APIRouter, BackgroundTasks from base_config import path, avatar_url from git import Repo from pydantic import BaseModel from models.gitModels import Users class RequestBody(BaseModel): uuid: str repo_url: str class CommitHash(BaseModel): uuid: str repo_url: str commit_hash: str def generate_repo_path(uuid, repo_url): repo_name = repo_url.split("/")[-1].replace(".git", "") base_path = os.path.join(path, uuid) return os.path.join(base_path, repo_name), repo_name def get_repo(uuid, repo_url): path, _ = generate_repo_path(uuid, repo_url) if not os.path.exists(path): return 0 return Repo(path) def git_stats_to_json(text): pattern = r",?\s*(\d+)\s*files changed|,?\s*(\d+)\s*insertions\(\+\)|,?\s*(\d+)\s+deletions\(\-\)" data = re.findall(pattern, text) result = {} for item in data: if item[0]: result["files_changed"] = int(item[0]) if item[1]: result["insertions"] = int(item[1]) if item[2]: result["deletions"] = int(item[2]) return result testapi = APIRouter() import hashlib from fastapi import APIRouter from git import Repo, GitCommandError import json @testapi.post("/log") async def log(request: RequestBody): local_path, _ = generate_repo_path(request.uuid, request.repo_url) print(local_path) repo = get_repo(request.uuid, request.repo_url) if not repo: return { "status": "404", "msg": "仓库不存在", "uuid": request.uuid, "repo_url": request.repo_url, "local_path": local_path } # 使用git log --numstat一次性获取所有必要信息 git_log_format = '--pretty=format:%h|%an|%ce|%s|%cd' try: log_output = repo.git.log( git_log_format, '--numstat', '--no-renames', date='format:%Y-%m-%d %H:%M' ) except GitCommandError as e: return {"status": "500", "msg": f"获取日志失败: {str(e)}"} log = [] current_commit = None for line in log_output.split('\n'): if not line.strip(): continue # 跳过空行 if '\t' in line and len(line.split('\t')) == 3: # 处理numstat行,例如 "10\t5\tfile.txt" if current_commit is None: continue # 防止数据错误 insertions_str, deletions_str, _ = line.split('\t') try: insertions = int(insertions_str) if insertions_str != '-' else 0 deletions = int(deletions_str) if deletions_str != '-' else 0 except ValueError: insertions, deletions = 0, 0 current_commit['change']['insertions'] += insertions current_commit['change']['deletions'] += deletions current_commit['change']['files'] += 1 else: # 处理提交信息行,例如 "abc123|Author|email|summary|2023-10-01 12:34" if current_commit is not None: # 生成avatar的md5 email = current_commit['email'] email_md5 = hashlib.md5(email.encode('utf-8')).hexdigest() current_commit['avatar'] = f"{avatar_url}{email_md5}?d=identicon" log.append(current_commit) try: commit_hash, author, email, summary, date = line.split('|', 4) current_commit = { "commit": commit_hash, "author": author, "email": email, "summary": summary, "date": date, "avatar": "", "change": { "insertions": 0, "deletions": 0, "files": 0 } } except ValueError: current_commit = None # 忽略格式错误行 # 添加最后一个提交 if current_commit is not None: email = current_commit['email'] email_md5 = hashlib.md5(email.encode('utf-8')).hexdigest() current_commit['avatar'] = f"{avatar_url}{email_md5}?d=identicon" log.append(current_commit) # 按时间倒序排列(git log默认最新在前) print(log) return { "status": "200", "msg": "成功获取日志", "uuid": request.uuid, "repo_url": request.repo_url, "local_path": local_path, "git_log": log }