Commit b0b2ca7

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-07-03 04:01:00
feat(danmu): support query from Turso
1 parent fc2e157
src/danmu/entrypoint.py
@@ -14,6 +14,7 @@ from pyrogram.types import Message
 from config import DANMU, PREFIX, TZ
 from danmu.r2 import query_r2
 from danmu.server import query_server
+from danmu.turso import query_turso
 from danmu.utils import file_bytes, merge_txt_files, to_usd
 from llm.utils import convert_html
 from messages.parser import parse_msg
@@ -43,7 +44,7 @@ HELP = f"""📖**查询直播合订本**
 - 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
 {BLOCKQUOTE_EXPANDABLE_END_DELIM}
 `{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
-额外需注意的是 `{PREFIX.FAYAN}` 命令会忽略指定的 `@用户名`
+额外需注意的是 `{PREFIX.FAYAN}` 命令会忽略指定的 `@用户名`且必须指定关键词
 """
 
 DANMU_TIPS = f"时间点为{TZ}时区"
@@ -78,22 +79,26 @@ async def query_danmu(client: Client, message: Message, **kwargs):
     status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
     kwargs["progress"] = status_msg
 
-    resp = {}
-    paths = []
-    count = 0
     super_chats = defaultdict(Decimal)  # {"currency": amount}
-    engine_dates = await get_engine_with_dates(match_time, user, keyword, qtype)
-    for engine, dates in sorted(engine_dates.items(), reverse=True):
-        if engine == "r2":
-            resp = await query_r2(dates, user, keyword, caption, super_chats, qtype, **kwargs)
-        else:
-            resp = await query_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
-        paths.extend(resp.get("paths", []))
-        count += resp.get("count", 0)
+    if DANMU.QUERY_METHOD.lower() == "turso":
+        resp = await query_turso(match_time, user, keyword, caption, super_chats, qtype, **kwargs)
+        count = resp.get("count", 0)
+        paths = resp.get("paths", [])
+    else:
+        resp = {}
+        paths = []
+        count = 0
+        engine_dates = await get_engine_with_dates(match_time, user, keyword, qtype)
+        for engine, dates in sorted(engine_dates.items(), reverse=True):
+            if engine == "r2":
+                resp = await query_r2(dates, user, keyword, caption, super_chats, qtype, **kwargs)
+            else:
+                resp = await query_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
+            paths.extend(resp.get("paths", []))
+            count += resp.get("count", 0)
     if count == 0:
         await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
         return
-
     header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
     profit = ""
     profit_usd = 0
@@ -114,8 +119,11 @@ async def query_danmu(client: Client, message: Message, **kwargs):
             return
     caption += f"\n#️⃣{qtype}数: {count}"
     caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
-    dates: list[str] = flatten(engine_dates.values())  # type: ignore
-    merged_paths = merge_txt_files(paths, dates, user, keyword, qtype, tips)
+    if DANMU.QUERY_METHOD.lower() == "turso":
+        merged_paths = paths
+    else:
+        dates: list[str] = flatten(engine_dates.values())  # type: ignore
+        merged_paths = merge_txt_files(paths, dates, user, keyword, qtype, tips)
     media = [{"document": path} for path in sorted(merged_paths)]
     # less than 200K, add instant view
     if file_bytes(merged_paths) < 204800:
@@ -161,6 +169,8 @@ def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
 
     if qtype == "发言":
         user = ""
+        if not keyword:
+            return "", "", "", "查询发言时必须指定关键词"
 
     if not any((match_time, user, keyword)):
         return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.FAYAN}命令查看帮助"
@@ -181,7 +191,7 @@ async def get_engine_with_dates(match_time: str, user: str = "", keyword: str =
               为避免此情况, 当查询为全年时, 改为按月份从R2获取
               然后合并每2个月的记录为一个文件, 发送6个文件.
 
-    对于发言记录, 如果指定了日期, 从R2查询, 否则从server获取
+    对于发言记录一律从R2查询
     Returns:
         dict: {"server": [date-1, date-2], "r2": [date-3, date-4]}
     """
@@ -192,7 +202,7 @@ async def get_engine_with_dates(match_time: str, user: str = "", keyword: str =
     if qtype == "发言":
         if match_time:
             return {"r2": [match_time]}
-        return {"server": allowed_years}
+        return {"r2": allowed_years}
 
     # 以下为匹配弹幕查询
     if not match_time:
src/danmu/sync.py
@@ -8,8 +8,8 @@ from zoneinfo import ZoneInfo
 from glom import flatten, glom
 from loguru import logger
 
-from config import DANMU, DB, TZ, cutter
-from danmu.utils import merge_json, simplify_json
+from config import DANMU, TZ, cutter
+from danmu.utils import TURSO_KWARGS, merge_json, simplify_json
 from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
 from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
 from networking import hx_req
@@ -27,12 +27,6 @@ INDEX_NAMES = {
     "发言": ["time"],
     "弹幕": ["time", "user", "uid", "superchat"],
 }
-TURSO_KWARGS: dict = {
-    "db_name": DANMU.TURSO_DATABASE,
-    "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
-    "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
-    "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
-}
 
 
 async def sync_livechats() -> None:
@@ -188,16 +182,16 @@ async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> li
     resp = await turso_exec(
         [{"type": "execute", "stmt": {"sql": f'SELECT {identifier} FROM "{qtype}" WHERE time >= "{date} 00:00:00" AND time <= "{tomorrow:%Y-%m-%d} 08:00:00";'}}],
         silent=True,
+        retry=2,
         **TURSO_KWARGS,
     )
     saved = glom(resp, "results.0.response.result.rows.*.*.value", default=[])
     saved = {"-".join(x) for x in saved}  # convert to set to speedup.
-    log = f"Found {len(saved)} messages in Turso."
+    log = f"Found {len(saved)} messages in Turso of date {date}"
     log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}"
-    logger.info(log)
-    if qtype == "发言":
-        return [x for x in data if x["time"] not in saved]
-    return [x for x in data if f"{x['time']}-{x['user']}" not in saved]
+    filtered = [x for x in data if x["time"] not in saved] if qtype == "发言" else [x for x in data if f"{x['time']}-{x['user']}" not in saved]
+    logger.info(f"{log}. Remains: {len(filtered)}")
+    return filtered
 
 
 async def sync_server_to_r2(qtype: str) -> None:
src/danmu/turso.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from collections import defaultdict
+from decimal import Decimal
+
+import anyio
+from loguru import logger
+
+from config import DOWNLOAD_DIR, cache, cutter
+from danmu.utils import TURSO_KWARGS
+from database.turso import turso_exec, turso_parse_resp
+from messages.progress import modify_progress
+from others.emoji import CURRENCY
+from utils import number
+
+
+async def query_turso(match_time: str, user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
+    """从Turso获取记录.
+
+    Returns:
+        {"paths": list[str], "count": int}
+    """
+    if match_time:
+        if len(match_time) == 4:  # 2025
+            begin = f"{match_time}-01-01 00:00:00"
+            end = f"{match_time}-12-31 23:59:59"
+        elif len(match_time) == 7:  # 2025-01
+            begin = f"{match_time}-01 00:00:00"
+            end = f"{match_time}-31 23:59:59"
+        elif len(match_time) == 10:  # 2025-01-01
+            begin = f"{match_time} 00:00:00"
+            end = f"{match_time} 23:59:59"
+    if keyword:
+        segmented = " ".join(cutter.cutword(keyword))
+        texts_to_match = keyword if segmented == keyword else f'"{keyword}" OR "{segmented}"'  # must use double quotes for inner part
+
+    if qtype == "发言":  # 发言必须指定keyword
+        sql = f"SELECT T.time, T.content FROM 发言 AS T JOIN fts_发言 AS FTS ON T.rowid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}'"
+        if match_time:
+            sql += f" AND T.time >= '{begin}' AND T.time <= '{end}'"
+        logger.info(sql)
+        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
+        parsed = await parse_from_turso(turso_parse_resp(resp), user, keyword, super_chats, qtype)
+        count = parsed.get("count", 0)
+        await modify_progress(text=caption + f"\n⏳已匹配 {count} 条{qtype}", force_update=True, **kwargs)
+        texts = parsed.get("texts", "")
+        save_path = f"{DOWNLOAD_DIR}/{user}-{match_time}-{keyword}-{qtype}.txt".replace("--", "-")
+        async with await anyio.open_file(save_path, "w") as f:
+            await f.write(texts.strip())
+    else:
+        conditions = []
+        if match_time:
+            conditions.append(f"T.time >= '{begin}' AND T.time <= '{end}'" if keyword else f"time >= '{begin}' AND time <= '{end}'")
+        if user:
+            conditions.append(f"T.user = '{user}'" if keyword else f"user = '{user}'")
+        if keyword:
+            sql = f"SELECT T.time,T.fullname,T.content,T.superchat,T.user,T.uid FROM 弹幕 AS T JOIN fts_弹幕 AS FTS ON T.rowid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}'"
+            if conditions:
+                sql += " AND " + " AND ".join(conditions)
+        else:
+            cond = " AND ".join(conditions)
+            sql = f"SELECT time,fullname,content,superchat,user,uid FROM 弹幕 WHERE {cond}"
+        logger.info(sql)
+        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
+        parsed = await parse_from_turso(turso_parse_resp(resp), user, keyword, super_chats, qtype)
+        count = parsed.get("count", 0)
+        await modify_progress(text=caption + f"\n⏳已匹配 {count} 条{qtype}", force_update=True, **kwargs)
+        texts = parsed.get("texts", "")
+        save_path = f"{DOWNLOAD_DIR}/{user}-{match_time}-{keyword}-{qtype}.txt".replace("--", "-")
+        async with await anyio.open_file(save_path, "w") as f:
+            await f.write(texts.strip())
+    return {"paths": [save_path], "count": count}
+
+
+async def parse_from_turso(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
+    """解析从Turso获取的记录.
+
+    日期从新到旧, 数据从旧到新
+    字段含义详见 `simplify_json` 函数
+
+    发言: {"t": "00:04:27", "m": "你好"}
+    弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
+
+
+    """
+    # ruff: noqa: PLW2901
+    # group by dates
+    grouped_data = defaultdict(list)  # {date: list[dict]}
+    for x in data:
+        grouped_data[x["time"][:10]].append(x)
+
+    texts = ""
+    count = 0
+    for date, items in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
+        if keyword:
+            items = [x for x in items if keyword in x.get("content", "")]
+        if user and qtype == "弹幕":
+            items = [x for x in items if x.get("user", "") == user]
+        items = sorted(items, key=lambda x: x["time"])  # 数据从旧到新
+        for idx, x in enumerate(items):
+            # only show the day once
+            day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
+            if qtype == "发言":
+                texts += f"\n{day}{x['time'][11:]}: {x['content'].strip()}"
+            else:
+                hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
+                sc_amount = ""
+                if super_chat := x.get("superchat"):
+                    currency, amount = super_chat.split(" ")
+                    super_chats[currency] += Decimal(amount)
+                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
+                msg = x.get("content", "")
+                username = "" if hide_name else "|" + x.get("fullname", "")
+                texts += f"\n{day}{x['time'][11:]}{username}{sc_amount}: {msg}"
+            count += 1
+    return {"texts": texts.rstrip(), "count": count}
+
+
+@cache.memoize(ttl=60)
+async def get_liveinfo() -> dict:
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
+    logger.warning("Get liveinfo from Turso")
+    return {x["liveDate"]: x for x in turso_parse_resp(resp)}
+
+
+async def live_date(date: str) -> str:
+    liveinfo = await get_liveinfo()
+    matched = [v for k, v in liveinfo.items() if k[:10] == date]
+    titles = [x.get("title", "") for x in matched]
+    urls = [x.get("url", "") for x in matched]
+    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
+    texts = date + "\n" + "\n".join(markdown)
+    return texts.rstrip()
src/danmu/utils.py
@@ -6,10 +6,17 @@ from pathlib import Path
 
 from glom import glom
 
-from config import DANMU, DOWNLOAD_DIR, PROXY, TOKEN, cache
+from config import DANMU, DB, DOWNLOAD_DIR, PROXY, TOKEN, cache
 from networking import hx_req
 from price.coinmarketcap import get_cmc_fiat
 
+TURSO_KWARGS: dict = {
+    "db_name": DANMU.TURSO_DATABASE,
+    "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
+    "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
+    "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
+}
+
 
 @cache.memoize(ttl=28800)
 async def to_usd(ccy: str) -> Decimal:
@@ -135,7 +142,10 @@ def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str,
         return []
     dates = sorted(dates)
     paths = sorted(paths)
-    if all(len(x) == 10 for x in dates):  # all days  (不存在此情况)
+    dates = [name for name in dates if any(name in path for path in paths)]
+    if len(dates) == 1:
+        date_name = dates[0]
+    elif all(len(x) == 10 for x in dates):  # all days  (不存在此情况)
         date_name = dates[0][:7]
     elif all(len(x) == 7 for x in dates):  # all months
         date_name = dates[0][:4]
src/messages/progress.py
@@ -83,7 +83,6 @@ async def telegram_uploading(current: int, total: int, *args):
     if not Path(path).is_file():
         logger.error(f"File not found: {path}")
         return
-    _type = "视频" if Path(path).suffix in [".mp4", ".mkv", ".mov", ".webm", ".avi", ".flv", ".wmv", ".m4v"] else "音频"
-    emoji = "🎬" if _type == "视频" else "🎧"
-    msg = f"⏫{_type}{msg}\n{emoji}{Path(path).name}"
+    ext = Path(path).suffix.lstrip(".")
+    msg = f"⏫{ext.upper()} {msg}\n💾{Path(path).name}"
     await modify_progress(message=message, text=msg, detail_progress=detail_progress)
src/config.py
@@ -106,7 +106,8 @@ class DANMU:
     BASE_URL = os.getenv("DANMU_BASE_URL", "")  # Custom API, No docs
     STREAMER = os.getenv("DANMU_STREAMER", "Streamer")  # streamer name
     PROXY = os.getenv("DANMU_PROXY", None)  # socks5://127.0.0.1:7890
-    NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query
+    QUERY_METHOD = os.getenv("DANMU_QUERY_METHOD", "turso")  # Turso or R2+API server
+    NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query to API server
     D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")
     TURSO_DATABASE = os.getenv("DANMU_TURSO_DATABASE", "bennybot-danmu")
     TURSO_USERNAME = os.getenv("DANMU_TURSO_USERNAME", "")  # https://turso.tech