import os, json, time, asyncio from base_config import ai_key, path from fastapi import APIRouter, BackgroundTasks from pathlib import Path from pydantic import BaseModel from git import Repo from http import HTTPStatus from dashscope import Application from models.aiModels import Scan_Tasks, Commit_Summary_Tasks, File_Summary_Tasks from models.gitModels import Repos airouter = APIRouter() class RequestCommit(BaseModel): task_id: str uuid: str repo_url: str class RequestScan(BaseModel): task_id: str uuid: str repo_url: str class RequestFile(BaseModel): task_id: str uuid: str repo_url: str file_path: str def generate_repo_path(uuid, repo_url): repo_name = repo_url.split("/")[-1].replace(".git", "") base_path = os.path.join(path, uuid) return os.path.join(base_path, repo_name), repo_name async def file_summary(content): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='ef50d70cd4074a899a09875e6a6e36ea', prompt=content) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") json_data = {"summary": []} else: print(f"请求失败: {response.message}") json_data = {"summary": []} return json_data async def commit_summary(content): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='88426cc2301b44bea5d28d41d187ebf2', prompt=content) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") json_data = {"null": []} else: print(f"请求失败: {response.message}") json_data = {"null": []} return json_data def filter_code_files(prompt): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='b0725a23eafd4422bfa7d5eff278af7c', prompt=prompt) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") json_data={"files":[]} else: print(f"请求失败: {response.message}") json_data = {"files": []} return json_data def analysis_results(local_path,path): prompt="" file_path=os.path.join(local_path,path) with open(file_path, 'r',encoding="utf8") as f: for line_num, line in enumerate(f, start=1): prompt+=f"{line_num}\t{line}" response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='b6edb4f5ff1c49f9855af27b14a0e8b4', # 替换为实际的应用 ID prompt=prompt) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") print(response.output.text) json_data={"summary":None} else: print(f"请求失败: {response.message}") json_data = {"summary":None} json_data["path"]=file_path return json_data async def get_filtered_files(folder_path): base_path = Path(folder_path).resolve() if not base_path.is_dir(): raise ValueError("无效的目录路径") file_list = [] for root, dirs, files in os.walk(base_path): dirs[:] = [d for d in dirs if not d.startswith('.')] files = [f for f in files if not f.startswith('.')] for file in files: abs_path = Path(root) / file rel_path = abs_path.relative_to(base_path) file_list.append(str(rel_path)) return file_list async def process_batch1(batch_files): """多线程处理单个文件批次的函数""" try: js = filter_code_files(str(batch_files)) return js["files"] except Exception as e: print(f"处理批次时出错: {e}") return [] async def get_code_files(path): file_list = [] files = await get_filtered_files(path) print(files) print(f"找到 {len(files)} 个文件") # 将文件列表分块(每500个一组) chunks = [files[i * 500: (i + 1) * 500] for i in range(0, len(files) // 500 + 1)] # 提交所有批次任务 # futures = [executor.submit(process_batch1, chunk) for chunk in chunks] tasks = [process_batch1(chunk) for chunk in chunks] futures = await asyncio.gather(*tasks, return_exceptions=True) # 实时获取已完成任务的结果 for future in futures[0]: if isinstance(future, Exception): print(f"处理出错: {future}") else: file_list.append(future) return file_list async def process_batch2(local_path,path): """多线程处理单个文件批次的函数""" try: # print(local_path, path) js = analysis_results(local_path,path) return js except Exception as e: print(11111) print(f"处理批次时出错: {e}") return {"summary":None} async def analysis(local_path, task_id): file_list = await get_code_files(local_path) print(file_list) results = [] tasks = [process_batch2(local_path, file) for file in file_list] # 假设process_batch2已改为异步函数 batch_results = await asyncio.gather(*tasks, return_exceptions=True) for result in batch_results: if isinstance(result, Exception): print(f"处理出错: {result}") await Scan_Tasks.filter(id=task_id).update(state=3, result={"results": results}, scan_end_time=int(time.time())) else: results.append(result) await Scan_Tasks.filter(id=task_id).update(state=2, result={"results": results}, scan_end_time=int(time.time())) print("扫描完成") async def commit_task(content,task_id): commit_result=await asyncio.gather(commit_summary(content), return_exceptions=True) if isinstance(commit_result, Exception): print(f"提交出错: {commit_result}") else: print("提交成功") await Commit_Summary_Tasks.filter(id=task_id).update(result=commit_result[0],end_time=int(time.time())) async def file_task(file_path,task_id): with open(file_path, 'r', encoding="utf8") as f: content = f.read() file_result = await asyncio.gather(file_summary(content), return_exceptions=True) if isinstance(file_result, Exception): print(f"提交出错: {file_result}") else: print("提交成功") await File_Summary_Tasks.filter(id=task_id).update(result=file_result[0], end_time=int(time.time())) @airouter.post("/scan") async def scan(request: RequestScan, background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) print(f"开始扫描仓库: {repo_name}") await Scan_Tasks.filter(id=request.task_id).update(state=1, scan_start_time=int(time.time())) background_tasks.add_task(analysis, local_path, request.task_id) return {"code": 200, "msg": "添加扫描任务成功"} @airouter.post("/summaryCommit") async def summaryCommit(request: RequestCommit, background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) repo_commit=await Commit_Summary_Tasks.get(id=request.task_id) repo_commit_hash=repo_commit.repo_hash print(f"开始提交仓库: {repo_name}") await Commit_Summary_Tasks.filter(id=request.task_id).update(start_time=int(time.time())) # commit_content = Repo(local_path).git.log('-1', '-p', '--pretty=format:%h %s') commit_content = Repo(local_path).git.diff(f"{repo_commit_hash}^", repo_commit_hash) background_tasks.add_task(commit_task,commit_content, request.task_id) return {"code": 200, "msg": "添加提交任务成功"} @airouter.post("/summaryFile") async def summaryFile(request: RequestFile,background_tasks: BackgroundTasks): await File_Summary_Tasks.filter(id=request.task_id).update(start_time=int(time.time())) background_tasks.add_task(file_task, request.file_path, request.task_id) return {"code": 200, "msg": "添加提交任务成功"}