main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import hashlib
  5import json
  6from collections import defaultdict
  7from datetime import datetime, timedelta
  8from zoneinfo import ZoneInfo
  9
 10from glom import flatten, glom
 11from loguru import logger
 12
 13from config import DANMU, PROXY, TZ, cutter
 14from danmu.utils import TURSO_KWARGS, convert_parquet, get_bearer_token, get_live_info, merge_parquet
 15from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
 16from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
 17from networking import hx_req
 18from utils import nowdt, strings_list
 19
 20# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
 21LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
 22USERINFO_COLUMNS = "uid TEXT, fullname TEXT, superchat TEXT, firstdate TEXT, lastdate TEXT, note TEXT, hash TEXT PRIMARY KEY"
 23
 24# livechats相关
 25COLUMNS = {
 26    "发言": "time TEXT, content TEXT, segmented TEXT",
 27    "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT, uid TEXT, segmented TEXT",
 28}
 29INDEX_NAMES = {
 30    "发言": ["time"],
 31    "弹幕": ["time", "fullname", "uid"],
 32}
 33
 34
 35async def sync_livechats() -> None:
 36    if not DANMU.SYNC_ENABLE:
 37        return
 38    for qtype in ["弹幕", "发言"]:
 39        if "R2" in DANMU.SYNC_ENGNIE.upper():
 40            logger.info(f"Syncing streaming {qtype} to R2")
 41            await sync_server_to_r2(qtype)
 42
 43        if "turso" in DANMU.SYNC_ENGNIE.lower():
 44            logger.info(f"Syncing streaming {qtype} to Turso")
 45            await sync_server_to_turso(qtype)
 46
 47
 48async def sync_server_to_turso(qtype: str) -> None:
 49    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
 50    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
 51
 52    # 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
 53    await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], idx_prefix="idx_liveinfo_", silent=True, **TURSO_KWARGS)
 54    await turso_create_table("userinfo", USERINFO_COLUMNS, idx_cols=["uid", "fullname"], idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
 55    await turso_create_table(qtype, COLUMNS[qtype], idx_cols=INDEX_NAMES[qtype], idx_prefix=f"idx_{qtype}_", fts_on_col="rowid", silent=True, **TURSO_KWARGS)
 56
 57    # 获取turso存储的liveinfo
 58    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
 59    db_liveinfo = {x["liveDate"]: x for x in turso_parse_resp(resp)}
 60
 61    # 开始同步
 62    monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
 63    monitor_years.append(str(nowdt(TZ).year))
 64    monitor_years = sorted(set(monitor_years))
 65    for year in monitor_years:
 66        headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
 67        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
 68        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
 69        liveinfo_list: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)  # type: ignore
 70        if glom(liveinfo_list, "hx_error", default=""):  # API server is down
 71            return
 72        for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
 73            live_date = liveinfo["liveDate"]
 74            if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default=0) == 1:
 75                continue
 76            results = []
 77            payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
 78            resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
 79            if resp.get("count", 0) == 0:
 80                continue
 81            logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
 82            # append the first page
 83            results.extend(resp["data"])
 84            quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
 85            n_pages = quotient + (1 if remainder else 0)
 86            tasks = [
 87                hx_req(
 88                    api_url,
 89                    "POST",
 90                    headers=headers,
 91                    data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date},
 92                    proxy=PROXY.DANMU,
 93                    check_kv={"code": 0},
 94                    silent=True,
 95                )
 96                for page in range(2, n_pages + 1)
 97            ]
 98            chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
 99            for chunk in chunks:
100                async with asyncio.Semaphore(concurrency):
101                    res = await asyncio.gather(*chunk, return_exceptions=True)
102                    data = flatten(glom(res, "*.data"))
103                    results.extend(data)
104            # save all results to turso
105            await save_livechats_to_turso(liveinfo, results, qtype)
106
107
108async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str) -> None:
109    """Save livechats from server to turso."""
110    if not data:
111        return
112    # warning: some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
113    date = live_info["liveDate"][:10]
114
115    # 标准化数据格式
116    normed_data = []
117    added = set()
118    if qtype == "发言":
119        for x in data:
120            if not all([x.get("startTime"), x.get("content")]):
121                continue
122            time = x["startTime"].split(",")[0]  # 3:53:23
123            if len(time.split(":")[0]) == 1:
124                time = f"0{time}"
125            if f"{date}{time}{x['content']}" in added:
126                continue
127            normed_data.append({"time": f"{date} {time}", "content": x["content"], "segmented": " ".join(cutter.cutword(x["content"]))})
128            added.add(f"{date}{time}{x['content']}")
129    else:
130        for x in sorted(data, key=lambda x: x.get("timestamp") or 0):
131            # time, fullname, content, uid, superchat, currency, segmented
132            item = {}
133            if not all([x.get("timestamp"), x.get("authorName")]):
134                continue
135            dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=ZoneInfo(TZ))
136            item["time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
137            item["fullname"] = x["authorName"].removeprefix("@")  # User Name
138            item["uid"] = x["authorId"] if x.get("authorId") else item["fullname"]
139            if x.get("scAmount"):
140                item["superchat"] = x["scAmount"]
141
142            if x.get("message"):
143                item["content"] = x["message"]
144                item["segmented"] = " ".join(cutter.cutword(x["message"]))
145            if f"{item['time']}{item['fullname']}{item.get('content', '')}" in added:
146                continue
147            normed_data.append(item)
148            added.add(f"{item['time']}{item['fullname']}{item.get('content', '')}")
149        await save_userinfo_to_turso(normed_data)
150    # 过滤掉获取已保存在turso的记录
151    data = await filter_records_in_turso(normed_data, date, qtype)
152
153    # 插入到turso
154    concurrency = 4096  # 每次插入4096条
155    statements = [insert_statement(qtype, records) for records in sorted(data, key=lambda x: x["time"])]
156    chunks = [statements[i : i + concurrency] for i in range(0, len(statements), concurrency)]
157    for chunk in chunks:
158        resp = await turso_exec(chunk, silent=True, retry=2, **TURSO_KWARGS)
159        num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
160        logger.success(f"Synced {num_success} {qtype} of {date} to Turso")
161    now = nowdt(TZ)
162    yesterday = now - timedelta(days=1)
163    # 弹幕是实时录制, 应等晚间的直播结束后再标记完成
164    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 19) and date in [f"{yesterday:%Y-%m-%d}", f"{now:%Y-%m-%d}"]:
165        return
166
167    # 更新liveinfo
168    # 更新前, 再检查一遍是否全部数据均已经同步
169    if await filter_records_in_turso(normed_data, date, qtype):
170        logger.warning(f"Not all {qtype} data of {date} has been synced.")
171        return
172    records = {
173        "date": live_info["liveDate"][:10],
174        "title": live_info.get("title", ""),
175        "url": live_info.get("url", ""),
176        f"{qtype}已完成": 1,
177        "liveDate": live_info["liveDate"],
178    }
179    resp = await turso_exec([insert_statement("liveinfo", records, update_on_conflict="liveDate")], silent=True, **TURSO_KWARGS)
180    if glom(resp, "results.0.type", default="") == "ok":
181        logger.success(f"Updated liveinfo of {date}")
182
183
184async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> list[dict]:
185    """获取Turso数据库中date全天的记录, 并过滤掉已保存的记录.
186
187    由于过了凌晨的数据在原始数据库中还是记录为前一天
188    所以我们应该获取到第二天凌晨8点前的数据 (已下播), 以过滤掉已保存的记录
189    """
190    # 对于发言, 使用time过滤即可, 对于弹幕, 需要使用time和fullname
191    identifier = "time" if qtype == "发言" else "time,fullname"
192    tomorrow = datetime.strptime(date, "%Y-%m-%d").astimezone(ZoneInfo(TZ)) + timedelta(days=1)
193    resp = await turso_exec(
194        [{"type": "execute", "stmt": {"sql": f'SELECT {identifier} FROM "{qtype}" WHERE time >= "{date} 00:00:00" AND time <= "{tomorrow:%Y-%m-%d} 08:00:00";'}}],
195        silent=True,
196        retry=2,
197        **TURSO_KWARGS,
198    )
199    saved = glom(resp, "results.0.response.result.rows.*.*.value", default=[])
200    saved = {"-".join(x) for x in saved}  # convert to set to speedup.
201    log = f"Found {len(saved)} messages in Turso of date {date}"
202    log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}"
203    filtered = [x for x in data if x["time"] not in saved] if qtype == "发言" else [x for x in data if f"{x['time']}-{x['fullname']}" not in saved]
204    logger.info(f"{log}. Remains: {len(filtered)}")
205    return filtered
206
207
208async def save_userinfo_to_turso(data: list[dict]) -> None:
209    """Save userinfo to turso.
210
211    item in data:
212    {
213        'content': 'Hello',
214        'fullname': 'Alive',
215        'segmented': 'Hello',
216        'time': '2021-04-03 21:11:25',
217        'uid': 'UCVN-fPvyxRJnyMkFjAa9oTQ'
218    }
219    """
220    sha224 = lambda s: hashlib.sha224(s.encode()).hexdigest()
221    escape = lambda s: s.replace("'", "''")
222
223    def update_superchat(superchats: list[dict], db_superchat: str | None) -> str:
224        """Update superchat to db_record.
225
226        superchats: ["USD 10.0", "TWD 200.0"]
227        db_superchat: "{"USD": 10.0, "TWD": 200.0}"
228        """
229        db_superchat = db_superchat or "[]"
230        db_sc = json.loads(db_superchat)
231        for x in superchats:
232            sc = {"time": x["time"], "superchat": x["superchat"]}
233            if sc not in db_sc:
234                db_sc.append(sc)
235        return json.dumps(db_sc).removeprefix("[]")
236
237    hashkeys = tuple({sha224(f"{x['uid']}{x['fullname']}") for x in data})
238    db = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM userinfo WHERE hash IN ({', '.join(f"'{x}'" for x in hashkeys)});"}}], silent=True, **TURSO_KWARGS)
239    db = {x["hash"]: x for x in turso_parse_resp(db)}
240    grouped = defaultdict(list)  # group by uid-fullname
241    for x in data:
242        grouped[x["uid"] + x["fullname"]].append(x)
243
244    sql = "INSERT OR REPLACE INTO userinfo (uid, fullname, superchat, firstdate, lastdate, note, hash) VALUES "
245    for records in grouped.values():
246        uid = records[0]["uid"]
247        fullname = records[0]["fullname"]
248        hash_ = sha224(f"{uid}{fullname}")
249        times = sorted([x["time"] for x in records])
250        superchats = [x for x in records if x.get("superchat")]
251        db_record = db.get(hash_, {})
252        superchat = update_superchat(superchats, db_record.get("superchat"))
253        db_firstdate = db_record.get("firstdate", "9999-12-31")
254        db_lastdate = db_record.get("lastdate", "0000-01-01")
255        firstdate = min(times[0], db_firstdate)
256        lastdate = max(times[-1], db_lastdate)
257        note = db_record.get("note", "")
258        record = {"uid": uid, "fullname": fullname, "superchat": superchat, "firstdate": firstdate, "lastdate": lastdate, "note": note, "hash": hash_}
259        if record != db_record:
260            sql += f"('{escape(uid)}', '{escape(fullname)}', '{superchat}', '{firstdate}', '{lastdate}', '{escape(note)}', '{hash_}'),"
261
262    if sql.endswith(","):
263        sql = sql[:-1] + ";"
264        logger.info(f"Retrieve userinfo of {len(grouped)} users")
265        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
266        num_success = glom(resp, "results.0.response.result.affected_row_count", default=0)
267        logger.success(f"Updated userinfo of {num_success} records")
268
269
270async def sync_server_to_r2(qtype: str) -> None:
271    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
272    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
273    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
274
275    async def batch_sync(new_dates: dict[str, list[str]]):
276        for r2_key, server_dates in sorted(new_dates.items()):
277            results = []
278            for date in sorted(server_dates):
279                logger.trace(f"Query {qtype} date: {date}")
280                headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
281                payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
282                resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
283                if resp.get("count", 0) == 0:
284                    continue
285                logger.trace(f"Found {qtype} date: {date} - {resp['count']} results")
286                # append the first page
287                results.extend(resp["data"])
288                quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
289                n_pages = quotient + (1 if remainder else 0)
290                tasks = [
291                    hx_req(
292                        api_url,
293                        "POST",
294                        headers=headers,
295                        data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
296                        proxy=PROXY.DANMU,
297                        check_kv={"code": 0},
298                        silent=True,
299                    )
300                    for page in range(2, n_pages + 1)
301                ]
302                chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
303                for chunk in chunks:
304                    async with asyncio.Semaphore(concurrency):
305                        res = await asyncio.gather(*chunk, return_exceptions=True)
306                        data = flatten(glom(res, "*.data"))
307                        results.extend(data)
308            # if no results, create an empty key
309            metadata = {"empty": True} if not results else {}
310            await set_cf_r2(prefix + r2_key, convert_parquet(results, qtype, date), metadata, mime_type="application/x-parquet", silent=True)
311
312    now = nowdt(TZ)
313    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
314        # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
315        return
316
317    monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
318    monitor_years.append(str(now.year))
319    monitor_years = sorted(set(monitor_years))
320    r2 = await list_cf_r2(prefix=prefix)
321    r2_dates = {x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])}
322    has_update = False
323    for year in monitor_years:
324        headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
325        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
326        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
327        resp = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)
328        new_dates_map = defaultdict(list)
329        # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
330        # norm dates to YYYY-MM-DD
331        for x in resp:
332            live_date: str = x["liveDate"]  # type: ignore
333            if live_date[:10] in r2_dates:
334                continue
335            if qtype == "弹幕" and live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
336                continue
337            new_dates_map[live_date[:10]].append(live_date)
338        if not new_dates_map:
339            continue
340        has_update = True
341        # sync per date
342        logger.debug(f"Sync {qtype}-{year}: {new_dates_map}")
343        await batch_sync(new_dates_map)
344        r2_dates.update(new_dates_map.keys())  # update new dates to R2 dates
345
346        # sync whole month
347        year_months = {x[:7] for x in new_dates_map}
348        for year_month in sorted(year_months):
349            month_data = []
350            for d in range(1, 32):
351                date = f"{year_month}-{d:02d}"
352                if date not in r2_dates:
353                    continue
354                logger.trace(f"Get R2 date: {date}")
355                if r2 := await get_cf_r2(prefix + date, rformat="bytes", silent=True):
356                    month_data.append(r2)
357            await set_cf_r2(prefix + year_month, merge_parquet(month_data), mime_type="application/x-parquet", silent=True)
358            del month_data
359        r2_dates.update(year_months)  # update new dates to R2 dates
360
361        # sync whole year
362        year_data = []
363        for mon in range(1, 13):
364            date = f"{year}-{mon:02d}"
365            if date not in r2_dates:
366                continue
367            logger.trace(f"Get R2 date: {date}")
368            if r2 := await get_cf_r2(prefix + date, rformat="bytes", silent=True):
369                year_data.append(r2)
370        await set_cf_r2(prefix + year, merge_parquet(year_data), mime_type="application/x-parquet", silent=True)
371        del year_data
372
373    # sync liveinfo
374    if has_update:
375        liveinfo = {}
376        for year in monitor_years:
377            liveinfo.update(await get_live_info(year))
378        await set_cf_r2(DANMU.R2_PREFIX.rstrip("/") + "/liveinfo", liveinfo, silent=True)