file.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import os
  2. from pathlib import Path
  3. import json
  4. from http import HTTPStatus
  5. from dashscope import Application
  6. a=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\electron"
  7. b=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\Gitnexus-python"
  8. c=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\GitNexus-backend"
  9. d=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\U-Clean-Reserve"
  10. e=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\nsrhwebNew"
  11. def filter_code_files(prompt):
  12. response = Application.call(
  13. # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
  14. api_key="sk-0164613e1a2143fc808fc4cc2451bef0",
  15. app_id='c1a6dbb6d2314e469bfcbe44c2fe0a5f', # 替换为实际的应用 ID
  16. prompt=prompt)
  17. if response.status_code == HTTPStatus.OK:
  18. try:
  19. json_data = json.loads(response.output.text)
  20. print(json_data)
  21. except json.JSONDecodeError:
  22. print("返回内容不是有效的 JSON 格式!")
  23. print(response.output.text)
  24. json_data={"dirs":[]}
  25. else:
  26. print(f"请求失败: {response.message}")
  27. return json_data
  28. def get_filtered_files(folder_path):
  29. base_path = Path(folder_path).resolve()
  30. if not base_path.is_dir():
  31. raise ValueError("无效的目录路径")
  32. file_list = []
  33. for root, dirs, files in os.walk(base_path):
  34. dirs[:] = [d for d in dirs if not d.startswith('.')]
  35. files = [f for f in files if not f.startswith('.')]
  36. for file in files:
  37. abs_path = Path(root) / file
  38. rel_path = abs_path.relative_to(base_path)
  39. file_list.append(str(rel_path))
  40. return file_list
  41. import threading
  42. from concurrent.futures import ThreadPoolExecutor
  43. def process_batch(batch_files):
  44. """多线程处理单个文件批次的函数"""
  45. try:
  46. js = filter_code_files(str(batch_files))
  47. return js.get("files", [])
  48. except Exception as e:
  49. print(f"处理批次时出错: {e}")
  50. return []
  51. def get_code_files(path):
  52. file_list = []
  53. files = get_filtered_files(path)
  54. print(f"找到 {len(files)} 个文件")
  55. # 将文件列表分块(每500个一组)
  56. chunks = [files[i * 500: (i + 1) * 500]
  57. for i in range(0, len(files) // 500 + 1)]
  58. with ThreadPoolExecutor(max_workers=min(5, len(chunks))) as executor:
  59. # 提交所有批次任务
  60. futures = [executor.submit(process_batch, chunk) for chunk in chunks]
  61. # 实时获取已完成任务的结果
  62. for future in futures:
  63. try:
  64. batch_result = future.result()
  65. file_list.extend(batch_result)
  66. except Exception as e:
  67. print(f"获取结果时出错: {e}")
  68. print(f"最终合并文件数: {len(file_list)}")
  69. return {"files": file_list}
  70. get_code_files(a)