niumasoftware/ui/win_handle_scan.py

342 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import ctypes
import os
from ctypes import wintypes
from dataclasses import dataclass
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
ntdll = ctypes.WinDLL("ntdll", use_last_error=True)
advapi32 = ctypes.WinDLL("advapi32", use_last_error=True)
PROCESS_DUP_HANDLE = 0x0040
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
DUPLICATE_SAME_ACCESS = 0x00000002
DUPLICATE_CLOSE_SOURCE = 0x00000001
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
FILE_SHARE_DELETE = 0x00000004
OPEN_EXISTING = 3
FILE_ATTRIBUTE_NORMAL = 0x00000080
SystemExtendedHandleInformation = 64
class SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX(ctypes.Structure):
_fields_ = [
("Object", ctypes.c_void_p),
("UniqueProcessId", ctypes.c_void_p),
("HandleValue", ctypes.c_void_p),
("GrantedAccess", wintypes.ULONG),
("CreatorBackTraceIndex", wintypes.USHORT),
("ObjectTypeIndex", wintypes.USHORT),
("HandleAttributes", wintypes.ULONG),
("Reserved", wintypes.ULONG),
]
class SYSTEM_HANDLE_INFORMATION_EX(ctypes.Structure):
_fields_ = [
("NumberOfHandles", ctypes.c_void_p),
("Reserved", ctypes.c_void_p),
("Handles", SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * 1),
]
NtQuerySystemInformation = ntdll.NtQuerySystemInformation
NtQuerySystemInformation.argtypes = [
wintypes.ULONG,
wintypes.LPVOID,
wintypes.ULONG,
wintypes.PULONG,
]
NtQuerySystemInformation.restype = wintypes.LONG
kernel32.OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.CloseHandle.argtypes = [wintypes.HANDLE]
kernel32.CloseHandle.restype = wintypes.BOOL
kernel32.DuplicateHandle.argtypes = [
wintypes.HANDLE,
wintypes.HANDLE,
wintypes.HANDLE,
ctypes.POINTER(wintypes.HANDLE),
wintypes.DWORD,
wintypes.BOOL,
wintypes.DWORD,
]
kernel32.DuplicateHandle.restype = wintypes.BOOL
kernel32.GetCurrentProcess.argtypes = []
kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetFinalPathNameByHandleW.argtypes = [
wintypes.HANDLE,
wintypes.LPWSTR,
wintypes.DWORD,
wintypes.DWORD,
]
kernel32.GetFinalPathNameByHandleW.restype = wintypes.DWORD
kernel32.GetCurrentProcessId.argtypes = []
kernel32.GetCurrentProcessId.restype = wintypes.DWORD
kernel32.CreateFileW.argtypes = [
wintypes.LPCWSTR,
wintypes.DWORD,
wintypes.DWORD,
wintypes.LPVOID,
wintypes.DWORD,
wintypes.DWORD,
wintypes.HANDLE,
]
kernel32.CreateFileW.restype = wintypes.HANDLE
def _normalize_path(p: str) -> str:
p = os.path.abspath(p)
p = os.path.normcase(p)
return p
def _normalize_final_path(p: str) -> str:
# GetFinalPathNameByHandleW 常见返回:\\?\C:\path\file 或 \\?\UNC\server\share\path
if p.startswith("\\\\?\\UNC\\"):
p = "\\\\" + p[len("\\\\?\\UNC\\") :]
elif p.startswith("\\\\?\\"):
p = p[len("\\\\?\\") :]
return _normalize_path(p)
def _try_enable_debug_privilege() -> None:
# best-effort没有也能工作只是会漏掉部分系统进程
TOKEN_ADJUST_PRIVILEGES = 0x0020
TOKEN_QUERY = 0x0008
SE_PRIVILEGE_ENABLED = 0x0002
class LUID(ctypes.Structure):
_fields_ = [("LowPart", wintypes.DWORD), ("HighPart", wintypes.LONG)]
class LUID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [("Luid", LUID), ("Attributes", wintypes.DWORD)]
class TOKEN_PRIVILEGES(ctypes.Structure):
_fields_ = [("PrivilegeCount", wintypes.DWORD), ("Privileges", LUID_AND_ATTRIBUTES * 1)]
OpenProcessToken = advapi32.OpenProcessToken
OpenProcessToken.argtypes = [wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)]
OpenProcessToken.restype = wintypes.BOOL
LookupPrivilegeValueW = advapi32.LookupPrivilegeValueW
LookupPrivilegeValueW.argtypes = [wintypes.LPCWSTR, wintypes.LPCWSTR, ctypes.POINTER(LUID)]
LookupPrivilegeValueW.restype = wintypes.BOOL
AdjustTokenPrivileges = advapi32.AdjustTokenPrivileges
AdjustTokenPrivileges.argtypes = [
wintypes.HANDLE,
wintypes.BOOL,
ctypes.POINTER(TOKEN_PRIVILEGES),
wintypes.DWORD,
wintypes.PVOID,
wintypes.PVOID,
]
AdjustTokenPrivileges.restype = wintypes.BOOL
token = wintypes.HANDLE()
if not OpenProcessToken(kernel32.GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, ctypes.byref(token)):
return
try:
luid = LUID()
if not LookupPrivilegeValueW(None, "SeDebugPrivilege", ctypes.byref(luid)):
return
tp = TOKEN_PRIVILEGES()
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
AdjustTokenPrivileges(token, False, ctypes.byref(tp), 0, None, None)
finally:
kernel32.CloseHandle(token)
@dataclass(frozen=True)
class FileLock:
pid: int
handle: int
path: str
def _query_system_handles(max_size: int = 1 << 28) -> tuple[ctypes.Array, int] | tuple[None, int]:
"""返回 (buffer, handle_count);失败返回 (None, 0)。"""
size = 1 << 20
while True:
buf = ctypes.create_string_buffer(size)
ret_len = wintypes.ULONG(0)
status = NtQuerySystemInformation(
SystemExtendedHandleInformation, buf, size, ctypes.byref(ret_len)
)
if status == 0: # STATUS_SUCCESS
info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents
n = int(info.NumberOfHandles)
return buf, n
# STATUS_INFO_LENGTH_MISMATCH = 0xC0000004 (signed: -1073741820)
if status in (-1073741820, 0xC0000004):
size = max(size * 2, int(ret_len.value) + 0x10000)
if size > max_size:
return None, 0
continue
return None, 0
def _get_file_object_type_index(buf: ctypes.Array, handle_count: int, sample_path: str) -> int | None:
"""
通过“本进程打开一个文件句柄”,再在系统句柄表中找到它对应的 ObjectTypeIndex。
这样可以只扫描 File 类型句柄,速度会快很多。
"""
try:
h = kernel32.CreateFileW(
sample_path,
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
None,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
None,
)
if not h or h == wintypes.HANDLE(-1).value:
return None
except Exception:
return None
try:
cur_pid = int(kernel32.GetCurrentProcessId())
hv = int(ctypes.cast(h, ctypes.c_size_t).value)
n = int(handle_count)
if n <= 0:
return None
array_type = SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * n
info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents
handles = ctypes.cast(ctypes.addressof(info.Handles), ctypes.POINTER(array_type)).contents
for ent in handles:
pid = int(ctypes.cast(ent.UniqueProcessId, ctypes.c_size_t).value)
if pid != cur_pid:
continue
val = int(ctypes.cast(ent.HandleValue, ctypes.c_size_t).value)
if val == hv:
return int(ent.ObjectTypeIndex)
return None
finally:
try:
kernel32.CloseHandle(h)
except Exception:
pass
def find_file_locks(target_path: str, *, max_handles: int = 200000) -> list[FileLock]:
"""
返回当前系统中占用 target_path 的 (pid, handle) 列表。
- 基于 SystemExtendedHandleInformation + DuplicateHandle + GetFinalPathNameByHandleW
"""
_try_enable_debug_privilege()
target_norm = _normalize_path(target_path)
buf, n = _query_system_handles()
if buf is None or n <= 0:
return []
if n <= 0:
return []
if n > max_handles:
n = max_handles
# 重新按实际数量解释数组
array_type = SYSTEM_HANDLE_TABLE_ENTRY_INFO_EX * n
info = ctypes.cast(buf, ctypes.POINTER(SYSTEM_HANDLE_INFORMATION_EX)).contents
handles = ctypes.cast(ctypes.addressof(info.Handles), ctypes.POINTER(array_type)).contents
cur_proc = kernel32.GetCurrentProcess()
locks: list[FileLock] = []
file_type_index = _get_file_object_type_index(buf, int(info.NumberOfHandles), target_path)
# 进程句柄缓存:避免每个 handle 都 OpenProcess/CloseHandle
proc_cache: dict[int, wintypes.HANDLE] = {}
for h in handles:
pid = int(ctypes.cast(h.UniqueProcessId, ctypes.c_size_t).value)
hv = int(ctypes.cast(h.HandleValue, ctypes.c_size_t).value)
if pid <= 0 or hv <= 0:
continue
if file_type_index is not None and int(h.ObjectTypeIndex) != int(file_type_index):
continue
ph = proc_cache.get(pid)
if not ph:
ph = kernel32.OpenProcess(
PROCESS_DUP_HANDLE | PROCESS_QUERY_LIMITED_INFORMATION, False, pid
)
if not ph:
continue
proc_cache[pid] = ph
try:
dup = wintypes.HANDLE()
if not kernel32.DuplicateHandle(ph, wintypes.HANDLE(hv), cur_proc, ctypes.byref(dup), 0, False, DUPLICATE_SAME_ACCESS):
continue
try:
# 获取路径
bufw = ctypes.create_unicode_buffer(4096)
got = kernel32.GetFinalPathNameByHandleW(dup, bufw, len(bufw), 0)
if got == 0 or got >= len(bufw):
continue
final_norm = _normalize_final_path(bufw.value)
if final_norm == target_norm:
locks.append(FileLock(pid=pid, handle=hv, path=final_norm))
finally:
kernel32.CloseHandle(dup)
finally:
pass
for ph in proc_cache.values():
try:
kernel32.CloseHandle(ph)
except Exception:
pass
return locks
def close_remote_handle(pid: int, handle_value: int) -> bool:
"""
关闭目标进程中指定 handle不结束进程
需要足够权限(通常管理员 + SeDebugPrivilege 才更稳)。
"""
_try_enable_debug_privilege()
ph = kernel32.OpenProcess(PROCESS_DUP_HANDLE, False, int(pid))
if not ph:
return False
try:
dup = wintypes.HANDLE()
ok = kernel32.DuplicateHandle(
ph,
wintypes.HANDLE(int(handle_value)),
kernel32.GetCurrentProcess(),
ctypes.byref(dup),
0,
False,
DUPLICATE_CLOSE_SOURCE,
)
if ok and dup:
kernel32.CloseHandle(dup)
return bool(ok)
finally:
kernel32.CloseHandle(ph)