Commit 3589b85

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-08 16:44:17
refactor(history): refactor Turso history sync
1 parent 345a2a8
Changed files (3)
src/danmu/entrypoint.py
@@ -186,7 +186,7 @@ async def get_engine_with_dates(match_time: str, user: str = "", keyword: str =
         dict: {"server": [date-1, date-2], "r2": [date-3, date-4]}
     """
     now = nowdt(TZ)
-    allowed_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
+    allowed_years = [x.strip() for x in DANMU.SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.SYNC_FAYAN_YEARS.split(",") if x.strip()]
     allowed_years.append(str(now.year))
     allowed_years = sorted(set(allowed_years))
     if qtype == "发言":
src/history/turso.py
@@ -11,23 +11,27 @@ from pyrogram.client import Client
 from pyrogram.errors import PeerIdInvalid
 from pyrogram.types import Chat, Message
 
-from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
-from database.turso import turso_create_table, turso_exec, turso_list_tables
+from config import DB, DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
+from database.turso import insert_statement, turso_create_table, turso_exec, turso_list_tables
 from messages.parser import parse_msg
 from permission import check_save_history
 from utils import i_am_bot, nowdt
 
 # ruff: noqa: S608
 
-DB_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, user TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, uid INTEGER, segmented TEXT"
-INDEX_NAMES = ["time", "uid"]
-SQL_TYPES = {"str": "text", "int": "integer", "float": "float", "nonetype": "null"}
+DB_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, fullname TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, user TEXT, uid INTEGER, segmented TEXT"
+INDEX_NAMES = ["time", "user", "uid"]
+TURSO_KWARGS: dict = {
+    "db_name": HISTORY.TURSO_DATABASE,
+    "username": HISTORY.TURSO_USERNAME or DB.TURSO_USERNAME,
+    "api_token": HISTORY.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
+    "group_token": HISTORY.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
+}
 
 
 async def sync_history_to_turso(client: Client, message: Message) -> None:
     if not HISTORY.TURSO_ENABLE:
         return
-
     info = parse_msg(message, silent=True)
     if not check_save_history(info["ctype"], info["cid"]):
         return
@@ -36,21 +40,17 @@ async def sync_history_to_turso(client: Client, message: Message) -> None:
         "mid": info["mid"],
         "mtype": info["mtype"],
         "time": info["time"],
-        "user": info["full_name"],
+        "fullname": info["full_name"],
         "content": message.content,  # text or edited text
         "filename": info["file_name"],
         "urls": "\n\n".join(info["entity_urls"]),
         "reply": message.reply_to_message_id,
         "mime": info["mime_type"],
+        "user": info["full_name"].replace(" ", ""),
         "uid": info["uid"],
         "segmented": " ".join(cutter.cutword(message.content)),
     }
-    keys = ", ".join(records)
-    values = ", ".join(["?" for _ in range(len(records))])
-    updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != "mid"])
-    args = [{"type": SQL_TYPES[type(x).__name__.lower()], "value": str(x) if isinstance(x, (int, float)) else x} for x in records.values()]
-    sql = f'INSERT INTO "{table_name}" ({keys}) VALUES ({values}) ON CONFLICT (mid) DO UPDATE SET {updates};'
-    await turso_exec([{"type": "execute", "stmt": {"sql": sql, "args": args}}], db_name=HISTORY.TURSO_DATABASE, silent=True, retry=2)
+    await turso_exec([insert_statement(table_name, records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
 
 
 async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours: float = HISTORY.BACKUP_CHATS_HOURS) -> None:
@@ -67,7 +67,7 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
     begin_time = begin_dt.strftime("%Y-%m-%d %H:%M:%S")
     end_time = now.strftime("%Y-%m-%d %H:%M:%S")
     sql = f'SELECT mid FROM "{table_name}" WHERE time >= "{begin_time}" AND time <= "{end_time}";'
-    resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], db_name=HISTORY.TURSO_DATABASE, silent=True)
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
     saved_mids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
     saved_mids = {int(x) for x in saved_mids}
     logger.info(f"Found {len(saved_mids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
@@ -85,31 +85,27 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
             "mid": info["mid"],
             "mtype": info["mtype"],
             "time": info["time"],
-            "user": info["full_name"],
+            "fullname": info["full_name"],
             "content": message.content,
             "filename": info["file_name"],
             "urls": "\n\n".join(info["entity_urls"]),
             "reply": message.reply_to_message_id,
             "mime": info["mime_type"],
+            "user": info["full_name"].replace(" ", ""),
             "uid": info["uid"],
             "segmented": " ".join(cutter.cutword(info["text"])),
         }
         logger.trace(f"Syncing {table_name}: {info['mid']}")
-        keys = ", ".join(records)
-        values = ", ".join(["?" for _ in range(len(records))])
-        updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != "mid"])
-        args = [{"type": SQL_TYPES[type(x).__name__.lower()], "value": str(x) if isinstance(x, (int, float)) else x} for x in records.values()]
-        sql = f'INSERT INTO "{table_name}" ({keys}) VALUES ({values}) ON CONFLICT (mid) DO UPDATE SET {updates};'
-        statements.append({"type": "execute", "stmt": {"sql": sql, "args": args}})
+        statements.append(insert_statement(table_name, records, update_on_conflict="mid"))
         if len(statements) == concurrency:
-            resp = await turso_exec(statements, db_name=HISTORY.TURSO_DATABASE, silent=True, retry=2)
+            resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
             num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
             if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
                 logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}")
             statements = []
 
     if statements:
-        resp = await turso_exec(statements, db_name=HISTORY.TURSO_DATABASE, silent=True, retry=2)
+        resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
         num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
         if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
             logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}")
@@ -151,11 +147,7 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
     }
     table_name = await get_table_name(client, data["id"])
     # find all message_ids
-    resp = await turso_exec(
-        [{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}],
-        db_name=HISTORY.TURSO_DATABASE,
-        silent=True,
-    )
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}], silent=True, **TURSO_KWARGS)
     saved_ids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
     saved_ids = {int(x) for x in saved_ids}
     logger.info(f"Found {len(saved_ids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
@@ -186,30 +178,26 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
             "mid": info["id"],
             "mtype": mtypes[info.get("media_type", "text")],
             "time": dt.strftime("%Y-%m-%d %H:%M:%S"),
-            "user": user,
+            "fullname": user,
             "content": parse_text(info.get("text", [])),
             "filename": info.get("file_name", ""),
             "urls": parse_urls(info.get("text_entities", [])),
             "reply": info.get("reply_to_message_id"),
             "mime": info.get("mime_type", ""),
+            "user": user.replace(" ", ""),
             "uid": uid,
             "segmented": " ".join(cutter.cutword(content)),
         }
         # logger.debug(f"Syncing message {table_name}: {info['id']}")
-        keys = ", ".join(records)
-        values = ", ".join(["?" for _ in range(len(records))])
-        updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != "mid"])
-        args = [{"type": SQL_TYPES[type(x).__name__.lower()], "value": str(x) if isinstance(x, (int, float)) else x} for x in records.values()]
-        sql = f'INSERT INTO "{table_name}" ({keys}) VALUES ({values}) ON CONFLICT (mid) DO UPDATE SET {updates};'
-        statements.append({"type": "execute", "stmt": {"sql": sql, "args": args}})
+        statements.append(insert_statement(table_name, records, update_on_conflict="mid"))
         if len(statements) == concurrency:
-            resp = await turso_exec(statements, db_name=HISTORY.TURSO_DATABASE, silent=True, retry=2)
+            resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
             num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
             if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
                 logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {dt.strftime('%Y-%m-%d %H:%M:%S')}")
             statements = []
     if statements:
-        resp = await turso_exec(statements, db_name=HISTORY.TURSO_DATABASE, silent=True, retry=2)
+        resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
         num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
         if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
             logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}")
@@ -245,72 +233,71 @@ async def get_table_name(client: Client, chat_id: str | int) -> str:
     default_name = f"{slim_cid}-{chat_title}".replace(" ", "")
 
     # find the table name based on chat id
-    table_names = await turso_list_tables(HISTORY.TURSO_DATABASE, silent=True)
+    table_names = await turso_list_tables(**TURSO_KWARGS, silent=True)
     table_name = next((x for x in table_names if x.startswith(slim_cid + "-")), default_name)
     cache.set(f"tablename-{chat_id}", table_name, ttl=0)
 
     # create table and index
-    if table_name in table_names:
-        return table_name
-    await turso_create_table(table_name, DB_COLUMNS, HISTORY.TURSO_DATABASE)
-    await create_table_index(slim_cid, table_name)
+    await create_table_and_index(slim_cid, table_name)
     return table_name
 
 
-async def create_table_index(slim_cid: str, table_name: str) -> None:
+async def create_table_and_index(slim_cid: str, table_name: str) -> None:
+    # create table
+    await turso_create_table(table_name, DB_COLUMNS, **TURSO_KWARGS)
+
     # get all index names
-    resp = await turso_exec(
-        [{"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='index';"}}],
-        db_name=HISTORY.TURSO_DATABASE,
-        silent=True,
-    )
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='index';"}}], silent=True, **TURSO_KWARGS)
     indexs = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
 
-    # create index if not exists
-    idx_names = [x for x in INDEX_NAMES if f"idx_{slim_cid}_{x}" not in indexs]
-    if not idx_names:
-        return
-
-    # 创建标准索引
-    for idx_name in idx_names:
+    # create standard index
+    for idx_name in [x for x in INDEX_NAMES if f"idx_{slim_cid}_{x}" not in indexs]:
         logger.debug(f"Creating index on {table_name} for {idx_name}")
-        resp = await turso_exec(
-            [{"type": "execute", "stmt": {"sql": f'CREATE INDEX IF NOT EXISTS "idx_{slim_cid}_{idx_name}" ON "{table_name}"({idx_name})'}}],
-            db_name=HISTORY.TURSO_DATABASE,
-            silent=True,
-        )
+        resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'CREATE INDEX IF NOT EXISTS "idx_{slim_cid}_{idx_name}" ON "{table_name}"({idx_name})'}}], silent=True, **TURSO_KWARGS)
+
+    # 列出所有虚拟表
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM pragma_table_list WHERE type="virtual";'}}], silent=True, **TURSO_KWARGS)
+    virtual_tables = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
 
     statements = []
+    primary_key = "mid"
     """创建 FTS5 虚拟表
     -- content=table_name 指明关联的原表
     -- content_rowid=mid 指明原表的行 ID 列是 mid
     -- segmented 是我们要索引的列
     -- tokenize='unicode61' 使用 unicode61 分词器
     """
-    sql = f"""CREATE VIRTUAL TABLE IF NOT EXISTS fts_{slim_cid} USING fts5(segmented, content="{table_name}", content_rowid=mid, tokenize="unicode61");"""
-    statements.append({"type": "execute", "stmt": {"sql": sql}})
-
-    """将现有数据从原表复制到 FTS 表
-    注意, 我们在这里插入的是 rowid (它会对应到 content_rowid=mid 指定的列) 和 segmented
-    从原表中选择 mid 和 segmented 列。mid 列的值会被插入到 FTS 表中对应原表 rowid (或 content_rowid) 的位置。
-    """
-    sql = f"INSERT INTO fts_{slim_cid} (rowid, segmented) SELECT mid, segmented FROM '{table_name}' WHERE mid NOT IN (SELECT rowid FROM fts_{slim_cid});"
-    statements.append({"type": "execute", "stmt": {"sql": sql}})
-
+    if f"fts_{slim_cid}" not in virtual_tables:
+        sql = f"""CREATE VIRTUAL TABLE IF NOT EXISTS fts_{slim_cid} USING fts5(segmented, content="{table_name}", content_rowid={primary_key}, tokenize="unicode61");"""
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+        """将现有数据从原表复制到 FTS 表
+        注意, 我们在这里插入的是 rowid (它会对应到 content_rowid=mid 指定的列) 和 segmented
+        从原表中选择 mid 和 segmented 列。mid 列的值会被插入到 FTS 表中对应原表 rowid (或 content_rowid) 的位置。
+        """
+        sql = f"INSERT INTO fts_{slim_cid} (rowid, segmented) SELECT {primary_key}, segmented FROM '{table_name}' WHERE {primary_key} NOT IN (SELECT rowid FROM fts_{slim_cid});"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+    # 列出所有触发器
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM sqlite_master WHERE type="trigger";'}}], silent=True, **TURSO_KWARGS)
+    triggers = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
     """维护 FTS 表
     为了让 FTS 表与原表保持同步, 需要在原表上创建触发器。
     在原表插入、删除、更新时, 同步更新 FTS 表
     """
-    # 创建触发器, 在原表删除数据时, 同步从 FTS 表删除
-    sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_ai AFTER INSERT ON '{table_name}' BEGIN INSERT INTO fts_{slim_cid} (rowid, segmented) VALUES (NEW.mid, NEW.segmented); END;"
-    statements.append({"type": "execute", "stmt": {"sql": sql}})
+    # 创建触发器, 在原表插入数据时, 同步从 FTS 表插入
+    if f"trigger_{slim_cid}_ai" not in triggers:
+        sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_ai AFTER INSERT ON '{table_name}' BEGIN INSERT INTO fts_{slim_cid} (rowid, segmented) VALUES (NEW.{primary_key}, NEW.segmented); END;"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
 
     # 创建触发器, 在原表删除数据时, 同步从 FTS 表删除
-    sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_ad AFTER DELETE ON '{table_name}' BEGIN DELETE FROM fts_{slim_cid} WHERE rowid = OLD.mid; END;"
-    statements.append({"type": "execute", "stmt": {"sql": sql}})
+    if f"trigger_{slim_cid}_ad" not in triggers:
+        sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_ad AFTER DELETE ON '{table_name}' BEGIN DELETE FROM fts_{slim_cid} WHERE rowid = OLD.{primary_key}; END;"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
 
     # 创建触发器, 在原表更新数据时, 同步更新 FTS 表
     # FTS5 的更新通常是先删除旧的, 再插入新的
-    sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_au AFTER UPDATE ON '{table_name}' BEGIN DELETE FROM fts_{slim_cid} WHERE rowid = OLD.mid AND OLD.segmented <> NEW.segmented; INSERT INTO fts_{slim_cid} (rowid, segmented) SELECT NEW.mid, NEW.segmented WHERE OLD.segmented <> NEW.segmented; END;"
-    statements.append({"type": "execute", "stmt": {"sql": sql}})
-    await turso_exec(statements, db_name=HISTORY.TURSO_DATABASE, silent=True)
+    if f"trigger_{slim_cid}_au" not in triggers:
+        sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_au AFTER UPDATE ON '{table_name}' BEGIN DELETE FROM fts_{slim_cid} WHERE rowid = OLD.{primary_key} AND OLD.segmented <> NEW.segmented; INSERT INTO fts_{slim_cid} (rowid, segmented) SELECT NEW.{primary_key}, NEW.segmented WHERE OLD.segmented <> NEW.segmented; END;"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+    await turso_exec(statements, silent=True, **TURSO_KWARGS)
src/config.py
@@ -109,10 +109,14 @@ class DANMU:
     NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query
     D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")
     TURSO_DATABASE = os.getenv("DANMU_TURSO_DATABASE", "bennybot-danmu")
+    TURSO_USERNAME = os.getenv("DANMU_TURSO_USERNAME", "")  # https://turso.tech
+    TURSO_API_TOKEN = os.getenv("DANMU_TURSO_API_TOKEN", "")
+    TURSO_GROUP_TOKEN = os.getenv("DANMU_TURSO_GROUP_TOKEN", "")
     R2_PREFIX = os.getenv("DANMU_R2_PREFIX", "Streaming/")
-    R2_SYNC_ENABLE = os.getenv("DANMU_R2_SYNC_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
-    R2_SYNC_DANMU_YEARS = os.getenv("R2_SYNC_DANMU_YEARS", "")  # comma separated years to sync to R2. e.g. "2025,2024,2023"
-    R2_SYNC_FAYAN_YEARS = os.getenv("R2_SYNC_FAYAN_YEARS", "")  # comma separated years that has live stream
+    SYNC_ENABLE = os.getenv("DANMU_SYNC_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    SYNC_ENGNIE = os.getenv("DANMU_SYNC_ENGNIE", "turso,R2")  # sync livechats to Turso & R2
+    SYNC_DANMU_YEARS = os.getenv("SYNC_DANMU_YEARS", "")  # comma separated years to sync. e.g. "2025,2024,2023"
+    SYNC_FAYAN_YEARS = os.getenv("SYNC_FAYAN_YEARS", "")  # comma separated years that has live stream
 
 
 class PROVIDER:  # default API provider
@@ -208,6 +212,9 @@ class HISTORY:
     ENGINE = os.getenv("HISTORY_ENGINE", "turso")  # turso or D1
     TURSO_ENABLE = os.getenv("HISTORY_TURSO_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     TURSO_DATABASE = os.getenv("HISTORY_TURSO_DATABASE", "bennybot-history")
+    TURSO_USERNAME = os.getenv("HISTORY_TURSO_USERNAME", "")  # https://turso.tech
+    TURSO_API_TOKEN = os.getenv("HISTORY_TURSO_API_TOKEN", "")
+    TURSO_GROUP_TOKEN = os.getenv("HISTORY_TURSO_GROUP_TOKEN", "")
     PERIODICALLY_BACKUP_CHATS = os.getenv("HISTORY_PERIODICALLY_BACKUP_CHATS", "")  # comma separated chat ids to include  (without `-100` prefix)
     BACKUP_CHATS_HOURS = float(os.getenv("HISTORY_BACKUP_CHATS_HOURS", "24"))  # hours to backup chats
     D1_ENABLE = os.getenv("HISTORY_D1_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]