12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import os
- from pathlib import Path
- import json
- from http import HTTPStatus
- from dashscope import Application
- a=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\electron"
- b=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\Gitnexus-python"
- c=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\GitNexus-backend"
- d=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\U-Clean-Reserve"
- e=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\nsrhwebNew"
- def filter_code_files(prompt):
- response = Application.call(
- # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
- api_key="sk-0164613e1a2143fc808fc4cc2451bef0",
- app_id='c1a6dbb6d2314e469bfcbe44c2fe0a5f', # 替换为实际的应用 ID
- prompt=prompt)
- if response.status_code == HTTPStatus.OK:
- try:
- json_data = json.loads(response.output.text)
- print(json_data)
- except json.JSONDecodeError:
- print("返回内容不是有效的 JSON 格式!")
- print(response.output.text)
- json_data={"dirs":[]}
- else:
- print(f"请求失败: {response.message}")
- return json_data
- def get_filtered_files(folder_path):
- base_path = Path(folder_path).resolve()
- if not base_path.is_dir():
- raise ValueError("无效的目录路径")
- file_list = []
- for root, dirs, files in os.walk(base_path):
- dirs[:] = [d for d in dirs if not d.startswith('.')]
- files = [f for f in files if not f.startswith('.')]
- for file in files:
- abs_path = Path(root) / file
- rel_path = abs_path.relative_to(base_path)
- file_list.append(str(rel_path))
- return file_list
- import threading
- from concurrent.futures import ThreadPoolExecutor
- def process_batch(batch_files):
- """多线程处理单个文件批次的函数"""
- try:
- js = filter_code_files(str(batch_files))
- return js.get("files", [])
- except Exception as e:
- print(f"处理批次时出错: {e}")
- return []
- def get_code_files(path):
- file_list = []
- files = get_filtered_files(path)
- print(f"找到 {len(files)} 个文件")
- # 将文件列表分块(每500个一组)
- chunks = [files[i * 500: (i + 1) * 500]
- for i in range(0, len(files) // 500 + 1)]
- with ThreadPoolExecutor(max_workers=min(5, len(chunks))) as executor:
- # 提交所有批次任务
- futures = [executor.submit(process_batch, chunk) for chunk in chunks]
- # 实时获取已完成任务的结果
- for future in futures:
- try:
- batch_result = future.result()
- file_list.extend(batch_result)
- except Exception as e:
- print(f"获取结果时出错: {e}")
- print(f"最终合并文件数: {len(file_list)}")
- return {"files": file_list}
- get_code_files(a)
|