import sqlite3 import os import shutil # 数据库放到 Windows 常见的 APPDATA 目录(避免写在项目目录) APP_NAME = "CleanDesktopOrganizer" _SCHEMA_VERSION_KEY = "schema_version" LATEST_SCHEMA_VERSION = 1 _PROJECT_DB_PATH = os.path.join( os.path.dirname(os.path.dirname(__file__)), "data.db" ) _BASE_DIR = os.environ.get("APPDATA") or os.environ.get("LOCALAPPDATA") or "" DB_DIR = os.path.join(_BASE_DIR, APP_NAME) if _BASE_DIR else None DB_PATH = os.path.join(DB_DIR, "data.db") if DB_DIR else _PROJECT_DB_PATH def _ensure_db_dir(): if DB_DIR: os.makedirs(DB_DIR, exist_ok=True) def _migrate_old_db_if_needed(): """ 首次升级:把旧版项目目录下的 data.db 自动拷贝到 APPDATA, 避免你之前分组/设置丢失。 """ try: if os.path.isfile(_PROJECT_DB_PATH) and not os.path.isfile(DB_PATH): _ensure_db_dir() shutil.copy2(_PROJECT_DB_PATH, DB_PATH) except Exception: # 迁移失败不影响程序运行(会在新目录重新创建库) pass def get_conn(): _ensure_db_dir() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: conn.execute("PRAGMA foreign_keys=ON") except Exception: pass return conn def _get_schema_version(conn: sqlite3.Connection) -> int: try: row = conn.execute( "SELECT value FROM settings WHERE key=?", (_SCHEMA_VERSION_KEY,), ).fetchone() if not row: return 0 return int(str(row[0]).strip() or "0") except Exception: return 0 def _set_schema_version(conn: sqlite3.Connection, v: int): conn.execute( "INSERT INTO settings (key, value) VALUES (?,?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (_SCHEMA_VERSION_KEY, str(int(v))), ) def _table_has_column(conn: sqlite3.Connection, table: str, col: str) -> bool: try: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() return any((r[1] if isinstance(r, (tuple, list)) else r["name"]) == col for r in rows) except Exception: return False def _apply_migrations(conn: sqlite3.Connection): """ 版本化迁移(schema migration): - 允许从任意旧版本升级到最新 - 每一步尽量幂等,并在事务中执行 """ def m1(): # 基础表 conn.execute( """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, position INTEGER DEFAULT 0, folder_path TEXT DEFAULT '' ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, group_id INTEGER NOT NULL, name TEXT NOT NULL, path TEXT NOT NULL, icon_path TEXT, position INTEGER DEFAULT 0, FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE ) """ ) # 兼容更老的库:补字段 if not _table_has_column(conn, "groups", "folder_path"): conn.execute("ALTER TABLE groups ADD COLUMN folder_path TEXT DEFAULT ''") migrations = { 1: m1, } cur = _get_schema_version(conn) # 未设置版本但已有表:视为 0,走迁移补齐并写入版本号 while cur < LATEST_SCHEMA_VERSION: nxt = cur + 1 fn = migrations.get(nxt) if fn is None: raise RuntimeError(f"missing migration for version {nxt}") with conn: fn() _set_schema_version(conn, nxt) cur = nxt def init_db(): _migrate_old_db_if_needed() conn = get_conn() # 先确保 settings 表存在,才能读写 schema_version with conn: conn.execute( """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """ ) _apply_migrations(conn) # 默认分组(数据迁移/初始化) with conn: row = conn.execute("SELECT COUNT(*) FROM groups").fetchone() cnt = int(row[0] if row else 0) if cnt == 0: conn.execute("INSERT INTO groups (name, position) VALUES ('常用程序', 0)") conn.close() # 非结构类迁移(数据清洗/一次性转换) _migrate_item_shortcut_paths_to_targets() def _migrate_item_shortcut_paths_to_targets(): """将旧数据中仍保存为 .lnk 的路径改存为快捷方式目标(若可解析)。仅执行一次。""" conn = get_conn() row = conn.execute( "SELECT value FROM settings WHERE key='items_lnk_targets_migrated'" ).fetchone() conn.close() if row and row["value"] == "1": return from shortcut_target import path_for_storage conn = get_conn() rows = conn.execute("SELECT id, path FROM items").fetchall() for r in rows: iid, p = r["id"], (r["path"] or "") pl = p.lower() if not pl.endswith(".lnk"): continue try: if not os.path.isfile(p): continue except OSError: continue new_p = path_for_storage(p) if new_p != p: conn.execute("UPDATE items SET path=? WHERE id=?", (new_p, iid)) conn.execute( "INSERT INTO settings (key, value) VALUES (?,?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", ("items_lnk_targets_migrated", "1"), ) conn.commit() conn.close() # ── Settings ───────────────────────────────────────────── def get_setting(key: str, default: str = "") -> str: conn = get_conn() row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() conn.close() return row["value"] if row else default def set_setting(key: str, value: str): conn = get_conn() conn.execute( "INSERT INTO settings (key, value) VALUES (?,?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, value) ) conn.commit() conn.close() # ── Groups ────────────────────────────────────────────── def get_groups(): conn = get_conn() rows = conn.execute("SELECT * FROM groups ORDER BY position").fetchall() conn.close() return [dict(r) for r in rows] def add_group(name, folder_path=""): conn = get_conn() conn.execute("INSERT INTO groups (name, folder_path) VALUES (?,?)", (name, folder_path)) conn.commit() gid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.close() return gid def rename_group(gid, name): conn = get_conn() conn.execute("UPDATE groups SET name=? WHERE id=?", (name, gid)) conn.commit() conn.close() def delete_group(gid): conn = get_conn() conn.execute("DELETE FROM groups WHERE id=?", (gid,)) conn.commit() conn.close() def reorder_groups(id_list): conn = get_conn() for pos, gid in enumerate(id_list): conn.execute("UPDATE groups SET position=? WHERE id=?", (pos, gid)) conn.commit() conn.close() # ── Items ──────────────────────────────────────────────── def get_items(group_id): conn = get_conn() rows = conn.execute( "SELECT * FROM items WHERE group_id=? ORDER BY position", (group_id,) ).fetchall() conn.close() return [dict(r) for r in rows] def add_item(group_id, name, path, icon_path=None): from shortcut_target import path_for_storage path = path_for_storage(path) conn = get_conn() conn.execute( "INSERT INTO items (group_id, name, path, icon_path) VALUES (?,?,?,?)", (group_id, name, path, icon_path), ) conn.commit() iid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] conn.close() return iid def delete_item(item_id): conn = get_conn() conn.execute("DELETE FROM items WHERE id=?", (item_id,)) conn.commit() conn.close() def move_item(item_id, new_group_id, new_position): conn = get_conn() conn.execute( "UPDATE items SET group_id=?, position=? WHERE id=?", (new_group_id, new_position, item_id), ) conn.commit() conn.close() def reorder_items(group_id, id_list): conn = get_conn() for pos, iid in enumerate(id_list): conn.execute("UPDATE items SET position=? WHERE id=?", (pos, iid)) conn.commit() conn.close() def reset_groups_and_items(recreate_default_group: bool = True): """ 清空所有分组与程序(items)。 - 为避免外键级联受 SQLite 外键开关影响,这里显式先删 items 再删 groups。 - 可选地重新创建默认分组「常用程序」,保证界面至少有一个组可操作。 """ conn = get_conn() conn.execute("DELETE FROM items") conn.execute("DELETE FROM groups") if recreate_default_group: conn.execute("INSERT INTO groups (name, position) VALUES ('常用程序', 0)") conn.commit() conn.close()