niumasoftware/ui/updater.py

392 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
软件自动更新模块
- 检查版本GET https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware
- 下载新版 exe替换当前程序后重启
"""
import os
import sys
import subprocess
import tempfile
import json
import ctypes
import base64
from PyQt6.QtCore import QThread, pyqtSignal, QObject
from PyQt6.QtWidgets import QProgressDialog, QMessageBox, QApplication
import urllib.request
UPDATE_CHECK_URL = "https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware"
APP_NAME = "CleanDesktopOrganizer"
# schtasks 的任务名在不同环境下可能需要/不需要前导 "\",这里两种都尝试
UPDATE_TASK_NAMES = [r"\CleanDesktopOrganizer\Update", r"CleanDesktopOrganizer\Update"]
def _current_version() -> str:
try:
import importlib
m = importlib.import_module("__main__")
return getattr(m, "__VERSION__", "0.0.0")
except Exception:
return "0.0.0"
def _is_newer(latest: str, current: str) -> bool:
try:
def _parse(v):
return tuple(int(x) for x in v.strip().lstrip("v").split("."))
return _parse(latest) > _parse(current)
except Exception:
return latest != current
class _CheckWorker(QThread):
result = pyqtSignal(dict)
error = pyqtSignal(str)
def run(self):
try:
import json
with urllib.request.urlopen(UPDATE_CHECK_URL, timeout=10) as resp:
body = json.loads(resp.read().decode())
if body.get("code") == 200:
self.result.emit(body["data"])
else:
self.error.emit(body.get("msg", "接口返回异常"))
except Exception as e:
self.error.emit(str(e))
class _DownloadWorker(QThread):
progress = pyqtSignal(int)
finished = pyqtSignal(str)
error = pyqtSignal(str)
def __init__(self, url: str, dest: str):
super().__init__()
self._url = url
self._dest = dest
def run(self):
try:
def _reporthook(count, block_size, total_size):
if total_size > 0:
pct = min(100, int(count * block_size * 100 / total_size))
self.progress.emit(pct)
urllib.request.urlretrieve(self._url, self._dest, _reporthook)
self.progress.emit(100)
self.finished.emit(self._dest)
except Exception as e:
self.error.emit(str(e))
def _programdata_dir() -> str:
base = os.environ.get("PROGRAMDATA") or r"C:\ProgramData"
return os.path.join(base, APP_NAME)
def _ensure_dir(p: str):
try:
os.makedirs(p, exist_ok=True)
except Exception:
pass
def _update_work_dir() -> str:
"""
全局安装场景下Program Files 不可写;更新文件与请求文件统一放 ProgramData。
该目录应由安装器创建并赋予 Users Modify 权限。
"""
d = os.path.join(_programdata_dir(), "updates")
_ensure_dir(d)
if os.path.isdir(d):
return d
return tempfile.gettempdir()
def _request_file_path() -> str:
d = _programdata_dir()
_ensure_dir(d)
return os.path.join(d, "update_request.json")
def _task_exists(name: str) -> bool:
try:
r = subprocess.run(
["schtasks", "/Query", "/TN", name],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
creationflags=subprocess.CREATE_NO_WINDOW,
)
return r.returncode == 0
except Exception:
return False
def _run_update_task() -> bool:
for name in UPDATE_TASK_NAMES:
if not _task_exists(name):
continue
try:
r = subprocess.run(
["schtasks", "/Run", "/TN", name],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
creationflags=subprocess.CREATE_NO_WINDOW,
)
if r.returncode == 0:
return True
except Exception:
continue
return False
def _run_update_helper_elevated() -> bool:
"""
当计划任务不存在时的兜底:以管理员权限运行 update_helper.exe。
这会触发 UAC无法完全静默但能保证更新能完成。
"""
if sys.platform != "win32":
return False
try:
helper = os.path.join(os.path.dirname(sys.executable), "update_helper.exe")
if not os.path.isfile(helper):
return False
# ShellExecuteW 返回值 > 32 表示成功启动
r = ctypes.windll.shell32.ShellExecuteW(None, "runas", helper, None, None, 0)
return int(r) > 32
except Exception:
return False
def _run_elevated_copy_and_restart(src: str, dst: str, pid: int) -> bool:
"""
不依赖 update_helper.exe 的兜底方案:
直接用管理员权限启动 PowerShell等待原进程退出后覆盖 dst 并重启。
"""
if sys.platform != "win32":
return False
try:
ps = rf"""
$pid_target = {int(pid)}
$src = '{str(src).replace("'", "''")}'
$dst = '{str(dst).replace("'", "''")}'
$waited = 0
while ((Get-Process -Id $pid_target -ErrorAction SilentlyContinue) -and $waited -lt 60) {{
Start-Sleep -Milliseconds 500
$waited += 0.5
}}
$ok = $false
for ($i = 0; $i -lt 20; $i++) {{
try {{
Copy-Item -Path $src -Destination $dst -Force
$ok = $true
break
}} catch {{
Start-Sleep -Milliseconds 800
}}
}}
if ($ok) {{
Remove-Item $src -Force -ErrorAction SilentlyContinue
Start-Process $dst
}}
"""
# PowerShell -EncodedCommand 需要 UTF-16LE + base64
enc = base64.b64encode(ps.encode("utf-16le")).decode("ascii")
r = ctypes.windll.shell32.ShellExecuteW(
None,
"runas",
"powershell",
f"-NoProfile -NonInteractive -ExecutionPolicy Bypass -EncodedCommand {enc}",
None,
0,
)
return int(r) > 32
except Exception:
return False
def _replace_and_restart(new_exe: str):
"""
onedir 模式:只替换 exe 本身dll 等文件不变。
用 PowerShell 后台脚本等待原进程退出后替换并重启,完全无窗口。
"""
if not getattr(sys, "frozen", False):
subprocess.Popen([new_exe])
QApplication.quit()
return
current_exe = sys.executable
pid = os.getpid()
ps_script = f"""
$pid_target = {pid}
$src = '{new_exe.replace(chr(92), chr(92)*2)}'
$dst = '{current_exe.replace(chr(92), chr(92)*2)}'
# 等待原进程退出,最多 30 秒
$waited = 0
while ((Get-Process -Id $pid_target -ErrorAction SilentlyContinue) -and $waited -lt 30) {{
Start-Sleep -Milliseconds 500
$waited += 0.5
}}
# 重试覆盖,最多 10 次
$ok = $false
for ($i = 0; $i -lt 10; $i++) {{
try {{
Copy-Item -Path $src -Destination $dst -Force
$ok = $true
break
}} catch {{
Start-Sleep -Milliseconds 800
}}
}}
if ($ok) {{
Remove-Item $src -Force -ErrorAction SilentlyContinue
Start-Process $dst
}}
"""
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=".ps1", mode="w", encoding="utf-8"
)
tmp.write(ps_script)
tmp.close()
subprocess.Popen(
[
"powershell", "-WindowStyle", "Hidden",
"-NonInteractive", "-ExecutionPolicy", "Bypass",
"-File", tmp.name,
],
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NO_WINDOW,
)
QApplication.quit()
class Updater(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self._parent_widget = parent
def check(self, silent_if_latest: bool = False):
self._silent = silent_if_latest
self._worker = _CheckWorker()
self._worker.result.connect(self._on_check_result)
self._worker.error.connect(self._on_check_error)
self._worker.start()
def _on_check_result(self, data: dict):
current = _current_version()
latest = data.get("latestVersion", "")
url = data.get("downloadUrl", "")
if not _is_newer(latest, current):
if not self._silent:
QMessageBox.information(self._parent_widget, "检查更新", f"当前已是最新版本 v{current}")
return
notes = data.get("releaseNotes", "") or ""
msg = f"发现新版本 v{latest}(当前 v{current}"
if notes:
msg += f"\n\n更新内容:\n{notes}"
msg += "\n\n是否立即下载更新?"
ret = QMessageBox.question(
self._parent_widget, "发现新版本", msg,
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.Yes,
)
if ret != QMessageBox.StandardButton.Yes:
return
self._url = url
self._download(url)
def _on_check_error(self, err: str):
if not self._silent:
QMessageBox.warning(self._parent_widget, "检查更新失败", f"无法连接更新服务器:\n{err}")
def _download(self, url: str):
# 全局安装:不要下载到安装目录(通常在 Program Files不可写
dest_dir = _update_work_dir() if getattr(sys, "frozen", False) else tempfile.gettempdir()
dest = os.path.join(dest_dir, "_update_new.exe")
self._progress = QProgressDialog("正在下载更新...", "取消", 0, 100, self._parent_widget)
self._progress.setWindowTitle("下载更新")
self._progress.setMinimumDuration(0)
self._progress.setValue(0)
self._dl_worker = _DownloadWorker(url, dest)
self._dl_worker.progress.connect(self._progress.setValue)
self._dl_worker.finished.connect(self._on_download_done)
self._dl_worker.error.connect(self._on_download_error)
self._progress.canceled.connect(self._dl_worker.terminate)
self._dl_worker.start()
def _on_download_done(self, path: str):
self._progress.close()
# frozen 且全局安装:交给计划任务(最高权限)覆盖安装目录 exe
if getattr(sys, "frozen", False):
try:
req = {
"src": path,
"dst": sys.executable,
"pid": os.getpid(),
"restart": True,
}
with open(_request_file_path(), "w", encoding="utf-8") as f:
json.dump(req, f, ensure_ascii=False)
except Exception as e:
QMessageBox.critical(self._parent_widget, "更新失败", f"无法准备更新任务:\n{e}")
return
if _run_update_task():
QApplication.quit()
return
# 兜底策略 1计划任务不存在时提示并可通过 UAC 直接运行 update_helper.exe 完成一次更新
if _run_update_helper_elevated():
QApplication.quit()
return
# 兜底策略 1.5:如果安装目录里没有 update_helper.exe则直接用管理员 PowerShell 完成覆盖
if _run_elevated_copy_and_restart(path, sys.executable, os.getpid()):
QApplication.quit()
return
# 兜底策略 2如果安装目录可写非常少见尝试旧的自替换方式
try:
can_write = os.access(sys.executable, os.W_OK)
except Exception:
can_write = False
if can_write:
_replace_and_restart(path)
return
QMessageBox.critical(
self._parent_widget,
"更新失败",
"已下载更新,但未找到/无法运行更新计划任务(需要管理员权限)。\n\n"
"请尝试:\n"
"1) 以管理员身份重新安装一次安装包(用于创建更新计划任务)\n"
"2) 或在“任务计划程序”检查是否存在任务CleanDesktopOrganizer\\Update\n"
"3) 也可尝试用管理员权限运行 update_helper.exe 完成一次更新",
)
return
_replace_and_restart(path)
def _on_download_error(self, err: str):
self._progress.close()
QMessageBox.critical(self._parent_widget, "下载失败", f"下载更新失败:\n{err}")