""" 软件自动更新模块 - 检查版本:GET https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware - 下载新版 exe,替换当前程序后重启 """ import os import sys import subprocess import tempfile import json import ctypes import base64 from PyQt6.QtCore import QThread, pyqtSignal, QObject from PyQt6.QtWidgets import QProgressDialog, QMessageBox, QApplication import urllib.request UPDATE_CHECK_URL = "https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware" APP_NAME = "CleanDesktopOrganizer" # schtasks 的任务名在不同环境下可能需要/不需要前导 "\",这里两种都尝试 UPDATE_TASK_NAMES = [r"\CleanDesktopOrganizer\Update", r"CleanDesktopOrganizer\Update"] def _current_version() -> str: try: import importlib m = importlib.import_module("__main__") return getattr(m, "__VERSION__", "0.0.0") except Exception: return "0.0.0" def _is_newer(latest: str, current: str) -> bool: try: def _parse(v): return tuple(int(x) for x in v.strip().lstrip("v").split(".")) return _parse(latest) > _parse(current) except Exception: return latest != current class _CheckWorker(QThread): result = pyqtSignal(dict) error = pyqtSignal(str) def run(self): try: import json with urllib.request.urlopen(UPDATE_CHECK_URL, timeout=10) as resp: body = json.loads(resp.read().decode()) if body.get("code") == 200: self.result.emit(body["data"]) else: self.error.emit(body.get("msg", "接口返回异常")) except Exception as e: self.error.emit(str(e)) class _DownloadWorker(QThread): progress = pyqtSignal(int) finished = pyqtSignal(str) error = pyqtSignal(str) def __init__(self, url: str, dest: str): super().__init__() self._url = url self._dest = dest def run(self): try: def _reporthook(count, block_size, total_size): if total_size > 0: pct = min(100, int(count * block_size * 100 / total_size)) self.progress.emit(pct) urllib.request.urlretrieve(self._url, self._dest, _reporthook) self.progress.emit(100) self.finished.emit(self._dest) except Exception as e: self.error.emit(str(e)) def _programdata_dir() -> str: base = os.environ.get("PROGRAMDATA") or r"C:\ProgramData" return os.path.join(base, APP_NAME) def _ensure_dir(p: str): try: os.makedirs(p, exist_ok=True) except Exception: pass def _update_work_dir() -> str: """ 全局安装场景下,Program Files 不可写;更新文件与请求文件统一放 ProgramData。 该目录应由安装器创建并赋予 Users Modify 权限。 """ d = os.path.join(_programdata_dir(), "updates") _ensure_dir(d) if os.path.isdir(d): return d return tempfile.gettempdir() def _request_file_path() -> str: d = _programdata_dir() _ensure_dir(d) return os.path.join(d, "update_request.json") def _task_exists(name: str) -> bool: try: r = subprocess.run( ["schtasks", "/Query", "/TN", name], capture_output=True, text=True, encoding="utf-8", errors="replace", creationflags=subprocess.CREATE_NO_WINDOW, ) return r.returncode == 0 except Exception: return False def _run_update_task() -> bool: for name in UPDATE_TASK_NAMES: if not _task_exists(name): continue try: r = subprocess.run( ["schtasks", "/Run", "/TN", name], capture_output=True, text=True, encoding="utf-8", errors="replace", creationflags=subprocess.CREATE_NO_WINDOW, ) if r.returncode == 0: return True except Exception: continue return False def _run_update_helper_elevated() -> bool: """ 当计划任务不存在时的兜底:以管理员权限运行 update_helper.exe。 这会触发 UAC(无法完全静默),但能保证更新能完成。 """ if sys.platform != "win32": return False try: helper = os.path.join(os.path.dirname(sys.executable), "update_helper.exe") if not os.path.isfile(helper): return False # ShellExecuteW 返回值 > 32 表示成功启动 r = ctypes.windll.shell32.ShellExecuteW(None, "runas", helper, None, None, 0) return int(r) > 32 except Exception: return False def _run_elevated_copy_and_restart(src: str, dst: str, pid: int) -> bool: """ 不依赖 update_helper.exe 的兜底方案: 直接用管理员权限启动 PowerShell,等待原进程退出后覆盖 dst 并重启。 """ if sys.platform != "win32": return False try: ps = rf""" $pid_target = {int(pid)} $src = '{str(src).replace("'", "''")}' $dst = '{str(dst).replace("'", "''")}' $waited = 0 while ((Get-Process -Id $pid_target -ErrorAction SilentlyContinue) -and $waited -lt 60) {{ Start-Sleep -Milliseconds 500 $waited += 0.5 }} $ok = $false for ($i = 0; $i -lt 20; $i++) {{ try {{ Copy-Item -Path $src -Destination $dst -Force $ok = $true break }} catch {{ Start-Sleep -Milliseconds 800 }} }} if ($ok) {{ Remove-Item $src -Force -ErrorAction SilentlyContinue Start-Process $dst }} """ # PowerShell -EncodedCommand 需要 UTF-16LE + base64 enc = base64.b64encode(ps.encode("utf-16le")).decode("ascii") r = ctypes.windll.shell32.ShellExecuteW( None, "runas", "powershell", f"-NoProfile -NonInteractive -ExecutionPolicy Bypass -EncodedCommand {enc}", None, 0, ) return int(r) > 32 except Exception: return False def _replace_and_restart(new_exe: str): """ onedir 模式:只替换 exe 本身,dll 等文件不变。 用 PowerShell 后台脚本等待原进程退出后替换并重启,完全无窗口。 """ if not getattr(sys, "frozen", False): subprocess.Popen([new_exe]) QApplication.quit() return current_exe = sys.executable pid = os.getpid() ps_script = f""" $pid_target = {pid} $src = '{new_exe.replace(chr(92), chr(92)*2)}' $dst = '{current_exe.replace(chr(92), chr(92)*2)}' # 等待原进程退出,最多 30 秒 $waited = 0 while ((Get-Process -Id $pid_target -ErrorAction SilentlyContinue) -and $waited -lt 30) {{ Start-Sleep -Milliseconds 500 $waited += 0.5 }} # 重试覆盖,最多 10 次 $ok = $false for ($i = 0; $i -lt 10; $i++) {{ try {{ Copy-Item -Path $src -Destination $dst -Force $ok = $true break }} catch {{ Start-Sleep -Milliseconds 800 }} }} if ($ok) {{ Remove-Item $src -Force -ErrorAction SilentlyContinue Start-Process $dst }} """ tmp = tempfile.NamedTemporaryFile( delete=False, suffix=".ps1", mode="w", encoding="utf-8" ) tmp.write(ps_script) tmp.close() subprocess.Popen( [ "powershell", "-WindowStyle", "Hidden", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-File", tmp.name, ], creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NO_WINDOW, ) QApplication.quit() class Updater(QObject): def __init__(self, parent=None): super().__init__(parent) self._parent_widget = parent def check(self, silent_if_latest: bool = False): self._silent = silent_if_latest self._worker = _CheckWorker() self._worker.result.connect(self._on_check_result) self._worker.error.connect(self._on_check_error) self._worker.start() def _on_check_result(self, data: dict): current = _current_version() latest = data.get("latestVersion", "") url = data.get("downloadUrl", "") if not _is_newer(latest, current): if not self._silent: QMessageBox.information(self._parent_widget, "检查更新", f"当前已是最新版本 v{current}") return notes = data.get("releaseNotes", "") or "" msg = f"发现新版本 v{latest}(当前 v{current})" if notes: msg += f"\n\n更新内容:\n{notes}" msg += "\n\n是否立即下载更新?" ret = QMessageBox.question( self._parent_widget, "发现新版本", msg, QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.Yes, ) if ret != QMessageBox.StandardButton.Yes: return self._url = url self._download(url) def _on_check_error(self, err: str): if not self._silent: QMessageBox.warning(self._parent_widget, "检查更新失败", f"无法连接更新服务器:\n{err}") def _download(self, url: str): # 全局安装:不要下载到安装目录(通常在 Program Files,不可写) dest_dir = _update_work_dir() if getattr(sys, "frozen", False) else tempfile.gettempdir() dest = os.path.join(dest_dir, "_update_new.exe") self._progress = QProgressDialog("正在下载更新...", "取消", 0, 100, self._parent_widget) self._progress.setWindowTitle("下载更新") self._progress.setMinimumDuration(0) self._progress.setValue(0) self._dl_worker = _DownloadWorker(url, dest) self._dl_worker.progress.connect(self._progress.setValue) self._dl_worker.finished.connect(self._on_download_done) self._dl_worker.error.connect(self._on_download_error) self._progress.canceled.connect(self._dl_worker.terminate) self._dl_worker.start() def _on_download_done(self, path: str): self._progress.close() # frozen 且全局安装:交给计划任务(最高权限)覆盖安装目录 exe if getattr(sys, "frozen", False): try: req = { "src": path, "dst": sys.executable, "pid": os.getpid(), "restart": True, } with open(_request_file_path(), "w", encoding="utf-8") as f: json.dump(req, f, ensure_ascii=False) except Exception as e: QMessageBox.critical(self._parent_widget, "更新失败", f"无法准备更新任务:\n{e}") return if _run_update_task(): QApplication.quit() return # 兜底策略 1:计划任务不存在时,提示并可通过 UAC 直接运行 update_helper.exe 完成一次更新 if _run_update_helper_elevated(): QApplication.quit() return # 兜底策略 1.5:如果安装目录里没有 update_helper.exe,则直接用管理员 PowerShell 完成覆盖 if _run_elevated_copy_and_restart(path, sys.executable, os.getpid()): QApplication.quit() return # 兜底策略 2:如果安装目录可写(非常少见),尝试旧的自替换方式 try: can_write = os.access(sys.executable, os.W_OK) except Exception: can_write = False if can_write: _replace_and_restart(path) return QMessageBox.critical( self._parent_widget, "更新失败", "已下载更新,但未找到/无法运行更新计划任务(需要管理员权限)。\n\n" "请尝试:\n" "1) 以管理员身份重新安装一次安装包(用于创建更新计划任务)\n" "2) 或在“任务计划程序”检查是否存在任务:CleanDesktopOrganizer\\Update\n" "3) 也可尝试用管理员权限运行 update_helper.exe 完成一次更新", ) return _replace_and_restart(path) def _on_download_error(self, err: str): self._progress.close() QMessageBox.critical(self._parent_widget, "下载失败", f"下载更新失败:\n{err}")