import os import time import shutil import subprocess from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import concurrent.futures import threading # ====== 配置 ====== SOURCE_FOLDER = Path(r"E:\ELDEN RING\Game\cv_saved") # 监控的源目录 DEST_FOLDER = Path(r"E:") # 目标目录(必须是具体路径) FOLDER_PREFIX = "25" # 需要监控的文件夹前缀 MAX_WORKERS = 8 # 最大线程数 STABLE_TIME = 30 # 文件夹稳定检查时间(秒) # ================= def is_folder_stable(folder_path): """检查文件夹是否稳定(连续两次修改时间相同)""" try: initial_mtime = os.path.getmtime(folder_path) time.sleep(STABLE_TIME) current_mtime = os.path.getmtime(folder_path) return initial_mtime == current_mtime except Exception as e: print(f" [错误] 检查稳定性失败: {e}") return False def process_folder(folder_path): """使用 robocopy 移动文件夹""" start_time = time.time() folder_name = folder_path.name print(f"\n[处理] 开始移动文件夹: '{folder_name}'") print(f" 源路径: {folder_path}") print(f" 目标路径: {DEST_FOLDER / folder_name}") try: # 检查目标目录权限 if not os.access(DEST_FOLDER, os.W_OK): raise PermissionError(f"无写入权限: {DEST_FOLDER}") # 确保目标目录存在 DEST_FOLDER.mkdir(parents=True, exist_ok=True) # 构造 robocopy 命令 cmd = [ "robocopy", str(folder_path), # 源目录 str(DEST_FOLDER / folder_name), # 目标目录 "/MIR", # 镜像目录树 "/MT:8", # 多线程(8线程) "/R:2", # 失败重试2次 "/W:5", # 重试等待5秒 "/NFL", "/NDL", "/NJH", "/NJS" # 减少日志输出 ] print(f" 执行命令: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') # 处理 robocopy 返回码 if result.returncode >= 8: raise RuntimeError(f"Robocopy 失败 (代码 {result.returncode}): {result.stderr}") # 删除源目录(robocopy /MIR 已处理,此处作为二次确认) if (DEST_FOLDER / folder_name).exists(): print(f" 验证: 目标文件夹已存在,正在清理源文件夹...") shutil.rmtree(folder_path, ignore_errors=True) print(f" [完成] '{folder_name}' 移动成功") else: raise FileNotFoundError("目标文件夹未创建,终止删除源文件夹") except Exception as e: print(f" [错误] 处理失败: {e}") finally: duration = time.time() - start_time print(f" 耗时: {duration:.2f} 秒") class FolderMonitor(FileSystemEventHandler): def __init__(self, executor): super().__init__() self.executor = executor self.active_tasks = set() # 跟踪正在处理的任务 def on_created(self, event): """处理新创建的文件夹""" self._handle_folder(event.src_path, "创建") def on_moved(self, event): """处理移动/重命名的文件夹""" self._handle_folder(event.dest_path, "移动") def _handle_folder(self, path, action_type): """统一处理文件夹事件""" folder_path = Path(path) if not folder_path.is_dir(): return if folder_path.parent != SOURCE_FOLDER or not folder_path.name.startswith(FOLDER_PREFIX): return print(f"\n[发现] 通过 {action_type} 事件检测到文件夹: '{folder_path.name}'") # 避免重复处理 if folder_path.name in self.active_tasks: print(f" 警告: 文件夹 '{folder_path.name}' 已在处理中") return self.active_tasks.add(folder_path.name) self.executor.submit(self._process_with_stability_check, folder_path) def _process_with_stability_check(self, folder_path): """带稳定性检查的处理流程(无限等待直到稳定)""" folder_name = folder_path.name try: while True: if is_folder_stable(folder_path): print(f" [稳定] 文件夹 '{folder_name}' 已稳定,开始移动") process_folder(folder_path) break else: print(f" [监控] 文件夹 '{folder_name}' 仍在修改中,继续等待...") except Exception as e: print(f" [错误] 处理 '{folder_name}' 时发生异常: {e}") finally: self.active_tasks.discard(folder_name) def main(): print("\n===== 文件夹监控转移程序 =====") print(f"监控目录: {SOURCE_FOLDER}") print(f"目标目录: {DEST_FOLDER}") print(f"文件夹前缀: '{FOLDER_PREFIX}'") print(f"稳定性检查间隔: {STABLE_TIME}秒") print(f"最大线程数: {MAX_WORKERS}\n") # 验证目录 if not SOURCE_FOLDER.is_dir(): print(f"[错误] 源目录不存在: {SOURCE_FOLDER}") return try: DEST_FOLDER.mkdir(parents=True, exist_ok=True) except Exception as e: print(f"[错误] 无法创建目标目录: {e}") return # 启动监控 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: event_handler = FolderMonitor(executor) observer = Observer() observer.schedule(event_handler, str(SOURCE_FOLDER), recursive=False) observer.start() print("[系统] 监控已启动 (按 Ctrl+C 停止)...") try: while True: time.sleep(1) except KeyboardInterrupt: print("\n[系统] 正在停止监控...") finally: observer.stop() observer.join() print("[系统] 监控已停止") if __name__ == "__main__": main()