import os, json, time, asyncio from base_config import ai_key, path from fastapi import APIRouter, BackgroundTasks from pathlib import Path from pydantic import BaseModel from git import Repo from http import HTTPStatus from dashscope import Application from models.aiModels import Scan_Tasks, Commit_Summary_Tasks, File_Summary_Tasks from models.gitModels import Repos airouter = APIRouter() class RequestBody(BaseModel): uuid: str repo_url: str class RequestFile(BaseModel): uuid: str repo_url: str file_path: str def generate_repo_path(uuid, repo_url): repo_name = repo_url.split("/")[-1].replace(".git", "") base_path = os.path.join(path, uuid) return os.path.join(base_path, repo_name), repo_name async def file_summary(content): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='5e3df30ad8bf44b68209e1ef089ff15c', prompt=content) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") json_data = {"summary": []} else: print(f"请求失败: {response.message}") json_data = {"summary": []} return json_data async def commit_summary(content): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='31a822df773248b19c3c9779f41a5507', prompt=content) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") json_data = {"null": []} else: print(f"请求失败: {response.message}") json_data = {"null": []} return json_data def filter_code_files(prompt): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='c1a6dbb6d2314e469bfcbe44c2fe0a5f', prompt=prompt) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") json_data={"files":[]} else: print(f"请求失败: {response.message}") json_data = {"files": []} return json_data def analysis_results(local_path,path): prompt="" file_path=os.path.join(local_path,path) with open(file_path, 'r',encoding="utf8") as f: for line_num, line in enumerate(f, start=1): prompt+=f"{line_num}\t{line}" response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key=ai_key, app_id='2f288f146e2d492abb3fe22695e70635', # 替换为实际的应用 ID prompt=prompt) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") print(response.output.text) json_data={"summary":None} else: print(f"请求失败: {response.message}") json_data = {"summary":None} json_data["path"]=file_path return json_data async def get_filtered_files(folder_path): base_path = Path(folder_path).resolve() if not base_path.is_dir(): raise ValueError("无效的目录路径") file_list = [] for root, dirs, files in os.walk(base_path): dirs[:] = [d for d in dirs if not d.startswith('.')] files = [f for f in files if not f.startswith('.')] for file in files: abs_path = Path(root) / file rel_path = abs_path.relative_to(base_path) file_list.append(str(rel_path)) return file_list async def process_batch1(batch_files): """多线程处理单个文件批次的函数""" try: js = filter_code_files(str(batch_files)) return js["files"] except Exception as e: print(f"处理批次时出错: {e}") return [] async def get_code_files(path): file_list = [] files = await get_filtered_files(path) print(files) print(f"找到 {len(files)} 个文件") # 将文件列表分块(每500个一组) chunks = [files[i * 500: (i + 1) * 500] for i in range(0, len(files) // 500 + 1)] # 提交所有批次任务 # futures = [executor.submit(process_batch1, chunk) for chunk in chunks] tasks = [process_batch1(chunk) for chunk in chunks] futures = await asyncio.gather(*tasks, return_exceptions=True) # 实时获取已完成任务的结果 for future in futures[0]: if isinstance(future, Exception): print(f"处理出错: {future}") else: file_list.append(future) return file_list async def process_batch2(local_path,path): """多线程处理单个文件批次的函数""" try: # print(local_path, path) js = analysis_results(local_path,path) return js except Exception as e: print(11111) print(f"处理批次时出错: {e}") return {"summary":None} async def analysis(local_path, repo_id): file_list = await get_code_files(local_path) print(file_list) results = [] tasks = [process_batch2(local_path, file) for file in file_list] # 假设process_batch2已改为异步函数 batch_results = await asyncio.gather(*tasks, return_exceptions=True) for result in batch_results: if isinstance(result, Exception): print(f"处理出错: {result}") await write_to_db({"results": results}, repo_id, 3) else: results.append(result) await write_to_db({"results": results}, repo_id, 2) print("扫描完成") async def write_to_db(results_dict,repo_id,state): await Scan_Tasks.filter(repo_id=repo_id).update(state=state, result=results_dict,scan_end_time=int(time.time())) async def commit_task(content,repo_id): commit_result=await asyncio.gather(commit_summary(content), return_exceptions=True) if isinstance(commit_result, Exception): print(f"提交出错: {commit_result}") else: print("提交成功") await Commit_Summary_Tasks.filter(repo_id=repo_id).update(result=commit_result,end_time=int(time.time())) async def file_task(file_path,repo_id): with open(file_path, 'r', encoding="utf8") as f: content = f.read() file_result = await asyncio.gather(file_summary(content), return_exceptions=True) if isinstance(file_result, Exception): print(f"提交出错: {file_result}") else: print("提交成功") await File_Summary_Tasks.filter(repo_id=repo_id).update(result=file_result, end_time=int(time.time())) @airouter.post("/scan") async def scan(request: RequestBody, background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) repo = await Repos.get(name=repo_name) repo_id = repo.id print(f"开始扫描仓库: {repo_name}") await Scan_Tasks.filter(repo_id=repo_id).update(state=1, scan_start_time=int(time.time())) background_tasks.add_task(analysis, local_path, repo_id) return {"code": 200, "msg": "添加扫描任务成功"} @airouter.post("/summaryCommit") async def summaryCommit(request: RequestBody, background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) repo=await Repos.get(name=repo_name) repo_id=repo.id await Commit_Summary_Tasks.filter(repo_id=repo_id).update(start_time=int(time.time())) commit_content = Repo(local_path).git.log('-1', '-p', '--pretty=format:%h %s') background_tasks.add_task(commit_task,commit_content,repo_id) return {"code": 200, "msg": "添加提交任务成功"} @airouter.post("/summaryFile") async def summaryFile(request: RequestFile,background_tasks: BackgroundTasks): local_path, repo_name = generate_repo_path(request.uuid, request.repo_url) repo=await Repos.get(name=repo_name) repo_id = repo.id await File_Summary_Tasks.filter(repo_id=repo_id).update(start_time=int(time.time())) background_tasks.add_task(file_task, request.file_path, repo_id) return {"code": 200, "msg": "添加提交任务成功"}