123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- import os, json, time, asyncio
- from base_config import ai_key, path
- from fastapi import APIRouter, BackgroundTasks
- from pathlib import Path
- from pydantic import BaseModel
- from git import Repo
- from http import HTTPStatus
- from dashscope import Application
- from models.aiModels import Scan_Tasks, Commit_Summary_Tasks, File_Summary_Tasks
- from models.gitModels import Repos
- airouter = APIRouter()
- class RequestCommit(BaseModel):
- task_id: str
- uuid: str
- repo_url: str
- class RequestScan(BaseModel):
- task_id: str
- uuid: str
- repo_url: str
- class RequestFile(BaseModel):
- task_id: str
- uuid: str
- repo_url: str
- file_path: str
- def generate_repo_path(uuid, repo_url):
- repo_name = repo_url.split("/")[-1].replace(".git", "")
- base_path = os.path.join(path, uuid)
- return os.path.join(base_path, repo_name), repo_name
- async def file_summary(content):
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key=ai_key,
- app_id='ef50d70cd4074a899a09875e6a6e36ea',
- prompt=content)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- print(json_data)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- json_data = {"summary": []}
- else:
- print(f"请求失败: {response.message}")
- json_data = {"summary": []}
- return json_data
- async def commit_summary(content):
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key=ai_key,
- app_id='88426cc2301b44bea5d28d41d187ebf2',
- prompt=content)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- print(json_data)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- json_data = {"null": []}
- else:
- print(f"请求失败: {response.message}")
- json_data = {"null": []}
- return json_data
- def filter_code_files(prompt):
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key=ai_key,
- app_id='b0725a23eafd4422bfa7d5eff278af7c',
- prompt=prompt)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- print(json_data)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- json_data={"files":[]}
- else:
- print(f"请求失败: {response.message}")
- json_data = {"files": []}
- return json_data
- def analysis_results(local_path,path):
- prompt=""
- file_path=os.path.join(local_path,path)
- with open(file_path, 'r',encoding="utf8") as f:
- for line_num, line in enumerate(f, start=1):
- prompt+=f"{line_num}\t{line}"
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key=ai_key,
- app_id='b6edb4f5ff1c49f9855af27b14a0e8b4',
- prompt=prompt)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- print(response.output.text)
- json_data={"summary":None}
- else:
- print(f"请求失败: {response.message}")
- json_data = {"summary":None}
- json_data["path"]=path
- return json_data
- async def get_filtered_files(folder_path):
- base_path = Path(folder_path).resolve()
- if not base_path.is_dir():
- raise ValueError("无效的目录路径")
- file_list = []
- for root, dirs, files in os.walk(base_path):
- dirs[:] = [d for d in dirs if not d.startswith('.')]
- files = [f for f in files if not f.startswith('.')]
- for file in files:
- abs_path = Path(root) / file
- rel_path = abs_path.relative_to(base_path)
- file_list.append(str(rel_path))
- return file_list
- async def process_batch1(batch_files):
- """多线程处理单个文件批次的函数"""
- try:
- js = filter_code_files(str(batch_files))
- return js["files"]
- except Exception as e:
- print(f"处理批次时出错: {e}")
- return []
- async def get_code_files(path):
- file_list = []
- files = await get_filtered_files(path)
- print(files)
- print(f"找到 {len(files)} 个文件")
- # 将文件列表分块(每500个一组)
- chunks = [files[i * 500: (i + 1) * 500]
- for i in range(0, len(files) // 500 + 1)]
- # 提交所有批次任务
- # futures = [executor.submit(process_batch1, chunk) for chunk in chunks]
- tasks = [process_batch1(chunk) for chunk in chunks]
- futures = await asyncio.gather(*tasks, return_exceptions=True)
- # 实时获取已完成任务的结果
- for future in futures[0]:
- if isinstance(future, Exception):
- print(f"处理出错: {future}")
- else:
- file_list.append(future)
- return file_list
- async def process_batch2(local_path,path):
- """多线程处理单个文件批次的函数"""
- try:
- # print(local_path, path)
- js = analysis_results(local_path,path)
- return js
- except Exception as e:
- print(11111)
- print(f"处理批次时出错: {e}")
- return {"summary":None}
- async def analysis(local_path, task_id):
- file_list = await get_code_files(local_path)
- print(file_list)
- results = []
- tasks = [process_batch2(local_path, file) for file in file_list]
- batch_results = await asyncio.gather(*tasks, return_exceptions=True)
- for result in batch_results:
- if isinstance(result, Exception):
- print(f"处理出错: {result}")
- await Scan_Tasks.filter(id=task_id).update(state=3, result={"results": results},
- scan_end_time=int(time.time() * 1000))
- else:
- results.append(result)
- await Scan_Tasks.filter(id=task_id).update(state=2, result={"results": results},
- scan_end_time=int(time.time() * 1000))
- print("扫描完成")
- async def commit_task(content,task_id):
- commit_result=await asyncio.gather(commit_summary(content), return_exceptions=True)
- if isinstance(commit_result, Exception):
- print(f"提交出错: {commit_result}")
- else:
- print("提交成功")
- await Commit_Summary_Tasks.filter(id=task_id).update(result=commit_result[0],end_time=int(time.time() * 1000))
- async def file_task(file_path,task_id):
- with open(file_path, 'r', encoding="utf8") as f:
- content = f.read()
- file_result = await asyncio.gather(file_summary(content), return_exceptions=True)
- if isinstance(file_result, Exception):
- print(f"提交出错: {file_result}")
- else:
- print("提交成功")
- await File_Summary_Tasks.filter(id=task_id).update(result=file_result[0], end_time=int(time.time() * 1000))
- @airouter.post("/scan")
- async def scan(request: RequestScan, background_tasks: BackgroundTasks):
- local_path, repo_name = generate_repo_path(request.uuid, request.repo_url)
- print(f"开始扫描仓库: {repo_name}")
- await Scan_Tasks.filter(id=request.task_id).update(state=1, scan_start_time=int(time.time() * 1000))
- background_tasks.add_task(analysis, local_path, request.task_id)
- return {"code": 200, "msg": "添加扫描任务成功"}
- @airouter.post("/summaryCommit")
- async def summaryCommit(request: RequestCommit, background_tasks: BackgroundTasks):
- local_path, repo_name = generate_repo_path(request.uuid, request.repo_url)
- repo_commit=await Commit_Summary_Tasks.get(id=request.task_id)
- repo_commit_hash=repo_commit.repo_hash
- print(f"开始提交仓库: {repo_name}")
- await Commit_Summary_Tasks.filter(id=request.task_id).update(start_time=int(time.time() * 1000))
- # commit_content = Repo(local_path).git.log('-1', '-p', '--pretty=format:%h %s')
- commit_content = Repo(local_path).git.diff(f"{repo_commit_hash}^", repo_commit_hash)
- background_tasks.add_task(commit_task,commit_content, request.task_id)
- return {"code": 200, "msg": "添加提交任务成功"}
- @airouter.post("/summaryFile")
- async def summaryFile(request: RequestFile,background_tasks: BackgroundTasks):
- await File_Summary_Tasks.filter(id=request.task_id).update(start_time=int(time.time() * 1000))
- background_tasks.add_task(file_task, request.file_path, request.task_id)
- return {"code": 200, "msg": "添加提交任务成功"}
|