import os from pathlib import Path import json from http import HTTPStatus from dashscope import Application a=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\electron" b=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\Gitnexus-python" c=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\GitNexus-backend" d=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\U-Clean-Reserve" e=r"C:\www\gitnexus\9992cddb-b7d1-99ec-1bd2-35fdc177e623\nsrhwebNew" def filter_code_files(prompt): response = Application.call( # 若没有配置环境变量,可用百炼API Key将下行替换为:api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。 api_key="sk-0164613e1a2143fc808fc4cc2451bef0", app_id='c1a6dbb6d2314e469bfcbe44c2fe0a5f', # 替换为实际的应用 ID prompt=prompt) if response.status_code == HTTPStatus.OK: try: json_data = json.loads(response.output.text) print(json_data) except json.JSONDecodeError: print("返回内容不是有效的 JSON 格式!") print(response.output.text) json_data={"dirs":[]} else: print(f"请求失败: {response.message}") return json_data def get_filtered_files(folder_path): base_path = Path(folder_path).resolve() if not base_path.is_dir(): raise ValueError("无效的目录路径") file_list = [] for root, dirs, files in os.walk(base_path): dirs[:] = [d for d in dirs if not d.startswith('.')] files = [f for f in files if not f.startswith('.')] for file in files: abs_path = Path(root) / file rel_path = abs_path.relative_to(base_path) file_list.append(str(rel_path)) return file_list import threading from concurrent.futures import ThreadPoolExecutor def process_batch(batch_files): """多线程处理单个文件批次的函数""" try: js = filter_code_files(str(batch_files)) return js.get("files", []) except Exception as e: print(f"处理批次时出错: {e}") return [] def get_code_files(path): file_list = [] files = get_filtered_files(path) print(f"找到 {len(files)} 个文件") # 将文件列表分块(每500个一组) chunks = [files[i * 500: (i + 1) * 500] for i in range(0, len(files) // 500 + 1)] with ThreadPoolExecutor(max_workers=min(5, len(chunks))) as executor: # 提交所有批次任务 futures = [executor.submit(process_batch, chunk) for chunk in chunks] # 实时获取已完成任务的结果 for future in futures: try: batch_result = future.result() file_list.extend(batch_result) except Exception as e: print(f"获取结果时出错: {e}") print(f"最终合并文件数: {len(file_list)}") return {"files": file_list} get_code_files(a)