niumasoftware/ui/unlocker.py

383 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import os
from typing import List
import psutil
from PyQt6.QtCore import Qt, pyqtSignal, QThread, QTimer
from PyQt6.QtWidgets import (
QDialog,
QVBoxLayout,
QHBoxLayout,
QWidget,
QLabel,
QListWidget,
QListWidgetItem,
QPushButton,
)
import ui.dialog_style as dialog_style
import ui.theme as theme
class _ScanWorker(QThread):
done = pyqtSignal(str, list)
error = pyqtSignal(str, str)
def __init__(self, file_path: str):
super().__init__()
self._file_path = file_path
def run(self):
try:
target = os.path.abspath(self._file_path)
found: list[dict] = []
if os.name == "nt":
try:
from ui.win_handle_scan import find_file_locks
locks = find_file_locks(target)
# 聚合pid -> handles
mp: dict[int, list[int]] = {}
for lk in locks:
mp.setdefault(lk.pid, []).append(int(lk.handle))
for pid, handles in mp.items():
name = ""
try:
name = psutil.Process(pid).name()
except Exception:
name = f"PID {pid}"
found.append({"pid": pid, "name": name, "handles": handles})
except Exception:
found = []
# 兜底psutil可能漏报但至少不会空白
if not found:
target_norm = os.path.normcase(os.path.abspath(target))
for proc in psutil.process_iter(["pid", "name", "open_files"]):
try:
files = proc.info.get("open_files") or []
for f in files:
if os.path.normcase(f.path) == target_norm:
pid = int(proc.info.get("pid") or 0)
name = proc.info.get("name") or f"PID {pid}"
found.append({"pid": pid, "name": str(name), "handles": []})
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
self.done.emit(os.path.normcase(os.path.abspath(target)), found)
except Exception as e:
self.error.emit(self._file_path, str(e))
class _DropArea(QWidget):
file_dropped = pyqtSignal(str)
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
self.setAcceptDrops(True)
self.setMinimumHeight(120)
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dragMoveEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dropEvent(self, event):
urls = event.mimeData().urls()
if not urls:
return
local = urls[0].toLocalFile()
if local:
self.file_dropped.emit(local)
def paintEvent(self, event):
from PyQt6.QtGui import QPainter, QPen, QBrush, QColor
super().paintEvent(event)
p = QPainter(self)
p.setRenderHint(QPainter.RenderHint.Antialiasing)
rect = self.rect().adjusted(6, 6, -6, -6)
t = theme.current()
border = t.get("search_border", "#888")
bg = t.get("search_bg", "rgba(0,0,0,20)")
p.setBrush(QBrush(QColor(bg)))
pen = QPen(QColor(border))
pen.setStyle(Qt.PenStyle.DashLine)
pen.setWidth(1)
p.setPen(pen)
p.drawRoundedRect(rect, 8, 8)
p.end()
class UnlockDialog(QDialog):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
self.setWindowTitle("解除占用")
self.setMinimumSize(520, 420)
self._targets: List[psutil.Process] = []
self._file_path: str | None = None
self._scan_worker: _ScanWorker | None = None
self._loading_timer: QTimer | None = None
self._loading_step = 0
self._build_ui()
self._apply_theme()
def _build_ui(self):
layout = QVBoxLayout(self)
layout.setContentsMargins(18, 16, 18, 16)
layout.setSpacing(10)
intro = QLabel(
"将被占用的文件拖拽到下方虚线框内,我会尝试找出正在占用它的程序,"
"并在下方列出,方便你一键结束相关进程。"
)
intro.setWordWrap(True)
self._intro_lbl = intro
layout.addWidget(intro)
self._drop = _DropArea(self)
lbl = QLabel("拖拽文件到这里")
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
self._drop_hint_lbl = lbl
layout_drop = QVBoxLayout(self._drop)
layout_drop.addStretch()
layout_drop.addWidget(lbl)
layout_drop.addStretch()
self._drop.file_dropped.connect(self._on_file_dropped)
layout.addWidget(self._drop)
self._path_lbl = QLabel("当前文件:未选择")
layout.addWidget(self._path_lbl)
self._list = QListWidget()
layout.addWidget(self._list, 1)
btn_row = QHBoxLayout()
btn_row.addStretch()
self._unlock_btn = QPushButton("解除占用(结束相关进程)")
self._unlock_btn.clicked.connect(self._on_unlock)
btn_close = QPushButton("关闭")
btn_close.clicked.connect(self.close)
btn_row.addWidget(self._unlock_btn)
btn_row.addWidget(btn_close)
layout.addLayout(btn_row)
self._close_btn = btn_close
def _apply_theme(self):
t = theme.current()
is_dark = theme.name() == "dark"
# 兼容 panel_bg 为 rgba 的情况
panel_bg = t.get("panel_bg", "rgba(30,30,30,240)")
if panel_bg.startswith("rgba"):
tmp = panel_bg.replace("rgba", "rgb")
head = tmp.rsplit(",", 1)[0]
panel_rgb = head + ")"
else:
panel_rgb = panel_bg
txt = t.get("search_color", "#eee")
sub = "#888" if is_dark else "#666"
border = t.get("panel_border", t.get("search_border", "#666"))
inp_bg = t.get("search_bg", "rgba(0,0,0,20)")
hover = t.get("header_hover", t.get("menu_selected", "rgba(128,128,128,40)"))
self.setStyleSheet(
f"""
QDialog, QWidget {{
background: {panel_rgb};
color: {txt};
}}
QListWidget {{
background: {inp_bg};
border: 1px solid {t.get('search_border', border)};
border-radius: 8px;
}}
QListWidget::item {{
padding: 6px 8px;
border-radius: 6px;
}}
QListWidget::item:selected {{
background: #4a9eff;
color: white;
}}
QListWidget::item:hover:!selected {{
background: {hover};
}}
QPushButton {{
background: {inp_bg};
color: {txt};
border: 1px solid {t.get('search_border', border)};
border-radius: 6px;
padding: 6px 14px;
font-size: 12px;
}}
QPushButton:hover {{
border-color: #4a9eff;
color: #4a9eff;
}}
"""
)
if hasattr(self, "_path_lbl"):
self._path_lbl.setStyleSheet(f"color:{sub}; font-size:11px; background:transparent;")
if hasattr(self, "_intro_lbl"):
self._intro_lbl.setStyleSheet("background:transparent;")
if hasattr(self, "_drop_hint_lbl"):
self._drop_hint_lbl.setStyleSheet(f"color:{sub}; background:transparent;")
def _on_file_dropped(self, path: str):
self._file_path = os.path.abspath(path)
self._path_lbl.setText(f"当前文件:{self._file_path}")
self._start_scan()
def _set_loading(self, loading: bool):
if loading:
self._unlock_btn.setEnabled(False)
self._list.clear()
self._list.addItem(QListWidgetItem("正在扫描占用进程…"))
if self._loading_timer is None:
self._loading_timer = QTimer(self)
self._loading_timer.setInterval(250)
self._loading_timer.timeout.connect(self._tick_loading)
self._loading_step = 0
self._loading_timer.start()
else:
self._unlock_btn.setEnabled(True)
if self._loading_timer is not None:
self._loading_timer.stop()
def _tick_loading(self):
# 简易“加载动画”:点点点
self._loading_step = (self._loading_step + 1) % 4
dots = "." * self._loading_step
txt = f"正在扫描占用进程{dots}"
if self._list.count() > 0:
it = self._list.item(0)
if it is not None:
it.setText(txt)
def _start_scan(self):
self._list.clear()
self._targets.clear()
if not self._file_path:
return
# 取消上一次扫描(如果仍在跑)
if self._scan_worker is not None and self._scan_worker.isRunning():
try:
self._scan_worker.requestInterruption()
except Exception:
pass
self._set_loading(True)
self._scan_worker = _ScanWorker(self._file_path)
self._scan_worker.done.connect(self._on_scan_done)
self._scan_worker.error.connect(self._on_scan_error)
self._scan_worker.start()
def _on_scan_error(self, path: str, err: str):
self._set_loading(False)
dialog_style.warning(self, "扫描失败", f"无法扫描占用进程:\n{err}")
def _on_scan_done(self, norm_target: str, found: list):
self._set_loading(False)
self._list.clear()
self._targets.clear()
if not found:
item = QListWidgetItem("未检测到占用该文件的进程。")
self._list.addItem(item)
return
self._lock_infos = found # type: ignore[attr-defined]
# 终止进程时使用
procs: list[psutil.Process] = []
for it in found:
try:
pid = int(it.get("pid") or 0)
if pid <= 0:
continue
procs.append(psutil.Process(pid))
except Exception:
continue
self._targets = procs
for it in found:
pid = int(it.get("pid") or 0)
name = str(it.get("name") or f"PID {pid}")
hcnt = len(it.get("handles") or [])
suffix = f" (句柄 {hcnt})" if hcnt > 0 else ""
txt = f"{name} (PID {pid}){suffix}"
self._list.addItem(QListWidgetItem(txt))
def _on_unlock(self):
if not self._targets:
dialog_style.information(self, "提示", "当前没有检测到需要解除的占用进程。")
return
# 优先:尝试关闭句柄(不杀进程)
closed_any = False
if os.name == "nt" and hasattr(self, "_lock_infos"):
try:
from ui.win_handle_scan import close_remote_handle
for it in getattr(self, "_lock_infos", []):
pid = int(it.get("pid") or 0)
for hv in it.get("handles") or []:
try:
if close_remote_handle(pid, int(hv)):
closed_any = True
except Exception:
continue
except Exception:
pass
if closed_any:
dialog_style.information(self, "已完成", "已尝试关闭相关文件句柄(不结束进程)。")
self._start_scan()
return
failed = []
for p in self._targets:
try:
p.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 简单等待一会儿,再尝试强杀
for p in list(self._targets):
try:
p.wait(timeout=1.0)
except (psutil.TimeoutExpired, psutil.NoSuchProcess):
pass
for p in self._targets:
try:
if p.is_running():
try:
p.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
failed.append(p)
except psutil.NoSuchProcess:
continue
if failed:
dialog_style.warning(
self,
"部分失败",
"部分进程无法结束,可能需要以管理员身份运行或手动关闭相关程序。",
)
else:
dialog_style.information(self, "已完成", "相关占用进程已尝试结束。")
self._start_scan()