import sys import json import base64 import hashlib import shutil import uuid import random import string import re import os import webbrowser import tempfile import ctypes import socket import psutil import subprocess import sqlite3 import requests from contextlib import contextmanager from datetime import datetime from pathlib import Path from typing import Optional __VERSION__ = "0.0.7" from PySide6.QtWidgets import ( QApplication, QMainWindow, QMessageBox, QTextEdit, QInputDialog, QPushButton, QVBoxLayout, QHBoxLayout, QWidget, QLineEdit, QLabel, QGroupBox, QDialog, QScrollArea, QSplashScreen, QStyle, ) from PySide6.QtCore import QThread, Signal, Qt, QTimer, QMetaObject, Q_ARG, Slot from PySide6.QtUiTools import QUiLoader from PySide6.QtGui import QFont, QPixmap, QColor, QPainter, QPalette, QIcon, QAction def get_resource_path(relative_path): """获取资源文件的绝对路径,支持打包后的应用(onefile 内嵌或 exe 旁便携文件)。""" rel = Path(relative_path) if getattr(sys, "frozen", False): candidates = [] if hasattr(sys, "_MEIPASS"): candidates.append(Path(sys._MEIPASS) / rel) # onedir / 便携:layout 与 exe 同目录 candidates.append(Path(sys.executable).resolve().parent / rel) for p in candidates: try: if p.is_file(): return str(p.resolve()) except OSError: continue # 用于报错信息:优先 MEIPASS primary = ( Path(sys._MEIPASS) / rel if hasattr(sys, "_MEIPASS") else Path(sys.executable).resolve().parent / rel ) return str(primary.resolve()) return str((Path(__file__).parent / rel).resolve()) def get_app_icon() -> QIcon: """窗口标题栏 / 任务栏图标(使用根目录 logo.ico;打包后需将 logo.ico 一并打入资源)。""" p = Path(get_resource_path("logo.ico")) if p.is_file(): return QIcon(str(p)) return QIcon() def is_windows_pe_executable(path: Path) -> bool: """粗略校验是否为 Windows PE(避免把 HTML/JSON 当成 exe 替换)。""" try: if path.stat().st_size < 64 * 1024: return False with open(path, "rb") as f: return f.read(2) == b"MZ" except OSError: return False def get_default_cursor_path(): if sys.platform == "win32": paths = [ Path(os.environ.get("LOCALAPPDATA", "")) / "Programs" / "cursor" / "Cursor.exe", Path(os.environ.get("PROGRAMFILES", "")) / "Cursor" / "Cursor.exe", Path(os.environ.get("PROGRAMFILES(X86)", "")) / "Cursor" / "Cursor.exe", ] for path in paths: if path.exists(): return str(path) elif sys.platform == "darwin": return "/Applications/Cursor.app" else: return "/usr/bin/cursor" return "" def is_cursor_running(): """检测Cursor是否正在运行""" cursor_exe_names = ['cursor.exe', 'cursor'] for proc in psutil.process_iter(["name"]): try: name = proc.info["name"] if name and name.lower() in cursor_exe_names: return True except (psutil.NoSuchProcess, psutil.AccessDenied): continue return False def kill_cursor(): """强制关闭Cursor进程""" killed = False cursor_exe_names = ['cursor.exe', 'cursor'] for proc in psutil.process_iter(["name", "pid"]): try: name = proc.info["name"] if name and name.lower() in cursor_exe_names: proc.kill() killed = True except (psutil.NoSuchProcess, psutil.AccessDenied): continue return killed def generate_random_email(): length = random.randint(6, 8) username = "".join(random.choices(string.ascii_lowercase + string.digits, k=length)) return f"{username}@cursor.com" def generate_machine_id(): return str(uuid.uuid4()) def _run_command_text(args): """执行系统命令并返回文本输出,失败时返回空字符串。""" try: creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0) if sys.platform == "win32" else 0 return subprocess.check_output( args, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, text=True, encoding="utf-8", errors="ignore", creationflags=creationflags, ).strip() except Exception: return "" def _get_wmic_value(alias: str, field: str) -> str: """通过 WMIC 获取硬件字段值,兼容空值/表头输出。""" output = _run_command_text(["wmic", alias, "get", field, "/value"]) for line in output.splitlines(): line = line.strip() prefix = f"{field}=" if line.startswith(prefix): value = line[len(prefix):].strip() if value and value.lower() not in ("none", "null", "to be filled by o.e.m."): return value output = _run_command_text(["wmic", alias, "get", field]) lines = [line.strip() for line in output.splitlines() if line.strip()] for line in lines[1:]: if line and line.lower() not in ("none", "null", "to be filled by o.e.m."): return line return "" def _get_windows_cim_value(class_name: str, field: str) -> str: """WMIC 不可用时,通过 PowerShell CIM 获取硬件字段值。""" command = ( f"(Get-CimInstance {class_name} | " f"Select-Object -First 1 -ExpandProperty {field})" ) value = _run_command_text([ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command, ]).strip() if value and value.lower() not in ("none", "null", "to be filled by o.e.m."): return value return "" def _get_cpu_id() -> str: """获取 CPU 标识。""" if sys.platform == "win32": return ( _get_wmic_value("cpu", "ProcessorId") or _get_windows_cim_value("Win32_Processor", "ProcessorId") or os.environ.get("PROCESSOR_IDENTIFIER", "").strip() ) return os.environ.get("PROCESSOR_IDENTIFIER", "").strip() or os.uname().machine def _get_baseboard_id() -> str: """获取主板标识。""" if sys.platform == "win32": serial = ( _get_wmic_value("baseboard", "SerialNumber") or _get_windows_cim_value("Win32_BaseBoard", "SerialNumber") ) product = ( _get_wmic_value("baseboard", "Product") or _get_windows_cim_value("Win32_BaseBoard", "Product") ) manufacturer = ( _get_wmic_value("baseboard", "Manufacturer") or _get_windows_cim_value("Win32_BaseBoard", "Manufacturer") ) return "|".join(part for part in (manufacturer, product, serial) if part) return socket.gethostname() def get_renewal_device_id(): """生成用于一键续杯的本机设备号:CPU + MAC + 主板。""" try: cpu_id = _get_cpu_id() mac = f"{uuid.getnode():012x}" baseboard_id = _get_baseboard_id() raw = f"cpu={cpu_id}|mac={mac}|baseboard={baseboard_id}" digest = hashlib.sha256(raw.encode("utf-8")).digest() return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") except Exception: return str(uuid.uuid4()) def get_cursor_config_path(): home = Path.home() if sys.platform == "win32": base = home / "AppData" / "Roaming" / "Cursor" / "User" / "globalStorage" elif sys.platform == "darwin": base = ( home / "Library" / "Application Support" / "Cursor" / "User" / "globalStorage" ) else: base = home / ".config" / "Cursor" / "User" / "globalStorage" return base def update_vsdb_token(config_dir, new_token, new_email, log_callback): """更新 Cursor 数据库中的 token / email / auth 缓存字段。""" db_path = config_dir / "state.vscdb" if not db_path.exists(): log_callback("⚠️ 未找到 Cursor库 文件,跳过") return False try: log_callback("📖 连接 Cursor 数据库...") # 备份数据库 db_backup = config_dir / "state.vscdb.backup" if db_backup.exists(): db_backup.unlink() shutil.copy2(db_path, db_backup) # log_callback(f"📁 数据库已备份到: {db_backup}") # 使用 timeout=5 避免瞬间锁导致失败 conn = sqlite3.connect(db_path, timeout=5.0) cursor = conn.cursor() # 强制合并并清除 WAL 缓存(切换为 DELETE 模式,这样会自动合并并删除 -wal 和 -shm 文件) try: cursor.execute("PRAGMA journal_mode=delete") except Exception as wal_err: log_callback(f"⚠️ 无法合并 WAL 缓存 (可能是 Cursor 进程未完全关闭): {wal_err}") # 当前 Cursor 版本主要从 ItemTable 的 cursorAuth/* 读取登录态。 # 不要只写 accessToken/refreshToken;补齐已登录库里常见的认证缓存字段, # 避免客户端启动时因为缓存状态不完整回落到 Log in。 onboarding_date = datetime.utcnow().isoformat(timespec="milliseconds") + "Z" updates = [ ("cursorAuth/accessToken", new_token), ("cursorAuth/refreshToken", new_token), ("cursorAuth/cachedEmail", new_email), ("cursorAuth/cachedSignUpType", "Auth_0"), ("cursorAuth/onboardingDate", onboarding_date), ("cursorAuth/stripeMembershipType", "pro"), ] for key, value in updates: cursor.execute( "SELECT value FROM ItemTable WHERE key = ?", (key,) ) result = cursor.fetchone() if result: cursor.execute( "UPDATE ItemTable SET value = ? WHERE key = ?", (value, key) ) # log_callback(f"✓ 更新了 {key}") else: cursor.execute( "INSERT INTO ItemTable (key, value) VALUES (?, ?)", (key, value) ) # log_callback(f"✓ 插入了 {key}") conn.commit() # 恢复 WAL 模式以便 Cursor 正常工作 try: cursor.execute("PRAGMA journal_mode=wal") conn.commit() except Exception as wal_err: log_callback(f"⚠️ 无法还原为 WAL 模式: {wal_err}") conn.close() log_callback(f"✅ Cursor库 更新成功,已同步 {len(updates)} 个认证字段") return True except Exception as e: log_callback(f"❌ Cursor库 更新失败: {str(e)}") return False @contextmanager def request_with_proxy_fallback(url, **kwargs): """优先走系统代理;代理不可用时自动回退直连。以 contextmanager 统一管理连接生命周期。""" try: with requests.get(url, **kwargs) as response: yield response return except requests.exceptions.ProxyError: with requests.Session() as session: session.trust_env = False with session.get(url, **kwargs) as response: yield response def check_for_updates(): """检查软件更新,返回 (data, error) 元组""" try: url = "https://api.yunzer.cn/api/softwareupgrade/check?code=cursortokenlogin" with request_with_proxy_fallback(url, timeout=10) as response: response.raise_for_status() data = response.json() if data.get("code") == 200: return (data.get("data", {}), None) else: return (None, f"服务器返回错误: {data.get('msg', '未知错误')}") except requests.exceptions.Timeout: return (None, "连接超时(超过10秒),请检查网络连接") except requests.exceptions.ConnectionError as e: return (None, f"网络连接失败: {str(e)}") except requests.exceptions.RequestException as e: return (None, f"请求失败: {str(e)}") except Exception as e: return (None, f"检查更新失败: {str(e)}") def compare_versions(current, latest): """比较版本号""" try: current_parts = list(map(int, current.split('.'))) latest_parts = list(map(int, latest.split('.'))) for i in range(max(len(current_parts), len(latest_parts))): current_part = current_parts[i] if i < len(current_parts) else 0 latest_part = latest_parts[i] if i < len(latest_parts) else 0 if latest_part > current_part: return 1 # 需要更新 elif latest_part < current_part: return -1 # 当前版本更新 return 0 # 版本相同 except Exception: return 0 def normalize_download_url(raw): if raw is None: return "" s = str(raw).strip() if not s or s.lower() in ("null", "none", "undefined"): return "" return s def write_windows_updater_batch(old_exe: Path, new_exe: Path) -> Path: """生成批处理:通过 %1 %2 传入路径,避免把长路径写入 bat 产生编码/转义问题。""" bat = old_exe.parent / "_cursortokenlogin_update.bat" lines = [ "@echo off", "setlocal EnableExtensions", 'set "OLD=%~1"', 'set "NEW=%~2"', 'if not defined OLD goto :eof', 'if not defined NEW goto :eof', 'if not exist "%NEW%" exit /b 1', "ping 127.0.0.1 -n 5 >nul", ":wait_del", 'del /F /Q "%OLD%" 2>nul', 'if exist "%OLD%" (ping 127.0.0.1 -n 2 >nul & goto wait_del)', 'move /Y "%NEW%" "%OLD%"', "if errorlevel 1 exit /b 1", # 不自动启动 exe:解压/runtime 尚未就绪时易触发 Python DLL 加载失败,由用户稍后手动打开 'del /F /Q "%~f0"', ] bat.write_text("\r\n".join(lines), encoding="utf-8") return bat def launch_detach_no_window(args, cwd=None): if sys.platform == "win32": flags = getattr(subprocess, "CREATE_NO_WINDOW", 0) subprocess.Popen( args, cwd=cwd, creationflags=flags, close_fds=True, ) else: subprocess.Popen(args, cwd=cwd, close_fds=True, start_new_session=True) class DownloadUpdateThread(QThread): log_signal = Signal(str) progress_signal = Signal(int, int) finished_signal = Signal(bool, str) def __init__(self, download_url: str): super().__init__() self.download_url = download_url def run(self): new_path = None try: self.log_signal.emit("📡 正在请求安装包...") with request_with_proxy_fallback( self.download_url, stream=True, timeout=60, allow_redirects=True ) as r: r.raise_for_status() total = int(r.headers.get("Content-Length") or 0) chunk_size = 256 * 1024 downloaded = 0 last_pct = -1 if getattr(sys, "frozen", False): exe_path = Path(sys.executable).resolve() suffix = exe_path.suffix or ".exe" new_path = exe_path.parent / f"{exe_path.stem}_update_pending{suffix}" else: fd, tmp = tempfile.mkstemp(suffix=".exe") os.close(fd) new_path = Path(tmp) # self.log_signal.emit(f"💾 下载保存到: {new_path}") if total > 0: self.progress_signal.emit(0, total) next_unknown_log = 512 * 1024 with open(new_path, "wb") as f: for chunk in r.iter_content(chunk_size): if not chunk: continue f.write(chunk) downloaded += len(chunk) if total > 0: pct = min(100, downloaded * 100 // total) if pct >= last_pct + 5 or pct == 100: last_pct = pct if pct == 100 else (pct // 5) * 5 self.progress_signal.emit(downloaded, total) elif downloaded >= next_unknown_log: self.progress_signal.emit(downloaded, 0) next_unknown_log += 512 * 1024 self.log_signal.emit("✅ 下载完成") if not is_windows_pe_executable(new_path): try: new_path.unlink() except OSError: pass raise ValueError( "下载的文件不是有效的 Windows 安装包(缺少 PE 头或过小)," "可能为网页/错误信息,已取消替换。请确认服务端提供的是本软件完整打包的 .exe。" ) if not getattr(sys, "frozen", False): self.log_signal.emit( "ℹ️ 当前为脚本/开发模式,不会自动替换已安装程序;请手动用下载文件替换。" ) self.finished_signal.emit(True, "dev_mode") return if sys.platform != "win32": self.log_signal.emit( "ℹ️ 当前为非 Windows 打包环境,请手动将新文件替换为应用程序。" ) self.finished_signal.emit(True, "no_auto_replace") return exe_path = Path(sys.executable).resolve() self.log_signal.emit("🔧 正在准备替换:退出后将删除旧程序并启动新版本...") bat = write_windows_updater_batch(exe_path, new_path) launch_detach_no_window( ["cmd", "/c", str(bat), str(exe_path), str(new_path.resolve())], cwd=str(exe_path.parent), ) self.finished_signal.emit(True, "restarting") except Exception as e: if new_path and new_path.exists(): try: new_path.unlink() except OSError: pass self.log_signal.emit(f"❌ 下载或更新失败: {e}") self.finished_signal.emit(False, str(e)) class ImageClickLabel(QLabel): """可点击的图片标签,点击放大显示""" clicked = Signal(str) def __init__(self, image_path, parent=None): super().__init__(parent) self.image_path = image_path self.original_pixmap = QPixmap(image_path) self.setScaledContents(True) self.setCursor(Qt.PointingHandCursor) self.setStyleSheet("QLabel { border: 1px solid #ccc; border-radius: 5px; }") self.update_display() def update_display(self): """更新图片显示(等比缩放)""" if not self.original_pixmap.isNull(): scaled_pixmap = self.original_pixmap.scaled( 350, 450, Qt.KeepAspectRatio, Qt.SmoothTransformation ) self.setPixmap(scaled_pixmap) def mousePressEvent(self, event): if event.button() == Qt.LeftButton: self.clicked.emit(self.image_path) super().mousePressEvent(event) class DonateDialog(QDialog): """捐赠对话框""" def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("❤️ 捐赠支持") self.setFixedSize(800, 600) self.setup_ui() def setup_ui(self): layout = QVBoxLayout(self) # 标题 title = QLabel("您的捐赠将用于维护和改进本软件。\n\n感谢各位,为爱发电,软主需要大家的支持与关注!\n\n有问题请联系QQ:1066960883") title_font = QFont() title_font.setPointSize(16) title_font.setBold(True) title.setFont(title_font) title.setAlignment(Qt.AlignCenter) layout.addWidget(title) layout.addSpacing(20) # 图片区域 images_layout = QHBoxLayout() # 微信 wx_layout = QVBoxLayout() wx_label = QLabel("微信支付") wx_label.setAlignment(Qt.AlignCenter) wx_layout.addWidget(wx_label) wx_path = get_resource_path(os.path.join("assets", "images", "donate", "wx.jpg")) self.wx_image = ImageClickLabel(wx_path) self.wx_image.clicked.connect(self.show_full_image) wx_layout.addWidget(self.wx_image) images_layout.addLayout(wx_layout) images_layout.addSpacing(20) # 支付宝 zfb_layout = QVBoxLayout() zfb_label = QLabel("支付宝") zfb_label.setAlignment(Qt.AlignCenter) zfb_layout.addWidget(zfb_label) zfb_path = get_resource_path(os.path.join("assets", "images", "donate", "zfb.jpg")) self.zfb_image = ImageClickLabel(zfb_path) self.zfb_image.clicked.connect(self.show_full_image) zfb_layout.addWidget(self.zfb_image) images_layout.addLayout(zfb_layout) layout.addLayout(images_layout) layout.addSpacing(20) # 关闭按钮 btn_layout = QHBoxLayout() btn_layout.addStretch() close_btn = QPushButton("关闭") close_btn.clicked.connect(self.accept) close_btn.setMinimumWidth(100) btn_layout.addWidget(close_btn) layout.addLayout(btn_layout) def show_full_image(self, image_path): """显示全屏图片""" dialog = QDialog(self) dialog.setWindowTitle("图片预览") dialog.setMinimumSize(600, 700) layout = QVBoxLayout(dialog) scroll = QScrollArea() scroll.setWidgetResizable(True) label = QLabel() pixmap = QPixmap(image_path) label.setPixmap(pixmap) label.setAlignment(Qt.AlignCenter) scroll.setWidget(label) layout.addWidget(scroll) close_btn = QPushButton("关闭") close_btn.clicked.connect(dialog.accept) close_btn.setMinimumWidth(100) btn_layout = QHBoxLayout() btn_layout.addStretch() btn_layout.addWidget(close_btn) layout.addLayout(btn_layout) dialog.exec() class ChangeTokenThread(QThread): log_signal = Signal(str) finished_signal = Signal(bool, str) def __init__(self, new_token, new_email): super().__init__() self.new_token = new_token self.new_email = new_email def run(self): try: config_dir = get_cursor_config_path() if not config_dir.exists(): self.finished_signal.emit(False, "未找到Cursor配置目录") return # 创建备份(不显示日志) backup_dir = config_dir / "backup" backup_dir.mkdir(exist_ok=True) timestamp = random.randint(100000, 999999) backup_subdir = backup_dir / f"backup_{timestamp}" backup_subdir.mkdir(exist_ok=True) # 查找 storage.json storage_file = config_dir / "storage.json" if not storage_file.exists(): self.finished_signal.emit(False, "未找到 storage.json 文件") return # 检查并修复只读属性 if not os.access(storage_file, os.W_OK): self.log_signal.emit("🔓 检测到文件只读,正在解除只读属性...") import stat storage_file.chmod(storage_file.stat().st_mode | stat.S_IWRITE) # 读取原文件 self.log_signal.emit("📖 读取配置文件...") with open(storage_file, "r", encoding="utf-8") as f: content = f.read() data = json.loads(content) if content.strip() else {} # 显示原邮箱。access token 本身通常不包含 email claim, # 因此优先保留 Cursor 原缓存邮箱,避免写入随机邮箱导致登录缓存状态不一致。 old_email = ( data.get("cursorAuth", {}).get("cachedEmail") or data.get("cursorAccount", {}).get("email") or "" ) if old_email: self.log_signal.emit(f"📧 原邮箱: {old_email}") self.new_email = str(old_email).strip() # 备份原文件(不显示日志) shutil.copy2(storage_file, backup_subdir / "storage.json.bak") # 修改 cursorAuth self.log_signal.emit("🔑 替换 cursorAuth...") if "cursorAuth" not in data: data["cursorAuth"] = {} data["cursorAuth"]["accessToken"] = self.new_token data["cursorAuth"]["refreshToken"] = self.new_token data["cursorAuth"]["cachedEmail"] = self.new_email data["cursorAuth"]["cachedSignUpType"] = data["cursorAuth"].get("cachedSignUpType", "Auth_0") data["cursorAuth"]["onboardingDate"] = datetime.utcnow().isoformat(timespec="milliseconds") + "Z" data["cursorAuth"]["plan"] = "pro" data["cursorAuth"]["stripeMembershipType"] = "pro" data["cursorAuth"]["membershipType"] = "pro" # 修改 cursorAccount self.log_signal.emit("🔑 替换 cursorAccount...") if "cursorAccount" not in data: data["cursorAccount"] = {} data["cursorAccount"]["token"] = self.new_token data["cursorAccount"]["email"] = self.new_email data["cursorAccount"]["plan"] = "pro" # 刷新机器ID self.log_signal.emit("🔧 刷新机器ID...") new_machine_id = generate_machine_id() data["telemetryMacMachineId"] = new_machine_id data["telemetryDevDeviceId"] = new_machine_id data["workspaceIdentifier"] = new_machine_id # membershipType pro data["membershipType"] = "pro" self.log_signal.emit(f"📧 新邮箱: {self.new_email}") # 保存文件 self.log_signal.emit("💾 保存配置文件...") with open(storage_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # 更新 Cursor 数据库 self.log_signal.emit("📦 更新 Cursor 数据库...") db_ok = update_vsdb_token(config_dir, self.new_token, self.new_email, self.log_signal.emit) if not db_ok: self.log_signal.emit("❌ 更新 Cursor 数据库失败,换号未完成!") self.finished_signal.emit(False, "更新 Cursor 数据库失败,可能是 Cursor 进程未完全关闭,请在任务管理器中结束所有 Cursor 进程,或尝试以管理员身份运行本软件。") return self.log_signal.emit("✅ 换号完成!") self.finished_signal.emit(True, str(backup_subdir)) except Exception as e: self.log_signal.emit(f"❌ 错误: {str(e)}") self.finished_signal.emit(False, str(e)) class OneClickRenewalThread(QThread): log_signal = Signal(str) finished_signal = Signal(bool, str, str) # (success, message/backup_path, new_email) def run(self): try: self.log_signal.emit("🔄 正在从服务端获取账号...") url = "https://api.yunzer.cn/api/getcard?type=local&module=cursor&data_type=tk" response_data = None try: response = requests.get(url, timeout=15) response_data = response.text.strip() except requests.exceptions.ProxyError: with requests.Session() as session: session.trust_env = False response = session.get(url, timeout=15) response_data = response.text.strip() if not response_data or not response_data.startswith("ey"): try: err_json = json.loads(response_data) msg = err_json.get("msg") or err_json.get("message") or response_data except Exception: msg = response_data self.finished_signal.emit(False, f"获取账号 Token 失败: {msg[:200]}", "") return self.log_signal.emit("✅ 成功提取 Token!正在执行本地更换...") new_token = response_data new_email = generate_random_email() config_dir = get_cursor_config_path() if not config_dir.exists(): self.finished_signal.emit(False, "未找到Cursor配置目录", "") return backup_dir = config_dir / "backup" backup_dir.mkdir(exist_ok=True) timestamp = random.randint(100000, 999999) backup_subdir = backup_dir / f"backup_{timestamp}" backup_subdir.mkdir(exist_ok=True) storage_file = config_dir / "storage.json" if not storage_file.exists(): self.finished_signal.emit(False, "未找到 storage.json 文件", "") return if not os.access(storage_file, os.W_OK): self.log_signal.emit("🔓 检测到文件只读,正在解除只读属性...") import stat storage_file.chmod(storage_file.stat().st_mode | stat.S_IWRITE) self.log_signal.emit("📖 读取配置文件...") with open(storage_file, "r", encoding="utf-8") as f: content = f.read() data = json.loads(content) if content.strip() else {} old_email = ( data.get("cursorAuth", {}).get("cachedEmail") or data.get("cursorAccount", {}).get("email") or "" ) if old_email: self.log_signal.emit(f"📧 原邮箱: {old_email}") new_email = str(old_email).strip() shutil.copy2(storage_file, backup_subdir / "storage.json.bak") self.log_signal.emit("🔑 替换 cursorAuth...") if "cursorAuth" not in data: data["cursorAuth"] = {} data["cursorAuth"]["accessToken"] = new_token data["cursorAuth"]["refreshToken"] = new_token data["cursorAuth"]["cachedEmail"] = new_email data["cursorAuth"]["cachedSignUpType"] = data["cursorAuth"].get("cachedSignUpType", "Auth_0") data["cursorAuth"]["onboardingDate"] = datetime.utcnow().isoformat(timespec="milliseconds") + "Z" data["cursorAuth"]["plan"] = "pro" data["cursorAuth"]["stripeMembershipType"] = "pro" data["cursorAuth"]["membershipType"] = "pro" self.log_signal.emit("🔑 替换 cursorAccount...") if "cursorAccount" not in data: data["cursorAccount"] = {} data["cursorAccount"]["token"] = new_token data["cursorAccount"]["email"] = new_email data["cursorAccount"]["plan"] = "pro" self.log_signal.emit("🔧 刷新机器ID...") new_machine_id = generate_machine_id() data["telemetryMacMachineId"] = new_machine_id data["telemetryDevDeviceId"] = new_machine_id data["workspaceIdentifier"] = new_machine_id data["membershipType"] = "pro" self.log_signal.emit(f"📧 新邮箱: {new_email}") self.log_signal.emit("💾 保存配置文件...") with open(storage_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) self.log_signal.emit("📦 更新 Cursor 数据库...") db_ok = update_vsdb_token(config_dir, new_token, new_email, self.log_signal.emit) if not db_ok: self.log_signal.emit("❌ 更新 Cursor 数据库失败,换号未完成!") self.finished_signal.emit(False, "更新 Cursor 数据库失败,可能是 Cursor 进程未完全关闭,请在任务管理器中结束所有 Cursor 进程,或尝试以管理员身份运行本软件。", "") return self.log_signal.emit("✅ 一键续杯换号完成!") self.finished_signal.emit(True, str(backup_subdir), new_email) except Exception as e: self.log_signal.emit(f"❌ 续杯换号失败: {str(e)}") self.finished_signal.emit(False, str(e), "") class SilentDetectThread(QThread): log_signal = Signal(str) finished_signal = Signal(bool, str, int) # (success, message/token, next_id) def __init__(self, target_id): super().__init__() self.target_id = target_id def run(self): try: self.log_signal.emit(f"🔄 正在从服务端获取 ID={self.target_id} 的 Token...") url = f"https://api.yunzer.cn/api/cursor/token/peek?id={self.target_id}&data_type=tk" response_data = None try: response = requests.get(url, timeout=15) response.raise_for_status() response_data = response.json() except Exception: # 备用连接(直连,不走系统代理) with requests.Session() as session: session.trust_env = False response = session.get(url, timeout=15) response.raise_for_status() response_data = response.json() if not response_data or response_data.get("code") != 200: msg = response_data.get("msg") or "获取失败" self.finished_signal.emit(False, f"获取 Token 失败: {msg}", 0) return data_obj = response_data.get("data") or {} new_token = data_obj.get("token") self.new_token = new_token next_id = data_obj.get("next_id") or (int(self.target_id) + 1) if not new_token: self.finished_signal.emit(False, "服务端返回的 Token 为空!", 0) return self.log_signal.emit("✅ 成功提取 Token!正在执行本地更换...") new_email = generate_random_email() config_dir = get_cursor_config_path() if not config_dir.exists(): self.finished_signal.emit(False, "未找到Cursor配置目录", 0) return # 读取原 storage.json 并更新 storage_file = config_dir / "storage.json" if not storage_file.exists(): self.finished_signal.emit(False, "未找到 storage.json 文件", 0) return if not os.access(storage_file, os.W_OK): import stat storage_file.chmod(storage_file.stat().st_mode | stat.S_IWRITE) with open(storage_file, "r", encoding="utf-8") as f: content = f.read() data = json.loads(content) if content.strip() else {} old_email = ( data.get("cursorAuth", {}).get("cachedEmail") or data.get("cursorAccount", {}).get("email") or "" ) if old_email: new_email = str(old_email).strip() # 修改 storage.json 的数据 if "cursorAuth" not in data: data["cursorAuth"] = {} data["cursorAuth"]["accessToken"] = new_token data["cursorAuth"]["refreshToken"] = new_token data["cursorAuth"]["cachedEmail"] = new_email data["cursorAuth"]["cachedSignUpType"] = data["cursorAuth"].get("cachedSignUpType", "Auth_0") data["cursorAuth"]["onboardingDate"] = datetime.utcnow().isoformat(timespec="milliseconds") + "Z" data["cursorAuth"]["plan"] = "pro" data["cursorAuth"]["stripeMembershipType"] = "pro" data["cursorAuth"]["membershipType"] = "pro" if "cursorAccount" not in data: data["cursorAccount"] = {} data["cursorAccount"]["token"] = new_token data["cursorAccount"]["email"] = new_email data["cursorAccount"]["plan"] = "pro" # 刷新机器 ID new_machine_id = generate_machine_id() data["telemetryMacMachineId"] = new_machine_id data["telemetryDevDeviceId"] = new_machine_id data["workspaceIdentifier"] = new_machine_id data["membershipType"] = "pro" with open(storage_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # 更新 Cursor 数据库 state.vscdb self.log_signal.emit("📦 更新 Cursor 数据库...") db_ok = update_vsdb_token(config_dir, new_token, new_email, self.log_signal.emit) if not db_ok: self.finished_signal.emit(False, "更新 Cursor 数据库失败,可能是 Cursor 进程未完全关闭,请在任务管理器中结束所有 Cursor 进程,或尝试以管理员身份运行本软件。", 0) return self.log_signal.emit(f"✅ 无感换号完成!下一条 ID 推荐为: {next_id}") self.finished_signal.emit(True, "检测换号成功!", next_id) except Exception as e: self.finished_signal.emit(False, f"发生异常: {str(e)}", 0) def get_device_info() -> str: """获取 CPU/RAM/磁盘等设备信息""" try: cpu_name = _get_cpu_id() virtual_mem = psutil.virtual_memory() total_ram_gb = round(virtual_mem.total / (1024 ** 3), 2) try: disk_path = "C:\\" if sys.platform == "win32" else "/" disk_usage = psutil.disk_usage(disk_path) total_disk_gb = round(disk_usage.total / (1024 ** 3), 2) free_disk_gb = round(disk_usage.free / (1024 ** 3), 2) disk_str = f"Disk: {total_disk_gb} GB (Free: {free_disk_gb} GB)" except Exception: disk_str = "Disk: 获取失败" cpu_count = psutil.cpu_count(logical=False) cpu_logical = psutil.cpu_count(logical=True) info = f"CPU: {cpu_name} ({cpu_count}核/{cpu_logical}线程) | RAM: {total_ram_gb} GB | {disk_str}" return info except Exception as e: return f"获取设备信息失败: {str(e)}" def read_current_cursor_email_helper() -> str: """从 Cursor 的 storage.json 读取当前 email。""" try: config_dir = get_cursor_config_path() storage_file = config_dir / "storage.json" if not storage_file.exists(): return "" with open(storage_file, "r", encoding="utf-8") as f: content = f.read() data = json.loads(content) if content.strip() else {} email = ( data.get("cursorAuth", {}).get("cachedEmail") or data.get("cursorAccount", {}).get("email") or "" ) return str(email).strip() except Exception: return "" def report_equipment_data(bind_account=None): """上报设备数据,返回 (data, error) 元组""" try: url = "https://api.yunzer.cn/api/cursor/equipment/report" machine_code = get_renewal_device_id() device_info = get_device_info() system = "Windows" if sys.platform == "win32" else ("macOS" if sys.platform == "darwin" else "Linux") version = __VERSION__ if not bind_account: bind_account = read_current_cursor_email_helper() payload = { "machineCode": machine_code, "deviceInfo": device_info, "system": system, "version": version, "bindAccount": bind_account, "remark": "登录器上报" } headers = { "Content-Type": "application/json" } response_data = None # 优先走代理 fallback try: with requests.post(url, json=payload, headers=headers, timeout=10) as response: response.raise_for_status() response_data = response.json() except requests.exceptions.ProxyError: with requests.Session() as session: session.trust_env = False with session.post(url, json=payload, headers=headers, timeout=10) as response: response.raise_for_status() response_data = response.json() if response_data and response_data.get("code") == 200: return (response_data.get("data", {}), None) elif response_data: return (None, response_data.get("msg", "未知错误")) else: return (None, "服务器未返回有效数据") except requests.exceptions.Timeout: return (None, "连接超时,请检查网络") except requests.exceptions.ConnectionError as e: return (None, f"网络连接失败: {str(e)}") except requests.exceptions.RequestException as e: return (None, f"请求失败: {str(e)}") except Exception as e: return (None, f"数据上报失败: {str(e)}") class ReportEquipmentThread(QThread): """上报设备信息线程""" finished_signal = Signal(bool, dict, str) def __init__(self, bind_account=None): super().__init__() self.bind_account = bind_account def run(self): data, error = report_equipment_data(self.bind_account) if error: self.finished_signal.emit(False, {}, error) else: self.finished_signal.emit(True, data, "") def activate_by_code(activation_code, bind_account=None): """ 使用激活码激活设备,返回 (data, error) 元组 """ try: url = "https://api.yunzer.cn/api/cursor/equipment/activateByCode" machine_code = get_renewal_device_id() device_info = get_device_info() system = "Windows" if sys.platform == "win32" else ("macOS" if sys.platform == "darwin" else "Linux") version = __VERSION__ if not bind_account: bind_account = read_current_cursor_email_helper() payload = { "activationCode": activation_code, "machineCode": machine_code, "deviceInfo": device_info, "system": system, "version": version, "bindAccount": bind_account, "remark": "登录器激活" } headers = { "Content-Type": "application/json" } response_data = None # 优先走代理 fallback try: with requests.post(url, json=payload, headers=headers, timeout=10) as response: response_data = response.json() except requests.exceptions.ProxyError: with requests.Session() as session: session.trust_env = False with session.post(url, json=payload, headers=headers, timeout=10) as response: response_data = response.json() if response_data and response_data.get("code") == 200: return (response_data.get("data", {}), None) elif response_data: return (None, response_data.get("msg", "未知错误")) else: return (None, "服务器未返回有效数据") except requests.exceptions.Timeout: return (None, "连接超时,请检查网络") except requests.exceptions.ConnectionError as e: return (None, f"网络连接失败: {str(e)}") except requests.exceptions.RequestException as e: return (None, f"请求失败: {str(e)}") except Exception as e: return (None, f"激活失败: {str(e)}") class ActivateCodeThread(QThread): """激活码激活线程""" finished_signal = Signal(bool, dict, str) def __init__(self, activation_code, bind_account=None): super().__init__() self.activation_code = activation_code self.bind_account = bind_account def run(self): data, error = activate_by_code(self.activation_code, self.bind_account) if error: self.finished_signal.emit(False, {}, error) else: self.finished_signal.emit(True, data, "") class CheckUpdateThread(QThread): """检查更新线程""" update_available = Signal(dict) no_update = Signal() error = Signal(str) def run(self): try: update_data, error = check_for_updates() if error: self.error.emit(error) return if update_data: latest_version = update_data.get("latestVersion", "") if compare_versions(__VERSION__, latest_version) == 1: self.update_available.emit(update_data) else: self.no_update.emit() else: self.no_update.emit() except Exception as e: self.error.emit(f"检查更新异常: {str(e)}") def _create_startup_splash(app: QApplication) -> QSplashScreen: """启动画面背景与文字颜色跟随当前 Qt/系统主题(QPalette)。""" pal = app.palette() bg = pal.color(QPalette.ColorRole.Window) title_c = pal.color(QPalette.ColorRole.WindowText) sub_c = pal.color(QPalette.ColorGroup.Disabled, QPalette.ColorRole.WindowText) if not sub_c.isValid(): sub_c = pal.color(QPalette.ColorRole.Mid) w, h = 440, 300 pix = QPixmap(w, h) pix.fill(bg) painter = QPainter(pix) painter.setPen(title_c) title_font = QFont() title_font.setPointSize(14) title_font.setBold(True) painter.setFont(title_font) painter.drawText(0, 100, w, 40, Qt.AlignHCenter, "CursorTokenLogin") painter.setPen(sub_c) sub_font = QFont() sub_font.setPointSize(9) painter.setFont(sub_font) painter.drawText(0, 140, w, 30, Qt.AlignHCenter, "正在准备运行环境…") painter.end() splash = QSplashScreen(pix, Qt.WindowStaysOnTopHint) splash.setWindowFlag(Qt.FramelessWindowHint, True) splash.setPalette(pal) return splash class MainWindow(QMainWindow): def __init__(self, splash: Optional[QSplashScreen] = None): super().__init__() self._splash = splash self._splash_phase = 0 def splash_pulse(phase: str): if not self._splash: return self._splash_phase += 1 dots = "." * (self._splash_phase % 4) pal = QApplication.palette() msg_c = pal.color(QPalette.ColorRole.PlaceholderText) if not msg_c.isValid(): msg_c = pal.color(QPalette.ColorRole.WindowText) self._splash.showMessage( f"{phase}{dots}", Qt.AlignBottom | Qt.AlignHCenter, msg_c, ) QApplication.processEvents() self._splash_pulse = splash_pulse self._splash_pulse("正在加载") # 获取UI文件路径 ui_path = get_resource_path(os.path.join("layout", "main1.ui")) # 详细的调试信息 debug_info = f"UI文件路径: {ui_path}\n" debug_info += f"路径存在: {Path(ui_path).exists()}\n" # 检查基础路径 if hasattr(sys, '_MEIPASS'): debug_info += f"打包路径(MEIPASS): {sys._MEIPASS}\n" debug_info += f"脚本路径: {os.path.dirname(os.path.abspath(__file__))}\n" debug_info += f"当前工作目录: {os.getcwd()}\n" print(debug_info) if not Path(ui_path).exists(): if self._splash: self._splash.close() # 用正斜杠展示路径,避免在部分环境下反斜杠被误解析 error_msg = f"找不到UI文件: {Path(ui_path).as_posix()}\n\n详细信息:\n{debug_info}" QMessageBox.critical(None, "错误", error_msg) sys.exit(1) self._splash_pulse("正在加载界面") # 加载UI文件 - 不设置parent,让loader返回完整的窗口 try: loader = QUiLoader() self.ui = loader.load(ui_path) if self.ui is None: if self._splash: self._splash.close() QMessageBox.critical(None, "错误", "UI文件加载失败") sys.exit(1) self._splash_pulse("正在装配窗口") # 将UI的所有属性和方法复制到当前窗口 # 先保存当前窗口的状态栏 status_bar = self.statusBar() # 使用UI的central widget self.setCentralWidget(self.ui.centralwidget) # 复制窗口属性 self.setWindowTitle(self.ui.windowTitle()) self.resize(self.ui.size()) # 清理原来的ui对象,避免混淆 del self.ui.centralwidget except Exception as e: if self._splash: self._splash.close() QMessageBox.critical(None, "错误", f"加载UI文件时出错: {str(e)}") import traceback traceback.print_exc() sys.exit(1) self._splash_pulse("正在初始化") self.cursor_path = get_default_cursor_path() self.backup_path = "" self.member_status = 0 self._update_download_thread = None self._emergency_dialog = None self._usage_guide_dialog = None self._token_extract_dialog = None self._token_display = None self._current_extracted_token = "" self.log_file_path = self._prepare_log_file_path() # 重新查找组件 - 从self查找 self.txtToken = self.findChild(QTextEdit, "txtToken") self.txtLog = self.findChild(QTextEdit, "txtLog") self.txtCursorPath = self.findChild(QLineEdit, "txtCursorPath") self.txtDeviceId = self.findChild(QLineEdit, "txtDeviceId") self.txtActivationCode = self.findChild(QLineEdit, "txtActivationCode") self.btnChange = self.findChild(QPushButton, "btnChange") self.btnOpenCursor = self.findChild(QPushButton, "btnOpenCursor") self.btnOnlineShop = self.findChild(QPushButton, "btnOnlineShop") self.btnBrowseCursor = self.findChild(QPushButton, "btnBrowseCursor") self.btnAutoCursorPath = self.findChild(QPushButton, "btnAutoCursorPath") self.btnAiRelay = self.findChild(QPushButton, "btnAiRelay") self.btnClearLog = self.findChild(QPushButton, "btnClearLog") self.btnDonate = self.findChild(QPushButton, "btnDonate") self.btnCheckUpdate = self.findChild(QPushButton, "btnCheckUpdate") self.btnEmergencyRepair = self.findChild(QPushButton, "btnEmergencyRepair") self.btnUsageGuide = self.findChild(QPushButton, "btnUsageGuide") self.btnAbout = self.findChild(QPushButton, "btnAbout") self.btnCopyDeviceId = self.findChild(QPushButton, "btnCopyDeviceId") self.btnActivateRenewal = self.findChild(QPushButton, "btnActivateRenewal") self.btnBuyActivationCode = self.findChild(QPushButton, "btnBuyActivationCode") self.btnRefreshMemberStatus = self.findChild(QPushButton, "btnRefreshMemberStatus") self.btnOneClickRenewal = self.findChild(QPushButton, "btnOneClickRenewal") self.lblMemberStatus = self.findChild(QLabel, "lblMemberStatus") self.lblMemberLevel = self.findChild(QLabel, "lblMemberLevel") self.lblAccountType = self.findChild(QLabel, "lblAccountType") self.lblActivatedAt = self.findChild(QLabel, "lblActivatedAt") self.lblExpiredAt = self.findChild(QLabel, "lblExpiredAt") self.actionExit = self.findChild(QAction, "actionExit") self.actionEmergencyRepair = self.findChild(QAction, "actionEmergencyRepair") self.actionUsageGuide = self.findChild(QAction, "actionUsageGuide") self.actionDonate = self.findChild(QAction, "actionDonate") self.actionAbout = self.findChild(QAction, "actionAbout") # 无感检测相关组件 self.txtSilentToken = self.findChild(QLineEdit, "txtSilentToken") self.btnSilentChange = self.findChild(QPushButton, "btnSilentChange") self.btnSilentUnavailable = self.findChild(QPushButton, "btnSilentUnavailable") self.btnSilentCopyToken = self.findChild(QPushButton, "btnSilentCopyToken") self.lblSilentToken = self.findChild(QLabel, "lblSilentToken") self.lblCurrentDetectId = self.findChild(QLabel, "lblCurrentDetectId") self.lblCurrentDetectToken = self.findChild(QTextEdit, "lblCurrentDetectToken") self.groupHelpSilentDetect = self.findChild(QGroupBox, "groupHelpSilentDetect") self.lblHelpSilentDetectDesc = self.findChild(QLabel, "lblHelpSilentDetectDesc") # 调整无感检测界面的提示词,使其适配以 ID 检测换号的新功能 if self.lblSilentToken: self.lblSilentToken.setText("请输入要检测的号池 ID(如 11):") if self.txtSilentToken: self.txtSilentToken.setPlaceholderText("在此输入数字 ID,例如:11") self.txtSilentToken.setMaximumWidth(150) if self.btnSilentChange: self.btnSilentChange.setText("无感换号") if self.btnSilentUnavailable: self.btnSilentUnavailable.setText("不可用") if self.btnSilentCopyToken: self.btnSilentCopyToken.setText("复制 Token") if self.lblCurrentDetectId: self.lblCurrentDetectId.setText("当前检测 ID:-") if self.lblCurrentDetectToken: self.lblCurrentDetectToken.setText("当前 Token:-") if self.groupHelpSilentDetect: self.groupHelpSilentDetect.setTitle("帮助-无感检测(ID 换号版)") if self.lblHelpSilentDetectDesc: self.lblHelpSilentDetectDesc.setText("无感检测:输入服务器号池的记录 ID,点击检测将自动从服务器拉取对应 Token,为您执行本地换号,并自动打开 Cursor,方便快速测试。") # 初始化当前检测状态变量 self.current_detect_id = "" self.current_detect_token = "" self._load_cached_logs_to_ui() # 调试信息 print(f"txtToken: {self.txtToken}") print(f"txtLog: {self.txtLog}") print(f"txtCursorPath: {self.txtCursorPath}") print(f"btnChange: {self.btnChange}") print(f"btnBrowseCursor: {self.btnBrowseCursor}") print(f"btnClearLog: {self.btnClearLog}") print(f"btnDonate: {self.btnDonate}") print(f"btnCheckUpdate: {self.btnCheckUpdate}") print(f"btnUsageGuide: {self.btnUsageGuide}") # 设置默认Cursor路径 if self.txtCursorPath: self.txtCursorPath.setText(get_default_cursor_path()) if self.txtDeviceId: self.txtDeviceId.setText(get_renewal_device_id()) self._reset_member_status_display() # 为在线商城按钮设置图标(使用 Qt 内置图标,避免依赖外部资源) if self.btnOnlineShop or self.btnBuyActivationCode: shop_icon = QIcon.fromTheme("shopping-cart") if shop_icon.isNull(): shop_icon = QIcon.fromTheme("emblem-sales") if shop_icon.isNull(): shop_icon = self.style().standardIcon(QStyle.SP_DialogOpenButton) if self.btnOnlineShop: self.btnOnlineShop.setIcon(shop_icon) if self.btnBuyActivationCode: self.btnBuyActivationCode.setIcon(shop_icon) # 信号连接 if self.btnChange: self.btnChange.clicked.connect(self.on_change_clicked) if self.btnOpenCursor: self.btnOpenCursor.clicked.connect(self.on_open_cursor_clicked) if self.btnOnlineShop: self.btnOnlineShop.clicked.connect(self.on_open_online_shop_clicked) if self.btnAiRelay: self.btnAiRelay.clicked.connect(self.on_open_ai_relay_clicked) if self.btnBrowseCursor: self.btnBrowseCursor.clicked.connect(self.on_browse_cursor) if self.btnAutoCursorPath: self.btnAutoCursorPath.clicked.connect(self.on_auto_config_cursor) if self.btnClearLog: self.btnClearLog.clicked.connect(self.on_clear_log) if self.btnDonate: self.btnDonate.clicked.connect(self.on_donate_clicked) if self.btnCheckUpdate: self.btnCheckUpdate.clicked.connect(self.on_check_update_clicked) if self.btnEmergencyRepair: self.btnEmergencyRepair.clicked.connect(self.on_emergency_repair_clicked) if self.btnUsageGuide: self.btnUsageGuide.clicked.connect(self.on_usage_guide_clicked) if self.btnAbout: self.btnAbout.clicked.connect(self.on_about_clicked) if self.btnCopyDeviceId: self.btnCopyDeviceId.clicked.connect(self.on_copy_device_id_clicked) if self.btnActivateRenewal: self.btnActivateRenewal.clicked.connect(self.on_activate_renewal_clicked) if self.btnBuyActivationCode: self.btnBuyActivationCode.clicked.connect(self.on_open_online_shop_clicked) if self.btnRefreshMemberStatus: self.btnRefreshMemberStatus.clicked.connect(lambda: self.on_refresh_member_status_clicked(show_popup=True)) if self.btnOneClickRenewal: self.btnOneClickRenewal.clicked.connect(self.on_one_click_renewal_clicked) if self.btnSilentChange: self.btnSilentChange.clicked.connect(self.on_silent_detect_clicked) if self.btnSilentUnavailable: self.btnSilentUnavailable.clicked.connect(self.on_silent_unavailable_clicked) if self.btnSilentCopyToken: self.btnSilentCopyToken.clicked.connect(self.on_silent_copy_token_clicked) if self.actionExit: self.actionExit.triggered.connect(self.close) if self.actionEmergencyRepair: self.actionEmergencyRepair.triggered.connect(self.on_emergency_repair_clicked) if self.actionUsageGuide: self.actionUsageGuide.triggered.connect(self.on_usage_guide_clicked) if self.actionDonate: self.actionDonate.triggered.connect(self.on_donate_clicked) if self.actionAbout: self.actionAbout.triggered.connect(self.on_about_clicked) # 设置状态栏:左侧显示QQ群,右侧显示版本号 qq_icon_label = QLabel() qq_icon = QIcon.fromTheme("im-qq") if qq_icon.isNull(): qq_icon = self.style().standardIcon(QStyle.SP_MessageBoxInformation) qq_icon_label.setPixmap(qq_icon.pixmap(16, 16)) self.statusBar().addWidget(qq_icon_label) self.statusBar().addWidget(QLabel("QQ群:720797421")) self.statusBar().addPermanentWidget(QLabel(f"Version: {__VERSION__}")) self.log("🚀 程序启动成功") self.log("📋 请先粘贴Token,然后点击换号") if self._splash: self._splash.finish(self) # 启动检查更新 self.check_update_thread = CheckUpdateThread() self.check_update_thread.update_available.connect(self.on_update_available) self.check_update_thread.no_update.connect(self.on_no_update) self.check_update_thread.error.connect(self.on_update_error) self.check_update_thread.start() # 启动设备上报并同步状态 self.on_refresh_member_status_clicked(show_popup=False) def _restore_check_update_btn(self): if self.btnCheckUpdate: self.btnCheckUpdate.setEnabled(True) self.btnCheckUpdate.setText("🔄 检查更新") def _on_update_download_progress(self, downloaded: int, total: int): if total > 0: pct = min(100, downloaded * 100 // total) self.log(f"⬇️ 下载进度: {pct}% ({downloaded // 1024} KB / {total // 1024} KB)") else: self.log(f"⬇️ 已下载: {downloaded // 1024} KB(服务器未提供总大小)") def _on_update_download_finished(self, success: bool, detail: str): self._restore_check_update_btn() if success and detail == "restarting": QMessageBox.information( self, "更新完成", "新版本已替换完成。\n\n" "本程序将自动退出。退出后请手动运行新版本" ) QTimer.singleShot(200, QApplication.instance().quit) elif not success: QMessageBox.warning(self, "更新失败", detail or "下载或安装失败,请稍后重试。") def _begin_update_workflow_after_confirm(self, update_data: dict): """用户确认升级后:日志展示流程 → 校验 downloadUrl → 下载(含进度)→ 替换程序。""" # self.log("──────── 软件更新流程 ────────") # self.log("① 校验接口返回的 downloadUrl 是否有效…") url = normalize_download_url(update_data.get("downloadUrl")) if not url: err = "当前软件没有更新包,请联系管理员。" self.log(f"❌ {err}") QMessageBox.warning(self, "无法更新", err) self._restore_check_update_btn() return self.log(f"开始下载安装包…") # self.log(f" 地址: {url}") if self.btnCheckUpdate: self.btnCheckUpdate.setEnabled(False) self.btnCheckUpdate.setText("更新中…") self._update_download_thread = DownloadUpdateThread(url) self._update_download_thread.log_signal.connect(self.log) self._update_download_thread.progress_signal.connect(self._on_update_download_progress) self._update_download_thread.finished_signal.connect(self._on_update_download_finished) self._update_download_thread.start() def on_update_available(self, update_data): """有新版本可用""" latest_version = update_data.get("latestVersion", "") download_url = normalize_download_url(update_data.get("downloadUrl")) force_update = update_data.get("forceUpdate", False) release_notes = update_data.get("releaseNotes", "") self.log(f"🔔 发现新版本 v{latest_version}!") message = f"发现新版本 v{latest_version}\n\n" if release_notes: message += f"更新内容:\n{release_notes}\n\n" if force_update: message += "⚠️ 这是强制更新,请立即升级!" else: message += "是否现在升级?" msg_box = QMessageBox(self) msg_box.setWindowTitle("发现新版本") msg_box.setText(message) msg_box.setIcon(QMessageBox.Information) # Windows 上点标题栏 × 会触发「默认按钮」;默认设为「取消」/ 无默认, # 避免 × 被当成「立即升级」或「确定」。 btn_update = None btn_cancel = None if download_url: btn_update = msg_box.addButton("立即升级", QMessageBox.ActionRole) btn_cancel = msg_box.addButton("取消", QMessageBox.RejectRole) msg_box.setDefaultButton(btn_cancel) else: msg_box.setStandardButtons(QMessageBox.Ok) msg_box.setDefaultButton(QMessageBox.NoButton) ret = msg_box.exec() if download_url: if ret == QMessageBox.Rejected or msg_box.clickedButton() != btn_update: self._restore_check_update_btn() return else: # 无下载地址时仅「确定」会走校验流程;× 为 Rejected,不等于 Ok if ret != QMessageBox.Ok: self._restore_check_update_btn() return self._begin_update_workflow_after_confirm(update_data) def on_no_update(self): """没有新版本(启动时自动检查)""" self.log(f"✅ 当前已是最新版本 v{__VERSION__}") def on_update_error(self, error_msg): """检查更新失败(启动时自动检查)""" print(f"检查更新失败: {error_msg}") self.log(f"❌ 检查更新失败: {error_msg}") # 恢复按钮状态 if self.btnCheckUpdate: self.btnCheckUpdate.setEnabled(True) self.btnCheckUpdate.setText("🔄 检查更新") def on_check_update_clicked(self): """手动检查更新按钮点击""" self.log("🔄 正在检查更新...") if self.btnCheckUpdate: self.btnCheckUpdate.setEnabled(False) self.btnCheckUpdate.setText("检查中...") self.check_update_thread = CheckUpdateThread() self.check_update_thread.update_available.connect(self.on_update_available) self.check_update_thread.no_update.connect(self.on_no_update_clicked) self.check_update_thread.error.connect(self.on_update_error_clicked) self.check_update_thread.start() def on_no_update_clicked(self): """手动检查时没有新版本""" if self.btnCheckUpdate: self.btnCheckUpdate.setEnabled(True) self.btnCheckUpdate.setText("🔄 检查更新") self.log(f"✅ 当前已是最新版本 v{__VERSION__}") def on_update_error_clicked(self, error_msg): """手动检查更新失败""" if self.btnCheckUpdate: self.btnCheckUpdate.setEnabled(True) self.btnCheckUpdate.setText("🔄 检查更新") self.log(f"❌ 检查更新失败: {error_msg}") def log(self, message): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {message}" if self.txtLog: self.txtLog.append(line) self._scroll_log_to_bottom() if self.log_file_path: try: with open(self.log_file_path, "a", encoding="utf-8") as f: f.write(line + "\n") except OSError: pass def _scroll_log_to_bottom(self): """让日志文本框始终滚动到最底部,跟踪最新日志。""" if not self.txtLog: return scroll_bar = self.txtLog.verticalScrollBar() if scroll_bar: scroll_bar.setValue(scroll_bar.maximum()) def _prepare_log_file_path(self) -> Optional[Path]: """准备当前日志文件路径;优先沿用当天最新日志,失败时返回 None。""" try: self.log_dir = Path.home() / ".cursortokenlogin" / "logs" self.log_dir.mkdir(parents=True, exist_ok=True) latest = self._get_latest_log_file_path() today = datetime.now().strftime("%Y-%m-%d") return latest or (self.log_dir / f"{today}-0001.log") except OSError: self.log_dir = None return None def on_clear_log(self): if self.txtLog: self.txtLog.clear() next_log = self._get_next_log_file_path() if next_log: self.log_file_path = next_log try: # 立即创建新的空日志文件,确保重启后读取的是清空后的新会话文件。 self.log_file_path.touch(exist_ok=True) except OSError: pass def _load_cached_logs_to_ui(self): """启动时把最近日志文件内容回显到界面。""" if not self.txtLog or not self.log_file_path or not self.log_file_path.exists(): return try: with open(self.log_file_path, "r", encoding="utf-8") as f: content = f.read().strip() if content: self.txtLog.setPlainText(content) self._scroll_log_to_bottom() except OSError: pass def _get_latest_log_file_path(self) -> Optional[Path]: if not getattr(self, "log_dir", None): return None today = datetime.now().strftime("%Y-%m-%d") prefix = f"{today}-" try: files = sorted(self.log_dir.glob(f"{prefix}*.log")) except OSError: return None if not files: return None numbered = [] for p in files: stem = p.stem if not stem.startswith(prefix): continue seq = stem[len(prefix):] if seq.isdigit(): numbered.append((int(seq), p)) if not numbered: return None numbered.sort(key=lambda x: x[0]) return numbered[-1][1] def _get_next_log_file_path(self) -> Optional[Path]: if not getattr(self, "log_dir", None): return None today = datetime.now().strftime("%Y-%m-%d") prefix = f"{today}-" latest = self._get_latest_log_file_path() if latest and latest.stem.startswith(prefix): seq = latest.stem[len(prefix):] next_no = int(seq) + 1 if seq.isdigit() else 1 else: next_no = 1 return self.log_dir / f"{today}-{next_no:04d}.log" def on_browse_cursor(self): """浏览Cursor路径""" from PySide6.QtWidgets import QFileDialog if sys.platform == "win32": file_path, _ = QFileDialog.getOpenFileName( self, "选择Cursor.exe", "", "可执行文件 (*.exe)" ) elif sys.platform == "darwin": file_path = QFileDialog.getExistingDirectory(self, "选择Cursor.app") else: file_path, _ = QFileDialog.getOpenFileName(self, "选择Cursor") if file_path: if self.txtCursorPath: self.txtCursorPath.setText(file_path) self.cursor_path = file_path def on_auto_config_cursor(self): """自动查找并填充 Cursor 安装路径。""" cursor_path = get_default_cursor_path() if cursor_path and Path(cursor_path).exists(): if self.txtCursorPath: self.txtCursorPath.setText(cursor_path) self.cursor_path = cursor_path self.log(f"✅ 已自动查找 Cursor 路径: {cursor_path}") return self.log("❌ 自动查找失败:未找到 Cursor 安装目录,请手动浏览选择。") QMessageBox.warning(self, "提示", "未自动找到 Cursor 安装目录,请点击“浏览”手动选择。") def _launch_cursor(self, cursor_path: str) -> bool: """按当前平台启动 Cursor。""" try: if sys.platform == "win32": os.startfile(cursor_path) elif sys.platform == "darwin": subprocess.Popen(["open", cursor_path]) else: subprocess.Popen([cursor_path]) self.log("✅ Cursor已启动") return True except Exception as e: self.log(f"❌ 打开Cursor失败: {str(e)}") QMessageBox.warning(self, "警告", f"打开Cursor失败: {str(e)}") return False def _resolve_cursor_path_or_prompt(self) -> str: """ 返回可用的 Cursor 路径;当未配置/无效时,提示用户手动配置或自动查找。 """ cursor_path = self.txtCursorPath.text().strip() if self.txtCursorPath else "" if cursor_path and Path(cursor_path).exists(): return cursor_path msg_box = QMessageBox(self) msg_box.setWindowTitle("Cursor路径未配置") msg_box.setText("未检测到有效的 Cursor 路径,请先配置。") msg_box.setIcon(QMessageBox.Warning) btn_manual = msg_box.addButton("手动配置", QMessageBox.ActionRole) btn_auto = msg_box.addButton("自动查找", QMessageBox.ActionRole) btn_cancel = msg_box.addButton("取消", QMessageBox.RejectRole) msg_box.setDefaultButton(btn_auto) msg_box.exec() clicked = msg_box.clickedButton() if clicked == btn_manual: self.on_browse_cursor() elif clicked == btn_auto: self.on_auto_config_cursor() else: return "" cursor_path = self.txtCursorPath.text().strip() if self.txtCursorPath else "" if cursor_path and Path(cursor_path).exists(): return cursor_path return "" def on_open_cursor_clicked(self): """点击“打开Cursor”按钮:优先使用已配置路径,否则引导配置。""" cursor_path = self._resolve_cursor_path_or_prompt() if not cursor_path: return self.log("🚀 正在打开Cursor...") self._launch_cursor(cursor_path) def on_open_online_shop_clicked(self): """打开在线商城页面。""" shop_url = "https://shop.yunzer.cn/" try: opened = webbrowser.open(shop_url) if opened: self.log(f"🌐 已打开在线商城: {shop_url}") else: self.log(f"⚠️ 未能自动打开浏览器,请手动访问: {shop_url}") QMessageBox.warning(self, "提示", f"无法自动打开浏览器,请手动访问:\n{shop_url}") except Exception as e: self.log(f"❌ 打开在线商城失败: {str(e)}") QMessageBox.warning(self, "警告", f"打开在线商城失败: {str(e)}") def on_open_ai_relay_clicked(self): """打开 AI 中转站页面。""" relay_url = "https://api.yunzer.com.cn/" try: opened = webbrowser.open(relay_url) if opened: self.log(f"🌐 已打开AI中转站: {relay_url}") else: self.log(f"⚠️ 未能自动打开浏览器,请手动访问: {relay_url}") QMessageBox.warning(self, "提示", f"无法自动打开浏览器,请手动访问:\n{relay_url}") except Exception as e: self.log(f"❌ 打开AI中转站失败: {str(e)}") QMessageBox.warning(self, "警告", f"打开AI中转站失败: {str(e)}") def on_copy_device_id_clicked(self): """复制一键续杯设备号。""" device_id = self.txtDeviceId.text().strip() if self.txtDeviceId else "" if not device_id: QMessageBox.warning(self, "提示", "设备号为空,无法复制。") return QApplication.clipboard().setText(device_id) self.log("✅ 设备号已复制到剪贴板") QMessageBox.information(self, "成功", "设备号已复制到剪贴板。") def on_activate_renewal_clicked(self): """提交一键续杯激活码。""" activation_code = self.txtActivationCode.text().strip() if self.txtActivationCode else "" device_id = self.txtDeviceId.text().strip() if self.txtDeviceId else "" if not activation_code: QMessageBox.warning(self, "提示", "请输入激活码。") return self.log(f"🔑 正在激活,设备号:{device_id},激活码:{activation_code}") if self.btnActivateRenewal: self.btnActivateRenewal.setEnabled(False) self.btnActivateRenewal.setText("🔄 激活中...") self._activate_thread = ActivateCodeThread(activation_code) self._activate_thread.finished_signal.connect(self.on_activate_finished) self._activate_thread.start() def on_activate_finished(self, success, data, error_msg): if self.btnActivateRenewal: self.btnActivateRenewal.setEnabled(True) self.btnActivateRenewal.setText("🔑 提交激活码") if success: self.log("✅ 激活成功!已成功同步有效期及状态") status = data.get("status", 1) self.member_status = status status_text = "已激活" if status == 1 else "已过期" if self.lblMemberStatus: self.lblMemberStatus.setText(f"当前状态:{status_text}") if self.lblMemberLevel: self.lblMemberLevel.setText(status_text) act_time = data.get("activatedAt") or data.get("activationAt") or "-" exp_time = data.get("expireTime") or data.get("expiredAt") or "-" def format_time_str(t_str): if not t_str or t_str == "-": return "-" t_str = str(t_str).replace("T", " ") if "+" in t_str: t_str = t_str.split("+")[0] if "Z" in t_str: t_str = t_str.replace("Z", "") if "." in t_str: t_str = t_str.split(".")[0] return t_str.strip() if self.lblActivatedAt: self.lblActivatedAt.setText(format_time_str(act_time)) if self.lblExpiredAt: self.lblExpiredAt.setText(format_time_str(exp_time)) duration = data.get("durationDays", 0) self.log(f"🎉 激活成功!增加天数:{duration} 天,到期时间:{format_time_str(exp_time)}") QMessageBox.information( self, "激活成功", f"激活码校验通过!\n绑定机器码成功,已续期 {duration} 天。\n到期时间:{format_time_str(exp_time)}" ) else: self.log(f"❌ 激活失败: {error_msg}") QMessageBox.critical(self, "激活失败", f"无法激活设备:{error_msg}") def _reset_member_status_display(self): """重置会员状态展示。""" if self.lblMemberStatus: self.lblMemberStatus.setText("当前状态:未刷新") if self.lblMemberLevel: self.lblMemberLevel.setText("-") if self.lblAccountType: self.lblAccountType.setText("-") if self.lblActivatedAt: self.lblActivatedAt.setText("-") if self.lblExpiredAt: self.lblExpiredAt.setText("-") def start_equipment_report(self, bind_account=None, show_popup=False): """开始上报设备信息""" self._report_thread = ReportEquipmentThread(bind_account) self._report_thread.finished_signal.connect( lambda success, data, error_msg: self.on_equipment_report_finished(success, data, error_msg, show_popup) ) self._report_thread.start() def on_equipment_report_finished(self, success, data, error_msg, show_popup=False): if success: # self.log("✅ 设备数据已成功上报到平台") status = data.get("status", 0) self.member_status = status status_text = "未激活" if status == 1: status_text = "已激活" elif status == 2: status_text = "已过期" elif status == 3: status_text = "已禁用" if self.lblMemberStatus: self.lblMemberStatus.setText(f"当前状态:{status_text}") if self.lblMemberLevel: self.lblMemberLevel.setText(status_text) act_time = data.get("activationTime") or data.get("activation_time") or "-" exp_time = data.get("expireTime") or data.get("expire_time") or "-" def format_time_str(t_str): if not t_str or t_str == "-": return "-" t_str = str(t_str).replace("T", " ") if "+" in t_str: t_str = t_str.split("+")[0] if "Z" in t_str: t_str = t_str.replace("Z", "") if "." in t_str: t_str = t_str.split(".")[0] return t_str.strip() if self.lblActivatedAt: self.lblActivatedAt.setText(format_time_str(act_time)) if self.lblExpiredAt: self.lblExpiredAt.setText(format_time_str(exp_time)) owner_name = data.get("ownerUserName") or data.get("owner_user_name") if owner_name: if self.lblAccountType: self.lblAccountType.setText(f"绑定用户:{owner_name}") else: if self.lblAccountType: self.lblAccountType.setText("普通设备") if show_popup: QMessageBox.information(self, "刷新成功", f"会员状态已同步!当前状态:{status_text}") else: self.log(f"⚠️ 设备数据同步失败: {error_msg}") if show_popup: QMessageBox.warning(self, "刷新失败", f"无法同步设备状态:{error_msg}") def on_refresh_member_status_clicked(self, show_popup=False): """刷新会员状态展示。""" device_id = self.txtDeviceId.text().strip() if self.txtDeviceId else "" if not device_id: if show_popup: QMessageBox.warning(self, "提示", "设备号为空,无法刷新会员状态。") return if self.lblMemberStatus: self.lblMemberStatus.setText("当前状态:正在同步平台...") self.log("🔄 正在从平台同步设备与会员状态...") self.start_equipment_report(show_popup=show_popup) def on_one_click_renewal_clicked(self): """执行一键续杯。""" # 1. 检测是否激活 is_active = False if hasattr(self, 'member_status') and self.member_status == 1: is_active = True elif self.lblMemberStatus and "已激活" in self.lblMemberStatus.text(): is_active = True if not is_active: QMessageBox.warning(self, "提示", "本设备未激活,请输入激活码激活。") self.log("⚠️ 续杯失败:本设备未激活,请输入激活码激活。") return # 2. 检测 Cursor 是否正在运行 if is_cursor_running(): msg_box = QMessageBox(self) msg_box.setWindowTitle("Cursor正在运行") msg_box.setText("Cursor正在运行中!\n请先保存代码并手动关闭Cursor。") msg_box.setIcon(QMessageBox.Warning) btn_close = msg_box.addButton("💀 强制关闭", QMessageBox.ActionRole) btn_cancel = msg_box.addButton("取消", QMessageBox.RejectRole) msg_box.setDefaultButton(btn_cancel) msg_box.exec_() if msg_box.clickedButton() == btn_close: self.log("💀 正在强制关闭Cursor...") if kill_cursor(): self.log("✅ Cursor已关闭") else: self.log("⚠️ 未找到运行中的Cursor进程") QMessageBox.warning(self, "警告", "未找到运行中的Cursor进程") return else: return # 3. 检查 Cursor 路径 cursor_path = self.txtCursorPath.text().strip() if self.txtCursorPath else "" if not cursor_path or not Path(cursor_path).exists(): QMessageBox.warning(self, "警告", "请设置正确的Cursor路径!") return # 4. 再次确认续杯 confirm = QMessageBox.question( self, "确认一键续杯", "请确认当前额度已经用完后再续杯。\n\n" "如发现大量未使用完就续杯情况,一律封号处理,请悉知!\n\n" "确定要继续执行一键续杯吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No, ) if confirm != QMessageBox.Yes: self.log("ℹ️ 已取消一键续杯。") return # 5. 禁用按钮并启动续杯换号后台线程 if self.btnOneClickRenewal: self.btnOneClickRenewal.setEnabled(False) self.btnOneClickRenewal.setText("🔄 续杯中...") self._renewal_thread = OneClickRenewalThread() self._renewal_thread.log_signal.connect(self.log) self._renewal_thread.finished_signal.connect(self.on_one_click_renewal_finished) self._renewal_thread.start() @Slot(bool, str, str) def on_one_click_renewal_finished(self, success, message, new_email): if self.btnOneClickRenewal: self.btnOneClickRenewal.setEnabled(True) self.btnOneClickRenewal.setText("一键续杯") if success: self.backup_path = message self.log("✅ 一键续杯完成") # 上报新换号绑定的设备信息(使用生成的新 email) self.start_equipment_report(bind_account=new_email) self._show_change_success_countdown_dialog(4) self.log("🚀 正在打开Cursor...") cursor_path = self.txtCursorPath.text().strip() if self.txtCursorPath else "" self._launch_cursor(cursor_path) else: QMessageBox.critical(self, "失败", message) def _extract_session_token(self, raw_value: str) -> str: """从输入文本中提取 SessionToken,兼容纯 token / Cookie 串。""" s = (raw_value or "").strip().strip('"').strip("'") if not s: return "" m = re.search(r"SessionToken\s*=\s*([^;\s]+)", s, flags=re.IGNORECASE) if m: return m.group(1).strip().strip('"').strip("'") if ";" in s: first = s.split(";", 1)[0].strip() if "=" in first: k, v = first.split("=", 1) if k.strip().lower() in ("workoscursorsessiontoken", "sessiontoken"): return v.strip().strip('"').strip("'") return s.replace("SessionToken=", "").strip() def _show_change_success_countdown_dialog(self, seconds: int = 4): """换号成功后显示倒计时提示框,倒计时结束自动关闭。""" dialog = QDialog(self) dialog.setWindowTitle("提示") dialog.setModal(True) dialog.setFixedSize(420, 160) layout = QVBoxLayout(dialog) tip_label = QLabel("换号成功咯!请及时去确认收货哦!万分感谢!", dialog) tip_label.setWordWrap(True) tip_label.setAlignment(Qt.AlignCenter) layout.addStretch() layout.addWidget(tip_label) countdown_label = QLabel("", dialog) countdown_label.setAlignment(Qt.AlignCenter) layout.addWidget(countdown_label) layout.addStretch() remain = {"value": max(1, int(seconds))} def refresh_text(): countdown_label.setText(f"{remain['value']} 秒后自动关闭") refresh_text() timer = QTimer(dialog) def on_timeout(): remain["value"] -= 1 if remain["value"] <= 0: timer.stop() dialog.accept() return refresh_text() timer.timeout.connect(on_timeout) timer.start(1000) dialog.exec() def on_change_clicked(self): raw_token = self.txtToken.toPlainText().strip() if self.txtToken else "" token = self._extract_session_token(raw_token) cursor_path = self.txtCursorPath.text().strip() if self.txtCursorPath else "" if not token: QMessageBox.warning(self, "警告", "请先粘贴Token!") return if len(token) < 20: QMessageBox.warning(self, "警告", "Token格式可能不正确!") return if not cursor_path or not Path(cursor_path).exists(): QMessageBox.warning(self, "警告", "请设置正确的Cursor路径!") return # 检测Cursor是否正在运行 if is_cursor_running(): msg_box = QMessageBox(self) msg_box.setWindowTitle("Cursor正在运行") msg_box.setText("Cursor正在运行中!\n请先保存代码并手动关闭Cursor。") msg_box.setIcon(QMessageBox.Warning) btn_close = msg_box.addButton("💀 强制关闭", QMessageBox.ActionRole) btn_cancel = msg_box.addButton("取消", QMessageBox.RejectRole) msg_box.setDefaultButton(btn_cancel) msg_box.exec_() if msg_box.clickedButton() == btn_close: self.log("💀 正在强制关闭Cursor...") if kill_cursor(): self.log("✅ Cursor已关闭") else: self.log("⚠️ 未找到运行中的Cursor进程") QMessageBox.warning(self, "警告", "未找到运行中的Cursor进程") return else: return # 自动生成新邮箱 new_email = generate_random_email() self.log(f"📧 生成新邮箱: {new_email}") # 禁用按钮 if self.btnChange: self.btnChange.setEnabled(False) self.btnChange.setText("🔄 处理中...") # 启动后台线程 self.thread = ChangeTokenThread(token, new_email) self.thread.log_signal.connect(self.log) self.thread.finished_signal.connect(self.on_change_finished) self.thread.start() def on_donate_clicked(self): """打开捐赠对话框""" dialog = DonateDialog(self) dialog.exec() def on_about_clicked(self): """显示关于软件信息。""" QMessageBox.about( self, "关于软件", f"牛马Cursor登录器\n\n" f"当前版本:{__VERSION__}\n\n" "用于 Cursor 账号登录、配置管理和常用工具操作。\n\n" "QQ群:720797421" ) def on_usage_guide_clicked(self): """打开使用说明图片(非模态,可与其他窗口并行)。""" if self._usage_guide_dialog and self._usage_guide_dialog.isVisible(): self._usage_guide_dialog.raise_() self._usage_guide_dialog.activateWindow() return image_path = get_resource_path(os.path.join("assets", "images", "donate", "info.png")) if not Path(image_path).is_file(): QMessageBox.warning(self, "提示", f"未找到使用说明图片:{image_path}") return dialog = QDialog(self) dialog.setWindowTitle("使用说明") dialog.setMinimumSize(900, 700) dialog.setModal(False) dialog.setWindowModality(Qt.NonModal) layout = QVBoxLayout(dialog) tip = QLabel("可使用鼠标滚轮查看长图内容,可通过按钮缩放图片。") tip.setAlignment(Qt.AlignCenter) layout.addWidget(tip) zoom_row = QHBoxLayout() zoom_row.addStretch() btn_zoom_out = QPushButton("缩小 -", dialog) btn_zoom_reset = QPushButton("100%", dialog) btn_zoom_in = QPushButton("放大 +", dialog) zoom_label = QLabel("100%", dialog) zoom_row.addWidget(btn_zoom_out) zoom_row.addWidget(btn_zoom_reset) zoom_row.addWidget(btn_zoom_in) zoom_row.addWidget(zoom_label) layout.addLayout(zoom_row) scroll = QScrollArea(dialog) scroll.setWidgetResizable(True) label = QLabel(scroll) original_pixmap = QPixmap(image_path) zoom_state = {"scale": 1.0} def apply_zoom(): if original_pixmap.isNull(): return target_w = max(1, int(original_pixmap.width() * zoom_state["scale"])) target_h = max(1, int(original_pixmap.height() * zoom_state["scale"])) scaled = original_pixmap.scaled( target_w, target_h, Qt.KeepAspectRatio, Qt.SmoothTransformation, ) label.setPixmap(scaled) label.resize(scaled.size()) zoom_label.setText(f"{int(zoom_state['scale'] * 100)}%") def zoom_in(): zoom_state["scale"] = min(3.0, zoom_state["scale"] * 1.2) apply_zoom() def zoom_out(): zoom_state["scale"] = max(0.2, zoom_state["scale"] / 1.2) apply_zoom() def zoom_reset(): zoom_state["scale"] = 1.0 apply_zoom() btn_zoom_in.clicked.connect(zoom_in) btn_zoom_out.clicked.connect(zoom_out) btn_zoom_reset.clicked.connect(zoom_reset) apply_zoom() label.setAlignment(Qt.AlignTop | Qt.AlignHCenter) scroll.setWidget(label) layout.addWidget(scroll, 1) btn_close = QPushButton("关闭", dialog) btn_close.clicked.connect(dialog.accept) btn_row = QHBoxLayout() btn_row.addStretch() btn_row.addWidget(btn_close) layout.addLayout(btn_row) self._usage_guide_dialog = dialog dialog.show() def on_emergency_repair_clicked(self): """应急检修:弹出工具面板。""" if self._emergency_dialog and self._emergency_dialog.isVisible(): self._emergency_dialog.raise_() self._emergency_dialog.activateWindow() return dialog = QDialog(self) dialog.setWindowTitle("应急检修") dialog.setMinimumSize(400, 300) dialog.setModal(False) dialog.setWindowModality(Qt.NonModal) layout = QVBoxLayout(dialog) layout.addWidget(QLabel("请选择要执行的操作:")) actions_widget = QWidget(dialog) actions_layout = QVBoxLayout(actions_widget) actions_layout.setContentsMargins(0, 0, 0, 0) actions_layout.setSpacing(10) actions_layout.setAlignment(Qt.AlignTop | Qt.AlignLeft) btn_download = QPushButton("下载DB Browser") btn_clear_cache = QPushButton("清除Cursor缓存") btn_extract_token = QPushButton("Token提取") for btn in ( btn_download, btn_clear_cache, btn_extract_token, ): btn.setMinimumWidth(180) btn.setMaximumWidth(260) btn.setSizePolicy(btn.sizePolicy().horizontalPolicy(), btn.sizePolicy().verticalPolicy()) actions_layout.addWidget(btn, 0, Qt.AlignLeft) layout.addWidget(actions_widget) layout.addStretch() btn_download.clicked.connect(self.download_db_tool) btn_clear_cache.clicked.connect(self.clear_cursor_cache) btn_extract_token.clicked.connect(self.on_token_extract_clicked) self._emergency_dialog = dialog dialog.show() def on_token_extract_clicked(self): """密码通过后打开 Token 提取窗口。""" pwd, ok = QInputDialog.getText( self, "Token提取", "请输入密码:", QLineEdit.Password, ) if not ok: return if pwd != "920103": self.log("❌ Token提取密码错误") QMessageBox.warning(self, "提示", "密码错误,无法使用 Token 提取。") return if self._token_extract_dialog and self._token_extract_dialog.isVisible(): self._token_extract_dialog.raise_() self._token_extract_dialog.activateWindow() return dialog = QDialog(self) dialog.setWindowTitle("Token提取") dialog.setMinimumSize(700, 420) dialog.setModal(False) dialog.setWindowModality(Qt.NonModal) layout = QVBoxLayout(dialog) layout.addWidget(QLabel("Token:")) token_display = QTextEdit(dialog) token_display.setReadOnly(True) token_display.setPlaceholderText("点击“读取Token”后将在此显示。") layout.addWidget(token_display, 1) btn_row = QHBoxLayout() btn_read = QPushButton("读取Token", dialog) btn_save = QPushButton("另存桌面", dialog) btn_close = QPushButton("关闭", dialog) btn_row.addWidget(btn_read) btn_row.addWidget(btn_save) btn_row.addStretch() btn_row.addWidget(btn_close) layout.addLayout(btn_row) self._token_extract_dialog = dialog self._token_display = token_display btn_read.clicked.connect(self.read_cursor_token_for_dialog) btn_save.clicked.connect(self.save_extracted_token_to_desktop) btn_close.clicked.connect(dialog.close) dialog.show() def _read_current_cursor_token(self) -> str: """从 Cursor 的 storage.json 读取当前 token。""" config_dir = get_cursor_config_path() storage_file = config_dir / "storage.json" if not storage_file.exists(): return "" try: with open(storage_file, "r", encoding="utf-8") as f: content = f.read() data = json.loads(content) if content.strip() else {} except Exception: return "" token = ( data.get("cursorAuth", {}).get("accessToken") or data.get("cursorAuth", {}).get("refreshToken") or data.get("cursorAccount", {}).get("token") or "" ) return str(token).strip() def read_cursor_token_for_dialog(self): token = self._read_current_cursor_token() if not token: self._current_extracted_token = "" if self._token_display: self._token_display.setPlainText("") self.log("⚠️ 未读取到 Token") QMessageBox.warning(self, "提示", "未读取到 Token,请确认 Cursor 配置是否存在。") return self._current_extracted_token = token if self._token_display: self._token_display.setPlainText(token) self.log("✅ 已读取当前 Token") def save_extracted_token_to_desktop(self): token = self._current_extracted_token if not token: token = self._read_current_cursor_token() if token: self._current_extracted_token = token if self._token_display: self._token_display.setPlainText(token) if not token: QMessageBox.warning(self, "提示", "没有可保存的 Token,请先点击“读取Token”。") return desktop = Path.home() / "Desktop" save_path = desktop / "token.txt" try: with open(save_path, "w", encoding="utf-8") as f: f.write(token) self.log(f"✅ Token 已保存到桌面: {save_path.name}") QMessageBox.information(self, "成功", "Token 已覆盖保存到桌面 token.txt") except OSError as e: self.log(f"❌ Token 保存失败: {e}") QMessageBox.warning(self, "失败", f"保存失败:{e}") def download_db_tool(self): """下载 DB Browser 到桌面。""" import urllib.request import threading url = "http://7colud.yunzer.cn/software/db%20browser%20for%20sqlite.zip" desktop = Path.home() / "Desktop" save_path = desktop / "db browser for sqlite.zip" def download(): try: self.log("🔧 正在下载检修工具...") urllib.request.urlretrieve(url, str(save_path)) self.log(f"✅ 下载完成,已保存到桌面:{save_path.name}") except Exception as e: self.log(f"❌ 下载失败: {e}") threading.Thread(target=download, daemon=True).start() def clear_cursor_cache(self): """清除 %APPDATA%\\Cursor 目录。""" if is_cursor_running(): msg_box = QMessageBox(self) msg_box.setWindowTitle("Cursor正在运行") msg_box.setText("检测到 Cursor 正在运行。\n请先关闭 Cursor 后再清除缓存。") msg_box.setIcon(QMessageBox.Warning) btn_force_close = msg_box.addButton("💀 强制关闭", QMessageBox.ActionRole) btn_cancel = msg_box.addButton("取消", QMessageBox.RejectRole) msg_box.setDefaultButton(btn_cancel) msg_box.exec() if msg_box.clickedButton() == btn_force_close: self.log("💀 正在强制关闭Cursor...") if kill_cursor(): self.log("✅ Cursor已关闭") else: self.log("⚠️ 未找到运行中的Cursor进程") QMessageBox.warning(self, "警告", "未找到运行中的Cursor进程") return else: self.log("ℹ️ 已取消清除缓存。") return cursor_dir = Path(os.environ.get("APPDATA", "")) / "Cursor" if not cursor_dir.exists(): self.log("ℹ️ 未找到 Cursor 缓存目录,无需清理。") QMessageBox.information(self, "提示", "未找到 Cursor 缓存目录,无需清理。") return try: shutil.rmtree(cursor_dir) self.log("✅ Cursor 缓存清除成功") QMessageBox.information(self, "完成", "Cursor 缓存已清除成功。") return except PermissionError: self.log("⚠️ 权限不足,尝试请求管理员权限删除缓存...") except Exception as e: self.log(f"❌ 删除缓存失败: {e}") QMessageBox.warning(self, "失败", f"删除缓存失败:{e}") return if sys.platform != "win32": QMessageBox.warning( self, "失败", "权限不足,无法删除缓存目录。请手动使用管理员权限删除。", ) return # 提权删除:弹 UAC 运行 PowerShell 删除 APPDATA 下的 Cursor 目录。 ps_cmd = ( "Remove-Item -LiteralPath \"$env:APPDATA\\Cursor\" -Recurse -Force " "-ErrorAction Stop" ) rc = ctypes.windll.shell32.ShellExecuteW( None, "runas", "powershell.exe", f"-NoProfile -ExecutionPolicy Bypass -Command \"{ps_cmd}\"", None, 1, ) if rc <= 32: self.log("❌ 提权删除未启动,请手动以管理员身份操作。") QMessageBox.warning( self, "权限不足", "无法自动提权。\n\n" "请手动执行:\n" "1. 关闭 Cursor\n" "2. 右键 PowerShell 选择“以管理员身份运行”\n" "3. 执行命令:Remove-Item -LiteralPath \"$env:APPDATA\\Cursor\" -Recurse -Force", ) return self.log("ℹ️ 已发起管理员删除请求,请在 UAC 窗口中确认。") QMessageBox.information( self, "已请求管理员权限", "已发起管理员权限删除请求。\n请在弹出的 UAC 窗口中确认后完成清理。", ) @Slot(str) def _show_usage_result(self, msg): QMessageBox.information(self, "额度查询结果", msg) def closeEvent(self, event): super().closeEvent(event) @Slot(bool, str) def on_change_finished(self, success, message): if self.btnChange: self.btnChange.setEnabled(True) self.btnChange.setText("🚀 开始换号") if success: self.backup_path = message self.log("✅ 换号完成") # 上报新换号绑定的设备信息(使用生成的新 email) self.start_equipment_report(bind_account=self.thread.new_email) self._show_change_success_countdown_dialog(4) self.log("🚀 正在打开Cursor...") cursor_path = self.txtCursorPath.text().strip() if self.txtCursorPath else "" self._launch_cursor(cursor_path) else: QMessageBox.critical(self, "失败", message) @Slot(bool, str, int) def on_silent_detect_finished(self, success, message, next_id): if self.btnSilentChange: self.btnSilentChange.setEnabled(True) self.btnSilentChange.setText("无感换号") if success: # 记录当前成功检测到的 ID 和 Token,并更新上方显示 self.current_detect_id = getattr(self.detect_thread, "target_id", "") self.current_detect_token = getattr(self.detect_thread, "new_token", "") if self.lblCurrentDetectId: self.lblCurrentDetectId.setText(f"当前检测 ID:{self.current_detect_id}") if self.lblCurrentDetectToken: self.lblCurrentDetectToken.setText(f"当前 Token:{self.current_detect_token}") self.log(f"🚀 换号成功 (ID={self.current_detect_id})") self.log(f"🔑 提取的 Token: {self.current_detect_token}") self.log("🚀 正在为您启动 Cursor...") self.on_open_cursor_clicked() QMessageBox.information(self, "成功", "无感检测换号成功!\n已自动为您启动 Cursor。") else: QMessageBox.critical(self, "错误", message) def on_silent_detect_clicked(self): """执行无感检测换号。""" id_str = self.txtSilentToken.text().strip() if self.txtSilentToken else "" if not id_str: QMessageBox.warning(self, "警告", "请先输入要检测的 ID!") return if not id_str.isdigit(): QMessageBox.warning(self, "警告", "ID 必须为纯数字!") return if is_cursor_running(): msg_box = QMessageBox(self) msg_box.setWindowTitle("Cursor正在运行") msg_box.setText("检测到 Cursor 正在运行!\n由于更新数据库需要独占锁,请先关闭 Cursor。") msg_box.setIcon(QMessageBox.Warning) btn_close = msg_box.addButton("💀 强制关闭并继续", QMessageBox.ActionRole) btn_cancel = msg_box.addButton("取消", QMessageBox.RejectRole) msg_box.setDefaultButton(btn_cancel) msg_box.exec_() if msg_box.clickedButton() == btn_close: self.log("💀 正在强制关闭Cursor...") if kill_cursor(): self.log("✅ Cursor已关闭") else: self.log("⚠️ 未找到运行中的Cursor进程") QMessageBox.warning(self, "警告", "未找到运行中的Cursor进程") return else: return if self.btnSilentChange: self.btnSilentChange.setEnabled(False) self.btnSilentChange.setText("🔍 检测换号中...") self.detect_thread = SilentDetectThread(id_str) self.detect_thread.log_signal.connect(self.log) self.detect_thread.finished_signal.connect(self.on_silent_detect_finished) self.detect_thread.start() @Slot() def on_silent_unavailable_clicked(self): """用户反馈当前 Token 不可用占位逻辑。""" if not getattr(self, "current_detect_id", None): QMessageBox.warning(self, "警告", "请先成功换号/检测一个 ID!") return reply = QMessageBox.question( self, "反馈 Token 不可用", f"确定要反馈当前 ID: {self.current_detect_id} 对应的 Token 为不可用吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.Yes: self.log(f"⚠️ 用户已点击不可用按钮反馈 ID: {self.current_detect_id}。") QMessageBox.information(self, "提示", "已收到反馈,不可用接口待后端实现。") @Slot() def on_silent_copy_token_clicked(self): """复制当前检测到的 Token 到剪贴板。""" if not getattr(self, "current_detect_token", None): QMessageBox.warning(self, "警告", "当前没有可复制的 Token,请先成功换号/检测一个 ID!") return clipboard = QApplication.clipboard() clipboard.setText(self.current_detect_token) self.log("📋 已复制当前检测到的 Token 到剪贴板。") QMessageBox.information(self, "成功", "Token 已成功复制到剪贴板!") def main(): app = QApplication(sys.argv) app_icon = get_app_icon() if not app_icon.isNull(): app.setWindowIcon(app_icon) splash = _create_startup_splash(app) if not app_icon.isNull(): splash.setWindowIcon(app_icon) splash.show() app.processEvents() window = MainWindow(splash=splash) if not app_icon.isNull(): window.setWindowIcon(app_icon) window.show() sys.exit(app.exec()) if __name__ == "__main__": main()