123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import os, json
- from base_config import ai_key, path
- from fastapi import APIRouter, BackgroundTasks
- from pathlib import Path
- from pydantic import BaseModel
- from models.gitModels import Users
- from concurrent.futures import ThreadPoolExecutor
- from http import HTTPStatus
- from dashscope import Application
- airouter = APIRouter()
- class RequestBody(BaseModel):
- uuid: str
- repo_url: str
- def generate_repo_path(uuid, repo_url):
- repo_name = repo_url.split("/")[-1].replace(".git", "")
- base_path = os.path.join(path, uuid)
- return os.path.join(base_path, repo_name), repo_name
- def filter_code_files(prompt):
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key=ai_key,
- app_id='c1a6dbb6d2314e469bfcbe44c2fe0a5f',
- prompt=prompt)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- print(json_data)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- json_data={"files":[]}
- else:
- print(f"请求失败: {response.message}")
- json_data = {"files": []}
- return json_data
- def analysis_results(local_path,path):
- prompt=""
- file_path=os.path.join(local_path,path)
- with open(file_path, 'r',encoding="utf8") as f:
- for line_num, line in enumerate(f, start=1):
- prompt+=f"{line_num}\t{line}"
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key=ai_key,
- app_id='2f288f146e2d492abb3fe22695e70635', # 替换为实际的应用 ID
- prompt=prompt)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- print(response.output.text)
- json_data={"summary":None}
- else:
- print(f"请求失败: {response.message}")
- json_data = {"summary":None}
- json_data["path"]=file_path
- print(json_data)
- return json_data
- def get_filtered_files(folder_path):
- base_path = Path(folder_path).resolve()
- if not base_path.is_dir():
- raise ValueError("无效的目录路径")
- file_list = []
- for root, dirs, files in os.walk(base_path):
- dirs[:] = [d for d in dirs if not d.startswith('.')]
- files = [f for f in files if not f.startswith('.')]
- for file in files:
- abs_path = Path(root) / file
- rel_path = abs_path.relative_to(base_path)
- file_list.append(str(rel_path))
- return file_list
- def process_batch1(batch_files):
- """多线程处理单个文件批次的函数"""
- try:
- js = filter_code_files(str(batch_files))
- return js.get("files", [])
- except Exception as e:
- print(f"处理批次时出错: {e}")
- return []
- def get_code_files(path):
- file_list = []
- files = get_filtered_files(path)
- print(files)
- print(f"找到 {len(files)} 个文件")
- # 将文件列表分块(每500个一组)
- chunks = [files[i * 500: (i + 1) * 500]
- for i in range(0, len(files) // 500 + 1)]
- with ThreadPoolExecutor(max_workers=min(5, len(chunks))) as executor:
- # 提交所有批次任务
- futures = [executor.submit(process_batch1, chunk) for chunk in chunks]
- # 实时获取已完成任务的结果
- for future in futures:
- try:
- batch_result = future.result()
- file_list.extend(batch_result)
- except Exception as e:
- print(f"获取结果时出错: {e}")
- print(f"最终合并文件数: {len(file_list)}")
- return file_list
- def process_batch2(local_path,path):
- """多线程处理单个文件批次的函数"""
- try:
- js = analysis_results(local_path,path)
- return js
- except Exception as e:
- print(f"处理批次时出错: {e}")
- return {"summary":None}
- def analysis(local_path):
- file_list = get_code_files(local_path)
- print(file_list)
- with ThreadPoolExecutor(max_workers=5) as executor:
- futures = [executor.submit(process_batch2, local_path, file) for file in file_list]
- for future in futures:
- try:
- batch_result = future.result()
- file_list.extend(batch_result)
- except Exception as e:
- print(f"获取结果时出错: {e}")
- @airouter.post("/scan")
- async def ai(request: RequestBody, background_tasks: BackgroundTasks):
- local_path, _ = generate_repo_path(request.uuid, request.repo_url)
- background_tasks.add_task(analysis, local_path)
- return {"code": 200, "meg": "添加扫描任务成功"}
|