Commit c9dc907

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-16 12:30:52
feat(history): support full table backup in turso
1 parent 8fac22d
Changed files (5)
src/history/query.py
@@ -10,7 +10,7 @@ from pyrogram.types import Message
 
 from config import PREFIX, cutter
 from database.turso import turso_exec, turso_parse_resp
-from history.turso import get_chatinfo, save_chatinfo
+from history.turso import get_turso_chatinfo, save_chatinfo_to_turso
 from history.utils import TURSO_KWARGS, check_save_history, get_chat, get_uid_by_username, is_admin, list_chat_ids
 from llm.utils import convert_html
 from messages.parser import parse_chat, parse_msg
@@ -72,10 +72,10 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
         return
     if qtype == "hist":
         chat_id = slim_cid(info["cid"])
-    chatinfo = await get_chatinfo(chat_id)
+    chatinfo = await get_turso_chatinfo(chat_id)
     if not chatinfo:  # this chat is never synced
         chat = await get_chat(client, int(chat_id))
-        chatinfo = await save_chatinfo(parse_chat(chat))
+        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
     if not chatinfo:
         await message.reply(f"⚠️`{chat_id}`不是有效的ChatID", quote=True)
         return
src/history/sync.py
@@ -5,8 +5,10 @@ from pyrogram.client import Client
 from pyrogram.types import Message
 
 from config import HISTORY, cache
+from database.turso import turso_exec, turso_parse_resp
 from history.d1 import backup_chat_history_to_d1, sync_history_to_d1
 from history.turso import backup_chat_history_to_turso, sync_history_to_turso
+from history.utils import TURSO_KWARGS
 
 
 async def sync_chat_history(client: Client, message: Message) -> None:
@@ -15,7 +17,7 @@ async def sync_chat_history(client: Client, message: Message) -> None:
     if HISTORY.ENGINE.upper() == "D1":  # Deprecated
         await sync_history_to_d1(client, message)
     if HISTORY.ENGINE.upper() == "TURSO":
-        await sync_history_to_turso(message)
+        await sync_history_to_turso(client, message)
 
 
 async def backup_chat_history(
@@ -29,8 +31,13 @@ async def backup_chat_history(
     if cache.get("backup_chat_history"):
         return
     cache.set("backup_chat_history", 1, ttl=12 * 3600)  # backup every 12 hours
-
-    chat_ids = [x.strip() for x in chats.split(",") if x.strip()]
+    # if `chats` is set to "full_table", backup all chats in `chatinfo` table
+    if chats == "full_table":
+        resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM 'chatinfo';"}}], silent=True, retry=2, **TURSO_KWARGS)
+        tables = turso_parse_resp(resp)
+        chat_ids = [x["chandle"] or int(x["cid"]) for x in tables]
+    else:
+        chat_ids = [x.strip() for x in chats.split(",") if x.strip()]
     if not chat_ids:
         return
     for cid in chat_ids:
src/history/turso.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import json
+import os
 from datetime import datetime, timedelta
 from pathlib import Path
 from zoneinfo import ZoneInfo
@@ -14,18 +15,20 @@ from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
 from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
 from history.utils import TURSO_KWARGS, check_save_history, fine_grained_check, get_chat
 from messages.parser import parse_chat, parse_msg
-from utils import i_am_bot, nowdt, slim_cid
+from utils import i_am_bot, nowdt, slim_cid, to_int, true
 
 # ruff: noqa: S608
 
 CHAT_COLUMNS = "cid INTEGER PRIMARY KEY, ctype TEXT, ctitle TEXT, chandle TEXT, tablename TEXT, tags TEXT"
-USER_COLUMNS = "uid INTEGER PRIMARY KEY, full_name TEXT, handle TEXT, tags TEXT"
+USER_COLUMNS = "ctitle TEXT, full_name TEXT, handle TEXT, tags TEXT, name TEXT, uid INTEGER, cid INTEGER, id INTEGER PRIMARY KEY"
+USER_INDEXES = ["uid", "cid"]
+
 MSG_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, fullname TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, user TEXT, uid INTEGER, gid INTEGER, segmented TEXT"  # fmt: off
 
-INDEX_NAMES = ["time", "user", "uid"]
+MSG_INDEXES = ["time", "user", "uid"]
 
 
-async def sync_history_to_turso(message: Message) -> None:
+async def sync_history_to_turso(client: Client, message: Message) -> None:
     """Sync received messages to Turso database.
 
     1. save the user info to table `userinfo`
@@ -38,8 +41,8 @@ async def sync_history_to_turso(message: Message) -> None:
     if not check_save_history(info["ctype"], info["cid"]) or not fine_grained_check(info) or message.service:
         return
 
-    await save_userinfo(info)
-    chatinfo = await save_chatinfo(info)
+    await save_userinfo_to_turso(client, info)
+    chatinfo = await save_chatinfo_to_turso(client, info)
     records = {
         "mid": info["mid"],
         "mtype": info["mtype"],
@@ -63,12 +66,14 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
         return
     if await i_am_bot(client):
         return
-    chatinfo = await get_chatinfo(chat_id)
+    chatinfo = await get_turso_chatinfo(chat_id)
     if not chatinfo:  # this chat is never synced
-        chat = await get_chat(client, int(chat_id))
-        chatinfo = await save_chatinfo(parse_chat(chat))
+        chat = await get_chat(client, to_int(chat_id))
+        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
     if not chatinfo:  # chat is deleted
         return
+    if true(os.getenv(f"HISTORY_IGNORE_{chatinfo['cid']}")):
+        return
     table_name = chatinfo["tablename"]
     # find message ids in this time range
     now = nowdt(TZ)
@@ -82,13 +87,11 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
     logger.info(f"Found {len(saved_mids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
     concurrency = 1000
     statements = []
-    real_cid = int(chatinfo["cid"]) if chatinfo["ctype"] in ["BOT", "PRIVATE"] else int(f"-100{chatinfo['cid']}")
+    real_cid = chatinfo["chandle"] or (int(chatinfo["cid"]) if chatinfo["ctype"] in ["BOT", "PRIVATE"] else int(f"-100{chatinfo['cid']}"))
     async for message in client.get_chat_history(real_cid):  # type: ignore
-        if not isinstance(message, Message) or message.empty or message.service:
+        if not isinstance(message, Message) or message.empty or message.service or message.id in saved_mids:
             continue
         info = parse_msg(message, silent=True, use_cache=False)
-        if info["mid"] in saved_mids:
-            continue
         if not fine_grained_check(info):
             continue
         if info["time"] < begin_time:
@@ -167,10 +170,10 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
         "video_file": "video",
     }
     chat_id = data["id"]
-    chatinfo = await get_chatinfo(chat_id)
+    chatinfo = await get_turso_chatinfo(chat_id)
     if not chatinfo:  # this chat is never synced
         chat = await get_chat(client, int(chat_id))
-        chatinfo = await save_chatinfo(parse_chat(chat))
+        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
     table_name = chatinfo["tablename"]
     # find all message_ids
     resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}], silent=True, **TURSO_KWARGS)
@@ -231,7 +234,7 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
             logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}")
 
 
-async def get_chatinfo(cid: str | int) -> dict:
+async def get_turso_chatinfo(cid: str | int) -> dict:
     """Get chat info from table `chatinfo`.
 
     Returns:
@@ -243,7 +246,7 @@ async def get_chatinfo(cid: str | int) -> dict:
         [
             {
                 "type": "execute",
-                "stmt": {"sql": f"SELECT * FROM chatinfo WHERE cid={slim_cid(cid)};"},
+                "stmt": {"sql": f"SELECT * FROM chatinfo WHERE cid='{slim_cid(cid)}' OR chandle='{cid}';"},
             }
         ],
         silent=True,
@@ -253,7 +256,7 @@ async def get_chatinfo(cid: str | int) -> dict:
     return glom(turso_parse_resp(resp), "0", default={})
 
 
-async def save_chatinfo(minfo: dict) -> dict[str, str]:
+async def save_chatinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
     """Save chat info to table `chatinfo`.
 
     Args:
@@ -267,9 +270,16 @@ async def save_chatinfo(minfo: dict) -> dict[str, str]:
         return {}
     # Get chat info from turso and save it to cache
     if not (cached := cache.get(f"chatinfo-{cid}")):
-        cached = await get_chatinfo(cid)
+        cached = await get_turso_chatinfo(cid)
         cache.set(f"chatinfo-{cid}", cached, ttl=0)
+
     ctitle = minfo["ctitle"] or minfo["full_name"]
+    # if in private chats, we use the opponent's name as chat title
+    if minfo["ctype"] in ["BOT", "PRIVATE"]:
+        chat = await get_chat(client, minfo["cid"])
+        if chat.id != 0:
+            ctitle = parse_chat(chat)["ctitle"]
+
     records = {
         "cid": cid,
         "ctype": minfo["ctype"],
@@ -282,7 +292,7 @@ async def save_chatinfo(minfo: dict) -> dict[str, str]:
     await turso_create_table(
         records["tablename"],
         MSG_COLUMNS,
-        idx_cols=INDEX_NAMES,
+        idx_cols=MSG_INDEXES,
         idx_prefix=f"idx_{cid}_",
         fts_on_col="mid",
         fts_name=cid,
@@ -295,19 +305,19 @@ async def save_chatinfo(minfo: dict) -> dict[str, str]:
     return records
 
 
-async def get_userinfo(uid: str | int) -> dict:
+async def get_turso_userinfo(uid: int, cid: int) -> dict:
     """Get user info from table `userinfo`.
 
     Returns:
         uid, full_name, handle
     """
     # create table
-    await turso_create_table("userinfo", USER_COLUMNS, silent=True, **TURSO_KWARGS)
+    await turso_create_table("userinfo", USER_COLUMNS, idx_cols=USER_INDEXES, idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
     resp = await turso_exec(
         [
             {
                 "type": "execute",
-                "stmt": {"sql": f"SELECT * FROM userinfo WHERE uid={uid};"},
+                "stmt": {"sql": f"SELECT * FROM userinfo WHERE uid={uid} AND cid={cid};"},
             }
         ],
         silent=True,
@@ -317,7 +327,7 @@ async def get_userinfo(uid: str | int) -> dict:
     return glom(turso_parse_resp(resp), "0", default={})
 
 
-async def save_userinfo(minfo: dict) -> dict[str, str]:
+async def save_userinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
     """Save user info to table `userinfo`.
 
     Args:
@@ -326,21 +336,35 @@ async def save_userinfo(minfo: dict) -> dict[str, str]:
     Returns:
         uid, full_name, handle, tags
     """
-    if str(minfo["uid"]) == "1":  # default user (user is unknown)
+    uid = int(minfo["uid"])
+    cid = int(slim_cid(minfo["cid"]))
+    if uid == 1:  # default user (user is unknown)
         return {}
     # Get user info from turso and save it to cache
-    if not (cached := cache.get(f"userinfo-{minfo['uid']}")):
-        cached = await get_userinfo(minfo["uid"])
-        cache.set(f"userinfo-{minfo['uid']}", cached, ttl=0)
+    if not (cached := cache.get(f"userinfo-{uid}-{cid}")):
+        cached = await get_turso_userinfo(uid, cid)
+        cache.set(f"userinfo-{uid}-{cid}", cached, ttl=0)
+
+    ctitle = minfo["ctitle"] or minfo["full_name"]
+    # if in private chats, we use the opponent's name as chat title
+    if minfo["ctype"] in ["BOT", "PRIVATE"]:
+        chat = await get_chat(client, minfo["cid"])
+        if chat.id != 0:
+            ctitle = parse_chat(chat)["ctitle"]
 
+    primary_key = uid if uid == cid else abs(uid - cid)
     records = {
-        "uid": str(minfo["uid"]),  # all values are str from turso.
+        "ctitle": ctitle,
         "full_name": minfo["full_name"],
         "handle": minfo["handle"],
         "tags": cached.get("tags", ""),
+        "name": minfo["full_name"].replace(" ", ""),
+        "uid": str(uid),  #     # all caced values are str from turso,
+        "cid": str(cid),  # so we need to use str to compare with cached.
+        "id": primary_key,
     }
     if cached != records:
         logger.info(f"Save user info: {records}")
-        cache.set(f"userinfo-{minfo['uid']}", records, ttl=0)
-        await turso_exec([insert_statement("userinfo", records, update_on_conflict="uid")], retry=2, **TURSO_KWARGS)
+        cache.set(f"userinfo-{uid}-{cid}", records, ttl=0)
+        await turso_exec([insert_statement("userinfo", records, update_on_conflict="id")], retry=2, **TURSO_KWARGS)
     return records
src/history/utils.py
@@ -89,10 +89,10 @@ async def get_chat(client: Client, chat_id: int | str) -> Chat:
     if str(chat_id) == "0":
         return chat
     try:
-        chat = await client.get_chat(int(chat_id))
+        chat = await client.get_chat(to_int(chat_id))
     except PeerIdInvalid:
         with contextlib.suppress(Exception):
-            chat = await client.get_chat(int(f"-100{slim_cid(chat_id)}"))
+            chat = await client.get_chat(to_int(f"-100{slim_cid(chat_id)}"))
     except Exception:
         logger.warning(f"Failed to get chat info for {chat_id}")
     cache.set(f"chat-info-{slim_cid(chat_id)}", chat, ttl=3600)  # cache for 1 hour
src/config.py
@@ -223,7 +223,7 @@ class HISTORY:
     TURSO_USERNAME = os.getenv("HISTORY_TURSO_USERNAME", "")  # https://turso.tech
     TURSO_API_TOKEN = os.getenv("HISTORY_TURSO_API_TOKEN", "")
     TURSO_GROUP_TOKEN = os.getenv("HISTORY_TURSO_GROUP_TOKEN", "")
-    PERIODICALLY_BACKUP_CHATS = os.getenv("HISTORY_PERIODICALLY_BACKUP_CHATS", "")  # comma separated chat ids to include  (without `-100` prefix)
+    PERIODICALLY_BACKUP_CHATS = os.getenv("HISTORY_PERIODICALLY_BACKUP_CHATS", "")  # "full_table" or comma separated chat ids to include  (without `-100` prefix)
     BACKUP_CHATS_HOURS = float(os.getenv("HISTORY_BACKUP_CHATS_HOURS", "24"))  # hours to backup chats
     D1_ENABLE = os.getenv("HISTORY_D1_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     D1_DATABASE = os.getenv("HISTORY_D1_DATABASE", "bennybot-history")