niumasoftware/db/database.py
2026-04-07 23:21:58 +08:00

323 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import os
import shutil
# 数据库放到 Windows 常见的 APPDATA 目录(避免写在项目目录)
APP_NAME = "CleanDesktopOrganizer"
_SCHEMA_VERSION_KEY = "schema_version"
LATEST_SCHEMA_VERSION = 1
_PROJECT_DB_PATH = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "data.db"
)
_BASE_DIR = os.environ.get("APPDATA") or os.environ.get("LOCALAPPDATA") or ""
DB_DIR = os.path.join(_BASE_DIR, APP_NAME) if _BASE_DIR else None
DB_PATH = os.path.join(DB_DIR, "data.db") if DB_DIR else _PROJECT_DB_PATH
def _ensure_db_dir():
if DB_DIR:
os.makedirs(DB_DIR, exist_ok=True)
def _migrate_old_db_if_needed():
"""
首次升级:把旧版项目目录下的 data.db 自动拷贝到 APPDATA
避免你之前分组/设置丢失。
"""
try:
if os.path.isfile(_PROJECT_DB_PATH) and not os.path.isfile(DB_PATH):
_ensure_db_dir()
shutil.copy2(_PROJECT_DB_PATH, DB_PATH)
except Exception:
# 迁移失败不影响程序运行(会在新目录重新创建库)
pass
def get_conn():
_ensure_db_dir()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
conn.execute("PRAGMA foreign_keys=ON")
except Exception:
pass
return conn
def _get_schema_version(conn: sqlite3.Connection) -> int:
try:
row = conn.execute(
"SELECT value FROM settings WHERE key=?",
(_SCHEMA_VERSION_KEY,),
).fetchone()
if not row:
return 0
return int(str(row[0]).strip() or "0")
except Exception:
return 0
def _set_schema_version(conn: sqlite3.Connection, v: int):
conn.execute(
"INSERT INTO settings (key, value) VALUES (?,?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(_SCHEMA_VERSION_KEY, str(int(v))),
)
def _table_has_column(conn: sqlite3.Connection, table: str, col: str) -> bool:
try:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
return any((r[1] if isinstance(r, (tuple, list)) else r["name"]) == col for r in rows)
except Exception:
return False
def _apply_migrations(conn: sqlite3.Connection):
"""
版本化迁移schema migration
- 允许从任意旧版本升级到最新
- 每一步尽量幂等,并在事务中执行
"""
def m1():
# 基础表
conn.execute(
"""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
position INTEGER DEFAULT 0,
folder_path TEXT DEFAULT ''
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_id INTEGER NOT NULL,
name TEXT NOT NULL,
path TEXT NOT NULL,
icon_path TEXT,
position INTEGER DEFAULT 0,
FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE
)
"""
)
# 兼容更老的库:补字段
if not _table_has_column(conn, "groups", "folder_path"):
conn.execute("ALTER TABLE groups ADD COLUMN folder_path TEXT DEFAULT ''")
migrations = {
1: m1,
}
cur = _get_schema_version(conn)
# 未设置版本但已有表:视为 0走迁移补齐并写入版本号
while cur < LATEST_SCHEMA_VERSION:
nxt = cur + 1
fn = migrations.get(nxt)
if fn is None:
raise RuntimeError(f"missing migration for version {nxt}")
with conn:
fn()
_set_schema_version(conn, nxt)
cur = nxt
def init_db():
_migrate_old_db_if_needed()
conn = get_conn()
# 先确保 settings 表存在,才能读写 schema_version
with conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
_apply_migrations(conn)
# 默认分组(数据迁移/初始化)
with conn:
row = conn.execute("SELECT COUNT(*) FROM groups").fetchone()
cnt = int(row[0] if row else 0)
if cnt == 0:
conn.execute("INSERT INTO groups (name, position) VALUES ('常用程序', 0)")
conn.close()
# 非结构类迁移(数据清洗/一次性转换)
_migrate_item_shortcut_paths_to_targets()
def _migrate_item_shortcut_paths_to_targets():
"""将旧数据中仍保存为 .lnk 的路径改存为快捷方式目标(若可解析)。仅执行一次。"""
conn = get_conn()
row = conn.execute(
"SELECT value FROM settings WHERE key='items_lnk_targets_migrated'"
).fetchone()
conn.close()
if row and row["value"] == "1":
return
from shortcut_target import path_for_storage
conn = get_conn()
rows = conn.execute("SELECT id, path FROM items").fetchall()
for r in rows:
iid, p = r["id"], (r["path"] or "")
pl = p.lower()
if not pl.endswith(".lnk"):
continue
try:
if not os.path.isfile(p):
continue
except OSError:
continue
new_p = path_for_storage(p)
if new_p != p:
conn.execute("UPDATE items SET path=? WHERE id=?", (new_p, iid))
conn.execute(
"INSERT INTO settings (key, value) VALUES (?,?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
("items_lnk_targets_migrated", "1"),
)
conn.commit()
conn.close()
# ── Settings ─────────────────────────────────────────────
def get_setting(key: str, default: str = "") -> str:
conn = get_conn()
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
conn.close()
return row["value"] if row else default
def set_setting(key: str, value: str):
conn = get_conn()
conn.execute(
"INSERT INTO settings (key, value) VALUES (?,?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value)
)
conn.commit()
conn.close()
# ── Groups ──────────────────────────────────────────────
def get_groups():
conn = get_conn()
rows = conn.execute("SELECT * FROM groups ORDER BY position").fetchall()
conn.close()
return [dict(r) for r in rows]
def add_group(name, folder_path=""):
conn = get_conn()
conn.execute("INSERT INTO groups (name, folder_path) VALUES (?,?)", (name, folder_path))
conn.commit()
gid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.close()
return gid
def rename_group(gid, name):
conn = get_conn()
conn.execute("UPDATE groups SET name=? WHERE id=?", (name, gid))
conn.commit()
conn.close()
def delete_group(gid):
conn = get_conn()
conn.execute("DELETE FROM groups WHERE id=?", (gid,))
conn.commit()
conn.close()
def reorder_groups(id_list):
conn = get_conn()
for pos, gid in enumerate(id_list):
conn.execute("UPDATE groups SET position=? WHERE id=?", (pos, gid))
conn.commit()
conn.close()
# ── Items ────────────────────────────────────────────────
def get_items(group_id):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM items WHERE group_id=? ORDER BY position", (group_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def add_item(group_id, name, path, icon_path=None):
from shortcut_target import path_for_storage
path = path_for_storage(path)
conn = get_conn()
conn.execute(
"INSERT INTO items (group_id, name, path, icon_path) VALUES (?,?,?,?)",
(group_id, name, path, icon_path),
)
conn.commit()
iid = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.close()
return iid
def delete_item(item_id):
conn = get_conn()
conn.execute("DELETE FROM items WHERE id=?", (item_id,))
conn.commit()
conn.close()
def move_item(item_id, new_group_id, new_position):
conn = get_conn()
conn.execute(
"UPDATE items SET group_id=?, position=? WHERE id=?",
(new_group_id, new_position, item_id),
)
conn.commit()
conn.close()
def reorder_items(group_id, id_list):
conn = get_conn()
for pos, iid in enumerate(id_list):
conn.execute("UPDATE items SET position=? WHERE id=?", (pos, iid))
conn.commit()
conn.close()
def reset_groups_and_items(recreate_default_group: bool = True):
"""
清空所有分组与程序items
- 为避免外键级联受 SQLite 外键开关影响,这里显式先删 items 再删 groups。
- 可选地重新创建默认分组「常用程序」,保证界面至少有一个组可操作。
"""
conn = get_conn()
conn.execute("DELETE FROM items")
conn.execute("DELETE FROM groups")
if recreate_default_group:
conn.execute("INSERT INTO groups (name, position) VALUES ('常用程序', 0)")
conn.commit()
conn.close()