Commit 4f0a59d

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-08 16:45:30
feat(danmu): support syncing livechats to Turso
1 parent 3589b85
Changed files (3)
src/danmu/sync.py
@@ -1,28 +1,267 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import asyncio
-import re
 from collections import defaultdict
-from datetime import UTC, datetime
+from datetime import datetime, timedelta
 from zoneinfo import ZoneInfo
 
 from glom import flatten, glom
 from loguru import logger
 
-from config import DANMU, TZ, cache
+from config import DANMU, DB, TZ, cache, cutter
 from danmu.utils import merge_json, simplify_json
-from database.d1 import create_d1_table, query_d1
 from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
+from database.turso import insert_statement, turso_create_table, turso_exec
 from networking import hx_req
 from utils import nowdt
 
+# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
+LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
 
+# livechats相关
+COLUMNS = {
+    "发言": "time TEXT, content TEXT, segmented TEXT",
+    "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT,user TEXT, uid TEXT, segmented TEXT",
+}
+INDEX_NAMES = {
+    "发言": ["time"],
+    "弹幕": ["time", "user", "uid", "superchat"],
+}
+TURSO_KWARGS: dict = {
+    "db_name": DANMU.TURSO_DATABASE,
+    "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
+    "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
+    "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
+}
+
+
+# ruff: noqa: S608
 @cache.memoize(ttl=3600)
-async def sync_server_to_r2(qtype: str) -> None:
-    if qtype not in ["弹幕", "发言"]:
-        logger.warning(f"Unknown query type: {qtype}")
+async def sync_livechats() -> None:
+    if not DANMU.SYNC_ENABLE:
         return
+    for qtype in ["弹幕", "发言"]:
+        if "R2" in DANMU.SYNC_ENGNIE.upper():
+            logger.info(f"Syncing streaming {qtype} to R2")
+            await sync_server_to_r2(qtype)
+
+        if "turso" in DANMU.SYNC_ENGNIE.lower():
+            logger.info(f"Syncing streaming {qtype} to Turso")
+            await sync_server_to_turso(qtype)
 
+
+async def sync_server_to_turso(qtype: str) -> None:
+    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
+    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+
+    # 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
+    await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], silent=True, **TURSO_KWARGS)
+    await create_table_and_index(qtype)
+
+    # 获取turso存储的liveinfo
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
+    cols = glom(resp, "results.0.response.result.cols", default=[])
+    rows = glom(resp, "results.0.response.result.rows", default=[])
+    db_liveinfo: dict[str, dict] = {}
+    for row in rows:
+        row_info = {col["name"]: x["value"] for x, col in zip(row, cols, strict=True)}
+        db_liveinfo[row_info["liveDate"]] = row_info
+
+    # 开始同步
+    monitor_years = [x.strip() for x in DANMU.SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.SYNC_FAYAN_YEARS.split(",") if x.strip()]
+    monitor_years.append(str(nowdt(TZ).year))
+    monitor_years = sorted(set(monitor_years))
+    for year in monitor_years:
+        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
+        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
+        liveinfo_list: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)  # type: ignore
+        for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
+            live_date = liveinfo["liveDate"]
+            if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default="0") == "1":
+                continue
+            results = []
+            payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
+            resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+            if resp.get("count", 0) == 0:
+                continue
+            logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
+            # append the first page
+            results.extend(resp["data"])
+            quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)  # type: ignore
+            n_pages = quotient + (1 if remainder else 0)
+            tasks = [
+                hx_req(
+                    api_url,
+                    "POST",
+                    data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date},
+                    proxy=DANMU.PROXY,
+                    check_kv={"code": 0},
+                    silent=True,
+                )
+                for page in range(2, n_pages + 1)
+            ]
+            chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
+            for chunk in chunks:
+                async with asyncio.Semaphore(concurrency):
+                    res = await asyncio.gather(*chunk, return_exceptions=True)
+                    data = flatten(glom(res, "*.data"))
+                    results.extend(data)
+            # save all results to turso
+            await save_livechats_to_turso(liveinfo, results, qtype)
+
+
+async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str) -> None:
+    """Save livechats from server to turso."""
+    if not data:
+        return
+    # warning: some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
+    date = live_info["liveDate"][:10]
+
+    # 标准化数据格式
+    normed_data = []
+    if qtype == "发言":
+        for x in data:
+            if not all([x.get("startTime"), x.get("content")]):
+                continue
+            time = x["startTime"].split(",")[0]  # 3:53:23
+            if len(time.split(":")[0]) == 1:
+                time = f"0{time}"
+            normed_data.append({"time": f"{date} {time}", "content": x["content"], "segmented": " ".join(cutter.cutword(x["content"]))})
+    else:
+        for x in sorted(data, key=lambda x: x.get("timestamp", 0)):
+            # time, user, fullname, content, uid, superchat, currency, segmented
+            item = {}
+            if not all([x.get("timestamp"), x.get("authorName")]):
+                continue
+            dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=ZoneInfo(TZ))
+            item["time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
+            item["fullname"] = x["authorName"]  # User Name
+            item["user"] = x["authorName"].replace(" ", "")  # UserName
+            if x.get("authorId"):
+                item["uid"] = x["authorId"]
+            if x.get("scAmount"):
+                item["superchat"] = x["scAmount"]
+
+            if x.get("message"):
+                item["content"] = x["message"]
+                item["segmented"] = " ".join(cutter.cutword(x["message"]))
+            normed_data.append(item)
+
+    # 过滤掉获取已保存在turso的记录
+    data = await filter_records_in_turso(normed_data, date, qtype)
+
+    # 插入到turso
+    concurrency = 1024  # 每次插入1024条
+    statements = [insert_statement(qtype, records) for records in sorted(data, key=lambda x: x["time"])]
+    chunks = [statements[i : i + concurrency] for i in range(0, len(statements), concurrency)]
+    for chunk in chunks:
+        resp = await turso_exec(chunk, silent=True, retry=2, **TURSO_KWARGS)
+        num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
+        logger.success(f"Synced {num_success} {qtype} of {date} to Turso")
+    now = nowdt(TZ)
+    yesterday = now - timedelta(days=1)
+    # 弹幕是实时录制, 应等晚间的直播结束后再标记完成
+    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 19) and date in [f"{yesterday:%Y-%m-%d}", f"{now:%Y-%m-%d}"]:
+        return
+
+    # 更新liveinfo
+    # 更新前, 再检查一遍是否全部数据均已经同步
+    if await filter_records_in_turso(normed_data, date, qtype):
+        logger.warning(f"Not all {qtype} data of {date} has been synced.")
+        return
+    records = {
+        "date": live_info["liveDate"][:10],
+        "title": live_info.get("title", ""),
+        "url": live_info.get("url", ""),
+        f"{qtype}已完成": 1,
+        "liveDate": live_info["liveDate"],
+    }
+    resp = await turso_exec([insert_statement("liveinfo", records, update_on_conflict="liveDate")], silent=True, **TURSO_KWARGS)
+    if glom(resp, "results.0.type", default="") == "ok":
+        logger.success(f"Updated liveinfo of {date}")
+
+
+async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> list[dict]:
+    """获取Turso数据库中date全天的记录, 并过滤掉已保存的记录.
+
+    由于过了凌晨的数据在原始数据库中还是记录为前一天
+    所以我们应该获取到第二天凌晨8点前的数据 (已下播), 以过滤掉已保存的记录
+    """
+    # 对于发言, 使用time过滤即可, 对于弹幕, 需要使用time和user
+    identifier = "time" if qtype == "发言" else "time,user"
+    tomorrow = datetime.strptime(date, "%Y-%m-%d").astimezone(ZoneInfo(TZ)) + timedelta(days=1)
+    resp = await turso_exec(
+        [{"type": "execute", "stmt": {"sql": f'SELECT {identifier} FROM "{qtype}" WHERE time >= "{date} 00:00:00" AND time <= "{tomorrow:%Y-%m-%d} 08:00:00";'}}],
+        silent=True,
+        **TURSO_KWARGS,
+    )
+    saved = glom(resp, "results.0.response.result.rows.*.*.value", default=[])
+    saved = {"-".join(x) for x in saved}  # convert to set to speedup.
+    log = f"Found {len(saved)} messages in Turso."
+    log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}"
+    logger.info(log)
+    if qtype == "发言":
+        return [x for x in data if x["time"] not in saved]
+    return [x for x in data if f"{x['time']}-{x['user']}" not in saved]
+
+
+async def create_table_and_index(table_name: str) -> None:
+    if cache.get(f"danmu-table-{table_name}"):
+        return
+    cache.set(f"danmu-table-{table_name}", table_name, ttl=0)
+
+    await turso_create_table(table_name, COLUMNS[table_name], idx_cols=INDEX_NAMES[table_name], silent=True, **TURSO_KWARGS)
+
+    # 列出所有虚拟表
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM pragma_table_list WHERE type="virtual";'}}], silent=True, **TURSO_KWARGS)
+    virtual_tables = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+
+    primary_key = "rowid"
+    """创建 FTS5 虚拟表
+    -- content=table_name 指明关联的原表
+    -- content_rowid=primary_key 指明原表的行 ID 列是 primary_key
+    -- segmented 是我们要索引的列
+    -- tokenize='unicode61' 使用 unicode61 分词器, 对多种语言支持更好
+    """
+    statements = []
+    if f"fts_{table_name}" not in virtual_tables:
+        logger.debug(f"Creating FTS5 virtual table for {table_name}")
+        sql = f"CREATE VIRTUAL TABLE IF NOT EXISTS 'fts_{table_name}' USING fts5(segmented, content='{table_name}', content_rowid={primary_key}, tokenize='unicode61');"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+        """将现有数据从原表复制到 FTS 表
+        注意, 我们在这里插入的是 rowid (它会对应到 content_rowid=primary_key 指定的列) 和 content
+        从原表中选择 primary_key 和 segmented 列。primary_key 列的值会被插入到 FTS 表中对应原表 rowid (或 content_rowid) 的位置。
+        """
+        sql = f"INSERT INTO 'fts_{table_name}' (rowid, segmented) SELECT {primary_key}, segmented FROM '{table_name}' WHERE {primary_key} NOT IN (SELECT rowid FROM 'fts_{table_name}');"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+    # 列出所有触发器
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM sqlite_master WHERE type="trigger";'}}], silent=True, **TURSO_KWARGS)
+    triggers = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+    """维护 FTS 表
+    为了让 FTS 表与原表保持同步, 需要在原表上创建触发器。
+    在原表插入、删除、更新时, 同步更新 FTS 表
+    """
+    # 创建触发器, 在原表插入数据时, 同步从 FTS 表插入
+    if f"trigger_{table_name}_ai" not in triggers:
+        sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_ai' AFTER INSERT ON '{table_name}' BEGIN INSERT INTO 'fts_{table_name}' (rowid, segmented) VALUES (NEW.{primary_key}, NEW.segmented); END;"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+    # 创建触发器, 在原表删除数据时, 同步从 FTS 表删除
+    if f"trigger_{table_name}_ad" not in triggers:
+        sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_ad' AFTER DELETE ON '{table_name}' BEGIN DELETE FROM 'fts_{table_name}' WHERE rowid = OLD.{primary_key}; END;"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+    # 创建触发器, 在原表更新数据时, 同步更新 FTS 表
+    # FTS5 的更新通常是先删除旧的, 再插入新的
+    if f"trigger_{table_name}_au" not in triggers:
+        sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_au' AFTER UPDATE ON '{table_name}' BEGIN DELETE FROM 'fts_{table_name}' WHERE rowid = OLD.{primary_key} AND OLD.segmented <> NEW.segmented; INSERT INTO 'fts_{table_name}' (rowid, segmented) SELECT NEW.{primary_key}, NEW.segmented WHERE OLD.segmented <> NEW.segmented; END;"
+        statements.append({"type": "execute", "stmt": {"sql": sql}})
+    await turso_exec(statements, silent=True, **TURSO_KWARGS)
+
+
+async def sync_server_to_r2(qtype: str) -> None:
     concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
     prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
     api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
@@ -62,14 +301,12 @@ async def sync_server_to_r2(qtype: str) -> None:
             metadata = {"empty": True} if not results else {}
             await set_cf_r2(prefix + r2_key, simplify_json(results, qtype), metadata, compress=True, silent=True)
 
-    if not DANMU.R2_SYNC_ENABLE:
-        return
     now = nowdt(TZ)
     if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
         # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
         return
 
-    monitor_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
+    monitor_years = [x.strip() for x in DANMU.SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.SYNC_FAYAN_YEARS.split(",") if x.strip()]
     monitor_years.append(str(now.year))
     monitor_years = sorted(set(monitor_years))
     r2 = await list_cf_r2(prefix=prefix)
@@ -132,55 +369,3 @@ async def sync_server_to_r2(qtype: str) -> None:
             if r2 := await get_cf_r2(prefix + year, silent=True):
                 all_data.append(r2)
         await set_cf_r2(prefix + "all", merge_json(all_data), compress=True, silent=True)
-
-
-@cache.memoize(ttl=3600)
-async def sync_danmu_to_d1() -> None:
-    """Deprecated, D1 only allow 5M reads, 100K writes."""
-    # ruff: noqa: S608
-    concurrency = 200
-    now = nowdt(TZ)
-
-    async def batch_sync(danmu_list: list[dict], saved_items: list[str]) -> int:
-        sc_pattern = re.compile(r"^([A-Z]{3,}) (\d+(?:\.\d+)?)$")
-        db_columns = "id INTEGER PRIMARY KEY, time TEXT, uid INTEGER, user TEXT, text TEXT, sc_amt REAL NULL, sc_ccy TEXT NULL"
-        tasks = []
-        for danmu in danmu_list:
-            dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
-            table_name = dt.year
-            if f"{dt:%Y-%m-%d %H:%M:%S}{danmu['authorId']}" in saved_items:
-                continue
-            await create_d1_table(table_name, db_columns, DANMU.D1_DATABASE)
-            columns = ["time", "uid", "user", "text"]
-            params = [f"{dt:%Y-%m-%d %H:%M:%S}", danmu["authorId"], danmu["authorName"], danmu["message"]]
-            if (amt := danmu.get("scAmount")) and (matched := sc_pattern.fullmatch(amt)):
-                sc_ccy = matched.group(1)
-                sc_amt = matched.group(2)
-                columns.extend(["sc_amt", "sc_ccy"])
-                params.extend([float(sc_amt), sc_ccy])
-            sql = f'INSERT INTO "{table_name}" ({", ".join(columns)}) VALUES ({", ".join(["?" for _ in range(len(columns))])});'
-            tasks.append(query_d1(sql, db_name=DANMU.D1_DATABASE, params=params))
-        chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
-        processed = 0
-        for chunk in chunks:
-            async with asyncio.Semaphore(concurrency):
-                results = await asyncio.gather(*chunk, return_exceptions=True)
-                processed += len(results)
-                if not any(glom(results, "*.success")):
-                    logger.error(f"Sync danmu to d1 failed: {results}")
-                    break
-        return processed
-
-    sql = f'SELECT time, uid FROM "{now.year}";'
-    resp = await query_d1(sql, db_name=DANMU.D1_DATABASE)
-    saved_items = [f"{x['time']}{x['uid']}" for x in flatten(glom(resp, "result.*.results", default=[]))]
-    page = 1
-    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": f"{now:%Y-%m-%d}", "message": "", "authorName": ""}
-    resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-    count = resp["count"]
-    processed = await batch_sync(resp["data"], saved_items)
-    while processed < count:
-        page += 1
-        payload["page"] = page
-        resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-        processed += await batch_sync(resp["data"], saved_items)
src/danmu/utils.py
@@ -101,7 +101,7 @@ def simplify_json(data: list[dict], qtype: str) -> dict:
         if not all([x.get("timestamp"), x.get("authorName")]):
             continue
         item["s"] = round(x["timestamp"] / 1000000)
-        item["u"] = x["authorName"]
+        item["u"] = x["authorName"].replace(" ", "")  # UserName
         if x.get("scAmount"):
             item["p"] = x["scAmount"]
         if x.get("message"):
@@ -115,7 +115,7 @@ def merge_json(data: list[dict]) -> dict:
 
     data: list[发言 | 弹幕]
     发言: {"2021-12-29": [{"t": "00:04:27", "m": "你好"}]}
-    弹幕: {"2021-12-29": [{"s": 1640799880, "u": "User Name", "p": "USD 100", "m": "你好"}]}
+    弹幕: {"2021-12-29": [{"s": 1640799880, "u": "UserName", "p": "USD 100", "m": "你好"}]}
 
     Returns:
         {"liveDate": list[dict]}
src/main.py
@@ -22,7 +22,7 @@ from bridge.chartimg import forward_chartimg_results
 from bridge.ocr import forward_ocr_results
 from bridge.social import forward_social_media_results
 from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
-from danmu.sync import sync_server_to_r2
+from danmu.sync import sync_livechats
 from handler import handle_social_media, handle_utilities
 from history.sync import backup_chat_history, sync_chat_history
 from llm.summary import daily_summary
@@ -123,8 +123,7 @@ async def scheduling(client: Client):
                 logger.info(f"Sending daily message to {chat_id}: {msg}")
                 await client.send_message(to_int(chat_id), msg)
     await summary_pods(client)
-    await sync_server_to_r2(qtype="发言")
-    await sync_server_to_r2(qtype="弹幕")
+    await sync_livechats()
     await clean_gemini_files()
     await backup_chat_history(client)