增加解除占用功能

This commit is contained in:
扫地僧 2026-04-08 00:07:40 +08:00
parent 005fc3a77c
commit ff8aa0269d
9 changed files with 823 additions and 8 deletions

View File

@ -1,4 +1,4 @@
#define AppVersion "0.0.3" #define AppVersion "0.0.4"
; Inno Setup 安装脚本(全局安装) ; Inno Setup 安装脚本(全局安装)
; 产物约定(统一放在 dist\niumasoftware 下): ; 产物约定(统一放在 dist\niumasoftware 下):
; - dist\niumasoftware\niumasoftware.exe 主程序PyInstaller onedir ; - dist\niumasoftware\niumasoftware.exe 主程序PyInstaller onedir

22
main.py
View File

@ -3,6 +3,7 @@ import os
import time import time
from PyQt6.QtCore import Qt from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import QApplication from PyQt6.QtWidgets import QApplication
from PyQt6.QtGui import QIcon
from db.database import init_db from db.database import init_db
from ui.dock import PanelWindow, _set_autostart from ui.dock import PanelWindow, _set_autostart
from ui.ball import FloatBall, BALL_SIZE from ui.ball import FloatBall, BALL_SIZE
@ -21,6 +22,18 @@ def get_resource_path(relative_path):
return os.path.join(os.path.abspath("."), relative_path) return os.path.join(os.path.abspath("."), relative_path)
# ========================================================== # ==========================================================
def _set_windows_app_user_model_id(appid: str):
"""让任务栏图标/分组按 AppID 识别(开发态避免显示 Python 图标)。"""
if sys.platform != "win32":
return
try:
import ctypes
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID(appid)
except Exception:
pass
def _wake_existing_or_exit() -> bool: def _wake_existing_or_exit() -> bool:
""" """
Windows 单实例 Windows 单实例
@ -75,6 +88,15 @@ def main():
app = QApplication(sys.argv) app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False) app.setQuitOnLastWindowClosed(False)
_set_windows_app_user_model_id("niumasoftware")
# 任务栏/窗口图标:开发态用 ico打包后也能通过资源路径找到
try:
ico_path = get_resource_path("logo.ico")
if os.path.exists(ico_path):
app.setWindowIcon(QIcon(ico_path))
except Exception:
pass
init_db() init_db()
theme.load() theme.load()

View File

@ -1,2 +1,3 @@
PyQt6>=6.4.0 PyQt6>=6.4.0
pillow>=9.0.0 pillow>=9.0.0
psutil>=5.9.0

Binary file not shown.

View File

@ -318,10 +318,14 @@ class PanelWindow(QWidget):
self.setWindowTitle(app_title()) self.setWindowTitle(app_title())
# 默认不置顶;仅图钉开启时与悬浮球一并置顶(见 _apply_pin_window_layer # 默认不置顶;仅图钉开启时与悬浮球一并置顶(见 _apply_pin_window_layer
self.setWindowFlags(Qt.WindowType.FramelessWindowHint) self.setWindowFlags(Qt.WindowType.FramelessWindowHint)
# 任务栏/开始菜单图标:使用项目 logo.png # 任务栏/开始菜单图标:优先 ico其次 png
logo_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logo.png") base_dir = os.path.dirname(os.path.dirname(__file__))
if os.path.exists(logo_path): logo_ico = os.path.join(base_dir, "logo.ico")
self.setWindowIcon(QIcon(logo_path)) logo_png = os.path.join(base_dir, "logo.png")
if os.path.exists(logo_ico):
self.setWindowIcon(QIcon(logo_ico))
elif os.path.exists(logo_png):
self.setWindowIcon(QIcon(logo_png))
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground) self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
self.setAcceptDrops(True) self.setAcceptDrops(True)
self.setMinimumSize(MIN_W, MIN_H) self.setMinimumSize(MIN_W, MIN_H)
@ -512,6 +516,7 @@ class PanelWindow(QWidget):
("管理员运行 CMD", "fa5s.terminal", self._open_admin_cmd), ("管理员运行 CMD", "fa5s.terminal", self._open_admin_cmd),
("管理员运行 PowerShell", "fa5b.windows", self._open_admin_powershell), ("管理员运行 PowerShell", "fa5b.windows", self._open_admin_powershell),
("打开默认浏览器", "fa5s.globe", self._open_default_browser), ("打开默认浏览器", "fa5s.globe", self._open_default_browser),
("解除占用", "fa5s.unlock", self._open_unlocker),
] ]
def _build_ui(self): def _build_ui(self):
@ -958,6 +963,15 @@ class PanelWindow(QWidget):
import webbrowser import webbrowser
webbrowser.open("https://www.baidu.com") webbrowser.open("https://www.baidu.com")
def _open_unlocker(self):
try:
from ui.unlocker import UnlockDialog
except Exception as e:
dialog_style.warning(self, "无法打开解除占用工具", f"加载模块失败:{e}")
return
dlg = UnlockDialog(self)
dlg.exec()
def _open_wechat_multi(self): def _open_wechat_multi(self):
from ui.wechat_multi import WechatMultiDialog from ui.wechat_multi import WechatMultiDialog
dlg = WechatMultiDialog(self) dlg = WechatMultiDialog(self)

View File

@ -3,7 +3,7 @@ import os
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QDialog, QHBoxLayout, QVBoxLayout, QListWidget, QListWidgetItem, QDialog, QHBoxLayout, QVBoxLayout, QListWidget, QListWidgetItem,
QStackedWidget, QWidget, QLabel, QSlider, QPushButton, QStackedWidget, QWidget, QLabel, QSlider, QPushButton,
QCheckBox, QGroupBox, QFormLayout, QSpinBox, QFrame QCheckBox, QGroupBox, QFormLayout, QSpinBox, QFrame, QLineEdit, QFileDialog
) )
from PyQt6.QtCore import Qt, QSize from PyQt6.QtCore import Qt, QSize
from PyQt6.QtWidgets import QMessageBox from PyQt6.QtWidgets import QMessageBox
@ -12,6 +12,7 @@ import qtawesome as qta
from db import database from db import database
import ui.theme as theme import ui.theme as theme
import ui.dialog_style as dialog_style import ui.dialog_style as dialog_style
from app_info import APP_NAME, __VERSION__
class SettingsWindow(QDialog): class SettingsWindow(QDialog):
@ -51,6 +52,7 @@ class SettingsWindow(QDialog):
("fa5s.paint-brush", "外观"), ("fa5s.paint-brush", "外观"),
("fa5s.window-maximize", "窗口"), ("fa5s.window-maximize", "窗口"),
("fa5s.rocket", "启动"), ("fa5s.rocket", "启动"),
("fa5s.folder-open", "缓存"),
("fa5s.heart", "捐赠"), ("fa5s.heart", "捐赠"),
("fa5s.eraser", "初始化"), ("fa5s.eraser", "初始化"),
] ]
@ -75,6 +77,7 @@ class SettingsWindow(QDialog):
self.stack.addWidget(self._page_appearance()) self.stack.addWidget(self._page_appearance())
self.stack.addWidget(self._page_window()) self.stack.addWidget(self._page_window())
self.stack.addWidget(self._page_startup()) self.stack.addWidget(self._page_startup())
self.stack.addWidget(self._page_cache())
self.stack.addWidget(self._page_donate()) self.stack.addWidget(self._page_donate())
self.stack.addWidget(self._page_initialization()) self.stack.addWidget(self._page_initialization())
# 此时 stack 已就绪,允许导航回调正常工作 # 此时 stack 已就绪,允许导航回调正常工作
@ -241,13 +244,65 @@ class SettingsWindow(QDialog):
layout.addWidget(self._divider()) layout.addWidget(self._divider())
layout.addWidget(self._section_title("关于")) layout.addWidget(self._section_title("关于"))
about = QLabel("桌面文件整理 v1.0\n整理你的桌面快捷方式,保持桌面干净。") about = QLabel(f"{APP_NAME} v{__VERSION__}\n整理你的桌面快捷方式,保持桌面干净。")
about.setStyleSheet("color:#888; font-size:12px; line-height:1.6;") about.setStyleSheet("color:#888; font-size:12px; line-height:1.6;")
layout.addWidget(about) layout.addWidget(about)
layout.addStretch() layout.addStretch()
return page return page
# ── 缓存页 ───────────────────────────────────────────
def _page_cache(self) -> QWidget:
page = QWidget()
layout = QVBoxLayout(page)
layout.setContentsMargins(24, 20, 24, 20)
layout.setSpacing(16)
layout.addWidget(self._section_title("缓存目录"))
desc = QLabel("下载的免安装软件/安装包默认保存到该目录。")
desc.setWordWrap(True)
desc.setStyleSheet("color:#888; font-size:12px; line-height:1.6;")
layout.addWidget(desc)
row = QHBoxLayout()
row.setSpacing(10)
self._cache_path_edit = QLineEdit()
self._cache_path_edit.setPlaceholderText("请选择缓存目录…")
self._cache_path_edit.setText(database.get_setting("cache_dir", ""))
browse = QPushButton("浏览…")
browse.clicked.connect(self._choose_cache_dir)
save = QPushButton("保存")
save.clicked.connect(self._save_cache_dir)
row.addWidget(self._cache_path_edit)
row.addWidget(browse)
row.addWidget(save)
layout.addLayout(row)
layout.addStretch()
return page
def _choose_cache_dir(self):
cur = ""
if hasattr(self, "_cache_path_edit"):
cur = self._cache_path_edit.text().strip()
start = cur if cur else os.path.expanduser("~")
p = QFileDialog.getExistingDirectory(self, "选择缓存目录", start)
if p:
self._cache_path_edit.setText(p)
def _save_cache_dir(self):
p = self._cache_path_edit.text().strip() if hasattr(self, "_cache_path_edit") else ""
if p and not os.path.isdir(p):
dialog_style.warning(self, "无效目录", "选择的缓存目录不存在,请重新选择。")
return
database.set_setting("cache_dir", p)
dialog_style.information(self, "已保存", "缓存目录已保存。")
# ── 捐赠页 ─────────────────────────────────────────── # ── 捐赠页 ───────────────────────────────────────────
def _page_donate(self) -> QWidget: def _page_donate(self) -> QWidget:
page = QWidget() page = QWidget()

382
ui/unlocker.py Normal file
View File

@ -0,0 +1,382 @@
from __future__ import annotations
import os
from typing import List
import psutil
from PyQt6.QtCore import Qt, pyqtSignal, QThread, QTimer
from PyQt6.QtWidgets import (
QDialog,
QVBoxLayout,
QHBoxLayout,
QWidget,
QLabel,
QListWidget,
QListWidgetItem,
QPushButton,
)
import ui.dialog_style as dialog_style
import ui.theme as theme
class _ScanWorker(QThread):
done = pyqtSignal(str, list)
error = pyqtSignal(str, str)
def __init__(self, file_path: str):
super().__init__()
self._file_path = file_path
def run(self):
try:
target = os.path.abspath(self._file_path)
found: list[dict] = []
if os.name == "nt":
try:
from ui.win_handle_scan import find_file_locks
locks = find_file_locks(target)
# 聚合pid -> handles
mp: dict[int, list[int]] = {}
for lk in locks:
mp.setdefault(lk.pid, []).append(int(lk.handle))
for pid, handles in mp.items():
name = ""
try:
name = psutil.Process(pid).name()
except Exception:
name = f"PID {pid}"
found.append({"pid": pid, "name": name, "handles": handles})
except Exception:
found = []
# 兜底psutil可能漏报但至少不会空白
if not found:
target_norm = os.path.normcase(os.path.abspath(target))
for proc in psutil.process_iter(["pid", "name", "open_files"]):
try:
files = proc.info.get("open_files") or []
for f in files:
if os.path.normcase(f.path) == target_norm:
pid = int(proc.info.get("pid") or 0)
name = proc.info.get("name") or f"PID {pid}"
found.append({"pid": pid, "name": str(name), "handles": []})
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
self.done.emit(os.path.normcase(os.path.abspath(target)), found)
except Exception as e:
self.error.emit(self._file_path, str(e))
class _DropArea(QWidget):
file_dropped = pyqtSignal(str)
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
self.setAcceptDrops(True)
self.setMinimumHeight(120)
def dragEnterEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dragMoveEvent(self, event):
if event.mimeData().hasUrls():
event.acceptProposedAction()
else:
event.ignore()
def dropEvent(self, event):
urls = event.mimeData().urls()
if not urls:
return
local = urls[0].toLocalFile()
if local:
self.file_dropped.emit(local)
def paintEvent(self, event):
from PyQt6.QtGui import QPainter, QPen, QBrush, QColor
super().paintEvent(event)
p = QPainter(self)
p.setRenderHint(QPainter.RenderHint.Antialiasing)
rect = self.rect().adjusted(6, 6, -6, -6)
t = theme.current()
border = t.get("search_border", "#888")
bg = t.get("search_bg", "rgba(0,0,0,20)")
p.setBrush(QBrush(QColor(bg)))
pen = QPen(QColor(border))
pen.setStyle(Qt.PenStyle.DashLine)
pen.setWidth(1)
p.setPen(pen)
p.drawRoundedRect(rect, 8, 8)
p.end()
class UnlockDialog(QDialog):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
self.setWindowTitle("解除占用")
self.setMinimumSize(520, 420)
self._targets: List[psutil.Process] = []
self._file_path: str | None = None
self._scan_worker: _ScanWorker | None = None
self._loading_timer: QTimer | None = None
self._loading_step = 0
self._build_ui()
self._apply_theme()
def _build_ui(self):
layout = QVBoxLayout(self)
layout.setContentsMargins(18, 16, 18, 16)
layout.setSpacing(10)
intro = QLabel(
"将被占用的文件拖拽到下方虚线框内,我会尝试找出正在占用它的程序,"
"并在下方列出,方便你一键结束相关进程。"
)
intro.setWordWrap(True)
self._intro_lbl = intro
layout.addWidget(intro)
self._drop = _DropArea(self)
lbl = QLabel("拖拽文件到这里")
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
self._drop_hint_lbl = lbl
layout_drop = QVBoxLayout(self._drop)
layout_drop.addStretch()
layout_drop.addWidget(lbl)
layout_drop.addStretch()
self._drop.file_dropped.connect(self._on_file_dropped)
layout.addWidget(self._drop)
self._path_lbl = QLabel("当前文件:未选择")
layout.addWidget(self._path_lbl)
self._list = QListWidget()
layout.addWidget(self._list, 1)
btn_row = QHBoxLayout()
btn_row.addStretch()
self._unlock_btn = QPushButton("解除占用(结束相关进程)")
self._unlock_btn.clicked.connect(self._on_unlock)
btn_close = QPushButton("关闭")
btn_close.clicked.connect(self.close)
btn_row.addWidget(self._unlock_btn)
btn_row.addWidget(btn_close)
layout.addLayout(btn_row)
self._close_btn = btn_close
def _apply_theme(self):
t = theme.current()
is_dark = theme.name() == "dark"
# 兼容 panel_bg 为 rgba 的情况
panel_bg = t.get("panel_bg", "rgba(30,30,30,240)")
if panel_bg.startswith("rgba"):
tmp = panel_bg.replace("rgba", "rgb")
head = tmp.rsplit(",", 1)[0]
panel_rgb = head + ")"
else:
panel_rgb = panel_bg
txt = t.get("search_color", "#eee")
sub = "#888" if is_dark else "#666"
border = t.get("panel_border", t.get("search_border", "#666"))
inp_bg = t.get("search_bg", "rgba(0,0,0,20)")
hover = t.get("header_hover", t.get("menu_selected", "rgba(128,128,128,40)"))
self.setStyleSheet(
f"""
QDialog, QWidget {{
background: {panel_rgb};
color: {txt};
}}
QListWidget {{
background: {inp_bg};
border: 1px solid {t.get('search_border', border)};
border-radius: 8px;
}}
QListWidget::item {{
padding: 6px 8px;
border-radius: 6px;
}}
QListWidget::item:selected {{
background: #4a9eff;
color: white;
}}
QListWidget::item:hover:!selected {{
background: {hover};
}}
QPushButton {{
background: {inp_bg};
color: {txt};
border: 1px solid {t.get('search_border', border)};
border-radius: 6px;
padding: 6px 14px;
font-size: 12px;
}}
QPushButton:hover {{
border-color: #4a9eff;
color: #4a9eff;
}}
"""
)
if hasattr(self, "_path_lbl"):
self._path_lbl.setStyleSheet(f"color:{sub}; font-size:11px; background:transparent;")
if hasattr(self, "_intro_lbl"):
self._intro_lbl.setStyleSheet("background:transparent;")
if hasattr(self, "_drop_hint_lbl"):
self._drop_hint_lbl.setStyleSheet(f"color:{sub}; background:transparent;")
def _on_file_dropped(self, path: str):
self._file_path = os.path.abspath(path)
self._path_lbl.setText(f"当前文件:{self._file_path}")
self._start_scan()
def _set_loading(self, loading: bool):
if loading:
self._unlock_btn.setEnabled(False)
self._list.clear()
self._list.addItem(QListWidgetItem("正在扫描占用进程…"))
if self._loading_timer is None:
self._loading_timer = QTimer(self)
self._loading_timer.setInterval(250)
self._loading_timer.timeout.connect(self._tick_loading)
self._loading_step = 0
self._loading_timer.start()
else:
self._unlock_btn.setEnabled(True)
if self._loading_timer is not None:
self._loading_timer.stop()
def _tick_loading(self):
# 简易“加载动画”:点点点
self._loading_step = (self._loading_step + 1) % 4
dots = "." * self._loading_step
txt = f"正在扫描占用进程{dots}"
if self._list.count() > 0:
it = self._list.item(0)
if it is not None:
it.setText(txt)
def _start_scan(self):
self._list.clear()
self._targets.clear()
if not self._file_path:
return
# 取消上一次扫描(如果仍在跑)
if self._scan_worker is not None and self._scan_worker.isRunning():
try:
self._scan_worker.requestInterruption()
except Exception:
pass
self._set_loading(True)
self._scan_worker = _ScanWorker(self._file_path)
self._scan_worker.done.connect(self._on_scan_done)
self._scan_worker.error.connect(self._on_scan_error)
self._scan_worker.start()
def _on_scan_error(self, path: str, err: str):
self._set_loading(False)
dialog_style.warning(self, "扫描失败", f"无法扫描占用进程:\n{err}")
def _on_scan_done(self, norm_target: str, found: list):
self._set_loading(False)
self._list.clear()
self._targets.clear()
if not found:
item = QListWidgetItem("未检测到占用该文件的进程。")
self._list.addItem(item)
return
self._lock_infos = found # type: ignore[attr-defined]
# 终止进程时使用
procs: list[psutil.Process] = []
for it in found:
try:
pid = int(it.get("pid") or 0)
if pid <= 0:
continue
procs.append(psutil.Process(pid))
except Exception:
continue
self._targets = procs
for it in found:
pid = int(it.get("pid") or 0)
name = str(it.get("name") or f"PID {pid}")
hcnt = len(it.get("handles") or [])
suffix = f" (句柄 {hcnt})" if hcnt > 0 else ""
txt = f"{name} (PID {pid}){suffix}"
self._list.addItem(QListWidgetItem(txt))
def _on_unlock(self):
if not self._targets:
dialog_style.information(self, "提示", "当前没有检测到需要解除的占用进程。")
return
# 优先:尝试关闭句柄(不杀进程)
closed_any = False
if os.name == "nt" and hasattr(self, "_lock_infos"):
try:
from ui.win_handle_scan import close_remote_handle
for it in getattr(self, "_lock_infos", []):
pid = int(it.get("pid") or 0)
for hv in it.get("handles") or []:
try:
if close_remote_handle(pid, int(hv)):
closed_any = True
except Exception:
continue
except Exception:
pass
if closed_any:
dialog_style.information(self, "已完成", "已尝试关闭相关文件句柄(不结束进程)。")
self._start_scan()
return
failed = []
for p in self._targets:
try:
p.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 简单等待一会儿,再尝试强杀
for p in list(self._targets):
try:
p.wait(timeout=1.0)
except (psutil.TimeoutExpired, psutil.NoSuchProcess):
pass
for p in self._targets:
try:
if p.is_running():
try:
p.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
failed.append(p)
except psutil.NoSuchProcess:
continue
if failed:
dialog_style.warning(
self,
"部分失败",
"部分进程无法结束,可能需要以管理员身份运行或手动关闭相关程序。",
)
else:
dialog_style.information(self, "已完成", "相关占用进程已尝试结束。")
self._start_scan()

341
ui/win_handle_scan.py Normal file
View File

@ -0,0 +1,341 @@
from __future__ import annotations
import ctypes
import os
from ctypes import wintypes
from dataclasses import dataclass
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
ntdll = ctypes.WinDLL("ntdll", use_last_error=True)
advapi32 = ctypes.WinDLL("advapi32", use_last_error=True)
PROCESS_DUP_HANDLE = 0x0040
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
DUPLICATE_SAME_ACCESS = 0x00000002
DUPLICATE_CLOSE_SOURCE = 0x00000001
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
FILE_SHARE_DELETE = 0x00000004
OPEN_EXISTING = 3
FILE_ATTRIBUTE_NORMAL = 0x00000080
SystemExtendedHandleInformation = 64
class SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX(ctypes.Structure):
_fields_ = [
("Object", ctypes.c_void_p),
("UniqueProcessId", ctypes.c_void_p),
("HandleValue", ctypes.c_void_p),
("GrantedAccess", wintypes.ULONG),
("CreatorBackTraceIndex", wintypes.USHORT),
("ObjectTypeIndex", wintypes.USHORT),
("HandleAttributes", wintypes.ULONG),
("Reserved", wintypes.ULONG),
]
class SYSTEM_HANDLE_INFORMATION_EX(ctypes.Structure):
_fields_ = [
("NumberOfHandles", ctypes.c_void_p),
("Reserved", ctypes.c_void_p),
("Handles", SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * 1),
]
NtQuerySystemInformation = ntdll.NtQuerySystemInformation
NtQuerySystemInformation.argtypes = [
wintypes.ULONG,
wintypes.LPVOID,
wintypes.ULONG,
wintypes.PULONG,
]
NtQuerySystemInformation.restype = wintypes.LONG
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
kernel32.DuplicateHandle.argtypes = [
wintypes.HANDLE,
wintypes.HANDLE,
wintypes.HANDLE,
ctypes.POINTER(wintypes.HANDLE),
wintypes.DWORD,
wintypes.BOOL,
wintypes.DWORD,
]
kernel32.DuplicateHandle.restype = wintypes.BOOL
kernel32.GetCurrentProcess.argtypes = []
kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetFinalPathNameByHandleW.argtypes = [
wintypes.HANDLE,
wintypes.LPWSTR,
wintypes.DWORD,
wintypes.DWORD,
]
kernel32.GetFinalPathNameByHandleW.restype = wintypes.DWORD
kernel32.GetCurrentProcessId.argtypes = []
kernel32.GetCurrentProcessId.restype = wintypes.DWORD
kernel32.CreateFileW.argtypes = [
wintypes.LPCWSTR,
wintypes.DWORD,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
wintypes.DWORD,
wintypes.HANDLE,
]
kernel32.CreateFileW.restype = wintypes.HANDLE
def _normalize_path(p: str) -> str:
p = os.path.abspath(p)
p = os.path.normcase(p)
return p
def _normalize_final_path(p: str) -> str:
# GetFinalPathNameByHandleW 常见返回:\\?\C:\path\file 或 \\?\UNC\server\share\path
if p.startswith("\\\\?\\UNC\\"):
p = "\\\\" + p[len("\\\\?\\UNC\\") :]
elif p.startswith("\\\\?\\"):
p = p[len("\\\\?\\") :]
return _normalize_path(p)
def _try_enable_debug_privilege() -> None:
# best-effort没有也能工作只是会漏掉部分系统进程
TOKEN_ADJUST_PRIVILEGES = 0x0020
TOKEN_QUERY = 0x0008
SE_PRIVILEGE_ENABLED = 0x0002
class LUID(ctypes.Structure):
_fields_ = [("LowPart", wintypes.DWORD), ("HighPart", wintypes.LONG)]
class LUID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [("Luid", LUID), ("Attributes", wintypes.DWORD)]
class TOKEN_PRIVILEGES(ctypes.Structure):
_fields_ = [("PrivilegeCount", wintypes.DWORD), ("Privileges", LUID_AND_ATTRIBUTES * 1)]
OpenProcessToken = advapi32.OpenProcessToken
OpenProcessToken.argtypes = [wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)]
OpenProcessToken.restype = wintypes.BOOL
LookupPrivilegeValueW = advapi32.LookupPrivilegeValueW
LookupPrivilegeValueW.argtypes = [wintypes.LPCWSTR, wintypes.LPCWSTR, ctypes.POINTER(LUID)]
LookupPrivilegeValueW.restype = wintypes.BOOL
AdjustTokenPrivileges = advapi32.AdjustTokenPrivileges
AdjustTokenPrivileges.argtypes = [
wintypes.HANDLE,
wintypes.BOOL,
ctypes.POINTER(TOKEN_PRIVILEGES),
wintypes.DWORD,
wintypes.PVOID,
wintypes.PVOID,
]
AdjustTokenPrivileges.restype = wintypes.BOOL
token = wintypes.HANDLE()
if not OpenProcessToken(kernel32.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, ctypes.byref(token)):
return
try:
luid = LUID()
if not LookupPrivilegeValueW(None, "SeDebugPrivilege", ctypes.byref(luid)):
return
tp = TOKEN_PRIVILEGES()
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
AdjustTokenPrivileges(token, False, ctypes.byref(tp), 0, None, None)
finally:
kernel32.CloseHandle(token)
@dataclass(frozen=True)
class FileLock:
pid: int
handle: int
path: str
def _query_system_handles(max_size: int = 1 << 28) -> tuple[ctypes.Array, int] | tuple[None, int]:
"""返回 (buffer, handle_count);失败返回 (None, 0)。"""
size = 1 << 20
while True:
buf = ctypes.create_string_buffer(size)
ret_len = wintypes.ULONG(0)
status = NtQuerySystemInformation(
SystemExtendedHandleInformation, buf, size, ctypes.byref(ret_len)
)
if status == 0: # STATUS_SUCCESS
info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents
n = int(info.NumberOfHandles)
return buf, n
# STATUS_INFO_LENGTH_MISMATCH = 0xC0000004 (signed: -1073741820)
if status in (-1073741820, 0xC0000004):
size = max(size * 2, int(ret_len.value) + 0x10000)
if size > max_size:
return None, 0
continue
return None, 0
def _get_file_object_type_index(buf: ctypes.Array, handle_count: int, sample_path: str) -> int | None:
"""
通过本进程打开一个文件句柄再在系统句柄表中找到它对应的 ObjectTypeIndex
这样可以只扫描 File 类型句柄速度会快很多
"""
try:
h = kernel32.CreateFileW(
sample_path,
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
None,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
None,
)
if not h or h == wintypes.HANDLE(-1).value:
return None
except Exception:
return None
try:
cur_pid = int(kernel32.GetCurrentProcessId())
hv = int(ctypes.cast(h, ctypes.c_size_t).value)
n = int(handle_count)
if n <= 0:
return None
array_type = SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * n
info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents
handles = ctypes.cast(ctypes.addressof(info.Handles), ctypes.POINTER(array_type)).contents
for ent in handles:
pid = int(ctypes.cast(ent.UniqueProcessId, ctypes.c_size_t).value)
if pid != cur_pid:
continue
val = int(ctypes.cast(ent.HandleValue, ctypes.c_size_t).value)
if val == hv:
return int(ent.ObjectTypeIndex)
return None
finally:
try:
kernel32.CloseHandle(h)
except Exception:
pass
def find_file_locks(target_path: str, *, max_handles: int = 200000) -> list[FileLock]:
"""
返回当前系统中占用 target_path (pid, handle) 列表
- 基于 SystemExtendedHandleInformation + DuplicateHandle + GetFinalPathNameByHandleW
"""
_try_enable_debug_privilege()
target_norm = _normalize_path(target_path)
buf, n = _query_system_handles()
if buf is None or n <= 0:
return []
if n <= 0:
return []
if n > max_handles:
n = max_handles
# 重新按实际数量解释数组
array_type = SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * n
info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents
handles = ctypes.cast(ctypes.addressof(info.Handles), ctypes.POINTER(array_type)).contents
cur_proc = kernel32.GetCurrentProcess()
locks: list[FileLock] = []
file_type_index = _get_file_object_type_index(buf, int(info.NumberOfHandles), target_path)
# 进程句柄缓存:避免每个 handle 都 OpenProcess/CloseHandle
proc_cache: dict[int, wintypes.HANDLE] = {}
for h in handles:
pid = int(ctypes.cast(h.UniqueProcessId, ctypes.c_size_t).value)
hv = int(ctypes.cast(h.HandleValue, ctypes.c_size_t).value)
if pid <= 0 or hv <= 0:
continue
if file_type_index is not None and int(h.ObjectTypeIndex) != int(file_type_index):
continue
ph = proc_cache.get(pid)
if not ph:
ph = kernel32.OpenProcess(
PROCESS_DUP_HANDLE | PROCESS_QUERY_LIMITED_INFORMATION, False, pid
)
if not ph:
continue
proc_cache[pid] = ph
try:
dup = wintypes.HANDLE()
if not kernel32.DuplicateHandle(ph, wintypes.HANDLE(hv), cur_proc, ctypes.byref(dup), 0, False, DUPLICATE_SAME_ACCESS):
continue
try:
# 获取路径
bufw = ctypes.create_unicode_buffer(4096)
got = kernel32.GetFinalPathNameByHandleW(dup, bufw, len(bufw), 0)
if got == 0 or got >= len(bufw):
continue
final_norm = _normalize_final_path(bufw.value)
if final_norm == target_norm:
locks.append(FileLock(pid=pid, handle=hv, path=final_norm))
finally:
kernel32.CloseHandle(dup)
finally:
pass
for ph in proc_cache.values():
try:
kernel32.CloseHandle(ph)
except Exception:
pass
return locks
def close_remote_handle(pid: int, handle_value: int) -> bool:
"""
关闭目标进程中指定 handle不结束进程
需要足够权限通常管理员 + SeDebugPrivilege 才更稳
"""
_try_enable_debug_privilege()
ph = kernel32.OpenProcess(PROCESS_DUP_HANDLE, False, int(pid))
if not ph:
return False
try:
dup = wintypes.HANDLE()
ok = kernel32.DuplicateHandle(
ph,
wintypes.HANDLE(int(handle_value)),
kernel32.GetCurrentProcess(),
ctypes.byref(dup),
0,
False,
DUPLICATE_CLOSE_SOURCE,
)
if ok and dup:
kernel32.CloseHandle(dup)
return bool(ok)
finally:
kernel32.CloseHandle(ph)