aiRouter.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import os, json
  2. from base_config import ai_key, path
  3. from fastapi import APIRouter, BackgroundTasks
  4. from pathlib import Path
  5. from pydantic import BaseModel
  6. from models.gitModels import Users
  7. from concurrent.futures import ThreadPoolExecutor
  8. from http import HTTPStatus
  9. from dashscope import Application
  10. airouter = APIRouter()
  11. class RequestBody(BaseModel):
  12. uuid: str
  13. repo_url: str
  14. def generate_repo_path(uuid, repo_url):
  15. repo_name = repo_url.split("/")[-1].replace(".git", "")
  16. base_path = os.path.join(path, uuid)
  17. return os.path.join(base_path, repo_name), repo_name
  18. def filter_code_files(prompt):
  19. response = Application.call(
  20. # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
  21. api_key=ai_key,
  22. app_id='c1a6dbb6d2314e469bfcbe44c2fe0a5f',
  23. prompt=prompt)
  24. if response.status_code == HTTPStatus.OK:
  25. try:
  26. json_data = json.loads(response.output.text)
  27. print(json_data)
  28. except json.JSONDecodeError:
  29. print("返回内容不是有效的 JSON 格式!")
  30. json_data={"files":[]}
  31. else:
  32. print(f"请求失败: {response.message}")
  33. json_data = {"files": []}
  34. return json_data
  35. def analysis_results(local_path,path):
  36. prompt=""
  37. file_path=os.path.join(local_path,path)
  38. with open(file_path, 'r',encoding="utf8") as f:
  39. for line_num, line in enumerate(f, start=1):
  40. prompt+=f"{line_num}\t{line}"
  41. response = Application.call(
  42. # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
  43. api_key=ai_key,
  44. app_id='2f288f146e2d492abb3fe22695e70635', # 替换为实际的应用 ID
  45. prompt=prompt)
  46. if response.status_code == HTTPStatus.OK:
  47. try:
  48. json_data = json.loads(response.output.text)
  49. except json.JSONDecodeError:
  50. print("返回内容不是有效的 JSON 格式!")
  51. print(response.output.text)
  52. json_data={"summary":None}
  53. else:
  54. print(f"请求失败: {response.message}")
  55. json_data = {"summary":None}
  56. json_data["path"]=file_path
  57. print(json_data)
  58. return json_data
  59. def get_filtered_files(folder_path):
  60. base_path = Path(folder_path).resolve()
  61. if not base_path.is_dir():
  62. raise ValueError("无效的目录路径")
  63. file_list = []
  64. for root, dirs, files in os.walk(base_path):
  65. dirs[:] = [d for d in dirs if not d.startswith('.')]
  66. files = [f for f in files if not f.startswith('.')]
  67. for file in files:
  68. abs_path = Path(root) / file
  69. rel_path = abs_path.relative_to(base_path)
  70. file_list.append(str(rel_path))
  71. return file_list
  72. def process_batch1(batch_files):
  73. """多线程处理单个文件批次的函数"""
  74. try:
  75. js = filter_code_files(str(batch_files))
  76. return js.get("files", [])
  77. except Exception as e:
  78. print(f"处理批次时出错: {e}")
  79. return []
  80. def get_code_files(path):
  81. file_list = []
  82. files = get_filtered_files(path)
  83. print(files)
  84. print(f"找到 {len(files)} 个文件")
  85. # 将文件列表分块(每500个一组)
  86. chunks = [files[i * 500: (i + 1) * 500]
  87. for i in range(0, len(files) // 500 + 1)]
  88. with ThreadPoolExecutor(max_workers=min(5, len(chunks))) as executor:
  89. # 提交所有批次任务
  90. futures = [executor.submit(process_batch1, chunk) for chunk in chunks]
  91. # 实时获取已完成任务的结果
  92. for future in futures:
  93. try:
  94. batch_result = future.result()
  95. file_list.extend(batch_result)
  96. except Exception as e:
  97. print(f"获取结果时出错: {e}")
  98. print(f"最终合并文件数: {len(file_list)}")
  99. return file_list
  100. def process_batch2(local_path,path):
  101. """多线程处理单个文件批次的函数"""
  102. try:
  103. js = analysis_results(local_path,path)
  104. return js
  105. except Exception as e:
  106. print(f"处理批次时出错: {e}")
  107. return {"summary":None}
  108. def analysis(local_path):
  109. file_list = get_code_files(local_path)
  110. print(file_list)
  111. with ThreadPoolExecutor(max_workers=5) as executor:
  112. futures = [executor.submit(process_batch2, local_path, file) for file in file_list]
  113. for future in futures:
  114. try:
  115. batch_result = future.result()
  116. file_list.extend(batch_result)
  117. except Exception as e:
  118. print(f"获取结果时出错: {e}")
  119. @airouter.post("/scan")
  120. async def ai(request: RequestBody, background_tasks: BackgroundTasks):
  121. local_path, _ = generate_repo_path(request.uuid, request.repo_url)
  122. background_tasks.add_task(analysis, local_path)
  123. return {"code": 200, "meg": "添加扫描任务成功"}