python-script-library/转移文件non.py

167 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import shutil
import subprocess
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import concurrent.futures
import threading
# ====== 配置 ======
SOURCE_FOLDER = Path(r"E:\ELDEN RING\Game\cv_saved") # 监控的源目录
DEST_FOLDER = Path(r"E:") # 目标目录(必须是具体路径)
FOLDER_PREFIX = "25" # 需要监控的文件夹前缀
MAX_WORKERS = 8 # 最大线程数
STABLE_TIME = 30 # 文件夹稳定检查时间(秒)
# =================
def is_folder_stable(folder_path):
"""检查文件夹是否稳定(连续两次修改时间相同)"""
try:
initial_mtime = os.path.getmtime(folder_path)
time.sleep(STABLE_TIME)
current_mtime = os.path.getmtime(folder_path)
return initial_mtime == current_mtime
except Exception as e:
print(f" [错误] 检查稳定性失败: {e}")
return False
def process_folder(folder_path):
"""使用 robocopy 移动文件夹"""
start_time = time.time()
folder_name = folder_path.name
print(f"\n[处理] 开始移动文件夹: '{folder_name}'")
print(f" 源路径: {folder_path}")
print(f" 目标路径: {DEST_FOLDER / folder_name}")
try:
# 检查目标目录权限
if not os.access(DEST_FOLDER, os.W_OK):
raise PermissionError(f"无写入权限: {DEST_FOLDER}")
# 确保目标目录存在
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
# 构造 robocopy 命令
cmd = [
"robocopy",
str(folder_path), # 源目录
str(DEST_FOLDER / folder_name), # 目标目录
"/MIR", # 镜像目录树
"/MT:8", # 多线程8线程
"/R:2", # 失败重试2次
"/W:5", # 重试等待5秒
"/NFL", "/NDL", "/NJH", "/NJS" # 减少日志输出
]
print(f" 执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
# 处理 robocopy 返回码
if result.returncode >= 8:
raise RuntimeError(f"Robocopy 失败 (代码 {result.returncode}): {result.stderr}")
# 删除源目录robocopy /MIR 已处理,此处作为二次确认)
if (DEST_FOLDER / folder_name).exists():
print(f" 验证: 目标文件夹已存在,正在清理源文件夹...")
shutil.rmtree(folder_path, ignore_errors=True)
print(f" [完成] '{folder_name}' 移动成功")
else:
raise FileNotFoundError("目标文件夹未创建,终止删除源文件夹")
except Exception as e:
print(f" [错误] 处理失败: {e}")
finally:
duration = time.time() - start_time
print(f" 耗时: {duration:.2f}")
class FolderMonitor(FileSystemEventHandler):
def __init__(self, executor):
super().__init__()
self.executor = executor
self.active_tasks = set() # 跟踪正在处理的任务
def on_created(self, event):
"""处理新创建的文件夹"""
self._handle_folder(event.src_path, "创建")
def on_moved(self, event):
"""处理移动/重命名的文件夹"""
self._handle_folder(event.dest_path, "移动")
def _handle_folder(self, path, action_type):
"""统一处理文件夹事件"""
folder_path = Path(path)
if not folder_path.is_dir():
return
if folder_path.parent != SOURCE_FOLDER or not folder_path.name.startswith(FOLDER_PREFIX):
return
print(f"\n[发现] 通过 {action_type} 事件检测到文件夹: '{folder_path.name}'")
# 避免重复处理
if folder_path.name in self.active_tasks:
print(f" 警告: 文件夹 '{folder_path.name}' 已在处理中")
return
self.active_tasks.add(folder_path.name)
self.executor.submit(self._process_with_stability_check, folder_path)
def _process_with_stability_check(self, folder_path):
"""带稳定性检查的处理流程(无限等待直到稳定)"""
folder_name = folder_path.name
try:
while True:
if is_folder_stable(folder_path):
print(f" [稳定] 文件夹 '{folder_name}' 已稳定,开始移动")
process_folder(folder_path)
break
else:
print(f" [监控] 文件夹 '{folder_name}' 仍在修改中,继续等待...")
except Exception as e:
print(f" [错误] 处理 '{folder_name}' 时发生异常: {e}")
finally:
self.active_tasks.discard(folder_name)
def main():
print("\n===== 文件夹监控转移程序 =====")
print(f"监控目录: {SOURCE_FOLDER}")
print(f"目标目录: {DEST_FOLDER}")
print(f"文件夹前缀: '{FOLDER_PREFIX}'")
print(f"稳定性检查间隔: {STABLE_TIME}")
print(f"最大线程数: {MAX_WORKERS}\n")
# 验证目录
if not SOURCE_FOLDER.is_dir():
print(f"[错误] 源目录不存在: {SOURCE_FOLDER}")
return
try:
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[错误] 无法创建目标目录: {e}")
return
# 启动监控
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
event_handler = FolderMonitor(executor)
observer = Observer()
observer.schedule(event_handler, str(SOURCE_FOLDER), recursive=False)
observer.start()
print("[系统] 监控已启动 (按 Ctrl+C 停止)...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[系统] 正在停止监控...")
finally:
observer.stop()
observer.join()
print("[系统] 监控已停止")
if __name__ == "__main__":
main()