Commit f03b9bb

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-07-29 17:14:33
feat(history): support D1 as query engine
1 parent 417b977
Changed files (3)
src/history/query.py
@@ -3,13 +3,16 @@
 import re
 from io import BytesIO
 
+from glom import glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
 from pyrogram.types import Message
 
-from config import PREFIX, TZ, cache
+from config import HISTORY, PREFIX, TZ, cache
+from database.d1 import query_d1
 from database.turso import turso_exec, turso_parse_resp
+from history.d1 import get_d1_chatinfo, save_chatinfo_to_d1
 from history.turso import get_turso_chatinfo, get_user, save_chatinfo_to_turso
 from history.utils import TURSO_KWARGS, check_save_history, filter_response, generate_query, get_chat, is_admin, list_chat_ids
 from llm.utils import convert_html
@@ -62,7 +65,7 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
         return
 
     if info["text"].strip() == "/history -l":
-        await list_chat_ids(client, message)
+        await list_chat_ids(client, message, engine=HISTORY.QUERY_ENGINE)
         return
     qtype = "history" if startswith_prefix(info["text"].strip(), prefix="/history") else "hist"
 
@@ -72,10 +75,10 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
         return
     if qtype == "hist":
         chat_id = slim_cid(info["cid"])
-    chatinfo = await get_turso_chatinfo(chat_id)
+    chatinfo = await get_turso_chatinfo(chat_id) if HISTORY.QUERY_ENGINE == "turso" else await get_d1_chatinfo(chat_id)
     if not chatinfo:  # this chat is never synced
         chat = await get_chat(client, int(chat_id))
-        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
+        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat)) if HISTORY.QUERY_ENGINE == "turso" else await save_chatinfo_to_d1(client, parse_chat(chat))
     if not chatinfo:
         await message.reply(f"⚠️`{chat_id}`不是有效的ChatID", quote=True)
         return
@@ -100,8 +103,7 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
     caption += f"\n🔤关键词: {keyword}"
     status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
     kwargs["progress"] = status_msg
-
-    results = await query_turso(client, chatinfo, match_time, user, keyword)
+    results = await query_history(client, chatinfo, match_time, user, keyword, engine=HISTORY.QUERY_ENGINE)
     texts = results.get("texts", "")
 
     count = results.get("count", 0)
@@ -180,7 +182,14 @@ def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str, str]:
     return chat_id, match_time, user, keyword, error
 
 
-async def query_turso(client: Client, cinfo: dict[str, str], match_time: str, user: str, keyword: str) -> dict:
+async def query_history(
+    client: Client,
+    cinfo: dict[str, str],
+    match_time: str,
+    user: str,
+    keyword: str,
+    engine: str = HISTORY.QUERY_ENGINE,
+) -> dict:
     """Query chat history from Turso.
 
     由于LIKE查询会扫描整个表, 速度较慢, 而且会快速消耗读取数量配额, 因此我们使用FTS5搜索.
@@ -214,8 +223,12 @@ async def query_turso(client: Client, cinfo: dict[str, str], match_time: str, us
             sql += f" AND T.user = '{user}'"
     sql += " ORDER BY T.mid DESC"
     logger.info(sql)
-    resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
-    filterd = filter_response(turso_parse_resp(resp), keyword)
+    if engine == "turso":
+        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
+        filterd = filter_response(turso_parse_resp(resp), keyword)
+    else:
+        resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
+        filterd = filter_response(glom(resp, "result.0.results", default=[]), keyword)
     full_texts = ""
     texts = ""  # long message will be trimmed
     count = 0
src/history/utils.py
@@ -5,12 +5,14 @@ import os
 import re
 import string
 
+from glom import glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.errors import PeerIdInvalid
 from pyrogram.types import Chat, Message, User
 
 from config import DB, HISTORY, TID, cache, cutter
+from database.d1 import query_d1
 from database.turso import turso_exec, turso_parse_resp
 from messages.sender import send2tg
 from others.emoji import CTYPE_EMOJI
@@ -111,7 +113,7 @@ async def get_chat(client: Client, chat_id: int | str) -> Chat:
     return chat
 
 
-async def list_chat_ids(client: Client, message: Message):
+async def list_chat_ids(client: Client, message: Message, engine: str = "turso"):
     """List chat ids from turso table `chatinfo`.
 
     One Turso database may be read by multiple Telegram accounts, we can use tags to filter by account
@@ -121,8 +123,13 @@ async def list_chat_ids(client: Client, message: Message):
         SKIP_LIST_IN_{chatid}  -> skip list in this chat_id
         ONLY_LIST_IN_{chatid}  -> only list in this chat_id
     """
-    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM chatinfo;"}}], silent=True, retry=2, **TURSO_KWARGS)
-    chats = turso_parse_resp(resp)
+    if engine.lower() == "turso":
+        resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM chatinfo;"}}], silent=True, retry=2, **TURSO_KWARGS)
+        chats = turso_parse_resp(resp)
+    else:
+        resp = await query_d1("SELECT * FROM chatinfo;", db_name=HISTORY.D1_DATABASE, silent=True)
+        chats = glom(resp, "result.0.results", default=[])
+
     me = await myself(client)
     cid = slim_cid(message.chat.id)
     msg = ""
src/config.py
@@ -231,7 +231,8 @@ class DB:
 
 class HISTORY:
     ENABLE = os.getenv("HISTORY_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
-    ENGINE = os.getenv("HISTORY_ENGINE", "turso")  # turso or D1
+    ENGINE = os.getenv("HISTORY_ENGINE", "turso").lower()  # turso or d1 (This is for sync & backup)
+    QUERY_ENGINE = os.getenv("HISTORY_QUERY_ENGINE", "turso").lower()  # turso or d1
     TURSO_ENABLE = os.getenv("HISTORY_TURSO_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     TURSO_DATABASE = os.getenv("HISTORY_TURSO_DATABASE", "bennybot-history")
     TURSO_USERNAME = os.getenv("HISTORY_TURSO_USERNAME", "")  # https://turso.tech