niumasoftware/ui/updater.py
2026-04-07 23:21:58 +08:00

275 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
软件自动更新模块
- 检查版本GET https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware
- 下载新版 exe替换当前程序后重启
"""
import os
import sys
import subprocess
import tempfile
import json
from PyQt6.QtCore import QThread, pyqtSignal, QObject
from PyQt6.QtWidgets import QProgressDialog, QMessageBox, QApplication
import urllib.request
UPDATE_CHECK_URL = "https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware"
APP_NAME = "CleanDesktopOrganizer"
UPDATE_TASK_NAME = r"CleanDesktopOrganizer\Update"
def _current_version() -> str:
try:
import importlib
m = importlib.import_module("__main__")
return getattr(m, "__VERSION__", "0.0.0")
except Exception:
return "0.0.0"
def _is_newer(latest: str, current: str) -> bool:
try:
def _parse(v):
return tuple(int(x) for x in v.strip().lstrip("v").split("."))
return _parse(latest) > _parse(current)
except Exception:
return latest != current
class _CheckWorker(QThread):
result = pyqtSignal(dict)
error = pyqtSignal(str)
def run(self):
try:
import json
with urllib.request.urlopen(UPDATE_CHECK_URL, timeout=10) as resp:
body = json.loads(resp.read().decode())
if body.get("code") == 200:
self.result.emit(body["data"])
else:
self.error.emit(body.get("msg", "接口返回异常"))
except Exception as e:
self.error.emit(str(e))
class _DownloadWorker(QThread):
progress = pyqtSignal(int)
finished = pyqtSignal(str)
error = pyqtSignal(str)
def __init__(self, url: str, dest: str):
super().__init__()
self._url = url
self._dest = dest
def run(self):
try:
def _reporthook(count, block_size, total_size):
if total_size > 0:
pct = min(100, int(count * block_size * 100 / total_size))
self.progress.emit(pct)
urllib.request.urlretrieve(self._url, self._dest, _reporthook)
self.progress.emit(100)
self.finished.emit(self._dest)
except Exception as e:
self.error.emit(str(e))
def _programdata_dir() -> str:
base = os.environ.get("PROGRAMDATA") or r"C:\ProgramData"
return os.path.join(base, APP_NAME)
def _ensure_dir(p: str):
try:
os.makedirs(p, exist_ok=True)
except Exception:
pass
def _update_work_dir() -> str:
"""
全局安装场景下Program Files 不可写;更新文件与请求文件统一放 ProgramData。
该目录应由安装器创建并赋予 Users Modify 权限。
"""
d = os.path.join(_programdata_dir(), "updates")
_ensure_dir(d)
if os.path.isdir(d):
return d
return tempfile.gettempdir()
def _request_file_path() -> str:
d = _programdata_dir()
_ensure_dir(d)
return os.path.join(d, "update_request.json")
def _run_update_task() -> bool:
try:
r = subprocess.run(
["schtasks", "/Run", "/TN", UPDATE_TASK_NAME],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
creationflags=subprocess.CREATE_NO_WINDOW,
)
return r.returncode == 0
except Exception:
return False
def _replace_and_restart(new_exe: str):
"""
onedir 模式:只替换 exe 本身dll 等文件不变。
用 PowerShell 后台脚本等待原进程退出后替换并重启,完全无窗口。
"""
if not getattr(sys, "frozen", False):
subprocess.Popen([new_exe])
QApplication.quit()
return
current_exe = sys.executable
pid = os.getpid()
ps_script = f"""
$pid_target = {pid}
$src = '{new_exe.replace(chr(92), chr(92)*2)}'
$dst = '{current_exe.replace(chr(92), chr(92)*2)}'
# 等待原进程退出,最多 30 秒
$waited = 0
while ((Get-Process -Id $pid_target -ErrorAction SilentlyContinue) -and $waited -lt 30) {{
Start-Sleep -Milliseconds 500
$waited += 0.5
}}
# 重试覆盖,最多 10 次
$ok = $false
for ($i = 0; $i -lt 10; $i++) {{
try {{
Copy-Item -Path $src -Destination $dst -Force
$ok = $true
break
}} catch {{
Start-Sleep -Milliseconds 800
}}
}}
if ($ok) {{
Remove-Item $src -Force -ErrorAction SilentlyContinue
Start-Process $dst
}}
"""
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=".ps1", mode="w", encoding="utf-8"
)
tmp.write(ps_script)
tmp.close()
subprocess.Popen(
[
"powershell", "-WindowStyle", "Hidden",
"-NonInteractive", "-ExecutionPolicy", "Bypass",
"-File", tmp.name,
],
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NO_WINDOW,
)
QApplication.quit()
class Updater(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self._parent_widget = parent
def check(self, silent_if_latest: bool = False):
self._silent = silent_if_latest
self._worker = _CheckWorker()
self._worker.result.connect(self._on_check_result)
self._worker.error.connect(self._on_check_error)
self._worker.start()
def _on_check_result(self, data: dict):
current = _current_version()
latest = data.get("latestVersion", "")
url = data.get("downloadUrl", "")
if not _is_newer(latest, current):
if not self._silent:
QMessageBox.information(self._parent_widget, "检查更新", f"当前已是最新版本 v{current}")
return
notes = data.get("releaseNotes", "") or ""
msg = f"发现新版本 v{latest}(当前 v{current}"
if notes:
msg += f"\n\n更新内容:\n{notes}"
msg += "\n\n是否立即下载更新?"
ret = QMessageBox.question(
self._parent_widget, "发现新版本", msg,
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.Yes,
)
if ret != QMessageBox.StandardButton.Yes:
return
self._url = url
self._download(url)
def _on_check_error(self, err: str):
if not self._silent:
QMessageBox.warning(self._parent_widget, "检查更新失败", f"无法连接更新服务器:\n{err}")
def _download(self, url: str):
# 全局安装:不要下载到安装目录(通常在 Program Files不可写
dest_dir = _update_work_dir() if getattr(sys, "frozen", False) else tempfile.gettempdir()
dest = os.path.join(dest_dir, "_update_new.exe")
self._progress = QProgressDialog("正在下载更新...", "取消", 0, 100, self._parent_widget)
self._progress.setWindowTitle("下载更新")
self._progress.setMinimumDuration(0)
self._progress.setValue(0)
self._dl_worker = _DownloadWorker(url, dest)
self._dl_worker.progress.connect(self._progress.setValue)
self._dl_worker.finished.connect(self._on_download_done)
self._dl_worker.error.connect(self._on_download_error)
self._progress.canceled.connect(self._dl_worker.terminate)
self._dl_worker.start()
def _on_download_done(self, path: str):
self._progress.close()
# frozen 且全局安装:交给计划任务(最高权限)覆盖安装目录 exe
if getattr(sys, "frozen", False):
try:
req = {
"src": path,
"dst": sys.executable,
"pid": os.getpid(),
"restart": True,
}
with open(_request_file_path(), "w", encoding="utf-8") as f:
json.dump(req, f, ensure_ascii=False)
except Exception as e:
QMessageBox.critical(self._parent_widget, "更新失败", f"无法准备更新任务:\n{e}")
return
if _run_update_task():
QApplication.quit()
return
# 兜底:如果计划任务不存在/失败,尝试旧的自替换方式(仅当安装目录可写时有效)
_replace_and_restart(path)
return
_replace_and_restart(path)
def _on_download_error(self, err: str):
self._progress.close()
QMessageBox.critical(self._parent_widget, "下载失败", f"下载更新失败:\n{err}")