main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import hashlib
5import json
6from collections import defaultdict
7from datetime import datetime, timedelta
8from zoneinfo import ZoneInfo
9
10import pandas as pd
11from glom import flatten, glom
12from loguru import logger
13
14from config import DANMU, PROXY, TZ, cutter
15from danmu.r2 import get_r2_dataframe, query_r2_for_date, r2_liveinfo, save_dataframe_to_r2
16from danmu.utils import TURSO_KWARGS, convert_to_dataframe, get_bearer_token
17from database.r2 import get_cf_r2, set_cf_r2
18from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
19from networking import hx_req
20from utils import nowdt, strings_list
21
22# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
23LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
24USERINFO_COLUMNS = "uid TEXT, fullname TEXT, superchat TEXT, firstdate TEXT, lastdate TEXT, note TEXT, hash TEXT PRIMARY KEY"
25
26# livechats相关
27COLUMNS = {
28 "发言": "time TEXT, content TEXT, segmented TEXT",
29 "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT, uid TEXT, segmented TEXT",
30}
31INDEX_NAMES = {
32 "发言": ["time"],
33 "弹幕": ["time", "fullname", "uid"],
34}
35
36
37async def sync_livechats() -> None:
38 if not DANMU.SYNC_ENABLE:
39 return
40 for qtype in ["弹幕", "发言"]:
41 if "R2" in DANMU.SYNC_ENGNIE.upper():
42 logger.info(f"Syncing streaming {qtype} to R2")
43 await sync_server_to_r2(qtype)
44
45 if "turso" in DANMU.SYNC_ENGNIE.lower():
46 logger.info(f"Syncing streaming {qtype} to Turso")
47 await sync_server_to_turso(qtype)
48
49
50async def sync_server_to_turso(qtype: str) -> None:
51 concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
52 api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
53
54 # 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
55 await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], idx_prefix="idx_liveinfo_", silent=True, **TURSO_KWARGS)
56 await turso_create_table("userinfo", USERINFO_COLUMNS, idx_cols=["uid", "fullname"], idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
57 await turso_create_table(qtype, COLUMNS[qtype], idx_cols=INDEX_NAMES[qtype], idx_prefix=f"idx_{qtype}_", fts_on_col="rowid", silent=True, **TURSO_KWARGS)
58
59 # 获取turso存储的liveinfo
60 resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
61 db_liveinfo = {x["liveDate"]: x for x in turso_parse_resp(resp)}
62
63 # 开始同步
64 monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
65 monitor_years.append(str(nowdt(TZ).year))
66 monitor_years = sorted(set(monitor_years))
67 for year in monitor_years:
68 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
69 params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
70 api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
71 liveinfo_list: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True) # type: ignore
72 if glom(liveinfo_list, "hx_error", default=""): # API server is down
73 return
74 for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
75 live_date = liveinfo["liveDate"]
76 if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default=0) == 1:
77 continue
78 results = []
79 payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
80 resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
81 if resp.get("count", 0) == 0:
82 continue
83 logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
84 # append the first page
85 results.extend(resp["data"])
86 quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
87 n_pages = quotient + (1 if remainder else 0)
88 tasks = [
89 hx_req(
90 api_url,
91 "POST",
92 headers=headers,
93 data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date},
94 proxy=PROXY.DANMU,
95 check_kv={"code": 0},
96 silent=True,
97 )
98 for page in range(2, n_pages + 1)
99 ]
100 chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
101 for chunk in chunks:
102 async with asyncio.Semaphore(concurrency):
103 res = await asyncio.gather(*chunk, return_exceptions=True)
104 data = flatten(glom(res, "*.data"))
105 results.extend(data)
106 # save all results to turso
107 await save_livechats_to_turso(liveinfo, results, qtype)
108
109
110async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str) -> None:
111 """Save livechats from server to turso."""
112 if not data:
113 return
114 # warning: some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
115 date = live_info["liveDate"][:10]
116
117 # 标准化数据格式
118 normed_data = []
119 added = set()
120 if qtype == "发言":
121 for x in data:
122 if not all([x.get("startTime"), x.get("content")]):
123 continue
124 time = x["startTime"].split(",")[0] # 3:53:23
125 if len(time.split(":")[0]) == 1:
126 time = f"0{time}"
127 if f"{date}{time}{x['content']}" in added:
128 continue
129 normed_data.append({"time": f"{date} {time}", "content": x["content"], "segmented": " ".join(cutter.cutword(x["content"]))})
130 added.add(f"{date}{time}{x['content']}")
131 else:
132 for x in sorted(data, key=lambda x: x.get("timestamp") or 0):
133 # time, fullname, content, uid, superchat, currency, segmented
134 item = {}
135 if not all([x.get("timestamp"), x.get("authorName")]):
136 continue
137 dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=ZoneInfo(TZ))
138 item["time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
139 item["fullname"] = x["authorName"].removeprefix("@") # User Name
140 item["uid"] = x["authorId"] if x.get("authorId") else item["fullname"]
141 if x.get("scAmount"):
142 item["superchat"] = x["scAmount"]
143
144 if x.get("message"):
145 item["content"] = x["message"]
146 item["segmented"] = " ".join(cutter.cutword(x["message"]))
147 if f"{item['time']}{item['fullname']}{item.get('content', '')}" in added:
148 continue
149 normed_data.append(item)
150 added.add(f"{item['time']}{item['fullname']}{item.get('content', '')}")
151 await save_userinfo_to_turso(normed_data)
152 # 过滤掉获取已保存在turso的记录
153 data = await filter_records_in_turso(normed_data, date, qtype)
154
155 # 插入到turso
156 concurrency = 4096 # 每次插入4096条
157 statements = [insert_statement(qtype, records) for records in sorted(data, key=lambda x: x["time"])]
158 chunks = [statements[i : i + concurrency] for i in range(0, len(statements), concurrency)]
159 for chunk in chunks:
160 resp = await turso_exec(chunk, silent=True, retry=2, **TURSO_KWARGS)
161 num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
162 logger.success(f"Synced {num_success} {qtype} of {date} to Turso")
163 now = nowdt(TZ)
164 yesterday = now - timedelta(days=1)
165 # 弹幕是实时录制, 应等晚间的直播结束后再标记完成
166 if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 19) and date in [f"{yesterday:%Y-%m-%d}", f"{now:%Y-%m-%d}"]:
167 return
168
169 # 更新liveinfo
170 # 更新前, 再检查一遍是否全部数据均已经同步
171 if await filter_records_in_turso(normed_data, date, qtype):
172 logger.warning(f"Not all {qtype} data of {date} has been synced.")
173 return
174 records = {
175 "date": live_info["liveDate"][:10],
176 "title": live_info.get("title", ""),
177 "url": live_info.get("url", ""),
178 f"{qtype}已完成": 1,
179 "liveDate": live_info["liveDate"],
180 }
181 resp = await turso_exec([insert_statement("liveinfo", records, update_on_conflict="liveDate")], silent=True, **TURSO_KWARGS)
182 if glom(resp, "results.0.type", default="") == "ok":
183 logger.success(f"Updated liveinfo of {date}")
184
185
186async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> list[dict]:
187 """获取Turso数据库中date全天的记录, 并过滤掉已保存的记录.
188
189 由于过了凌晨的数据在原始数据库中还是记录为前一天
190 所以我们应该获取到第二天凌晨8点前的数据 (已下播), 以过滤掉已保存的记录
191 """
192 # 对于发言, 使用time过滤即可, 对于弹幕, 需要使用time和fullname
193 identifier = "time" if qtype == "发言" else "time,fullname"
194 tomorrow = datetime.strptime(date, "%Y-%m-%d").astimezone(ZoneInfo(TZ)) + timedelta(days=1)
195 resp = await turso_exec(
196 [{"type": "execute", "stmt": {"sql": f'SELECT {identifier} FROM "{qtype}" WHERE time >= "{date} 00:00:00" AND time <= "{tomorrow:%Y-%m-%d} 08:00:00";'}}],
197 silent=True,
198 retry=2,
199 **TURSO_KWARGS,
200 )
201 saved = glom(resp, "results.0.response.result.rows.*.*.value", default=[])
202 saved = {"-".join(x) for x in saved} # convert to set to speedup.
203 log = f"Found {len(saved)} messages in Turso of date {date}"
204 log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}"
205 filtered = [x for x in data if x["time"] not in saved] if qtype == "发言" else [x for x in data if f"{x['time']}-{x['fullname']}" not in saved]
206 logger.info(f"{log}. Remains: {len(filtered)}")
207 return filtered
208
209
210async def save_userinfo_to_turso(data: list[dict]) -> None:
211 """Save userinfo to turso.
212
213 item in data:
214 {
215 'content': 'Hello',
216 'fullname': 'Alive',
217 'segmented': 'Hello',
218 'time': '2021-04-03 21:11:25',
219 'uid': 'UCVN-fPvyxRJnyMkFjAa9oTQ'
220 }
221 """
222 sha224 = lambda s: hashlib.sha224(s.encode()).hexdigest()
223 escape = lambda s: s.replace("'", "''")
224
225 def update_superchat(superchats: list[dict], db_superchat: str | None) -> str:
226 """Update superchat to db_record.
227
228 superchats: ["USD 10.0", "TWD 200.0"]
229 db_superchat: "{"USD": 10.0, "TWD": 200.0}"
230 """
231 db_superchat = db_superchat or "[]"
232 db_sc = json.loads(db_superchat)
233 for x in superchats:
234 sc = {"time": x["time"], "superchat": x["superchat"]}
235 if sc not in db_sc:
236 db_sc.append(sc)
237 return json.dumps(db_sc).removeprefix("[]")
238
239 hashkeys = tuple({sha224(f"{x['uid']}{x['fullname']}") for x in data})
240 db = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM userinfo WHERE hash IN ({', '.join(f"'{x}'" for x in hashkeys)});"}}], silent=True, **TURSO_KWARGS)
241 db = {x["hash"]: x for x in turso_parse_resp(db)}
242 grouped = defaultdict(list) # group by uid-fullname
243 for x in data:
244 grouped[x["uid"] + x["fullname"]].append(x)
245
246 sql = "INSERT OR REPLACE INTO userinfo (uid, fullname, superchat, firstdate, lastdate, note, hash) VALUES "
247 for records in grouped.values():
248 uid = records[0]["uid"]
249 fullname = records[0]["fullname"]
250 hash_ = sha224(f"{uid}{fullname}")
251 times = sorted([x["time"] for x in records])
252 superchats = [x for x in records if x.get("superchat")]
253 db_record = db.get(hash_, {})
254 superchat = update_superchat(superchats, db_record.get("superchat"))
255 db_firstdate = db_record.get("firstdate", "9999-12-31")
256 db_lastdate = db_record.get("lastdate", "0000-01-01")
257 firstdate = min(times[0], db_firstdate)
258 lastdate = max(times[-1], db_lastdate)
259 note = db_record.get("note", "")
260 record = {"uid": uid, "fullname": fullname, "superchat": superchat, "firstdate": firstdate, "lastdate": lastdate, "note": note, "hash": hash_}
261 if record != db_record:
262 sql += f"('{escape(uid)}', '{escape(fullname)}', '{superchat}', '{firstdate}', '{lastdate}', '{escape(note)}', '{hash_}'),"
263
264 if sql.endswith(","):
265 sql = sql[:-1] + ";"
266 logger.info(f"Retrieve userinfo of {len(grouped)} users")
267 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
268 num_success = glom(resp, "results.0.response.result.affected_row_count", default=0)
269 logger.success(f"Updated userinfo of {num_success} records")
270
271
272async def sync_server_to_r2(qtype: str) -> None:
273 concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
274 prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
275 api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
276
277 async def batch_fetch(new_dates: set[str]) -> pd.DataFrame:
278 results = []
279 for date in sorted(new_dates):
280 logger.trace(f"Query {qtype} date: {date}")
281 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
282 payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
283 resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
284 if resp.get("count", 0) == 0:
285 continue
286 logger.debug(f"Found {qtype} date: {date} - {resp['count']} results")
287 # append the first page
288 results.extend(resp["data"])
289 quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
290 n_pages = quotient + (1 if remainder else 0)
291 tasks = [
292 hx_req(
293 api_url,
294 "POST",
295 headers=headers,
296 data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
297 proxy=PROXY.DANMU,
298 check_kv={"code": 0},
299 silent=True,
300 )
301 for page in range(2, n_pages + 1)
302 ]
303 chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
304 for chunk in chunks:
305 async with asyncio.Semaphore(concurrency):
306 res = await asyncio.gather(*chunk, return_exceptions=True)
307 data = flatten(glom(res, "*.data"))
308 results.extend(data)
309 return convert_to_dataframe(results, qtype)
310
311 now = nowdt(TZ)
312 monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
313 monitor_years.append(str(now.year))
314 monitor_years = sorted(set(monitor_years))
315 finished_key = DANMU.R2_PREFIX.rstrip("/") + "/finished"
316 liveinfo_key = DANMU.R2_PREFIX.rstrip("/") + "/liveinfo"
317 userinfo_key = DANMU.R2_PREFIX.rstrip("/") + "/userinfo"
318 finished_info = await get_cf_r2(finished_key, silent=True)
319 live_info = await r2_liveinfo()
320 user_info = await get_r2_dataframe(userinfo_key) if qtype == "弹幕" else pd.DataFrame()
321 r2_dates = set(finished_info.get(qtype, []))
322 for year in monitor_years:
323 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
324 params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
325 api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
326 server_live_info = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)
327 new_dates = set()
328 for x in server_live_info:
329 live_date: str = x["liveDate"] # type: ignore
330 if live_date in r2_dates:
331 continue
332 new_dates.add(live_date)
333 if not new_dates:
334 continue
335 r2 = await query_r2_for_date(year, qtype)
336 logger.debug(f"Sync {qtype}-{year}: {new_dates}")
337 df = await batch_fetch(new_dates)
338 if len(df) > 0:
339 # save parquet
340 df = pd.concat([r2, df], ignore_index=True).sort_values(by=["ts"]).drop_duplicates()
341 if len(df) != len(r2):
342 logger.info(f"Saving {len(df) - len(r2)} records to {qtype}/{year}")
343 await save_dataframe_to_r2(prefix + year, df)
344 r2_dates |= new_dates
345 if 4 < now.hour < 20: # only mark as finished during day time
346 finished_info[qtype] = sorted(r2_dates)
347 await set_cf_r2(finished_key, finished_info, silent=True)
348
349 # save liveinfo
350 params_2 = {"liveDate": year} if qtype != "弹幕" else {"srtCount": 1, "liveDate": year}
351 server_live_info_2 = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params_2, silent=True)
352 info_1: dict = {x["liveDate"]: x for x in server_live_info} # ty:ignore[invalid-argument-type]
353 info_2: dict = {x["liveDate"]: x for x in server_live_info_2} # ty:ignore[invalid-argument-type]
354 keys = set(info_1) | set(info_2)
355 for key in keys:
356 live_info[key] = trim_empty(info_1.get(key, {}) | info_2.get(key, {}))
357
358 # handle dates like "2022-04-07_t", rename to "2022-04-07"
359 for key in keys:
360 if len(key) == 10:
361 continue
362 if key[:10] not in live_info:
363 live_info[key[:10]] = live_info[key]
364 del live_info[key]
365 await set_cf_r2(liveinfo_key, live_info, silent=True)
366
367 # save userinfo
368 if qtype != "弹幕":
369 continue
370 # 对 ts 列进行聚合操作:求最小值(min)命名为 first,求最大值(max)命名为 last
371 userinfo_this_year = df[["uid", "ts", "name"]].groupby(["uid", "name"], as_index=False).agg(first=("ts", "min"), last=("ts", "max"))
372 user_info = pd.concat([userinfo_this_year, user_info]).groupby(["uid", "name"], as_index=False).agg(first=("first", "min"), last=("last", "max"))
373 user_info = user_info.sort_values(by=["last"], ascending=False).reset_index(drop=True)
374 await save_dataframe_to_r2(userinfo_key, user_info)
375
376
377def trim_empty(obj: dict) -> dict:
378 res = {}
379 for k, v in obj.items():
380 if v is None:
381 continue
382 if isinstance(v, str) and v.strip() == "":
383 continue
384 res[k] = v
385 return res