diff --git a/installer/niumasoftware.generated.iss b/installer/niumasoftware.generated.iss index 1ce4ffe..d0df1be 100644 --- a/installer/niumasoftware.generated.iss +++ b/installer/niumasoftware.generated.iss @@ -1,4 +1,4 @@ -#define AppVersion "0.0.3" +#define AppVersion "0.0.4" ; Inno Setup 安装脚本(全局安装) ; 产物约定(统一放在 dist\niumasoftware 下): ; - dist\niumasoftware\niumasoftware.exe (主程序,PyInstaller onedir) diff --git a/main.py b/main.py index 0169df0..4082c38 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,7 @@ import os import time from PyQt6.QtCore import Qt from PyQt6.QtWidgets import QApplication +from PyQt6.QtGui import QIcon from db.database import init_db from ui.dock import PanelWindow, _set_autostart from ui.ball import FloatBall, BALL_SIZE @@ -21,6 +22,18 @@ def get_resource_path(relative_path): return os.path.join(os.path.abspath("."), relative_path) # ========================================================== +def _set_windows_app_user_model_id(appid: str): + """让任务栏图标/分组按 AppID 识别(开发态避免显示 Python 图标)。""" + if sys.platform != "win32": + return + try: + import ctypes + + ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(appid) + except Exception: + pass + + def _wake_existing_or_exit() -> bool: """ Windows 单实例: @@ -75,6 +88,15 @@ def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(False) + _set_windows_app_user_model_id("niumasoftware") + + # 任务栏/窗口图标:开发态用 ico;打包后也能通过资源路径找到 + try: + ico_path = get_resource_path("logo.ico") + if os.path.exists(ico_path): + app.setWindowIcon(QIcon(ico_path)) + except Exception: + pass init_db() theme.load() diff --git a/requirements.txt b/requirements.txt index f76ea70..c2386db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ PyQt6>=6.4.0 -pillow>=9.0.0 \ No newline at end of file +pillow>=9.0.0 +psutil>=5.9.0 \ No newline at end of file diff --git a/ui/__pycache__/dock.cpython-314.pyc b/ui/__pycache__/dock.cpython-314.pyc index 5640f6b..4b5584d 100644 Binary files a/ui/__pycache__/dock.cpython-314.pyc and b/ui/__pycache__/dock.cpython-314.pyc differ diff --git a/ui/__pycache__/settings_window.cpython-314.pyc b/ui/__pycache__/settings_window.cpython-314.pyc index e7889bb..6552fac 100644 Binary files a/ui/__pycache__/settings_window.cpython-314.pyc and b/ui/__pycache__/settings_window.cpython-314.pyc differ diff --git a/ui/dock.py b/ui/dock.py index 9efc5b0..a26ebb0 100644 --- a/ui/dock.py +++ b/ui/dock.py @@ -318,10 +318,14 @@ class PanelWindow(QWidget): self.setWindowTitle(app_title()) # 默认不置顶;仅图钉开启时与悬浮球一并置顶(见 _apply_pin_window_layer) self.setWindowFlags(Qt.WindowType.FramelessWindowHint) - # 任务栏/开始菜单图标:使用项目 logo.png - logo_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logo.png") - if os.path.exists(logo_path): - self.setWindowIcon(QIcon(logo_path)) + # 任务栏/开始菜单图标:优先 ico,其次 png + base_dir = os.path.dirname(os.path.dirname(__file__)) + logo_ico = os.path.join(base_dir, "logo.ico") + logo_png = os.path.join(base_dir, "logo.png") + if os.path.exists(logo_ico): + self.setWindowIcon(QIcon(logo_ico)) + elif os.path.exists(logo_png): + self.setWindowIcon(QIcon(logo_png)) self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground) self.setAcceptDrops(True) self.setMinimumSize(MIN_W, MIN_H) @@ -512,6 +516,7 @@ class PanelWindow(QWidget): ("管理员运行 CMD", "fa5s.terminal", self._open_admin_cmd), ("管理员运行 PowerShell", "fa5b.windows", self._open_admin_powershell), ("打开默认浏览器", "fa5s.globe", self._open_default_browser), + ("解除占用", "fa5s.unlock", self._open_unlocker), ] def _build_ui(self): @@ -958,6 +963,15 @@ class PanelWindow(QWidget): import webbrowser webbrowser.open("https://www.baidu.com") + def _open_unlocker(self): + try: + from ui.unlocker import UnlockDialog + except Exception as e: + dialog_style.warning(self, "无法打开解除占用工具", f"加载模块失败:{e}") + return + dlg = UnlockDialog(self) + dlg.exec() + def _open_wechat_multi(self): from ui.wechat_multi import WechatMultiDialog dlg = WechatMultiDialog(self) diff --git a/ui/settings_window.py b/ui/settings_window.py index cf7b16b..ade3156 100644 --- a/ui/settings_window.py +++ b/ui/settings_window.py @@ -3,7 +3,7 @@ import os from PyQt6.QtWidgets import ( QDialog, QHBoxLayout, QVBoxLayout, QListWidget, QListWidgetItem, QStackedWidget, QWidget, QLabel, QSlider, QPushButton, - QCheckBox, QGroupBox, QFormLayout, QSpinBox, QFrame + QCheckBox, QGroupBox, QFormLayout, QSpinBox, QFrame, QLineEdit, QFileDialog ) from PyQt6.QtCore import Qt, QSize from PyQt6.QtWidgets import QMessageBox @@ -12,6 +12,7 @@ import qtawesome as qta from db import database import ui.theme as theme import ui.dialog_style as dialog_style +from app_info import APP_NAME, __VERSION__ class SettingsWindow(QDialog): @@ -51,6 +52,7 @@ class SettingsWindow(QDialog): ("fa5s.paint-brush", "外观"), ("fa5s.window-maximize", "窗口"), ("fa5s.rocket", "启动"), + ("fa5s.folder-open", "缓存"), ("fa5s.heart", "捐赠"), ("fa5s.eraser", "初始化"), ] @@ -75,6 +77,7 @@ class SettingsWindow(QDialog): self.stack.addWidget(self._page_appearance()) self.stack.addWidget(self._page_window()) self.stack.addWidget(self._page_startup()) + self.stack.addWidget(self._page_cache()) self.stack.addWidget(self._page_donate()) self.stack.addWidget(self._page_initialization()) # 此时 stack 已就绪,允许导航回调正常工作 @@ -241,13 +244,65 @@ class SettingsWindow(QDialog): layout.addWidget(self._divider()) layout.addWidget(self._section_title("关于")) - about = QLabel("桌面文件整理 v1.0\n整理你的桌面快捷方式,保持桌面干净。") + about = QLabel(f"{APP_NAME} v{__VERSION__}\n整理你的桌面快捷方式,保持桌面干净。") about.setStyleSheet("color:#888; font-size:12px; line-height:1.6;") layout.addWidget(about) layout.addStretch() return page + # ── 缓存页 ─────────────────────────────────────────── + def _page_cache(self) -> QWidget: + page = QWidget() + layout = QVBoxLayout(page) + layout.setContentsMargins(24, 20, 24, 20) + layout.setSpacing(16) + + layout.addWidget(self._section_title("缓存目录")) + + desc = QLabel("下载的免安装软件/安装包默认保存到该目录。") + desc.setWordWrap(True) + desc.setStyleSheet("color:#888; font-size:12px; line-height:1.6;") + layout.addWidget(desc) + + row = QHBoxLayout() + row.setSpacing(10) + + self._cache_path_edit = QLineEdit() + self._cache_path_edit.setPlaceholderText("请选择缓存目录…") + self._cache_path_edit.setText(database.get_setting("cache_dir", "")) + + browse = QPushButton("浏览…") + browse.clicked.connect(self._choose_cache_dir) + + save = QPushButton("保存") + save.clicked.connect(self._save_cache_dir) + + row.addWidget(self._cache_path_edit) + row.addWidget(browse) + row.addWidget(save) + layout.addLayout(row) + + layout.addStretch() + return page + + def _choose_cache_dir(self): + cur = "" + if hasattr(self, "_cache_path_edit"): + cur = self._cache_path_edit.text().strip() + start = cur if cur else os.path.expanduser("~") + p = QFileDialog.getExistingDirectory(self, "选择缓存目录", start) + if p: + self._cache_path_edit.setText(p) + + def _save_cache_dir(self): + p = self._cache_path_edit.text().strip() if hasattr(self, "_cache_path_edit") else "" + if p and not os.path.isdir(p): + dialog_style.warning(self, "无效目录", "选择的缓存目录不存在,请重新选择。") + return + database.set_setting("cache_dir", p) + dialog_style.information(self, "已保存", "缓存目录已保存。") + # ── 捐赠页 ─────────────────────────────────────────── def _page_donate(self) -> QWidget: page = QWidget() diff --git a/ui/unlocker.py b/ui/unlocker.py new file mode 100644 index 0000000..94dcd83 --- /dev/null +++ b/ui/unlocker.py @@ -0,0 +1,382 @@ +from __future__ import annotations + +import os +from typing import List + +import psutil +from PyQt6.QtCore import Qt, pyqtSignal, QThread, QTimer +from PyQt6.QtWidgets import ( + QDialog, + QVBoxLayout, + QHBoxLayout, + QWidget, + QLabel, + QListWidget, + QListWidgetItem, + QPushButton, +) + +import ui.dialog_style as dialog_style +import ui.theme as theme + + +class _ScanWorker(QThread): + done = pyqtSignal(str, list) + error = pyqtSignal(str, str) + + def __init__(self, file_path: str): + super().__init__() + self._file_path = file_path + + def run(self): + try: + target = os.path.abspath(self._file_path) + + found: list[dict] = [] + if os.name == "nt": + try: + from ui.win_handle_scan import find_file_locks + + locks = find_file_locks(target) + # 聚合:pid -> handles + mp: dict[int, list[int]] = {} + for lk in locks: + mp.setdefault(lk.pid, []).append(int(lk.handle)) + for pid, handles in mp.items(): + name = "" + try: + name = psutil.Process(pid).name() + except Exception: + name = f"PID {pid}" + found.append({"pid": pid, "name": name, "handles": handles}) + except Exception: + found = [] + + # 兜底:psutil(可能漏报,但至少不会空白) + if not found: + target_norm = os.path.normcase(os.path.abspath(target)) + for proc in psutil.process_iter(["pid", "name", "open_files"]): + try: + files = proc.info.get("open_files") or [] + for f in files: + if os.path.normcase(f.path) == target_norm: + pid = int(proc.info.get("pid") or 0) + name = proc.info.get("name") or f"PID {pid}" + found.append({"pid": pid, "name": str(name), "handles": []}) + break + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + self.done.emit(os.path.normcase(os.path.abspath(target)), found) + except Exception as e: + self.error.emit(self._file_path, str(e)) + + +class _DropArea(QWidget): + file_dropped = pyqtSignal(str) + + def __init__(self, parent: QWidget | None = None): + super().__init__(parent) + self.setAcceptDrops(True) + self.setMinimumHeight(120) + + def dragEnterEvent(self, event): + if event.mimeData().hasUrls(): + event.acceptProposedAction() + else: + event.ignore() + + def dragMoveEvent(self, event): + if event.mimeData().hasUrls(): + event.acceptProposedAction() + else: + event.ignore() + + def dropEvent(self, event): + urls = event.mimeData().urls() + if not urls: + return + local = urls[0].toLocalFile() + if local: + self.file_dropped.emit(local) + + def paintEvent(self, event): + from PyQt6.QtGui import QPainter, QPen, QBrush, QColor + + super().paintEvent(event) + p = QPainter(self) + p.setRenderHint(QPainter.RenderHint.Antialiasing) + rect = self.rect().adjusted(6, 6, -6, -6) + t = theme.current() + border = t.get("search_border", "#888") + bg = t.get("search_bg", "rgba(0,0,0,20)") + p.setBrush(QBrush(QColor(bg))) + pen = QPen(QColor(border)) + pen.setStyle(Qt.PenStyle.DashLine) + pen.setWidth(1) + p.setPen(pen) + p.drawRoundedRect(rect, 8, 8) + p.end() + + +class UnlockDialog(QDialog): + def __init__(self, parent: QWidget | None = None): + super().__init__(parent) + self.setWindowTitle("解除占用") + self.setMinimumSize(520, 420) + self._targets: List[psutil.Process] = [] + self._file_path: str | None = None + self._scan_worker: _ScanWorker | None = None + self._loading_timer: QTimer | None = None + self._loading_step = 0 + self._build_ui() + self._apply_theme() + + def _build_ui(self): + layout = QVBoxLayout(self) + layout.setContentsMargins(18, 16, 18, 16) + layout.setSpacing(10) + + intro = QLabel( + "将被占用的文件拖拽到下方虚线框内,我会尝试找出正在占用它的程序," + "并在下方列出,方便你一键结束相关进程。" + ) + intro.setWordWrap(True) + self._intro_lbl = intro + layout.addWidget(intro) + + self._drop = _DropArea(self) + lbl = QLabel("拖拽文件到这里") + lbl.setAlignment(Qt.AlignmentFlag.AlignCenter) + self._drop_hint_lbl = lbl + layout_drop = QVBoxLayout(self._drop) + layout_drop.addStretch() + layout_drop.addWidget(lbl) + layout_drop.addStretch() + self._drop.file_dropped.connect(self._on_file_dropped) + layout.addWidget(self._drop) + + self._path_lbl = QLabel("当前文件:未选择") + layout.addWidget(self._path_lbl) + + self._list = QListWidget() + layout.addWidget(self._list, 1) + + btn_row = QHBoxLayout() + btn_row.addStretch() + self._unlock_btn = QPushButton("解除占用(结束相关进程)") + self._unlock_btn.clicked.connect(self._on_unlock) + btn_close = QPushButton("关闭") + btn_close.clicked.connect(self.close) + btn_row.addWidget(self._unlock_btn) + btn_row.addWidget(btn_close) + layout.addLayout(btn_row) + self._close_btn = btn_close + + def _apply_theme(self): + t = theme.current() + is_dark = theme.name() == "dark" + + # 兼容 panel_bg 为 rgba 的情况 + panel_bg = t.get("panel_bg", "rgba(30,30,30,240)") + if panel_bg.startswith("rgba"): + tmp = panel_bg.replace("rgba", "rgb") + head = tmp.rsplit(",", 1)[0] + panel_rgb = head + ")" + else: + panel_rgb = panel_bg + + txt = t.get("search_color", "#eee") + sub = "#888" if is_dark else "#666" + border = t.get("panel_border", t.get("search_border", "#666")) + inp_bg = t.get("search_bg", "rgba(0,0,0,20)") + hover = t.get("header_hover", t.get("menu_selected", "rgba(128,128,128,40)")) + + self.setStyleSheet( + f""" + QDialog, QWidget {{ + background: {panel_rgb}; + color: {txt}; + }} + QListWidget {{ + background: {inp_bg}; + border: 1px solid {t.get('search_border', border)}; + border-radius: 8px; + }} + QListWidget::item {{ + padding: 6px 8px; + border-radius: 6px; + }} + QListWidget::item:selected {{ + background: #4a9eff; + color: white; + }} + QListWidget::item:hover:!selected {{ + background: {hover}; + }} + QPushButton {{ + background: {inp_bg}; + color: {txt}; + border: 1px solid {t.get('search_border', border)}; + border-radius: 6px; + padding: 6px 14px; + font-size: 12px; + }} + QPushButton:hover {{ + border-color: #4a9eff; + color: #4a9eff; + }} + """ + ) + + if hasattr(self, "_path_lbl"): + self._path_lbl.setStyleSheet(f"color:{sub}; font-size:11px; background:transparent;") + if hasattr(self, "_intro_lbl"): + self._intro_lbl.setStyleSheet("background:transparent;") + if hasattr(self, "_drop_hint_lbl"): + self._drop_hint_lbl.setStyleSheet(f"color:{sub}; background:transparent;") + + def _on_file_dropped(self, path: str): + self._file_path = os.path.abspath(path) + self._path_lbl.setText(f"当前文件:{self._file_path}") + self._start_scan() + + def _set_loading(self, loading: bool): + if loading: + self._unlock_btn.setEnabled(False) + self._list.clear() + self._list.addItem(QListWidgetItem("正在扫描占用进程…")) + if self._loading_timer is None: + self._loading_timer = QTimer(self) + self._loading_timer.setInterval(250) + self._loading_timer.timeout.connect(self._tick_loading) + self._loading_step = 0 + self._loading_timer.start() + else: + self._unlock_btn.setEnabled(True) + if self._loading_timer is not None: + self._loading_timer.stop() + + def _tick_loading(self): + # 简易“加载动画”:点点点 + self._loading_step = (self._loading_step + 1) % 4 + dots = "." * self._loading_step + txt = f"正在扫描占用进程{dots}" + if self._list.count() > 0: + it = self._list.item(0) + if it is not None: + it.setText(txt) + + def _start_scan(self): + self._list.clear() + self._targets.clear() + if not self._file_path: + return + # 取消上一次扫描(如果仍在跑) + if self._scan_worker is not None and self._scan_worker.isRunning(): + try: + self._scan_worker.requestInterruption() + except Exception: + pass + self._set_loading(True) + self._scan_worker = _ScanWorker(self._file_path) + self._scan_worker.done.connect(self._on_scan_done) + self._scan_worker.error.connect(self._on_scan_error) + self._scan_worker.start() + + def _on_scan_error(self, path: str, err: str): + self._set_loading(False) + dialog_style.warning(self, "扫描失败", f"无法扫描占用进程:\n{err}") + + def _on_scan_done(self, norm_target: str, found: list): + self._set_loading(False) + self._list.clear() + self._targets.clear() + + if not found: + item = QListWidgetItem("未检测到占用该文件的进程。") + self._list.addItem(item) + return + + self._lock_infos = found # type: ignore[attr-defined] + # 终止进程时使用 + procs: list[psutil.Process] = [] + for it in found: + try: + pid = int(it.get("pid") or 0) + if pid <= 0: + continue + procs.append(psutil.Process(pid)) + except Exception: + continue + self._targets = procs + + for it in found: + pid = int(it.get("pid") or 0) + name = str(it.get("name") or f"PID {pid}") + hcnt = len(it.get("handles") or []) + suffix = f" (句柄 {hcnt})" if hcnt > 0 else "" + txt = f"{name} (PID {pid}){suffix}" + self._list.addItem(QListWidgetItem(txt)) + + def _on_unlock(self): + if not self._targets: + dialog_style.information(self, "提示", "当前没有检测到需要解除的占用进程。") + return + + # 优先:尝试关闭句柄(不杀进程) + closed_any = False + if os.name == "nt" and hasattr(self, "_lock_infos"): + try: + from ui.win_handle_scan import close_remote_handle + + for it in getattr(self, "_lock_infos", []): + pid = int(it.get("pid") or 0) + for hv in it.get("handles") or []: + try: + if close_remote_handle(pid, int(hv)): + closed_any = True + except Exception: + continue + except Exception: + pass + + if closed_any: + dialog_style.information(self, "已完成", "已尝试关闭相关文件句柄(不结束进程)。") + self._start_scan() + return + + failed = [] + for p in self._targets: + try: + p.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + # 简单等待一会儿,再尝试强杀 + for p in list(self._targets): + try: + p.wait(timeout=1.0) + except (psutil.TimeoutExpired, psutil.NoSuchProcess): + pass + + for p in self._targets: + try: + if p.is_running(): + try: + p.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + failed.append(p) + except psutil.NoSuchProcess: + continue + + if failed: + dialog_style.warning( + self, + "部分失败", + "部分进程无法结束,可能需要以管理员身份运行或手动关闭相关程序。", + ) + else: + dialog_style.information(self, "已完成", "相关占用进程已尝试结束。") + self._start_scan() + diff --git a/ui/win_handle_scan.py b/ui/win_handle_scan.py new file mode 100644 index 0000000..78303d6 --- /dev/null +++ b/ui/win_handle_scan.py @@ -0,0 +1,341 @@ +from __future__ import annotations + +import ctypes +import os +from ctypes import wintypes +from dataclasses import dataclass + + +kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) +ntdll = ctypes.WinDLL("ntdll", use_last_error=True) +advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) + + +PROCESS_DUP_HANDLE = 0x0040 +PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 +DUPLICATE_SAME_ACCESS = 0x00000002 +DUPLICATE_CLOSE_SOURCE = 0x00000001 + +GENERIC_READ = 0x80000000 +FILE_SHARE_READ = 0x00000001 +FILE_SHARE_WRITE = 0x00000002 +FILE_SHARE_DELETE = 0x00000004 +OPEN_EXISTING = 3 +FILE_ATTRIBUTE_NORMAL = 0x00000080 + + +SystemExtendedHandleInformation = 64 + + +class SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX(ctypes.Structure): + _fields_ = [ + ("Object", ctypes.c_void_p), + ("UniqueProcessId", ctypes.c_void_p), + ("HandleValue", ctypes.c_void_p), + ("GrantedAccess", wintypes.ULONG), + ("CreatorBackTraceIndex", wintypes.USHORT), + ("ObjectTypeIndex", wintypes.USHORT), + ("HandleAttributes", wintypes.ULONG), + ("Reserved", wintypes.ULONG), + ] + + +class SYSTEM_HANDLE_INFORMATION_EX(ctypes.Structure): + _fields_ = [ + ("NumberOfHandles", ctypes.c_void_p), + ("Reserved", ctypes.c_void_p), + ("Handles", SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * 1), + ] + + +NtQuerySystemInformation = ntdll.NtQuerySystemInformation +NtQuerySystemInformation.argtypes = [ + wintypes.ULONG, + wintypes.LPVOID, + wintypes.ULONG, + wintypes.PULONG, +] +NtQuerySystemInformation.restype = wintypes.LONG + + +kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD] +kernel32.OpenProcess.restype = wintypes.HANDLE + +kernel32.CloseHandle.argtypes = [wintypes.HANDLE] +kernel32.CloseHandle.restype = wintypes.BOOL + +kernel32.DuplicateHandle.argtypes = [ + wintypes.HANDLE, + wintypes.HANDLE, + wintypes.HANDLE, + ctypes.POINTER(wintypes.HANDLE), + wintypes.DWORD, + wintypes.BOOL, + wintypes.DWORD, +] +kernel32.DuplicateHandle.restype = wintypes.BOOL + +kernel32.GetCurrentProcess.argtypes = [] +kernel32.GetCurrentProcess.restype = wintypes.HANDLE + +kernel32.GetFinalPathNameByHandleW.argtypes = [ + wintypes.HANDLE, + wintypes.LPWSTR, + wintypes.DWORD, + wintypes.DWORD, +] +kernel32.GetFinalPathNameByHandleW.restype = wintypes.DWORD + +kernel32.GetCurrentProcessId.argtypes = [] +kernel32.GetCurrentProcessId.restype = wintypes.DWORD + +kernel32.CreateFileW.argtypes = [ + wintypes.LPCWSTR, + wintypes.DWORD, + wintypes.DWORD, + wintypes.LPVOID, + wintypes.DWORD, + wintypes.DWORD, + wintypes.HANDLE, +] +kernel32.CreateFileW.restype = wintypes.HANDLE + + +def _normalize_path(p: str) -> str: + p = os.path.abspath(p) + p = os.path.normcase(p) + return p + + +def _normalize_final_path(p: str) -> str: + # GetFinalPathNameByHandleW 常见返回:\\?\C:\path\file 或 \\?\UNC\server\share\path + if p.startswith("\\\\?\\UNC\\"): + p = "\\\\" + p[len("\\\\?\\UNC\\") :] + elif p.startswith("\\\\?\\"): + p = p[len("\\\\?\\") :] + return _normalize_path(p) + + +def _try_enable_debug_privilege() -> None: + # best-effort:没有也能工作,只是会漏掉部分系统进程 + TOKEN_ADJUST_PRIVILEGES = 0x0020 + TOKEN_QUERY = 0x0008 + SE_PRIVILEGE_ENABLED = 0x0002 + + class LUID(ctypes.Structure): + _fields_ = [("LowPart", wintypes.DWORD), ("HighPart", wintypes.LONG)] + + class LUID_AND_ATTRIBUTES(ctypes.Structure): + _fields_ = [("Luid", LUID), ("Attributes", wintypes.DWORD)] + + class TOKEN_PRIVILEGES(ctypes.Structure): + _fields_ = [("PrivilegeCount", wintypes.DWORD), ("Privileges", LUID_AND_ATTRIBUTES * 1)] + + OpenProcessToken = advapi32.OpenProcessToken + OpenProcessToken.argtypes = [wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)] + OpenProcessToken.restype = wintypes.BOOL + + LookupPrivilegeValueW = advapi32.LookupPrivilegeValueW + LookupPrivilegeValueW.argtypes = [wintypes.LPCWSTR, wintypes.LPCWSTR, ctypes.POINTER(LUID)] + LookupPrivilegeValueW.restype = wintypes.BOOL + + AdjustTokenPrivileges = advapi32.AdjustTokenPrivileges + AdjustTokenPrivileges.argtypes = [ + wintypes.HANDLE, + wintypes.BOOL, + ctypes.POINTER(TOKEN_PRIVILEGES), + wintypes.DWORD, + wintypes.PVOID, + wintypes.PVOID, + ] + AdjustTokenPrivileges.restype = wintypes.BOOL + + token = wintypes.HANDLE() + if not OpenProcessToken(kernel32.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, ctypes.byref(token)): + return + try: + luid = LUID() + if not LookupPrivilegeValueW(None, "SeDebugPrivilege", ctypes.byref(luid)): + return + tp = TOKEN_PRIVILEGES() + tp.PrivilegeCount = 1 + tp.Privileges[0].Luid = luid + tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED + AdjustTokenPrivileges(token, False, ctypes.byref(tp), 0, None, None) + finally: + kernel32.CloseHandle(token) + + +@dataclass(frozen=True) +class FileLock: + pid: int + handle: int + path: str + + +def _query_system_handles(max_size: int = 1 << 28) -> tuple[ctypes.Array, int] | tuple[None, int]: + """返回 (buffer, handle_count);失败返回 (None, 0)。""" + size = 1 << 20 + while True: + buf = ctypes.create_string_buffer(size) + ret_len = wintypes.ULONG(0) + status = NtQuerySystemInformation( + SystemExtendedHandleInformation, buf, size, ctypes.byref(ret_len) + ) + if status == 0: # STATUS_SUCCESS + info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents + n = int(info.NumberOfHandles) + return buf, n + # STATUS_INFO_LENGTH_MISMATCH = 0xC0000004 (signed: -1073741820) + if status in (-1073741820, 0xC0000004): + size = max(size * 2, int(ret_len.value) + 0x10000) + if size > max_size: + return None, 0 + continue + return None, 0 + + +def _get_file_object_type_index(buf: ctypes.Array, handle_count: int, sample_path: str) -> int | None: + """ + 通过“本进程打开一个文件句柄”,再在系统句柄表中找到它对应的 ObjectTypeIndex。 + 这样可以只扫描 File 类型句柄,速度会快很多。 + """ + try: + h = kernel32.CreateFileW( + sample_path, + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + None, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, + None, + ) + if not h or h == wintypes.HANDLE(-1).value: + return None + except Exception: + return None + + try: + cur_pid = int(kernel32.GetCurrentProcessId()) + hv = int(ctypes.cast(h, ctypes.c_size_t).value) + + n = int(handle_count) + if n <= 0: + return None + array_type = SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * n + info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents + handles = ctypes.cast(ctypes.addressof(info.Handles), ctypes.POINTER(array_type)).contents + + for ent in handles: + pid = int(ctypes.cast(ent.UniqueProcessId, ctypes.c_size_t).value) + if pid != cur_pid: + continue + val = int(ctypes.cast(ent.HandleValue, ctypes.c_size_t).value) + if val == hv: + return int(ent.ObjectTypeIndex) + return None + finally: + try: + kernel32.CloseHandle(h) + except Exception: + pass + + +def find_file_locks(target_path: str, *, max_handles: int = 200000) -> list[FileLock]: + """ + 返回当前系统中占用 target_path 的 (pid, handle) 列表。 + - 基于 SystemExtendedHandleInformation + DuplicateHandle + GetFinalPathNameByHandleW + """ + _try_enable_debug_privilege() + target_norm = _normalize_path(target_path) + + buf, n = _query_system_handles() + if buf is None or n <= 0: + return [] + + if n <= 0: + return [] + if n > max_handles: + n = max_handles + + # 重新按实际数量解释数组 + array_type = SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * n + info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents + handles = ctypes.cast(ctypes.addressof(info.Handles), ctypes.POINTER(array_type)).contents + + cur_proc = kernel32.GetCurrentProcess() + locks: list[FileLock] = [] + + file_type_index = _get_file_object_type_index(buf, int(info.NumberOfHandles), target_path) + # 进程句柄缓存:避免每个 handle 都 OpenProcess/CloseHandle + proc_cache: dict[int, wintypes.HANDLE] = {} + + for h in handles: + pid = int(ctypes.cast(h.UniqueProcessId, ctypes.c_size_t).value) + hv = int(ctypes.cast(h.HandleValue, ctypes.c_size_t).value) + if pid <= 0 or hv <= 0: + continue + if file_type_index is not None and int(h.ObjectTypeIndex) != int(file_type_index): + continue + + ph = proc_cache.get(pid) + if not ph: + ph = kernel32.OpenProcess( + PROCESS_DUP_HANDLE | PROCESS_QUERY_LIMITED_INFORMATION, False, pid + ) + if not ph: + continue + proc_cache[pid] = ph + try: + dup = wintypes.HANDLE() + if not kernel32.DuplicateHandle(ph, wintypes.HANDLE(hv), cur_proc, ctypes.byref(dup), 0, False, DUPLICATE_SAME_ACCESS): + continue + try: + # 获取路径 + bufw = ctypes.create_unicode_buffer(4096) + got = kernel32.GetFinalPathNameByHandleW(dup, bufw, len(bufw), 0) + if got == 0 or got >= len(bufw): + continue + final_norm = _normalize_final_path(bufw.value) + if final_norm == target_norm: + locks.append(FileLock(pid=pid, handle=hv, path=final_norm)) + finally: + kernel32.CloseHandle(dup) + finally: + pass + + for ph in proc_cache.values(): + try: + kernel32.CloseHandle(ph) + except Exception: + pass + + return locks + + +def close_remote_handle(pid: int, handle_value: int) -> bool: + """ + 关闭目标进程中指定 handle(不结束进程)。 + 需要足够权限(通常管理员 + SeDebugPrivilege 才更稳)。 + """ + _try_enable_debug_privilege() + ph = kernel32.OpenProcess(PROCESS_DUP_HANDLE, False, int(pid)) + if not ph: + return False + try: + dup = wintypes.HANDLE() + ok = kernel32.DuplicateHandle( + ph, + wintypes.HANDLE(int(handle_value)), + kernel32.GetCurrentProcess(), + ctypes.byref(dup), + 0, + False, + DUPLICATE_CLOSE_SOURCE, + ) + if ok and dup: + kernel32.CloseHandle(dup) + return bool(ok) + finally: + kernel32.CloseHandle(ph) +