1028new
This commit is contained in:
commit
d74b66b95e
|
|
@ -0,0 +1,325 @@
|
|||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import tkinter as tk
|
||||
from tkinter import messagebox
|
||||
from datetime import datetime
|
||||
from send2trash import send2trash # 需提前安装:pip install send2trash
|
||||
|
||||
# =================================================================
|
||||
# 请在这里修改为您的实际路径!
|
||||
# =================================================================
|
||||
# F 盘:视频文件和此Python脚本所在的目录(自动获取,无需手动改)
|
||||
F_DISK_VIDEO_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# E 盘:与视频同名的文件夹所在的根目录(当前设为C盘根目录,需根据实际改)
|
||||
E_DISK_FOLDERS_ROOT_DIR = "E:\\"
|
||||
|
||||
# 日志文件配置(保存在F盘,与脚本同级)
|
||||
PASS_LOG_FILE = "passed_files.txt" # 通过视频日志
|
||||
FAIL_LOG_FILE = "rejected_files.txt" # 不通过视频日志
|
||||
# 新增视频检测间隔(秒)
|
||||
DETECTION_INTERVAL = 5 # 每5秒检测一次新视频
|
||||
# =================================================================
|
||||
|
||||
class VideoProcessorGUI:
|
||||
def __init__(self, root):
|
||||
"""初始化GUI窗口和变量"""
|
||||
self.root = root
|
||||
self.root.title("视频批量标记工具 (支持实时新增视频)")
|
||||
self.root.geometry("550x220")
|
||||
self.root.resizable(False, False)
|
||||
|
||||
# 检查E盘(当前是C盘)文件夹根目录是否存在
|
||||
if not os.path.isdir(E_DISK_FOLDERS_ROOT_DIR):
|
||||
messagebox.showerror("错误", f"目标盘文件夹根目录不存在!\n请检查路径: {E_DISK_FOLDERS_ROOT_DIR}")
|
||||
self.root.destroy()
|
||||
return
|
||||
|
||||
# 初始化视频文件列表(存储完整路径)
|
||||
self.video_files = self._get_video_files()
|
||||
self.current_index = 0
|
||||
self.last_checked_time = datetime.now() # 记录最后一次检测时间
|
||||
|
||||
# 创建界面组件
|
||||
self._create_widgets()
|
||||
# 开始处理第一个视频
|
||||
self._process_current_video()
|
||||
# 启动新视频检测定时器
|
||||
self._start_detection_timer()
|
||||
|
||||
def _force_focus_timer(self):
|
||||
"""定时强制获取焦点"""
|
||||
self.root.focus_force() # 强制获取焦点
|
||||
self.root.after(500, self._force_focus_timer)
|
||||
|
||||
def _get_video_files(self):
|
||||
"""获取F盘当前目录下的所有视频文件(仅返回完整路径)"""
|
||||
video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.flv', '.wmv', '.webm', '.ts')
|
||||
video_files = [
|
||||
os.path.join(F_DISK_VIDEO_DIR, f)
|
||||
for f in os.listdir(F_DISK_VIDEO_DIR)
|
||||
if os.path.isfile(os.path.join(F_DISK_VIDEO_DIR, f)) and f.lower().endswith(video_extensions)
|
||||
]
|
||||
return video_files # 不排序,确保新增文件按系统顺序添加
|
||||
|
||||
def _create_widgets(self):
|
||||
"""创建弹窗中的按钮、文字等组件"""
|
||||
# 新视频提示标签(红色提示新增文件)
|
||||
self.new_files_label = tk.Label(self.root, text="", fg="red", pady=5)
|
||||
self.new_files_label.pack()
|
||||
|
||||
# 状态提示(如“正在处理第1/5个”)
|
||||
self.status_label = tk.Label(self.root, text="", pady=5)
|
||||
self.status_label.pack()
|
||||
|
||||
# 当前视频文件名(蓝色加粗)
|
||||
self.filename_label = tk.Label(self.root, text="", font=("Arial", 12), fg="blue")
|
||||
self.filename_label.pack()
|
||||
|
||||
# 按钮容器(让按钮横向排列)
|
||||
button_frame = tk.Frame(self.root)
|
||||
button_frame.pack(side=tk.BOTTOM, pady=20)
|
||||
|
||||
self.root.bind('<Key>', self._handle_key_press)
|
||||
self.root.focus_force() # 强制获取焦点
|
||||
self.root.bind('<FocusOut>', lambda e: self.root.focus_force())
|
||||
|
||||
# 上一条/重播/通过/不通过/下一条按钮
|
||||
self.prev_btn = tk.Button(button_frame, text="上一条", width=10, command=self._prev_video)
|
||||
self.replay_btn = tk.Button(button_frame, text="重播", width=10, command=self._replay_video)
|
||||
self.pass_btn = tk.Button(button_frame, text="通过", width=10, command=self._mark_as_pass, bg="green", fg="white")
|
||||
self.fail_btn = tk.Button(button_frame, text="不通过", width=10, command=self._mark_as_fail, bg="red", fg="white")
|
||||
self.next_btn = tk.Button(button_frame, text="下一条", width=10, command=self._next_video)
|
||||
|
||||
# 按钮布局(横向均匀分布)
|
||||
self.prev_btn.pack(side=tk.LEFT, padx=5)
|
||||
self.replay_btn.pack(side=tk.LEFT, padx=5)
|
||||
self.pass_btn.pack(side=tk.LEFT, padx=5)
|
||||
self.fail_btn.pack(side=tk.LEFT, padx=5)
|
||||
self.next_btn.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
def _start_detection_timer(self):
|
||||
"""启动定时器,定期检测新视频文件"""
|
||||
self._check_new_videos() # 立即执行一次检测
|
||||
# 每隔DETECTION_INTERVAL秒重复检测
|
||||
self.root.after(DETECTION_INTERVAL * 1000, self._start_detection_timer)
|
||||
|
||||
def _check_new_videos(self):
|
||||
"""检测并添加新增的视频文件,若之前无视频则自动启动播放"""
|
||||
if not os.path.isdir(F_DISK_VIDEO_DIR):
|
||||
return # 目录不存在则跳过检测
|
||||
|
||||
# 获取当前目录下的所有视频(最新状态)
|
||||
current_all_videos = self._get_video_files()
|
||||
# 找出不在当前处理列表中的新视频(通过路径对比)
|
||||
existing_paths = set(self.video_files)
|
||||
new_videos = [path for path in current_all_videos if path not in existing_paths]
|
||||
|
||||
if new_videos:
|
||||
# 记录新增前的列表长度(判断是否之前无视频)
|
||||
was_empty = len(self.video_files) == 0
|
||||
|
||||
# 将新视频添加到处理列表末尾
|
||||
self.video_files.extend(new_videos)
|
||||
# 提取新视频的文件名(用于提示)
|
||||
new_filenames = [os.path.basename(path) for path in new_videos]
|
||||
# 显示新增提示(3秒后自动清除提示)
|
||||
self.new_files_label.config(text=f"检测到 {len(new_videos)} 个新视频,已添加到列表:{', '.join(new_filenames[:2])}{'...' if len(new_filenames)>2 else ''}")
|
||||
self.root.after(3000, lambda: self.new_files_label.config(text="")) # 3秒后清除提示
|
||||
print(f"新增视频:{new_filenames}")
|
||||
|
||||
# 核心修复:若之前无视频,新增后自动从第一个新视频开始播放
|
||||
if was_empty:
|
||||
self.current_index = 0 # 确保从第一个新增视频开始
|
||||
self._process_current_video() # 触发播放
|
||||
|
||||
# 更新最后检测时间
|
||||
self.last_checked_time = datetime.now()
|
||||
|
||||
def _play_video(self, video_path):
|
||||
"""用系统默认播放器打开视频"""
|
||||
try:
|
||||
if sys.platform == 'win32': # 针对Windows系统
|
||||
os.startfile(video_path)
|
||||
else: # 针对Mac/Linux系统(备用)
|
||||
opener = 'open' if sys.platform == 'darwin' else 'xdg-open'
|
||||
subprocess.run([opener, video_path], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
except Exception as e:
|
||||
# 播放失败时弹窗提示
|
||||
error_msg = (
|
||||
f"无法播放视频!\n\n"
|
||||
f"系统错误信息: {e}\n\n"
|
||||
f"**请检查:**\n"
|
||||
f"1. 您是否已安装视频播放器?\n"
|
||||
f"2. 您是否已将其设置为默认播放器?"
|
||||
)
|
||||
messagebox.showerror("播放错误", error_msg)
|
||||
|
||||
def _log_file(self, filename, log_type):
|
||||
"""通用日志记录方法:根据log_type记录到对应的TXT文件"""
|
||||
try:
|
||||
# 选择日志文件(通过→passed_files.txt,不通过→rejected_files.txt)
|
||||
log_file = PASS_LOG_FILE if log_type == "pass" else FAIL_LOG_FILE
|
||||
# 日志文件路径:和脚本、视频在同一目录(F盘)
|
||||
log_file_path = os.path.join(F_DISK_VIDEO_DIR, log_file)
|
||||
# 追加写入(不覆盖历史记录),包含时间戳
|
||||
with open(log_file_path, 'a', encoding='utf-8') as f:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
f.write(f"[{timestamp}] {filename}\n")
|
||||
return True # 记录成功
|
||||
except Exception as e:
|
||||
print(f"写入日志失败: {e}") # 终端打印错误(不影响用户操作)
|
||||
return False # 记录失败
|
||||
|
||||
def _handle_key_press(self, event):
|
||||
"""处理键盘按键"""
|
||||
if event.char == '2': # 按下1键
|
||||
self._mark_as_pass()
|
||||
elif event.char == '3': # 按下2键
|
||||
self._mark_as_fail()
|
||||
|
||||
def _process_current_video(self):
|
||||
"""更新弹窗状态,并自动播放当前视频"""
|
||||
if not self.video_files: # 没有视频可处理时
|
||||
self.status_label.config(text="所有视频处理完毕!(将继续检测新视频...)", fg="green")
|
||||
self.filename_label.config(text="")
|
||||
return
|
||||
|
||||
# 获取当前视频路径和文件名
|
||||
current_video_path = self.video_files[self.current_index]
|
||||
current_video_name = os.path.basename(current_video_path)
|
||||
|
||||
# 更新弹窗文字提示(覆盖之前的“处理完毕”提示)
|
||||
self.status_label.config(text=f"正在处理: 第 {self.current_index + 1}/{len(self.video_files)} 个")
|
||||
self.filename_label.config(text=current_video_name)
|
||||
|
||||
# 自动播放当前视频
|
||||
self._play_video(current_video_path)
|
||||
|
||||
def _get_associated_folder(self, video_path):
|
||||
"""获取E盘(当前是C盘)上与视频同名的文件夹路径(仅查询,不操作)"""
|
||||
video_basename = os.path.basename(video_path)
|
||||
folder_name = os.path.splitext(video_basename)[0] # 提取视频名(去后缀)
|
||||
folder_path = os.path.join(E_DISK_FOLDERS_ROOT_DIR, folder_name)
|
||||
|
||||
if os.path.isdir(folder_path):
|
||||
return folder_path
|
||||
return None
|
||||
|
||||
def _mark_as_pass(self):
|
||||
"""处理“通过”:删除视频+记录到passed_files.txt"""
|
||||
if not self.video_files:
|
||||
return
|
||||
os.system('taskkill /f /im "vlc.exe"')
|
||||
# 获取当前视频信息(路径、带后缀名、去后缀名)
|
||||
current_video_path = self.video_files[self.current_index]
|
||||
video_basename = os.path.basename(current_video_path)
|
||||
video_filename = os.path.splitext(video_basename)[0] # 去后缀的文件名(如“视频1”)
|
||||
|
||||
|
||||
# 1. 记录“通过”日志
|
||||
log_success = self._log_file(video_filename, "pass")
|
||||
|
||||
# 2. 提示用户操作结果(包含日志记录情况)
|
||||
if log_success:
|
||||
messagebox.showinfo("操作结果",
|
||||
f"已标记为通过: {video_filename}\n"
|
||||
f"文件名已记录到 {PASS_LOG_FILE}")
|
||||
else:
|
||||
messagebox.showerror("操作结果",
|
||||
f"已标记为通过: {video_filename}\n"
|
||||
f"警告:文件名未成功记录到 {PASS_LOG_FILE}")
|
||||
|
||||
# 3. 删除F盘的视频(移至回收站,安全删除)
|
||||
try:
|
||||
send2trash(current_video_path)
|
||||
print(f"通过视频已删除: {current_video_path}")
|
||||
except Exception as e:
|
||||
messagebox.showerror("操作失败",
|
||||
f"无法删除视频: {e}\n"
|
||||
f"请确保视频没有被播放器占用(关闭播放器后重试)")
|
||||
|
||||
# 4. 处理下一个视频
|
||||
self._move_to_next()
|
||||
|
||||
def _mark_as_fail(self):
|
||||
"""处理“不通过”:删除视频+记录到rejected_files.txt"""
|
||||
if not self.video_files:
|
||||
return
|
||||
os.system('taskkill /f /im "vlc.exe"')
|
||||
# 获取当前视频信息
|
||||
current_video_path = self.video_files[self.current_index]
|
||||
video_basename = os.path.basename(current_video_path)
|
||||
video_filename = os.path.splitext(video_basename)[0]
|
||||
|
||||
|
||||
# 1. 记录“不通过”日志
|
||||
log_success = self._log_file(video_filename, "fail")
|
||||
|
||||
# 2. 提示用户操作结果
|
||||
if log_success:
|
||||
messagebox.showinfo("操作结果",
|
||||
f"已标记为不通过: {video_filename}\n"
|
||||
f"文件名已记录到 {FAIL_LOG_FILE}")
|
||||
else:
|
||||
messagebox.showerror("操作结果",
|
||||
f"已标记为不通过: {video_filename}\n"
|
||||
f"警告:文件名未成功记录到 {FAIL_LOG_FILE}")
|
||||
|
||||
# 3. 删除F盘的视频(移至回收站)
|
||||
try:
|
||||
send2trash(current_video_path)
|
||||
print(f"不通过视频已删除: {current_video_path}")
|
||||
except Exception as e:
|
||||
messagebox.showerror("操作失败",
|
||||
f"无法删除视频: {e}\n"
|
||||
f"请确保视频没有被播放器占用(关闭播放器后重试)")
|
||||
|
||||
# 4. 处理下一个视频
|
||||
self._move_to_next()
|
||||
|
||||
def _move_to_next(self):
|
||||
"""移除已处理的视频,切换到下一个"""
|
||||
if not self.video_files:
|
||||
return
|
||||
|
||||
self.video_files.pop(self.current_index)
|
||||
# 避免删除最后一个视频后索引越界
|
||||
if self.current_index >= len(self.video_files) and self.video_files:
|
||||
self.current_index = len(self.video_files) - 1
|
||||
# 更新弹窗状态,播放下一个视频
|
||||
self._process_current_video()
|
||||
|
||||
def _replay_video(self):
|
||||
"""重播当前视频"""
|
||||
if self.video_files:
|
||||
self._play_video(self.video_files[self.current_index])
|
||||
|
||||
def _next_video(self):
|
||||
"""切换到下一个视频(不处理当前视频)"""
|
||||
if not self.video_files:
|
||||
return
|
||||
|
||||
if self.current_index < len(self.video_files) - 1:
|
||||
self.current_index += 1
|
||||
self._process_current_video()
|
||||
else:
|
||||
messagebox.showinfo("提示", "已经是最后一个视频了。")
|
||||
|
||||
def _prev_video(self):
|
||||
"""切换到上一个视频(不处理当前视频)"""
|
||||
if not self.video_files:
|
||||
return
|
||||
|
||||
if self.current_index > 0:
|
||||
self.current_index -= 1
|
||||
self._process_current_video()
|
||||
else:
|
||||
messagebox.showinfo("提示", "已经是第一个视频了。")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 启动弹窗程序
|
||||
root = tk.Tk()
|
||||
app = VideoProcessorGUI(root)
|
||||
root.mainloop()
|
||||
|
|
@ -0,0 +1,177 @@
|
|||
import time
|
||||
import threading
|
||||
from pynput import keyboard
|
||||
from pynput.keyboard import Key, Controller
|
||||
|
||||
|
||||
class AutoKeyPresser:
|
||||
# =============== 全局配置 ===============
|
||||
CONTROL_KEY = 'p' # 控制定时器启动/暂停的快捷键
|
||||
TARGET_KEY = Key.f10 # 要自动按下的目标键(默认为F10)
|
||||
INTERVAL = 60 + 52 # 定时器间隔时间(1分钟52秒 = 112秒)
|
||||
|
||||
# =============== 结束配置 ===============
|
||||
|
||||
def __init__(self):
|
||||
self.timer = None
|
||||
self.is_active = False
|
||||
self.listener = None
|
||||
self.keyboard_controller = Controller()
|
||||
self.ignore_next_key = False # 标志位,用于忽略程序自动按下的目标键
|
||||
self.timer_lock = threading.Lock() # 线程锁确保状态同步
|
||||
self.last_activity_time = 0 # 记录最后一次活动时间
|
||||
|
||||
def simulate_key_press(self):
|
||||
"""模拟按下配置的目标键"""
|
||||
try:
|
||||
self.ignore_next_key = True # 设置标志,忽略接下来的目标键事件
|
||||
# 使用正确的键模拟
|
||||
self.keyboard_controller.press(self.TARGET_KEY)
|
||||
time.sleep(0.05) # 短暂延迟模拟真实按键
|
||||
self.keyboard_controller.release(self.TARGET_KEY)
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
key_name = self._get_key_name(self.TARGET_KEY)
|
||||
print(f"[{current_time}] 已自动按下 {key_name} 键")
|
||||
except Exception as e:
|
||||
print(f"模拟按下 {self._get_key_name(self.TARGET_KEY)} 时发生错误: {e}")
|
||||
|
||||
def _get_key_name(self, key):
|
||||
"""获取按键的可读名称"""
|
||||
if hasattr(key, 'name'):
|
||||
return key.name.upper()
|
||||
elif hasattr(key, 'char'):
|
||||
return key.char.upper()
|
||||
return str(key)
|
||||
|
||||
def start_timer(self):
|
||||
"""启动定时器(总是重新开始112秒计时)"""
|
||||
with self.timer_lock:
|
||||
if self.is_active:
|
||||
return
|
||||
|
||||
self.is_active = True
|
||||
self.last_activity_time = time.time() # 重置开始时间
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
key_name = self._get_key_name(self.TARGET_KEY)
|
||||
print(f"[{current_time}] 定时器已启动,将在{self.INTERVAL}秒后按下 {key_name} 键")
|
||||
print(f"按 {self.CONTROL_KEY.upper()} 键可以暂停定时器")
|
||||
|
||||
def timer_loop():
|
||||
# 记录本次循环的开始时间
|
||||
cycle_start_time = time.time()
|
||||
|
||||
while self.is_active:
|
||||
# 计算已经过去的时间
|
||||
elapsed_time = time.time() - cycle_start_time
|
||||
remaining_time = self.INTERVAL - elapsed_time
|
||||
|
||||
# 如果已经达到或超过间隔时间,执行按键操作
|
||||
if remaining_time <= 0:
|
||||
if self.is_active: # 再次检查状态
|
||||
self.simulate_key_press()
|
||||
# 重置循环开始时间,开始新的周期
|
||||
cycle_start_time = time.time()
|
||||
continue
|
||||
|
||||
# 分段等待,便于及时响应暂停命令
|
||||
wait_interval = min(0.5, remaining_time) # 每次最多等待0.5秒
|
||||
if wait_interval > 0:
|
||||
time.sleep(wait_interval)
|
||||
|
||||
self.timer = threading.Thread(target=timer_loop)
|
||||
self.timer.daemon = True
|
||||
self.timer.start()
|
||||
|
||||
def pause_timer(self):
|
||||
"""暂停定时器"""
|
||||
with self.timer_lock:
|
||||
if not self.is_active:
|
||||
return
|
||||
|
||||
self.is_active = False
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
# 计算已过去的时间(用于显示信息)
|
||||
if self.last_activity_time > 0:
|
||||
elapsed = time.time() - self.last_activity_time
|
||||
remaining = max(0, self.INTERVAL - elapsed)
|
||||
print(f"[{current_time}] 定时器已暂停(已等待{elapsed:.1f}秒,剩余{remaining:.1f}秒)")
|
||||
else:
|
||||
print(f"[{current_time}] 定时器已暂停")
|
||||
|
||||
def on_press(self, key):
|
||||
"""键盘按键事件处理"""
|
||||
try:
|
||||
# 检测控制键按下,用于控制定时器暂停/恢复
|
||||
if hasattr(key, 'char') and (key.char.lower() == self.CONTROL_KEY.lower()):
|
||||
if not self.is_active:
|
||||
# 按下控制键,启动定时器(重新开始计时)
|
||||
self.start_timer()
|
||||
else:
|
||||
# 定时器运行中按下控制键,暂停定时器
|
||||
self.pause_timer()
|
||||
|
||||
# 处理Esc键事件 - 不再退出程序,只显示信息
|
||||
elif key == Key.esc:
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
status = '运行中' if self.is_active else '已暂停'
|
||||
print(f"[{current_time}] 检测到按下Esc键(定时器状态:{status})")
|
||||
print("提示:Esc键已禁用退出功能,程序继续运行")
|
||||
|
||||
# 处理目标键事件(仅用于显示,不影响定时器状态)
|
||||
elif key == self.TARGET_KEY:
|
||||
if self.ignore_next_key:
|
||||
# 如果是程序自动按下的键,忽略它
|
||||
self.ignore_next_key = False
|
||||
else:
|
||||
# 用户手动按下的目标键,只显示信息不影响定时器
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
status = '运行中' if self.is_active else '已暂停'
|
||||
key_name = self._get_key_name(self.TARGET_KEY)
|
||||
print(f"[{current_time}] 检测到手动按下 {key_name} 键(定时器状态:{status})")
|
||||
|
||||
except AttributeError:
|
||||
# 处理特殊功能键(没有char属性的键)
|
||||
if key == Key.esc:
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
status = '运行中' if self.is_active else '已暂停'
|
||||
print(f"[{current_time}] 检测到按下Esc键(定时器状态:{status})")
|
||||
print("提示:Esc键已禁用退出功能,程序继续运行")
|
||||
elif key == self.TARGET_KEY:
|
||||
if self.ignore_next_key:
|
||||
self.ignore_next_key = False
|
||||
else:
|
||||
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
status = '运行中' if self.is_active else '已暂停'
|
||||
key_name = self._get_key_name(self.TARGET_KEY)
|
||||
print(f"[{current_time}] 检测到手动按下 {key_name} 键(定时器状态:{status})")
|
||||
|
||||
def on_release(self, key):
|
||||
"""键盘释放事件处理 - 移除Esc键退出功能"""
|
||||
pass
|
||||
|
||||
def start_listening(self):
|
||||
"""开始监听键盘事件"""
|
||||
key_name = self._get_key_name(self.TARGET_KEY)
|
||||
print(f"{key_name}自动按下程序已启动")
|
||||
print(f"按 {self.CONTROL_KEY.upper()} 键开始定时器,再次按 {self.CONTROL_KEY.upper()} 键暂停定时器")
|
||||
print(f"定时器间隔时间:{self.INTERVAL}秒")
|
||||
print(f"目标按键:{key_name}")
|
||||
print("提示:定时器暂停后重新启动将重新开始计时")
|
||||
print("提示:Esc键退出功能已禁用,如需退出程序请使用Ctrl+C")
|
||||
|
||||
# 同时监听按下和释放事件
|
||||
with keyboard.Listener(on_press=self.on_press, on_release=self.on_release) as self.listener:
|
||||
try:
|
||||
self.listener.join()
|
||||
except Exception as e:
|
||||
print(f"监听器错误: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
auto_presser = AutoKeyPresser()
|
||||
auto_presser.start_listening()
|
||||
except KeyboardInterrupt:
|
||||
print("\n程序被用户中断(Ctrl+C)")
|
||||
except Exception as e:
|
||||
print(f"程序运行错误: {e}")
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
import keyboard
|
||||
import time
|
||||
import sys
|
||||
|
||||
# 可配置参数
|
||||
START_STOP_KEY = '-' # 启动/停止按键
|
||||
HOLD_KEY = 'w' # 程序运行时持续按下的键
|
||||
EXIT_KEY = 'esc' # 注意:虽然你要求ESC不退出,但我们会忽略它
|
||||
|
||||
# 状态变量
|
||||
is_running = False
|
||||
should_exit = False
|
||||
|
||||
def on_p_pressed(e):
|
||||
global is_running
|
||||
if e.event_type == keyboard.KEY_DOWN:
|
||||
is_running = not is_running
|
||||
if not is_running: # 当停止时立即释放按键
|
||||
keyboard.release(HOLD_KEY)
|
||||
print(f"程序 {'已启动' if is_running else '已停止'}")
|
||||
|
||||
def main():
|
||||
print(f"程序已启动,按 {START_STOP_KEY} 键开始/停止,按 Ctrl+C 退出")
|
||||
print(f"程序运行时将持续按下 {HOLD_KEY} 键")
|
||||
|
||||
keyboard.hook_key(START_STOP_KEY, on_p_pressed)
|
||||
|
||||
try:
|
||||
while not should_exit:
|
||||
if is_running:
|
||||
keyboard.press(HOLD_KEY)
|
||||
# 保持按键状态,但允许其他事件处理
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
print("\n程序通过 Ctrl+C 退出")
|
||||
finally:
|
||||
if is_running:
|
||||
keyboard.release(HOLD_KEY)
|
||||
keyboard.unhook_all()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,389 @@
|
|||
import os
|
||||
import tkinter as tk
|
||||
from tkinter import messagebox
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
# =============== 全局命令配置 ===============
|
||||
COMMANDS_TEMPLATE = [
|
||||
"conda activate aiden-p3d",
|
||||
"d:",
|
||||
"cd aiden-p3d",
|
||||
]
|
||||
|
||||
# 扫描路径配置(默认F盘根目录)
|
||||
SCAN_PATH = "E:\\" # 修改为您需要的默认扫描路径
|
||||
|
||||
# 新增文件夹检测间隔(秒)
|
||||
DETECTION_INTERVAL = 5 # 每5秒检测一次新文件夹
|
||||
|
||||
# 游戏配置映射
|
||||
GAME_COMMANDS = {
|
||||
"赛博朋克2077": {
|
||||
"command": "python load_point_cloud_saibo_new.py \"{}\"",
|
||||
"description": "赛博朋克2077 - python load_point_cloud_saibo_new.py"
|
||||
},
|
||||
"巫师3": {
|
||||
"command": "python load_point_cloud_wushi_new.py \"{}\"",
|
||||
"description": "巫师3 - python load_point_cloud_wushi_new.py"
|
||||
},
|
||||
"孤岛危机": {
|
||||
"command": "python load_point_cloud_crysis_new.py \"{}\"",
|
||||
"description": "孤岛危机 - python load_point_cloud_crysis_new.py"
|
||||
},
|
||||
"地平线零之曙光": {
|
||||
"command": "python load_point_horizon_new.py \"{}\"",
|
||||
"description": "地平线零之曙光 - python load_point_horizon_new.py"
|
||||
},
|
||||
"原子之心": {
|
||||
"command": "python debug_atomicHeart.py -d \"{}\"",
|
||||
"description": "原子之心 - python debug_atomicHeart.py -d"
|
||||
},
|
||||
"巴士模拟18": {
|
||||
"command": "python debug_bus18.py -d \"{}\"",
|
||||
"description": "巴士模拟18 - python debug_bus18.py -d"
|
||||
},
|
||||
"超自然车旅": {
|
||||
"command": "python debug_pacificdrive.py -d \"{}\"",
|
||||
"description": "超自然车旅 - python debug_pacificdrive.py -d"
|
||||
},
|
||||
"印蒂卡": {
|
||||
"command": "python debug_indika.py -d \"{}\"",
|
||||
"description": "印蒂卡 - python debug_indika.py -d"
|
||||
},
|
||||
"黑神话:悟空": {
|
||||
"command": "python restruct_b1.py --data-dir \"{}\"",
|
||||
"description": "黑神话:悟空 - python restruct_b1.py --data-dir"
|
||||
},
|
||||
"新游戏选项": {
|
||||
"command": "conda activate worldgen && E: && cd datacheck && run.bat \"{}\"",
|
||||
"description": "新游戏选项 - E: && cd datacheck && run.bat"
|
||||
}
|
||||
}
|
||||
|
||||
# MeshLab路径
|
||||
MESHLAB_PATH = r"C:\Program Files\VCG\MeshLab\meshlab.exe"
|
||||
|
||||
# 输出目录
|
||||
OUTPUT_DIR = r"E:\datacheck\outputs"
|
||||
|
||||
# =============== 结束配置 ===============
|
||||
|
||||
class FolderPlayerApp:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
self.root.title("CV Saved Folders Player")
|
||||
self.root.geometry("700x250") # 增加窗口宽度以适应新按钮
|
||||
self.root.resizable(False, False)
|
||||
|
||||
self.selected_game = tk.StringVar(value="赛博朋克2077")
|
||||
self.scan_folders()
|
||||
|
||||
if not self.folders:
|
||||
messagebox.showerror("错误", f"在路径 {SCAN_PATH} 中没有找到以'cv_saved_'或'25'为前缀的文件夹")
|
||||
sys.exit(1)
|
||||
|
||||
self.current_index = 0
|
||||
self.process = None
|
||||
self.last_scan_time = time.time()
|
||||
|
||||
self.create_ui()
|
||||
self.update_display()
|
||||
self.start_detection_timer()
|
||||
self._force_focus_timer()
|
||||
|
||||
def _force_focus_timer(self):
|
||||
self.root.focus_force()
|
||||
self.root.after(500, self._force_focus_timer)
|
||||
|
||||
def start_detection_timer(self):
|
||||
self.check_new_folders()
|
||||
self.root.after(DETECTION_INTERVAL * 1000, self.start_detection_timer)
|
||||
|
||||
def check_new_folders(self):
|
||||
current_dir = SCAN_PATH
|
||||
try:
|
||||
all_items = os.listdir(current_dir)
|
||||
except Exception as e:
|
||||
print(f"检测新文件夹时出错: {e}")
|
||||
return
|
||||
|
||||
processed_folders = set()
|
||||
for filename in ["passed_files.txt", "rejected_files.txt"]:
|
||||
if os.path.exists(filename):
|
||||
with open(filename, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
processed_folders.add(line)
|
||||
|
||||
new_folders = [
|
||||
os.path.join(current_dir, item)
|
||||
for item in all_items
|
||||
if ((item.startswith("cv_saved_") or item.startswith("25")) and
|
||||
os.path.isdir(os.path.join(current_dir, item)) and
|
||||
item not in processed_folders and
|
||||
os.path.join(current_dir, item) not in self.folders)
|
||||
]
|
||||
|
||||
if new_folders:
|
||||
new_folders.sort()
|
||||
was_empty = len(self.folders) == 0
|
||||
self.folders.extend(new_folders)
|
||||
|
||||
if was_empty and self.folders:
|
||||
self.current_index = 0
|
||||
self.update_display()
|
||||
|
||||
messagebox.showinfo("提示", f"检测到 {len(new_folders)} 个新文件夹,已添加到列表")
|
||||
|
||||
def scan_folders(self):
|
||||
current_dir = SCAN_PATH
|
||||
try:
|
||||
all_items = os.listdir(current_dir)
|
||||
except FileNotFoundError:
|
||||
messagebox.showerror("错误", f"指定的扫描路径不存在: {SCAN_PATH}")
|
||||
sys.exit(1)
|
||||
except PermissionError:
|
||||
messagebox.showerror("错误", f"没有权限访问指定路径: {SCAN_PATH}")
|
||||
sys.exit(1)
|
||||
|
||||
processed_folders = set()
|
||||
for filename in ["passed_files.txt", "rejected_files.txt"]:
|
||||
if os.path.exists(filename):
|
||||
with open(filename, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
processed_folders.add(line)
|
||||
|
||||
self.folders = [
|
||||
os.path.join(current_dir, item)
|
||||
for item in all_items
|
||||
if ((item.startswith("cv_saved_") or item.startswith("25")) and
|
||||
os.path.isdir(os.path.join(current_dir, item)) and
|
||||
item not in processed_folders)
|
||||
]
|
||||
self.folders.sort()
|
||||
|
||||
def create_ui(self):
|
||||
game_frame = tk.Frame(self.root)
|
||||
game_frame.pack(fill=tk.X, padx=10, pady=5)
|
||||
|
||||
tk.Label(game_frame, text="选择游戏:", font=("Arial", 10)).pack(side=tk.LEFT)
|
||||
game_dropdown = tk.OptionMenu(game_frame, self.selected_game, *GAME_COMMANDS.keys())
|
||||
game_dropdown.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.status_label = tk.Label(self.root, text="", pady=5)
|
||||
self.status_label.pack()
|
||||
|
||||
self.folder_label = tk.Label(self.root, text="", font=("Arial", 12), fg="blue")
|
||||
self.folder_label.pack()
|
||||
|
||||
button_frame = tk.Frame(self.root)
|
||||
button_frame.pack(side=tk.BOTTOM, pady=20)
|
||||
|
||||
self.prev_button = tk.Button(
|
||||
button_frame,
|
||||
text="上一条(<-)",
|
||||
width=10,
|
||||
command=self.prev_folder,
|
||||
state=tk.DISABLED if self.current_index == 0 else tk.NORMAL
|
||||
)
|
||||
self.prev_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.play_button = tk.Button(
|
||||
button_frame,
|
||||
text="播放(空格)",
|
||||
width=10,
|
||||
command=self.play_current
|
||||
)
|
||||
self.play_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.pass_button = tk.Button(
|
||||
button_frame,
|
||||
text="通过(p)",
|
||||
width=10,
|
||||
command=self.mark_as_passed,
|
||||
bg="green",
|
||||
fg="white"
|
||||
)
|
||||
self.pass_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.reject_button = tk.Button(
|
||||
button_frame,
|
||||
text="不通过(r)",
|
||||
width=10,
|
||||
command=self.mark_as_rejected,
|
||||
bg="red",
|
||||
fg="white"
|
||||
)
|
||||
self.reject_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.next_button = tk.Button(
|
||||
button_frame,
|
||||
text="下一条(->)",
|
||||
width=10,
|
||||
command=self.next_folder,
|
||||
state=tk.DISABLED if self.current_index == len(self.folders) - 1 else tk.NORMAL
|
||||
)
|
||||
self.next_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 新增打开pts.ply文件按钮
|
||||
self.open_ply_button = tk.Button(
|
||||
button_frame,
|
||||
text="打开PLY(o)",
|
||||
width=10,
|
||||
command=self.open_ply_file,
|
||||
bg="orange",
|
||||
fg="white"
|
||||
)
|
||||
self.open_ply_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self.root.bind('<Left>', lambda event: self.prev_folder())
|
||||
self.root.bind('<Right>', lambda event: self.next_folder())
|
||||
self.root.bind('<space>', lambda event: self.play_current())
|
||||
self.root.bind('<p>', lambda event: self.mark_as_passed())
|
||||
self.root.bind('<r>', lambda event: self.mark_as_rejected())
|
||||
self.root.bind('<o>', lambda event: self.open_ply_file()) # 绑定o键
|
||||
|
||||
def update_display(self):
|
||||
if not self.folders:
|
||||
self.status_label.config(text="所有文件夹已处理完毕")
|
||||
self.folder_label.config(text="")
|
||||
return
|
||||
|
||||
current_path = self.folders[self.current_index]
|
||||
folder_name = os.path.basename(current_path)
|
||||
|
||||
self.status_label.config(text=f"正在处理: 第 {self.current_index + 1}/{len(self.folders)} 个")
|
||||
self.folder_label.config(text=folder_name)
|
||||
|
||||
self.prev_button.config(state=tk.NORMAL if self.current_index > 0 else tk.DISABLED)
|
||||
self.next_button.config(state=tk.NORMAL if self.current_index < len(self.folders) - 1 else tk.DISABLED)
|
||||
|
||||
def prev_folder(self):
|
||||
if self.current_index > 0:
|
||||
self.current_index -= 1
|
||||
self.update_display()
|
||||
|
||||
def next_folder(self):
|
||||
if self.current_index < len(self.folders) - 1:
|
||||
self.current_index += 1
|
||||
self.update_display()
|
||||
|
||||
def play_current(self):
|
||||
if not self.folders:
|
||||
return
|
||||
|
||||
current_path = self.folders[self.current_index]
|
||||
selected_game_name = self.selected_game.get()
|
||||
game_config = GAME_COMMANDS.get(selected_game_name)
|
||||
|
||||
if not game_config:
|
||||
messagebox.showerror("错误", f"未找到游戏 {selected_game_name} 的配置")
|
||||
return
|
||||
|
||||
try:
|
||||
self.close_command_prompt()
|
||||
time.sleep(0.5)
|
||||
|
||||
if selected_game_name == "新游戏选项":
|
||||
# 执行新游戏选项命令(不使用COMMANDS_TEMPLATE)
|
||||
game_command = game_config["command"]
|
||||
formatted_command = game_command.format(current_path)
|
||||
|
||||
# 启动命令(不等待)
|
||||
self.process = subprocess.Popen(f'start cmd /k "{formatted_command}"', shell=True)
|
||||
else:
|
||||
# 其他游戏使用原有的COMMANDS_TEMPLATE
|
||||
game_command = game_config["command"]
|
||||
formatted_command = game_command.format(current_path)
|
||||
commands = COMMANDS_TEMPLATE.copy()
|
||||
commands.append(formatted_command)
|
||||
full_command = " && ".join(commands)
|
||||
self.process = subprocess.Popen(f'start cmd /k "{full_command}"', shell=True)
|
||||
|
||||
except Exception as e:
|
||||
messagebox.showerror("错误", f"执行命令时出错:\n{str(e)}")
|
||||
|
||||
def open_ply_file(self):
|
||||
"""打开output目录中的pts.ply文件"""
|
||||
ply_file = os.path.join(OUTPUT_DIR, "pts.ply")
|
||||
|
||||
if os.path.exists(ply_file):
|
||||
try:
|
||||
subprocess.Popen([MESHLAB_PATH, ply_file])
|
||||
messagebox.showinfo("成功", f"已用MeshLab打开文件: {ply_file}")
|
||||
except Exception as e:
|
||||
messagebox.showerror("错误", f"打开文件时出错:\n{str(e)}")
|
||||
else:
|
||||
messagebox.showwarning("警告", f"未找到文件: {ply_file}")
|
||||
|
||||
def close_command_prompt(self):
|
||||
try:
|
||||
if self.process:
|
||||
self.process.terminate()
|
||||
|
||||
subprocess.Popen('taskkill /f /im cmd.exe', shell=True,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
creationflags=subprocess.CREATE_NO_WINDOW)
|
||||
except Exception as e:
|
||||
print(f"关闭命令行时出错: {e}")
|
||||
|
||||
def mark_as_passed(self):
|
||||
self._mark_file("passed_files.txt")
|
||||
|
||||
def mark_as_rejected(self):
|
||||
self._mark_file("rejected_files.txt")
|
||||
|
||||
def _mark_file(self, filename):
|
||||
if not self.folders:
|
||||
return
|
||||
|
||||
current_path = self.folders[self.current_index]
|
||||
folder_name = os.path.basename(current_path)
|
||||
|
||||
try:
|
||||
if not os.path.exists(filename):
|
||||
with open(filename, 'w', encoding='utf-8') as f:
|
||||
pass
|
||||
|
||||
with open(filename, 'a', encoding='utf-8') as f:
|
||||
if os.path.getsize(filename) == 0:
|
||||
f.write(f"{folder_name}\n")
|
||||
else:
|
||||
needs_newline = True
|
||||
with open(filename, 'rb+') as f_check:
|
||||
f_check.seek(-1, os.SEEK_END)
|
||||
last_char = f_check.read(1)
|
||||
needs_newline = (last_char != b'\n')
|
||||
|
||||
if needs_newline:
|
||||
f.write('\n')
|
||||
f.write(f"{folder_name}\n")
|
||||
|
||||
status = "通过" if filename == "passed_files.txt" else "不通过"
|
||||
messagebox.showinfo("成功", f"已标记为{status}: {folder_name}")
|
||||
|
||||
self.folders.pop(self.current_index)
|
||||
|
||||
if self.current_index >= len(self.folders):
|
||||
if len(self.folders) > 0:
|
||||
self.current_index = len(self.folders) - 1
|
||||
else:
|
||||
messagebox.showinfo("完成", "所有文件夹已处理完毕!")
|
||||
self.update_display()
|
||||
return
|
||||
|
||||
self.update_display()
|
||||
|
||||
except Exception as e:
|
||||
messagebox.showerror("错误", f"写入文件时出错:\n{str(e)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk()
|
||||
app = FolderPlayerApp(root)
|
||||
root.mainloop()
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
fix_commas_and_pad_multi.py
|
||||
批量修复多个文件夹中含逗号的数字文件名并补齐到6位,例如:
|
||||
"01,000.png" -> "001000.png"
|
||||
支持扫描指定文件夹中所有以"25"开头的子文件夹
|
||||
"""
|
||||
|
||||
import os
|
||||
import argparse
|
||||
|
||||
VALID_EXTS = {'.png', '.exr'}
|
||||
|
||||
def safe_rename(src, dst, dry_run=False):
|
||||
if os.path.exists(dst):
|
||||
print(f"[跳过] 目标已存在,未重命名:{os.path.basename(src)} -> {os.path.basename(dst)}")
|
||||
return False
|
||||
print(f"[重命名] {os.path.basename(src)} -> {os.path.basename(dst)}")
|
||||
if not dry_run:
|
||||
os.rename(src, dst)
|
||||
return True
|
||||
|
||||
def fix_folder(root_folder, dry_run=False):
|
||||
changed = 0
|
||||
for root, dirs, files in os.walk(root_folder):
|
||||
for fname in files:
|
||||
name, ext = os.path.splitext(fname)
|
||||
if ext.lower() not in VALID_EXTS:
|
||||
continue
|
||||
|
||||
# 若文件名中含逗号,则移除逗号
|
||||
if ',' in name:
|
||||
candidate = name.replace(',', '')
|
||||
else:
|
||||
candidate = None
|
||||
|
||||
if candidate is None or not candidate.isdigit():
|
||||
continue
|
||||
|
||||
padded = candidate.zfill(6)
|
||||
if padded == name:
|
||||
continue
|
||||
|
||||
src_path = os.path.join(root, fname)
|
||||
dst_path = os.path.join(root, padded + ext)
|
||||
if safe_rename(src_path, dst_path, dry_run=dry_run):
|
||||
changed += 1
|
||||
|
||||
print(f"[完成] {root_folder} 共重命名 {changed} 个文件。\n")
|
||||
|
||||
def find_25_folders(root_folder):
|
||||
"""查找所有以'25'开头的子文件夹"""
|
||||
folders = []
|
||||
for item in os.listdir(root_folder):
|
||||
item_path = os.path.join(root_folder, item)
|
||||
if os.path.isdir(item_path) and item.startswith('25'):
|
||||
folders.append(item_path)
|
||||
return folders
|
||||
|
||||
def main():
|
||||
p = argparse.ArgumentParser(
|
||||
description="修复含逗号的数字文件名并补齐到6位,例如 01,000.png -> 001000.png(扫描指定文件夹中所有以25开头的子文件夹)"
|
||||
)
|
||||
p.add_argument("root_folder", help="包含多个以25开头的子文件夹的根目录")
|
||||
p.add_argument("--dry-run", action="store_true", help="仅模拟显示将要做的重命名,不执行")
|
||||
args = p.parse_args()
|
||||
|
||||
if not os.path.isdir(args.root_folder):
|
||||
print(f"❌ 路径无效:{args.root_folder}")
|
||||
return
|
||||
|
||||
target_folders = find_25_folders(args.root_folder)
|
||||
if not target_folders:
|
||||
print(f"❌ 在 {args.root_folder} 中未找到以'25'开头的子文件夹")
|
||||
return
|
||||
|
||||
print(f"\n=== 在 {args.root_folder} 中找到 {len(target_folders)} 个以'25'开头的子文件夹 ===")
|
||||
|
||||
for folder in target_folders:
|
||||
print(f"\n=== 处理文件夹:{folder} ===")
|
||||
fix_folder(folder, dry_run=args.dry_run)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
import os
|
||||
import subprocess
|
||||
from PyQt5.QtWidgets import (QApplication, QMainWindow, QListWidget,
|
||||
QVBoxLayout, QLineEdit, QMessageBox, QWidget)
|
||||
from PyQt5.QtCore import Qt
|
||||
|
||||
class ScriptRunner(QMainWindow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.setWindowTitle("Python脚本启动器")
|
||||
self.setGeometry(100, 100, 600, 400)
|
||||
self.script_dir = r"D:\test\集合"
|
||||
self.init_ui()
|
||||
self.load_scripts()
|
||||
|
||||
def init_ui(self):
|
||||
central = QWidget()
|
||||
layout = QVBoxLayout(central)
|
||||
|
||||
self.search_box = QLineEdit()
|
||||
self.search_box.setPlaceholderText("搜索脚本...")
|
||||
self.search_box.textChanged.connect(self.filter_scripts)
|
||||
layout.addWidget(self.search_box)
|
||||
|
||||
self.script_list = QListWidget()
|
||||
self.script_list.itemDoubleClicked.connect(self.run_script)
|
||||
layout.addWidget(self.script_list)
|
||||
|
||||
self.setCentralWidget(central)
|
||||
|
||||
def load_scripts(self):
|
||||
self.script_list.clear()
|
||||
if os.path.exists(self.script_dir):
|
||||
for f in os.listdir(self.script_dir):
|
||||
if f.endswith('.py'):
|
||||
self.script_list.addItem(f)
|
||||
else:
|
||||
QMessageBox.warning(self, "错误", f"目录不存在: {self.script_dir}")
|
||||
|
||||
def filter_scripts(self):
|
||||
keyword = self.search_box.text().lower()
|
||||
for i in range(self.script_list.count()):
|
||||
item = self.script_list.item(i)
|
||||
item.setHidden(keyword not in item.text().lower())
|
||||
|
||||
def run_script(self, item):
|
||||
script_name = item.text()
|
||||
script_path = os.path.join(self.script_dir, script_name)
|
||||
|
||||
try:
|
||||
# 使用subprocess在新窗口中运行
|
||||
subprocess.Popen(f'start cmd /k python "{script_path}"', shell=True)
|
||||
self.statusBar().showMessage(f"正在运行: {script_name}")
|
||||
except Exception as e:
|
||||
QMessageBox.critical(self, "错误", f"运行失败: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication([])
|
||||
window = ScriptRunner()
|
||||
window.show()
|
||||
app.exec_()
|
||||
|
|
@ -0,0 +1,167 @@
|
|||
import os
|
||||
import time
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
import concurrent.futures
|
||||
import threading
|
||||
|
||||
# ====== 配置 ======
|
||||
SOURCE_FOLDER = Path(r"E:\ELDEN RING\Game\cv_saved") # 监控的源目录
|
||||
DEST_FOLDER = Path(r"E:") # 目标目录(必须是具体路径)
|
||||
FOLDER_PREFIX = "25" # 需要监控的文件夹前缀
|
||||
MAX_WORKERS = 8 # 最大线程数
|
||||
STABLE_TIME = 30 # 文件夹稳定检查时间(秒)
|
||||
# =================
|
||||
|
||||
def is_folder_stable(folder_path):
|
||||
"""检查文件夹是否稳定(连续两次修改时间相同)"""
|
||||
try:
|
||||
initial_mtime = os.path.getmtime(folder_path)
|
||||
time.sleep(STABLE_TIME)
|
||||
current_mtime = os.path.getmtime(folder_path)
|
||||
return initial_mtime == current_mtime
|
||||
except Exception as e:
|
||||
print(f" [错误] 检查稳定性失败: {e}")
|
||||
return False
|
||||
|
||||
def process_folder(folder_path):
|
||||
"""使用 robocopy 移动文件夹"""
|
||||
start_time = time.time()
|
||||
folder_name = folder_path.name
|
||||
|
||||
print(f"\n[处理] 开始移动文件夹: '{folder_name}'")
|
||||
print(f" 源路径: {folder_path}")
|
||||
print(f" 目标路径: {DEST_FOLDER / folder_name}")
|
||||
|
||||
try:
|
||||
# 检查目标目录权限
|
||||
if not os.access(DEST_FOLDER, os.W_OK):
|
||||
raise PermissionError(f"无写入权限: {DEST_FOLDER}")
|
||||
|
||||
# 确保目标目录存在
|
||||
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 构造 robocopy 命令
|
||||
cmd = [
|
||||
"robocopy",
|
||||
str(folder_path), # 源目录
|
||||
str(DEST_FOLDER / folder_name), # 目标目录
|
||||
"/MIR", # 镜像目录树
|
||||
"/MT:8", # 多线程(8线程)
|
||||
"/R:2", # 失败重试2次
|
||||
"/W:5", # 重试等待5秒
|
||||
"/NFL", "/NDL", "/NJH", "/NJS" # 减少日志输出
|
||||
]
|
||||
|
||||
print(f" 执行命令: {' '.join(cmd)}")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
|
||||
|
||||
# 处理 robocopy 返回码
|
||||
if result.returncode >= 8:
|
||||
raise RuntimeError(f"Robocopy 失败 (代码 {result.returncode}): {result.stderr}")
|
||||
|
||||
# 删除源目录(robocopy /MIR 已处理,此处作为二次确认)
|
||||
if (DEST_FOLDER / folder_name).exists():
|
||||
print(f" 验证: 目标文件夹已存在,正在清理源文件夹...")
|
||||
shutil.rmtree(folder_path, ignore_errors=True)
|
||||
print(f" [完成] '{folder_name}' 移动成功")
|
||||
else:
|
||||
raise FileNotFoundError("目标文件夹未创建,终止删除源文件夹")
|
||||
|
||||
except Exception as e:
|
||||
print(f" [错误] 处理失败: {e}")
|
||||
finally:
|
||||
duration = time.time() - start_time
|
||||
print(f" 耗时: {duration:.2f} 秒")
|
||||
|
||||
class FolderMonitor(FileSystemEventHandler):
|
||||
def __init__(self, executor):
|
||||
super().__init__()
|
||||
self.executor = executor
|
||||
self.active_tasks = set() # 跟踪正在处理的任务
|
||||
|
||||
def on_created(self, event):
|
||||
"""处理新创建的文件夹"""
|
||||
self._handle_folder(event.src_path, "创建")
|
||||
|
||||
def on_moved(self, event):
|
||||
"""处理移动/重命名的文件夹"""
|
||||
self._handle_folder(event.dest_path, "移动")
|
||||
|
||||
def _handle_folder(self, path, action_type):
|
||||
"""统一处理文件夹事件"""
|
||||
folder_path = Path(path)
|
||||
if not folder_path.is_dir():
|
||||
return
|
||||
|
||||
if folder_path.parent != SOURCE_FOLDER or not folder_path.name.startswith(FOLDER_PREFIX):
|
||||
return
|
||||
|
||||
print(f"\n[发现] 通过 {action_type} 事件检测到文件夹: '{folder_path.name}'")
|
||||
|
||||
# 避免重复处理
|
||||
if folder_path.name in self.active_tasks:
|
||||
print(f" 警告: 文件夹 '{folder_path.name}' 已在处理中")
|
||||
return
|
||||
|
||||
self.active_tasks.add(folder_path.name)
|
||||
self.executor.submit(self._process_with_stability_check, folder_path)
|
||||
|
||||
def _process_with_stability_check(self, folder_path):
|
||||
"""带稳定性检查的处理流程(无限等待直到稳定)"""
|
||||
folder_name = folder_path.name
|
||||
|
||||
try:
|
||||
while True:
|
||||
if is_folder_stable(folder_path):
|
||||
print(f" [稳定] 文件夹 '{folder_name}' 已稳定,开始移动")
|
||||
process_folder(folder_path)
|
||||
break
|
||||
else:
|
||||
print(f" [监控] 文件夹 '{folder_name}' 仍在修改中,继续等待...")
|
||||
except Exception as e:
|
||||
print(f" [错误] 处理 '{folder_name}' 时发生异常: {e}")
|
||||
finally:
|
||||
self.active_tasks.discard(folder_name)
|
||||
|
||||
def main():
|
||||
print("\n===== 文件夹监控转移程序 =====")
|
||||
print(f"监控目录: {SOURCE_FOLDER}")
|
||||
print(f"目标目录: {DEST_FOLDER}")
|
||||
print(f"文件夹前缀: '{FOLDER_PREFIX}'")
|
||||
print(f"稳定性检查间隔: {STABLE_TIME}秒")
|
||||
print(f"最大线程数: {MAX_WORKERS}\n")
|
||||
|
||||
# 验证目录
|
||||
if not SOURCE_FOLDER.is_dir():
|
||||
print(f"[错误] 源目录不存在: {SOURCE_FOLDER}")
|
||||
return
|
||||
try:
|
||||
DEST_FOLDER.mkdir(parents=True, exist_ok=True)
|
||||
except Exception as e:
|
||||
print(f"[错误] 无法创建目标目录: {e}")
|
||||
return
|
||||
|
||||
# 启动监控
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
event_handler = FolderMonitor(executor)
|
||||
observer = Observer()
|
||||
observer.schedule(event_handler, str(SOURCE_FOLDER), recursive=False)
|
||||
observer.start()
|
||||
print("[系统] 监控已启动 (按 Ctrl+C 停止)...")
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\n[系统] 正在停止监控...")
|
||||
finally:
|
||||
observer.stop()
|
||||
observer.join()
|
||||
print("[系统] 监控已停止")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,146 @@
|
|||
import os
|
||||
import shutil
|
||||
import argparse
|
||||
|
||||
# =============== 全局路径配置 ===============
|
||||
REJECT_ROOT = r"D:\videos" # 包含rejected_files.txt的根路径
|
||||
DELETE_ROOT = "E:" # 要删除文件夹的根路径
|
||||
|
||||
|
||||
# =============== 结束配置 ===============
|
||||
|
||||
def find_rejected_files(root_path):
|
||||
"""在指定根路径下递归查找所有名为rejected_files.txt的文件"""
|
||||
rejected_files = []
|
||||
for root, dirs, files in os.walk(root_path):
|
||||
if 'rejected_files.txt' in files:
|
||||
rejected_files.append(os.path.join(root, 'rejected_files.txt'))
|
||||
return rejected_files
|
||||
|
||||
|
||||
def read_folder_names(file_paths):
|
||||
"""读取多个TXT文件中的文件夹名称,提取'] '后的字符(严格去空格,确保名称完全匹配)"""
|
||||
folder_names = []
|
||||
unique_names = set() # 用于去重
|
||||
|
||||
for file_path in file_paths:
|
||||
if not os.path.exists(file_path):
|
||||
print(f"警告:文件 {file_path} 不存在,已跳过")
|
||||
continue
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
for line_num, line in enumerate(f, 1):
|
||||
raw_line = line.strip()
|
||||
if not raw_line or raw_line.startswith('#'):
|
||||
continue
|
||||
|
||||
if "] " in raw_line:
|
||||
folder_name = raw_line.split("] ", 1)[1].strip()
|
||||
else:
|
||||
folder_name = raw_line.strip()
|
||||
|
||||
if folder_name and folder_name not in unique_names:
|
||||
unique_names.add(folder_name)
|
||||
folder_names.append(folder_name)
|
||||
print(f"从 {os.path.basename(file_path)} 提取有效名称(第{line_num}行):'{folder_name}'")
|
||||
elif not folder_name:
|
||||
print(f"警告:{os.path.basename(file_path)} 第{line_num}行提取后为空,已忽略(原始内容:{raw_line})")
|
||||
|
||||
return folder_names
|
||||
|
||||
|
||||
def find_matching_folders_recursively(root_path, target_names):
|
||||
"""在指定根路径下递归查找所有名称与目标列表完全一致的文件夹"""
|
||||
matching_folders = []
|
||||
for root, dirs, files in os.walk(root_path):
|
||||
for dir_name in dirs:
|
||||
if dir_name in target_names:
|
||||
folder_path = os.path.join(root, dir_name)
|
||||
matching_folders.append(folder_path)
|
||||
return matching_folders
|
||||
|
||||
|
||||
def delete_matching_folders(root_path, names_to_delete):
|
||||
"""递归删除指定根路径下所有名称与不通过列表完全一致的文件夹"""
|
||||
if not os.path.isdir(root_path):
|
||||
print(f"错误:源路径 {root_path} 不是有效目录,无法删除")
|
||||
return 0
|
||||
|
||||
folders_to_delete = find_matching_folders_recursively(root_path, names_to_delete)
|
||||
|
||||
if not folders_to_delete:
|
||||
print("没有找到匹配的文件夹需要删除")
|
||||
return 0
|
||||
|
||||
print("\n找到以下匹配的文件夹:")
|
||||
for i, folder in enumerate(folders_to_delete, 1):
|
||||
print(f"{i}. {folder}")
|
||||
|
||||
# 询问用户确认删除
|
||||
while True:
|
||||
user_input = input("\n确定要删除以上所有文件夹吗?(y/n): ").strip().lower()
|
||||
if user_input == 'y':
|
||||
break
|
||||
elif user_input == 'n':
|
||||
print("取消删除操作")
|
||||
return 0
|
||||
else:
|
||||
print("请输入 'y' 或 'n'")
|
||||
|
||||
deleted = 0
|
||||
for folder_path in folders_to_delete:
|
||||
try:
|
||||
shutil.rmtree(folder_path)
|
||||
deleted += 1
|
||||
print(f"删除成功:{folder_path}")
|
||||
except Exception as e:
|
||||
print(f"删除失败 {folder_path}:{str(e)}")
|
||||
|
||||
print(f"\n共成功删除 {deleted} 个文件夹(在 {root_path} 下递归查找)")
|
||||
return deleted
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='根据rejected_files.txt删除匹配的文件夹')
|
||||
parser.add_argument('--reject_root', default=REJECT_ROOT,
|
||||
help=f'包含rejected_files.txt的根路径(默认:{REJECT_ROOT})')
|
||||
parser.add_argument('--delete_root', default=DELETE_ROOT,
|
||||
help=f'要删除文件夹的根路径(默认:{DELETE_ROOT})')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 60)
|
||||
print("操作说明:")
|
||||
print(f"1. 将在 {args.reject_root} 下递归查找所有 rejected_files.txt 文件")
|
||||
print(f"2. 收集这些文件中的所有条目作为要删除的文件夹名称")
|
||||
print(f"3. 在 {args.delete_root} 下递归查找并删除匹配的文件夹")
|
||||
print("=" * 60)
|
||||
|
||||
# 查找所有rejected_files.txt文件
|
||||
rejected_files = find_rejected_files(args.reject_root)
|
||||
if not rejected_files:
|
||||
print(f"\n错误:在 {args.reject_root} 下未找到任何 rejected_files.txt 文件")
|
||||
return
|
||||
|
||||
print(f"\n找到 {len(rejected_files)} 个 rejected_files.txt 文件:")
|
||||
for file in rejected_files:
|
||||
print(f" - {file}")
|
||||
|
||||
# 读取所有不通过的文件夹名称
|
||||
print("\n----- 读取不通过列表 -----")
|
||||
fail_folders = read_folder_names(rejected_files)
|
||||
if not fail_folders:
|
||||
print("\n错误:未从rejected_files.txt文件中提取到任何有效文件夹名称")
|
||||
return
|
||||
|
||||
print(f"\n共提取到 {len(fail_folders)} 个需要删除的文件夹名称")
|
||||
|
||||
# 在删除根路径下递归查找并删除匹配的文件夹
|
||||
print(f"\n----- 在 {args.delete_root} 中执行删除操作 -----")
|
||||
total_deleted = delete_matching_folders(args.delete_root, fail_folders)
|
||||
|
||||
print(f"\n总共成功删除 {total_deleted} 个文件夹")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
import pyautogui
|
||||
import time
|
||||
import keyboard
|
||||
import random
|
||||
|
||||
# ===========================
|
||||
# 🎮 配置区域(可调参数)
|
||||
# ===========================
|
||||
|
||||
pyautogui.FAILSAFE = True # 启用安全机制:鼠标快速移到左上角可紧急停止!
|
||||
|
||||
SCREEN_WIDTH, SCREEN_HEIGHT = pyautogui.size()
|
||||
SCREEN_HEIGHT_MIDDLE = SCREEN_HEIGHT // 2 # 鼠标保持在屏幕竖直中央(Y轴不变)
|
||||
|
||||
MOVE_STEP = 50 * 2 # 每次移动的像素距离(越小越慢越自然,比如 1~3)
|
||||
MOVE_DURATION = 0.5 * 2 * 2 # 每次移动持续时间(单位:秒,越大越慢越平滑,推荐 0.2~1.0)
|
||||
MOVE_DELAY = 0 # 每次移动后的停顿时间(可调整,推荐 0.02~0.1)
|
||||
USE_RANDOM_OFFSET = False # 是否加入随机偏移(你不需要,保持 False 即可)
|
||||
CONTROL_KEY = 'F1' # 控制鼠标移动的快捷键(可修改为F1-F12或其他键)
|
||||
|
||||
# ===========================
|
||||
# 🧠 状态控制
|
||||
# ===========================
|
||||
|
||||
is_moving_left = False # 控制是否正在向左移动鼠标
|
||||
|
||||
# ===========================
|
||||
# 🖱️ 鼠标向左移动函数
|
||||
# ===========================
|
||||
|
||||
def auto_move_left():
|
||||
global is_moving_left
|
||||
print(f"🖱️ 【鼠标自动左移】已启动:鼠标将缓慢向左移动...")
|
||||
|
||||
while is_moving_left:
|
||||
current_x, current_y = pyautogui.position() # 获取当前鼠标 x, y 坐标
|
||||
target_x = max(current_x - MOVE_STEP, 0) # 往左移动,但不能小于 0(屏幕最左边)
|
||||
target_y = current_y # Y 坐标保持不变,保持在屏幕中间或当前位置
|
||||
|
||||
# 移动鼠标到目标位置(向左),使用平滑动画
|
||||
pyautogui.moveTo(target_x, target_y, duration=MOVE_DURATION)
|
||||
|
||||
# 可选的停顿,模拟自然观察
|
||||
time.sleep(MOVE_DELAY)
|
||||
|
||||
print(f"🛑 【鼠标自动左移】已停止。")
|
||||
|
||||
# ===========================
|
||||
# ⌨️ 按键监听:启动 / 停止 左移控制
|
||||
# ===========================
|
||||
|
||||
def toggle_left_movement():
|
||||
global is_moving_left
|
||||
if is_moving_left:
|
||||
is_moving_left = False
|
||||
print(f"⏹️ {CONTROL_KEY} 被按下:鼠标左移已停止。")
|
||||
else:
|
||||
is_moving_left = True
|
||||
print(f"▶️ {CONTROL_KEY} 被按下:鼠标开始缓慢向左移动!")
|
||||
# 在新线程中运行,避免阻塞主线程和键盘监听
|
||||
import threading
|
||||
threading.Thread(target=auto_move_left, daemon=True).start()
|
||||
|
||||
# 注册热键监听
|
||||
keyboard.add_hotkey(CONTROL_KEY, toggle_left_movement)
|
||||
|
||||
# ===========================
|
||||
# 🚀 启动脚本 & 保持运行
|
||||
# ===========================
|
||||
|
||||
print("🎮 脚本已启动!")
|
||||
print(f"🔘 按下 {CONTROL_KEY} 键:启动 / 停止 鼠标缓慢向左移动")
|
||||
print("💡 鼠标会从当前位置开始,向左缓缓移动,不会重置到屏幕边缘")
|
||||
print("⚠️ 安全提示:你仍然可以用鼠标快速移动到屏幕左上角来紧急停止(FAILSAFE 机制)")
|
||||
print("ℹ️ 当前设置:缓慢、平滑、自然向左移动,无随机抖动")
|
||||
|
||||
try:
|
||||
keyboard.wait() # 保持程序运行,监听按键
|
||||
except KeyboardInterrupt:
|
||||
print("\n🛑 脚本已通过 Ctrl+C 停止。")
|
||||
finally:
|
||||
is_moving_left = False # 确保退出时停止移动
|
||||
Loading…
Reference in New Issue