167 lines
6.2 KiB
Python
167 lines
6.2 KiB
Python
import os
|
||
import time
|
||
import shutil
|
||
import subprocess
|
||
from pathlib import Path
|
||
from watchdog.observers import Observer
|
||
from watchdog.events import FileSystemEventHandler
|
||
import concurrent.futures
|
||
import threading
|
||
|
||
# ====== 配置 ======
|
||
SOURCE_FOLDER = Path(r"E:\ELDEN RING\Game\cv_saved") # 监控的源目录
|
||
DEST_FOLDER = Path(r"E:") # 目标目录(必须是具体路径)
|
||
FOLDER_PREFIX = "25" # 需要监控的文件夹前缀
|
||
MAX_WORKERS = 8 # 最大线程数
|
||
STABLE_TIME = 30 # 文件夹稳定检查时间(秒)
|
||
# =================
|
||
|
||
def is_folder_stable(folder_path):
|
||
"""检查文件夹是否稳定(连续两次修改时间相同)"""
|
||
try:
|
||
initial_mtime = os.path.getmtime(folder_path)
|
||
time.sleep(STABLE_TIME)
|
||
current_mtime = os.path.getmtime(folder_path)
|
||
return initial_mtime == current_mtime
|
||
except Exception as e:
|
||
print(f" [错误] 检查稳定性失败: {e}")
|
||
return False
|
||
|
||
def process_folder(folder_path):
|
||
"""使用 robocopy 移动文件夹"""
|
||
start_time = time.time()
|
||
folder_name = folder_path.name
|
||
|
||
print(f"\n[处理] 开始移动文件夹: '{folder_name}'")
|
||
print(f" 源路径: {folder_path}")
|
||
print(f" 目标路径: {DEST_FOLDER / folder_name}")
|
||
|
||
try:
|
||
# 检查目标目录权限
|
||
if not os.access(DEST_FOLDER, os.W_OK):
|
||
raise PermissionError(f"无写入权限: {DEST_FOLDER}")
|
||
|
||
# 确保目标目录存在
|
||
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 构造 robocopy 命令
|
||
cmd = [
|
||
"robocopy",
|
||
str(folder_path), # 源目录
|
||
str(DEST_FOLDER / folder_name), # 目标目录
|
||
"/MIR", # 镜像目录树
|
||
"/MT:8", # 多线程(8线程)
|
||
"/R:2", # 失败重试2次
|
||
"/W:5", # 重试等待5秒
|
||
"/NFL", "/NDL", "/NJH", "/NJS" # 减少日志输出
|
||
]
|
||
|
||
print(f" 执行命令: {' '.join(cmd)}")
|
||
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
|
||
|
||
# 处理 robocopy 返回码
|
||
if result.returncode >= 8:
|
||
raise RuntimeError(f"Robocopy 失败 (代码 {result.returncode}): {result.stderr}")
|
||
|
||
# 删除源目录(robocopy /MIR 已处理,此处作为二次确认)
|
||
if (DEST_FOLDER / folder_name).exists():
|
||
print(f" 验证: 目标文件夹已存在,正在清理源文件夹...")
|
||
shutil.rmtree(folder_path, ignore_errors=True)
|
||
print(f" [完成] '{folder_name}' 移动成功")
|
||
else:
|
||
raise FileNotFoundError("目标文件夹未创建,终止删除源文件夹")
|
||
|
||
except Exception as e:
|
||
print(f" [错误] 处理失败: {e}")
|
||
finally:
|
||
duration = time.time() - start_time
|
||
print(f" 耗时: {duration:.2f} 秒")
|
||
|
||
class FolderMonitor(FileSystemEventHandler):
|
||
def __init__(self, executor):
|
||
super().__init__()
|
||
self.executor = executor
|
||
self.active_tasks = set() # 跟踪正在处理的任务
|
||
|
||
def on_created(self, event):
|
||
"""处理新创建的文件夹"""
|
||
self._handle_folder(event.src_path, "创建")
|
||
|
||
def on_moved(self, event):
|
||
"""处理移动/重命名的文件夹"""
|
||
self._handle_folder(event.dest_path, "移动")
|
||
|
||
def _handle_folder(self, path, action_type):
|
||
"""统一处理文件夹事件"""
|
||
folder_path = Path(path)
|
||
if not folder_path.is_dir():
|
||
return
|
||
|
||
if folder_path.parent != SOURCE_FOLDER or not folder_path.name.startswith(FOLDER_PREFIX):
|
||
return
|
||
|
||
print(f"\n[发现] 通过 {action_type} 事件检测到文件夹: '{folder_path.name}'")
|
||
|
||
# 避免重复处理
|
||
if folder_path.name in self.active_tasks:
|
||
print(f" 警告: 文件夹 '{folder_path.name}' 已在处理中")
|
||
return
|
||
|
||
self.active_tasks.add(folder_path.name)
|
||
self.executor.submit(self._process_with_stability_check, folder_path)
|
||
|
||
def _process_with_stability_check(self, folder_path):
|
||
"""带稳定性检查的处理流程(无限等待直到稳定)"""
|
||
folder_name = folder_path.name
|
||
|
||
try:
|
||
while True:
|
||
if is_folder_stable(folder_path):
|
||
print(f" [稳定] 文件夹 '{folder_name}' 已稳定,开始移动")
|
||
process_folder(folder_path)
|
||
break
|
||
else:
|
||
print(f" [监控] 文件夹 '{folder_name}' 仍在修改中,继续等待...")
|
||
except Exception as e:
|
||
print(f" [错误] 处理 '{folder_name}' 时发生异常: {e}")
|
||
finally:
|
||
self.active_tasks.discard(folder_name)
|
||
|
||
def main():
|
||
print("\n===== 文件夹监控转移程序 =====")
|
||
print(f"监控目录: {SOURCE_FOLDER}")
|
||
print(f"目标目录: {DEST_FOLDER}")
|
||
print(f"文件夹前缀: '{FOLDER_PREFIX}'")
|
||
print(f"稳定性检查间隔: {STABLE_TIME}秒")
|
||
print(f"最大线程数: {MAX_WORKERS}\n")
|
||
|
||
# 验证目录
|
||
if not SOURCE_FOLDER.is_dir():
|
||
print(f"[错误] 源目录不存在: {SOURCE_FOLDER}")
|
||
return
|
||
try:
|
||
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
|
||
except Exception as e:
|
||
print(f"[错误] 无法创建目标目录: {e}")
|
||
return
|
||
|
||
# 启动监控
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||
event_handler = FolderMonitor(executor)
|
||
observer = Observer()
|
||
observer.schedule(event_handler, str(SOURCE_FOLDER), recursive=False)
|
||
observer.start()
|
||
print("[系统] 监控已启动 (按 Ctrl+C 停止)...")
|
||
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
print("\n[系统] 正在停止监控...")
|
||
finally:
|
||
observer.stop()
|
||
observer.join()
|
||
print("[系统] 监控已停止")
|
||
|
||
if __name__ == "__main__":
|
||
main() |