main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import hashlib
  5import json
  6from collections import defaultdict
  7from datetime import datetime, timedelta
  8from zoneinfo import ZoneInfo
  9
 10import pandas as pd
 11from glom import flatten, glom
 12from loguru import logger
 13
 14from config import DANMU, PROXY, TZ, cutter
 15from danmu.r2 import get_r2_dataframe, query_r2_for_date, r2_liveinfo, save_dataframe_to_r2
 16from danmu.utils import TURSO_KWARGS, convert_to_dataframe, get_bearer_token
 17from database.r2 import get_cf_r2, set_cf_r2
 18from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
 19from networking import hx_req
 20from utils import nowdt, strings_list
 21
 22# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
 23LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
 24USERINFO_COLUMNS = "uid TEXT, fullname TEXT, superchat TEXT, firstdate TEXT, lastdate TEXT, note TEXT, hash TEXT PRIMARY KEY"
 25
 26# livechats相关
 27COLUMNS = {
 28    "发言": "time TEXT, content TEXT, segmented TEXT",
 29    "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT, uid TEXT, segmented TEXT",
 30}
 31INDEX_NAMES = {
 32    "发言": ["time"],
 33    "弹幕": ["time", "fullname", "uid"],
 34}
 35
 36
 37async def sync_livechats() -> None:
 38    if not DANMU.SYNC_ENABLE:
 39        return
 40    for qtype in ["弹幕", "发言"]:
 41        if "R2" in DANMU.SYNC_ENGNIE.upper():
 42            logger.info(f"Syncing streaming {qtype} to R2")
 43            await sync_server_to_r2(qtype)
 44
 45        if "turso" in DANMU.SYNC_ENGNIE.lower():
 46            logger.info(f"Syncing streaming {qtype} to Turso")
 47            await sync_server_to_turso(qtype)
 48
 49
 50async def sync_server_to_turso(qtype: str) -> None:
 51    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
 52    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
 53
 54    # 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
 55    await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], idx_prefix="idx_liveinfo_", silent=True, **TURSO_KWARGS)
 56    await turso_create_table("userinfo", USERINFO_COLUMNS, idx_cols=["uid", "fullname"], idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
 57    await turso_create_table(qtype, COLUMNS[qtype], idx_cols=INDEX_NAMES[qtype], idx_prefix=f"idx_{qtype}_", fts_on_col="rowid", silent=True, **TURSO_KWARGS)
 58
 59    # 获取turso存储的liveinfo
 60    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
 61    db_liveinfo = {x["liveDate"]: x for x in turso_parse_resp(resp)}
 62
 63    # 开始同步
 64    monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
 65    monitor_years.append(str(nowdt(TZ).year))
 66    monitor_years = sorted(set(monitor_years))
 67    for year in monitor_years:
 68        headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
 69        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
 70        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
 71        liveinfo_list: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)  # type: ignore
 72        if glom(liveinfo_list, "hx_error", default=""):  # API server is down
 73            return
 74        for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
 75            live_date = liveinfo["liveDate"]
 76            if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default=0) == 1:
 77                continue
 78            results = []
 79            payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
 80            resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
 81            if resp.get("count", 0) == 0:
 82                continue
 83            logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
 84            # append the first page
 85            results.extend(resp["data"])
 86            quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
 87            n_pages = quotient + (1 if remainder else 0)
 88            tasks = [
 89                hx_req(
 90                    api_url,
 91                    "POST",
 92                    headers=headers,
 93                    data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date},
 94                    proxy=PROXY.DANMU,
 95                    check_kv={"code": 0},
 96                    silent=True,
 97                )
 98                for page in range(2, n_pages + 1)
 99            ]
100            chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
101            for chunk in chunks:
102                async with asyncio.Semaphore(concurrency):
103                    res = await asyncio.gather(*chunk, return_exceptions=True)
104                    data = flatten(glom(res, "*.data"))
105                    results.extend(data)
106            # save all results to turso
107            await save_livechats_to_turso(liveinfo, results, qtype)
108
109
110async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str) -> None:
111    """Save livechats from server to turso."""
112    if not data:
113        return
114    # warning: some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
115    date = live_info["liveDate"][:10]
116
117    # 标准化数据格式
118    normed_data = []
119    added = set()
120    if qtype == "发言":
121        for x in data:
122            if not all([x.get("startTime"), x.get("content")]):
123                continue
124            time = x["startTime"].split(",")[0]  # 3:53:23
125            if len(time.split(":")[0]) == 1:
126                time = f"0{time}"
127            if f"{date}{time}{x['content']}" in added:
128                continue
129            normed_data.append({"time": f"{date} {time}", "content": x["content"], "segmented": " ".join(cutter.cutword(x["content"]))})
130            added.add(f"{date}{time}{x['content']}")
131    else:
132        for x in sorted(data, key=lambda x: x.get("timestamp") or 0):
133            # time, fullname, content, uid, superchat, currency, segmented
134            item = {}
135            if not all([x.get("timestamp"), x.get("authorName")]):
136                continue
137            dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=ZoneInfo(TZ))
138            item["time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
139            item["fullname"] = x["authorName"].removeprefix("@")  # User Name
140            item["uid"] = x["authorId"] if x.get("authorId") else item["fullname"]
141            if x.get("scAmount"):
142                item["superchat"] = x["scAmount"]
143
144            if x.get("message"):
145                item["content"] = x["message"]
146                item["segmented"] = " ".join(cutter.cutword(x["message"]))
147            if f"{item['time']}{item['fullname']}{item.get('content', '')}" in added:
148                continue
149            normed_data.append(item)
150            added.add(f"{item['time']}{item['fullname']}{item.get('content', '')}")
151        await save_userinfo_to_turso(normed_data)
152    # 过滤掉获取已保存在turso的记录
153    data = await filter_records_in_turso(normed_data, date, qtype)
154
155    # 插入到turso
156    concurrency = 4096  # 每次插入4096条
157    statements = [insert_statement(qtype, records) for records in sorted(data, key=lambda x: x["time"])]
158    chunks = [statements[i : i + concurrency] for i in range(0, len(statements), concurrency)]
159    for chunk in chunks:
160        resp = await turso_exec(chunk, silent=True, retry=2, **TURSO_KWARGS)
161        num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
162        logger.success(f"Synced {num_success} {qtype} of {date} to Turso")
163    now = nowdt(TZ)
164    yesterday = now - timedelta(days=1)
165    # 弹幕是实时录制, 应等晚间的直播结束后再标记完成
166    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 19) and date in [f"{yesterday:%Y-%m-%d}", f"{now:%Y-%m-%d}"]:
167        return
168
169    # 更新liveinfo
170    # 更新前, 再检查一遍是否全部数据均已经同步
171    if await filter_records_in_turso(normed_data, date, qtype):
172        logger.warning(f"Not all {qtype} data of {date} has been synced.")
173        return
174    records = {
175        "date": live_info["liveDate"][:10],
176        "title": live_info.get("title", ""),
177        "url": live_info.get("url", ""),
178        f"{qtype}已完成": 1,
179        "liveDate": live_info["liveDate"],
180    }
181    resp = await turso_exec([insert_statement("liveinfo", records, update_on_conflict="liveDate")], silent=True, **TURSO_KWARGS)
182    if glom(resp, "results.0.type", default="") == "ok":
183        logger.success(f"Updated liveinfo of {date}")
184
185
186async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> list[dict]:
187    """获取Turso数据库中date全天的记录, 并过滤掉已保存的记录.
188
189    由于过了凌晨的数据在原始数据库中还是记录为前一天
190    所以我们应该获取到第二天凌晨8点前的数据 (已下播), 以过滤掉已保存的记录
191    """
192    # 对于发言, 使用time过滤即可, 对于弹幕, 需要使用time和fullname
193    identifier = "time" if qtype == "发言" else "time,fullname"
194    tomorrow = datetime.strptime(date, "%Y-%m-%d").astimezone(ZoneInfo(TZ)) + timedelta(days=1)
195    resp = await turso_exec(
196        [{"type": "execute", "stmt": {"sql": f'SELECT {identifier} FROM "{qtype}" WHERE time >= "{date} 00:00:00" AND time <= "{tomorrow:%Y-%m-%d} 08:00:00";'}}],
197        silent=True,
198        retry=2,
199        **TURSO_KWARGS,
200    )
201    saved = glom(resp, "results.0.response.result.rows.*.*.value", default=[])
202    saved = {"-".join(x) for x in saved}  # convert to set to speedup.
203    log = f"Found {len(saved)} messages in Turso of date {date}"
204    log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}"
205    filtered = [x for x in data if x["time"] not in saved] if qtype == "发言" else [x for x in data if f"{x['time']}-{x['fullname']}" not in saved]
206    logger.info(f"{log}. Remains: {len(filtered)}")
207    return filtered
208
209
210async def save_userinfo_to_turso(data: list[dict]) -> None:
211    """Save userinfo to turso.
212
213    item in data:
214    {
215        'content': 'Hello',
216        'fullname': 'Alive',
217        'segmented': 'Hello',
218        'time': '2021-04-03 21:11:25',
219        'uid': 'UCVN-fPvyxRJnyMkFjAa9oTQ'
220    }
221    """
222    sha224 = lambda s: hashlib.sha224(s.encode()).hexdigest()
223    escape = lambda s: s.replace("'", "''")
224
225    def update_superchat(superchats: list[dict], db_superchat: str | None) -> str:
226        """Update superchat to db_record.
227
228        superchats: ["USD 10.0", "TWD 200.0"]
229        db_superchat: "{"USD": 10.0, "TWD": 200.0}"
230        """
231        db_superchat = db_superchat or "[]"
232        db_sc = json.loads(db_superchat)
233        for x in superchats:
234            sc = {"time": x["time"], "superchat": x["superchat"]}
235            if sc not in db_sc:
236                db_sc.append(sc)
237        return json.dumps(db_sc).removeprefix("[]")
238
239    hashkeys = tuple({sha224(f"{x['uid']}{x['fullname']}") for x in data})
240    db = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM userinfo WHERE hash IN ({', '.join(f"'{x}'" for x in hashkeys)});"}}], silent=True, **TURSO_KWARGS)
241    db = {x["hash"]: x for x in turso_parse_resp(db)}
242    grouped = defaultdict(list)  # group by uid-fullname
243    for x in data:
244        grouped[x["uid"] + x["fullname"]].append(x)
245
246    sql = "INSERT OR REPLACE INTO userinfo (uid, fullname, superchat, firstdate, lastdate, note, hash) VALUES "
247    for records in grouped.values():
248        uid = records[0]["uid"]
249        fullname = records[0]["fullname"]
250        hash_ = sha224(f"{uid}{fullname}")
251        times = sorted([x["time"] for x in records])
252        superchats = [x for x in records if x.get("superchat")]
253        db_record = db.get(hash_, {})
254        superchat = update_superchat(superchats, db_record.get("superchat"))
255        db_firstdate = db_record.get("firstdate", "9999-12-31")
256        db_lastdate = db_record.get("lastdate", "0000-01-01")
257        firstdate = min(times[0], db_firstdate)
258        lastdate = max(times[-1], db_lastdate)
259        note = db_record.get("note", "")
260        record = {"uid": uid, "fullname": fullname, "superchat": superchat, "firstdate": firstdate, "lastdate": lastdate, "note": note, "hash": hash_}
261        if record != db_record:
262            sql += f"('{escape(uid)}', '{escape(fullname)}', '{superchat}', '{firstdate}', '{lastdate}', '{escape(note)}', '{hash_}'),"
263
264    if sql.endswith(","):
265        sql = sql[:-1] + ";"
266        logger.info(f"Retrieve userinfo of {len(grouped)} users")
267        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
268        num_success = glom(resp, "results.0.response.result.affected_row_count", default=0)
269        logger.success(f"Updated userinfo of {num_success} records")
270
271
272async def sync_server_to_r2(qtype: str) -> None:
273    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
274    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
275    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
276
277    async def batch_fetch(new_dates: set[str]) -> pd.DataFrame:
278        results = []
279        for date in sorted(new_dates):
280            logger.trace(f"Query {qtype} date: {date}")
281            headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
282            payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
283            resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
284            if resp.get("count", 0) == 0:
285                continue
286            logger.debug(f"Found {qtype} date: {date} - {resp['count']} results")
287            # append the first page
288            results.extend(resp["data"])
289            quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
290            n_pages = quotient + (1 if remainder else 0)
291            tasks = [
292                hx_req(
293                    api_url,
294                    "POST",
295                    headers=headers,
296                    data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
297                    proxy=PROXY.DANMU,
298                    check_kv={"code": 0},
299                    silent=True,
300                )
301                for page in range(2, n_pages + 1)
302            ]
303            chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
304            for chunk in chunks:
305                async with asyncio.Semaphore(concurrency):
306                    res = await asyncio.gather(*chunk, return_exceptions=True)
307                    data = flatten(glom(res, "*.data"))
308                    results.extend(data)
309        return convert_to_dataframe(results, qtype)
310
311    now = nowdt(TZ)
312    monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
313    monitor_years.append(str(now.year))
314    monitor_years = sorted(set(monitor_years))
315    finished_key = DANMU.R2_PREFIX.rstrip("/") + "/finished"
316    liveinfo_key = DANMU.R2_PREFIX.rstrip("/") + "/liveinfo"
317    userinfo_key = DANMU.R2_PREFIX.rstrip("/") + "/userinfo"
318    finished_info = await get_cf_r2(finished_key, silent=True)
319    live_info = await r2_liveinfo()
320    user_info = await get_r2_dataframe(userinfo_key) if qtype == "弹幕" else pd.DataFrame()
321    r2_dates = set(finished_info.get(qtype, []))
322    for year in monitor_years:
323        headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
324        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
325        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
326        server_live_info = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)
327        new_dates = set()
328        for x in server_live_info:
329            live_date: str = x["liveDate"]  # type: ignore
330            if live_date in r2_dates:
331                continue
332            new_dates.add(live_date)
333        if not new_dates:
334            continue
335        r2 = await query_r2_for_date(year, qtype)
336        logger.debug(f"Sync {qtype}-{year}: {new_dates}")
337        df = await batch_fetch(new_dates)
338        if len(df) > 0:
339            # save parquet
340            df = pd.concat([r2, df], ignore_index=True).sort_values(by=["ts"]).drop_duplicates()
341            if len(df) != len(r2):
342                logger.info(f"Saving {len(df) - len(r2)} records to {qtype}/{year}")
343                await save_dataframe_to_r2(prefix + year, df)
344            r2_dates |= new_dates
345            if 4 < now.hour < 20:  # only mark as finished during day time
346                finished_info[qtype] = sorted(r2_dates)
347                await set_cf_r2(finished_key, finished_info, silent=True)
348
349            # save liveinfo
350            params_2 = {"liveDate": year} if qtype != "弹幕" else {"srtCount": 1, "liveDate": year}
351            server_live_info_2 = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params_2, silent=True)
352            info_1: dict = {x["liveDate"]: x for x in server_live_info}  # ty:ignore[invalid-argument-type]
353            info_2: dict = {x["liveDate"]: x for x in server_live_info_2}  # ty:ignore[invalid-argument-type]
354            keys = set(info_1) | set(info_2)
355            for key in keys:
356                live_info[key] = trim_empty(info_1.get(key, {}) | info_2.get(key, {}))
357
358            # handle dates like "2022-04-07_t", rename to "2022-04-07"
359            for key in keys:
360                if len(key) == 10:
361                    continue
362                if key[:10] not in live_info:
363                    live_info[key[:10]] = live_info[key]
364                del live_info[key]
365            await set_cf_r2(liveinfo_key, live_info, silent=True)
366
367            # save userinfo
368            if qtype != "弹幕":
369                continue
370            # 对 ts 列进行聚合操作:求最小值(min)命名为 first,求最大值(max)命名为 last
371            userinfo_this_year = df[["uid", "ts", "name"]].groupby(["uid", "name"], as_index=False).agg(first=("ts", "min"), last=("ts", "max"))
372            user_info = pd.concat([userinfo_this_year, user_info]).groupby(["uid", "name"], as_index=False).agg(first=("first", "min"), last=("last", "max"))
373            user_info = user_info.sort_values(by=["last"], ascending=False).reset_index(drop=True)
374            await save_dataframe_to_r2(userinfo_key, user_info)
375
376
377def trim_empty(obj: dict) -> dict:
378    res = {}
379    for k, v in obj.items():
380        if v is None:
381            continue
382        if isinstance(v, str) and v.strip() == "":
383            continue
384        res[k] = v
385    return res