207 lines
6.3 KiB
Python
207 lines
6.3 KiB
Python
"""
|
||
软件自动更新模块
|
||
- 检查版本:GET https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware
|
||
- 下载新版 exe,替换当前程序后重启
|
||
"""
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import tempfile
|
||
from PyQt6.QtCore import QThread, pyqtSignal, QObject
|
||
from PyQt6.QtWidgets import QProgressDialog, QMessageBox, QApplication
|
||
import urllib.request
|
||
|
||
|
||
UPDATE_CHECK_URL = "https://api.yunzer.cn/api/softwareupgrade/check?code=niumasortware"
|
||
|
||
|
||
def _current_version() -> str:
|
||
try:
|
||
import importlib
|
||
m = importlib.import_module("__main__")
|
||
return getattr(m, "__VERSION__", "0.0.0")
|
||
except Exception:
|
||
return "0.0.0"
|
||
|
||
|
||
def _is_newer(latest: str, current: str) -> bool:
|
||
try:
|
||
def _parse(v):
|
||
return tuple(int(x) for x in v.strip().lstrip("v").split("."))
|
||
return _parse(latest) > _parse(current)
|
||
except Exception:
|
||
return latest != current
|
||
|
||
|
||
class _CheckWorker(QThread):
|
||
result = pyqtSignal(dict)
|
||
error = pyqtSignal(str)
|
||
|
||
def run(self):
|
||
try:
|
||
import json
|
||
with urllib.request.urlopen(UPDATE_CHECK_URL, timeout=10) as resp:
|
||
body = json.loads(resp.read().decode())
|
||
if body.get("code") == 200:
|
||
self.result.emit(body["data"])
|
||
else:
|
||
self.error.emit(body.get("msg", "接口返回异常"))
|
||
except Exception as e:
|
||
self.error.emit(str(e))
|
||
|
||
|
||
class _DownloadWorker(QThread):
|
||
progress = pyqtSignal(int)
|
||
finished = pyqtSignal(str)
|
||
error = pyqtSignal(str)
|
||
|
||
def __init__(self, url: str, dest: str):
|
||
super().__init__()
|
||
self._url = url
|
||
self._dest = dest
|
||
|
||
def run(self):
|
||
try:
|
||
def _reporthook(count, block_size, total_size):
|
||
if total_size > 0:
|
||
pct = min(100, int(count * block_size * 100 / total_size))
|
||
self.progress.emit(pct)
|
||
|
||
urllib.request.urlretrieve(self._url, self._dest, _reporthook)
|
||
self.progress.emit(100)
|
||
self.finished.emit(self._dest)
|
||
except Exception as e:
|
||
self.error.emit(str(e))
|
||
|
||
|
||
def _replace_and_restart(new_exe: str):
|
||
"""
|
||
onedir 模式:只替换 exe 本身,dll 等文件不变。
|
||
用 PowerShell 后台脚本等待原进程退出后替换并重启,完全无窗口。
|
||
"""
|
||
if not getattr(sys, "frozen", False):
|
||
subprocess.Popen([new_exe])
|
||
QApplication.quit()
|
||
return
|
||
|
||
current_exe = sys.executable
|
||
pid = os.getpid()
|
||
|
||
ps_script = f"""
|
||
$pid_target = {pid}
|
||
$src = '{new_exe.replace(chr(92), chr(92)*2)}'
|
||
$dst = '{current_exe.replace(chr(92), chr(92)*2)}'
|
||
|
||
# 等待原进程退出,最多 30 秒
|
||
$waited = 0
|
||
while ((Get-Process -Id $pid_target -ErrorAction SilentlyContinue) -and $waited -lt 30) {{
|
||
Start-Sleep -Milliseconds 500
|
||
$waited += 0.5
|
||
}}
|
||
|
||
# 重试覆盖,最多 10 次
|
||
$ok = $false
|
||
for ($i = 0; $i -lt 10; $i++) {{
|
||
try {{
|
||
Copy-Item -Path $src -Destination $dst -Force
|
||
$ok = $true
|
||
break
|
||
}} catch {{
|
||
Start-Sleep -Milliseconds 800
|
||
}}
|
||
}}
|
||
|
||
if ($ok) {{
|
||
Remove-Item $src -Force -ErrorAction SilentlyContinue
|
||
Start-Process $dst
|
||
}}
|
||
"""
|
||
|
||
tmp = tempfile.NamedTemporaryFile(
|
||
delete=False, suffix=".ps1", mode="w", encoding="utf-8"
|
||
)
|
||
tmp.write(ps_script)
|
||
tmp.close()
|
||
|
||
subprocess.Popen(
|
||
[
|
||
"powershell", "-WindowStyle", "Hidden",
|
||
"-NonInteractive", "-ExecutionPolicy", "Bypass",
|
||
"-File", tmp.name,
|
||
],
|
||
creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NO_WINDOW,
|
||
)
|
||
QApplication.quit()
|
||
|
||
|
||
class Updater(QObject):
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self._parent_widget = parent
|
||
|
||
def check(self, silent_if_latest: bool = False):
|
||
self._silent = silent_if_latest
|
||
self._worker = _CheckWorker()
|
||
self._worker.result.connect(self._on_check_result)
|
||
self._worker.error.connect(self._on_check_error)
|
||
self._worker.start()
|
||
|
||
def _on_check_result(self, data: dict):
|
||
current = _current_version()
|
||
latest = data.get("latestVersion", "")
|
||
url = data.get("downloadUrl", "")
|
||
|
||
if not _is_newer(latest, current):
|
||
if not self._silent:
|
||
QMessageBox.information(self._parent_widget, "检查更新", f"当前已是最新版本 v{current}")
|
||
return
|
||
|
||
notes = data.get("releaseNotes", "") or ""
|
||
msg = f"发现新版本 v{latest}(当前 v{current})"
|
||
if notes:
|
||
msg += f"\n\n更新内容:\n{notes}"
|
||
msg += "\n\n是否立即下载更新?"
|
||
|
||
ret = QMessageBox.question(
|
||
self._parent_widget, "发现新版本", msg,
|
||
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
|
||
QMessageBox.StandardButton.Yes,
|
||
)
|
||
if ret != QMessageBox.StandardButton.Yes:
|
||
return
|
||
|
||
self._url = url
|
||
self._download(url)
|
||
|
||
def _on_check_error(self, err: str):
|
||
if not self._silent:
|
||
QMessageBox.warning(self._parent_widget, "检查更新失败", f"无法连接更新服务器:\n{err}")
|
||
|
||
def _download(self, url: str):
|
||
# 下载到原 exe 同目录,避免跨盘 move 失败
|
||
if getattr(sys, "frozen", False):
|
||
dest_dir = os.path.dirname(sys.executable)
|
||
else:
|
||
dest_dir = tempfile.gettempdir()
|
||
dest = os.path.join(dest_dir, "_update_new.exe")
|
||
|
||
self._progress = QProgressDialog("正在下载更新...", "取消", 0, 100, self._parent_widget)
|
||
self._progress.setWindowTitle("下载更新")
|
||
self._progress.setMinimumDuration(0)
|
||
self._progress.setValue(0)
|
||
|
||
self._dl_worker = _DownloadWorker(url, dest)
|
||
self._dl_worker.progress.connect(self._progress.setValue)
|
||
self._dl_worker.finished.connect(self._on_download_done)
|
||
self._dl_worker.error.connect(self._on_download_error)
|
||
self._progress.canceled.connect(self._dl_worker.terminate)
|
||
self._dl_worker.start()
|
||
|
||
def _on_download_done(self, path: str):
|
||
self._progress.close()
|
||
_replace_and_restart(path)
|
||
|
||
def _on_download_error(self, err: str):
|
||
self._progress.close()
|
||
QMessageBox.critical(self._parent_widget, "下载失败", f"下载更新失败:\n{err}")
|