python-script-library/转移文件non.py

167 lines
6.2 KiB
Python
Raw Permalink Normal View History

2025-10-28 16:02:55 +08:00
import os
import time
import shutil
import subprocess
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import concurrent.futures
import threading
# ====== 配置 ======
SOURCE_FOLDER = Path(r"E:\ELDEN RING\Game\cv_saved") # 监控的源目录
DEST_FOLDER = Path(r"E:") # 目标目录(必须是具体路径)
FOLDER_PREFIX = "25" # 需要监控的文件夹前缀
MAX_WORKERS = 8 # 最大线程数
STABLE_TIME = 30 # 文件夹稳定检查时间(秒)
# =================
def is_folder_stable(folder_path):
"""检查文件夹是否稳定(连续两次修改时间相同)"""
try:
initial_mtime = os.path.getmtime(folder_path)
time.sleep(STABLE_TIME)
current_mtime = os.path.getmtime(folder_path)
return initial_mtime == current_mtime
except Exception as e:
print(f" [错误] 检查稳定性失败: {e}")
return False
def process_folder(folder_path):
"""使用 robocopy 移动文件夹"""
start_time = time.time()
folder_name = folder_path.name
print(f"\n[处理] 开始移动文件夹: '{folder_name}'")
print(f" 源路径: {folder_path}")
print(f" 目标路径: {DEST_FOLDER / folder_name}")
try:
# 检查目标目录权限
if not os.access(DEST_FOLDER, os.W_OK):
raise PermissionError(f"无写入权限: {DEST_FOLDER}")
# 确保目标目录存在
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
# 构造 robocopy 命令
cmd = [
"robocopy",
str(folder_path), # 源目录
str(DEST_FOLDER / folder_name), # 目标目录
"/MIR", # 镜像目录树
"/MT:8", # 多线程8线程
"/R:2", # 失败重试2次
"/W:5", # 重试等待5秒
"/NFL", "/NDL", "/NJH", "/NJS" # 减少日志输出
]
print(f" 执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
# 处理 robocopy 返回码
if result.returncode >= 8:
raise RuntimeError(f"Robocopy 失败 (代码 {result.returncode}): {result.stderr}")
# 删除源目录robocopy /MIR 已处理,此处作为二次确认)
if (DEST_FOLDER / folder_name).exists():
print(f" 验证: 目标文件夹已存在,正在清理源文件夹...")
shutil.rmtree(folder_path, ignore_errors=True)
print(f" [完成] '{folder_name}' 移动成功")
else:
raise FileNotFoundError("目标文件夹未创建,终止删除源文件夹")
except Exception as e:
print(f" [错误] 处理失败: {e}")
finally:
duration = time.time() - start_time
print(f" 耗时: {duration:.2f}")
class FolderMonitor(FileSystemEventHandler):
def __init__(self, executor):
super().__init__()
self.executor = executor
self.active_tasks = set() # 跟踪正在处理的任务
def on_created(self, event):
"""处理新创建的文件夹"""
self._handle_folder(event.src_path, "创建")
def on_moved(self, event):
"""处理移动/重命名的文件夹"""
self._handle_folder(event.dest_path, "移动")
def _handle_folder(self, path, action_type):
"""统一处理文件夹事件"""
folder_path = Path(path)
if not folder_path.is_dir():
return
if folder_path.parent != SOURCE_FOLDER or not folder_path.name.startswith(FOLDER_PREFIX):
return
print(f"\n[发现] 通过 {action_type} 事件检测到文件夹: '{folder_path.name}'")
# 避免重复处理
if folder_path.name in self.active_tasks:
print(f" 警告: 文件夹 '{folder_path.name}' 已在处理中")
return
self.active_tasks.add(folder_path.name)
self.executor.submit(self._process_with_stability_check, folder_path)
def _process_with_stability_check(self, folder_path):
"""带稳定性检查的处理流程(无限等待直到稳定)"""
folder_name = folder_path.name
try:
while True:
if is_folder_stable(folder_path):
print(f" [稳定] 文件夹 '{folder_name}' 已稳定,开始移动")
process_folder(folder_path)
break
else:
print(f" [监控] 文件夹 '{folder_name}' 仍在修改中,继续等待...")
except Exception as e:
print(f" [错误] 处理 '{folder_name}' 时发生异常: {e}")
finally:
self.active_tasks.discard(folder_name)
def main():
print("\n===== 文件夹监控转移程序 =====")
print(f"监控目录: {SOURCE_FOLDER}")
print(f"目标目录: {DEST_FOLDER}")
print(f"文件夹前缀: '{FOLDER_PREFIX}'")
print(f"稳定性检查间隔: {STABLE_TIME}")
print(f"最大线程数: {MAX_WORKERS}\n")
# 验证目录
if not SOURCE_FOLDER.is_dir():
print(f"[错误] 源目录不存在: {SOURCE_FOLDER}")
return
try:
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[错误] 无法创建目标目录: {e}")
return
# 启动监控
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
event_handler = FolderMonitor(executor)
observer = Observer()
observer.schedule(event_handler, str(SOURCE_FOLDER), recursive=False)
observer.start()
print("[系统] 监控已启动 (按 Ctrl+C 停止)...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[系统] 正在停止监控...")
finally:
observer.stop()
observer.join()
print("[系统] 监控已停止")
if __name__ == "__main__":
main()