main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import hashlib
5import json
6from collections import defaultdict
7from datetime import datetime, timedelta
8from zoneinfo import ZoneInfo
9
10from glom import flatten, glom
11from loguru import logger
12
13from config import DANMU, PROXY, TZ, cutter
14from danmu.utils import TURSO_KWARGS, convert_parquet, get_bearer_token, get_live_info, merge_parquet
15from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
16from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
17from networking import hx_req
18from utils import nowdt, strings_list
19
20# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
21LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
22USERINFO_COLUMNS = "uid TEXT, fullname TEXT, superchat TEXT, firstdate TEXT, lastdate TEXT, note TEXT, hash TEXT PRIMARY KEY"
23
24# livechats相关
25COLUMNS = {
26 "发言": "time TEXT, content TEXT, segmented TEXT",
27 "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT, uid TEXT, segmented TEXT",
28}
29INDEX_NAMES = {
30 "发言": ["time"],
31 "弹幕": ["time", "fullname", "uid"],
32}
33
34
35async def sync_livechats() -> None:
36 if not DANMU.SYNC_ENABLE:
37 return
38 for qtype in ["弹幕", "发言"]:
39 if "R2" in DANMU.SYNC_ENGNIE.upper():
40 logger.info(f"Syncing streaming {qtype} to R2")
41 await sync_server_to_r2(qtype)
42
43 if "turso" in DANMU.SYNC_ENGNIE.lower():
44 logger.info(f"Syncing streaming {qtype} to Turso")
45 await sync_server_to_turso(qtype)
46
47
48async def sync_server_to_turso(qtype: str) -> None:
49 concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
50 api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
51
52 # 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
53 await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], idx_prefix="idx_liveinfo_", silent=True, **TURSO_KWARGS)
54 await turso_create_table("userinfo", USERINFO_COLUMNS, idx_cols=["uid", "fullname"], idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
55 await turso_create_table(qtype, COLUMNS[qtype], idx_cols=INDEX_NAMES[qtype], idx_prefix=f"idx_{qtype}_", fts_on_col="rowid", silent=True, **TURSO_KWARGS)
56
57 # 获取turso存储的liveinfo
58 resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
59 db_liveinfo = {x["liveDate"]: x for x in turso_parse_resp(resp)}
60
61 # 开始同步
62 monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
63 monitor_years.append(str(nowdt(TZ).year))
64 monitor_years = sorted(set(monitor_years))
65 for year in monitor_years:
66 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
67 params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
68 api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
69 liveinfo_list: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True) # type: ignore
70 if glom(liveinfo_list, "hx_error", default=""): # API server is down
71 return
72 for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
73 live_date = liveinfo["liveDate"]
74 if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default=0) == 1:
75 continue
76 results = []
77 payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
78 resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
79 if resp.get("count", 0) == 0:
80 continue
81 logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
82 # append the first page
83 results.extend(resp["data"])
84 quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
85 n_pages = quotient + (1 if remainder else 0)
86 tasks = [
87 hx_req(
88 api_url,
89 "POST",
90 headers=headers,
91 data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date},
92 proxy=PROXY.DANMU,
93 check_kv={"code": 0},
94 silent=True,
95 )
96 for page in range(2, n_pages + 1)
97 ]
98 chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
99 for chunk in chunks:
100 async with asyncio.Semaphore(concurrency):
101 res = await asyncio.gather(*chunk, return_exceptions=True)
102 data = flatten(glom(res, "*.data"))
103 results.extend(data)
104 # save all results to turso
105 await save_livechats_to_turso(liveinfo, results, qtype)
106
107
108async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str) -> None:
109 """Save livechats from server to turso."""
110 if not data:
111 return
112 # warning: some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
113 date = live_info["liveDate"][:10]
114
115 # 标准化数据格式
116 normed_data = []
117 added = set()
118 if qtype == "发言":
119 for x in data:
120 if not all([x.get("startTime"), x.get("content")]):
121 continue
122 time = x["startTime"].split(",")[0] # 3:53:23
123 if len(time.split(":")[0]) == 1:
124 time = f"0{time}"
125 if f"{date}{time}{x['content']}" in added:
126 continue
127 normed_data.append({"time": f"{date} {time}", "content": x["content"], "segmented": " ".join(cutter.cutword(x["content"]))})
128 added.add(f"{date}{time}{x['content']}")
129 else:
130 for x in sorted(data, key=lambda x: x.get("timestamp") or 0):
131 # time, fullname, content, uid, superchat, currency, segmented
132 item = {}
133 if not all([x.get("timestamp"), x.get("authorName")]):
134 continue
135 dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=ZoneInfo(TZ))
136 item["time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
137 item["fullname"] = x["authorName"].removeprefix("@") # User Name
138 item["uid"] = x["authorId"] if x.get("authorId") else item["fullname"]
139 if x.get("scAmount"):
140 item["superchat"] = x["scAmount"]
141
142 if x.get("message"):
143 item["content"] = x["message"]
144 item["segmented"] = " ".join(cutter.cutword(x["message"]))
145 if f"{item['time']}{item['fullname']}{item.get('content', '')}" in added:
146 continue
147 normed_data.append(item)
148 added.add(f"{item['time']}{item['fullname']}{item.get('content', '')}")
149 await save_userinfo_to_turso(normed_data)
150 # 过滤掉获取已保存在turso的记录
151 data = await filter_records_in_turso(normed_data, date, qtype)
152
153 # 插入到turso
154 concurrency = 4096 # 每次插入4096条
155 statements = [insert_statement(qtype, records) for records in sorted(data, key=lambda x: x["time"])]
156 chunks = [statements[i : i + concurrency] for i in range(0, len(statements), concurrency)]
157 for chunk in chunks:
158 resp = await turso_exec(chunk, silent=True, retry=2, **TURSO_KWARGS)
159 num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
160 logger.success(f"Synced {num_success} {qtype} of {date} to Turso")
161 now = nowdt(TZ)
162 yesterday = now - timedelta(days=1)
163 # 弹幕是实时录制, 应等晚间的直播结束后再标记完成
164 if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 19) and date in [f"{yesterday:%Y-%m-%d}", f"{now:%Y-%m-%d}"]:
165 return
166
167 # 更新liveinfo
168 # 更新前, 再检查一遍是否全部数据均已经同步
169 if await filter_records_in_turso(normed_data, date, qtype):
170 logger.warning(f"Not all {qtype} data of {date} has been synced.")
171 return
172 records = {
173 "date": live_info["liveDate"][:10],
174 "title": live_info.get("title", ""),
175 "url": live_info.get("url", ""),
176 f"{qtype}已完成": 1,
177 "liveDate": live_info["liveDate"],
178 }
179 resp = await turso_exec([insert_statement("liveinfo", records, update_on_conflict="liveDate")], silent=True, **TURSO_KWARGS)
180 if glom(resp, "results.0.type", default="") == "ok":
181 logger.success(f"Updated liveinfo of {date}")
182
183
184async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> list[dict]:
185 """获取Turso数据库中date全天的记录, 并过滤掉已保存的记录.
186
187 由于过了凌晨的数据在原始数据库中还是记录为前一天
188 所以我们应该获取到第二天凌晨8点前的数据 (已下播), 以过滤掉已保存的记录
189 """
190 # 对于发言, 使用time过滤即可, 对于弹幕, 需要使用time和fullname
191 identifier = "time" if qtype == "发言" else "time,fullname"
192 tomorrow = datetime.strptime(date, "%Y-%m-%d").astimezone(ZoneInfo(TZ)) + timedelta(days=1)
193 resp = await turso_exec(
194 [{"type": "execute", "stmt": {"sql": f'SELECT {identifier} FROM "{qtype}" WHERE time >= "{date} 00:00:00" AND time <= "{tomorrow:%Y-%m-%d} 08:00:00";'}}],
195 silent=True,
196 retry=2,
197 **TURSO_KWARGS,
198 )
199 saved = glom(resp, "results.0.response.result.rows.*.*.value", default=[])
200 saved = {"-".join(x) for x in saved} # convert to set to speedup.
201 log = f"Found {len(saved)} messages in Turso of date {date}"
202 log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}"
203 filtered = [x for x in data if x["time"] not in saved] if qtype == "发言" else [x for x in data if f"{x['time']}-{x['fullname']}" not in saved]
204 logger.info(f"{log}. Remains: {len(filtered)}")
205 return filtered
206
207
208async def save_userinfo_to_turso(data: list[dict]) -> None:
209 """Save userinfo to turso.
210
211 item in data:
212 {
213 'content': 'Hello',
214 'fullname': 'Alive',
215 'segmented': 'Hello',
216 'time': '2021-04-03 21:11:25',
217 'uid': 'UCVN-fPvyxRJnyMkFjAa9oTQ'
218 }
219 """
220 sha224 = lambda s: hashlib.sha224(s.encode()).hexdigest()
221 escape = lambda s: s.replace("'", "''")
222
223 def update_superchat(superchats: list[dict], db_superchat: str | None) -> str:
224 """Update superchat to db_record.
225
226 superchats: ["USD 10.0", "TWD 200.0"]
227 db_superchat: "{"USD": 10.0, "TWD": 200.0}"
228 """
229 db_superchat = db_superchat or "[]"
230 db_sc = json.loads(db_superchat)
231 for x in superchats:
232 sc = {"time": x["time"], "superchat": x["superchat"]}
233 if sc not in db_sc:
234 db_sc.append(sc)
235 return json.dumps(db_sc).removeprefix("[]")
236
237 hashkeys = tuple({sha224(f"{x['uid']}{x['fullname']}") for x in data})
238 db = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM userinfo WHERE hash IN ({', '.join(f"'{x}'" for x in hashkeys)});"}}], silent=True, **TURSO_KWARGS)
239 db = {x["hash"]: x for x in turso_parse_resp(db)}
240 grouped = defaultdict(list) # group by uid-fullname
241 for x in data:
242 grouped[x["uid"] + x["fullname"]].append(x)
243
244 sql = "INSERT OR REPLACE INTO userinfo (uid, fullname, superchat, firstdate, lastdate, note, hash) VALUES "
245 for records in grouped.values():
246 uid = records[0]["uid"]
247 fullname = records[0]["fullname"]
248 hash_ = sha224(f"{uid}{fullname}")
249 times = sorted([x["time"] for x in records])
250 superchats = [x for x in records if x.get("superchat")]
251 db_record = db.get(hash_, {})
252 superchat = update_superchat(superchats, db_record.get("superchat"))
253 db_firstdate = db_record.get("firstdate", "9999-12-31")
254 db_lastdate = db_record.get("lastdate", "0000-01-01")
255 firstdate = min(times[0], db_firstdate)
256 lastdate = max(times[-1], db_lastdate)
257 note = db_record.get("note", "")
258 record = {"uid": uid, "fullname": fullname, "superchat": superchat, "firstdate": firstdate, "lastdate": lastdate, "note": note, "hash": hash_}
259 if record != db_record:
260 sql += f"('{escape(uid)}', '{escape(fullname)}', '{superchat}', '{firstdate}', '{lastdate}', '{escape(note)}', '{hash_}'),"
261
262 if sql.endswith(","):
263 sql = sql[:-1] + ";"
264 logger.info(f"Retrieve userinfo of {len(grouped)} users")
265 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
266 num_success = glom(resp, "results.0.response.result.affected_row_count", default=0)
267 logger.success(f"Updated userinfo of {num_success} records")
268
269
270async def sync_server_to_r2(qtype: str) -> None:
271 concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
272 prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
273 api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
274
275 async def batch_sync(new_dates: dict[str, list[str]]):
276 for r2_key, server_dates in sorted(new_dates.items()):
277 results = []
278 for date in sorted(server_dates):
279 logger.trace(f"Query {qtype} date: {date}")
280 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
281 payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
282 resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
283 if resp.get("count", 0) == 0:
284 continue
285 logger.trace(f"Found {qtype} date: {date} - {resp['count']} results")
286 # append the first page
287 results.extend(resp["data"])
288 quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
289 n_pages = quotient + (1 if remainder else 0)
290 tasks = [
291 hx_req(
292 api_url,
293 "POST",
294 headers=headers,
295 data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
296 proxy=PROXY.DANMU,
297 check_kv={"code": 0},
298 silent=True,
299 )
300 for page in range(2, n_pages + 1)
301 ]
302 chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
303 for chunk in chunks:
304 async with asyncio.Semaphore(concurrency):
305 res = await asyncio.gather(*chunk, return_exceptions=True)
306 data = flatten(glom(res, "*.data"))
307 results.extend(data)
308 # if no results, create an empty key
309 metadata = {"empty": True} if not results else {}
310 await set_cf_r2(prefix + r2_key, convert_parquet(results, qtype, date), metadata, mime_type="application/x-parquet", silent=True)
311
312 now = nowdt(TZ)
313 if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
314 # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
315 return
316
317 monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
318 monitor_years.append(str(now.year))
319 monitor_years = sorted(set(monitor_years))
320 r2 = await list_cf_r2(prefix=prefix)
321 r2_dates = {x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])}
322 has_update = False
323 for year in monitor_years:
324 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
325 params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
326 api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
327 resp = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)
328 new_dates_map = defaultdict(list)
329 # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
330 # norm dates to YYYY-MM-DD
331 for x in resp:
332 live_date: str = x["liveDate"] # type: ignore
333 if live_date[:10] in r2_dates:
334 continue
335 if qtype == "弹幕" and live_date[:10] == f"{now:%Y-%m-%d}": # ignore today's live
336 continue
337 new_dates_map[live_date[:10]].append(live_date)
338 if not new_dates_map:
339 continue
340 has_update = True
341 # sync per date
342 logger.debug(f"Sync {qtype}-{year}: {new_dates_map}")
343 await batch_sync(new_dates_map)
344 r2_dates.update(new_dates_map.keys()) # update new dates to R2 dates
345
346 # sync whole month
347 year_months = {x[:7] for x in new_dates_map}
348 for year_month in sorted(year_months):
349 month_data = []
350 for d in range(1, 32):
351 date = f"{year_month}-{d:02d}"
352 if date not in r2_dates:
353 continue
354 logger.trace(f"Get R2 date: {date}")
355 if r2 := await get_cf_r2(prefix + date, rformat="bytes", silent=True):
356 month_data.append(r2)
357 await set_cf_r2(prefix + year_month, merge_parquet(month_data), mime_type="application/x-parquet", silent=True)
358 del month_data
359 r2_dates.update(year_months) # update new dates to R2 dates
360
361 # sync whole year
362 year_data = []
363 for mon in range(1, 13):
364 date = f"{year}-{mon:02d}"
365 if date not in r2_dates:
366 continue
367 logger.trace(f"Get R2 date: {date}")
368 if r2 := await get_cf_r2(prefix + date, rformat="bytes", silent=True):
369 year_data.append(r2)
370 await set_cf_r2(prefix + year, merge_parquet(year_data), mime_type="application/x-parquet", silent=True)
371 del year_data
372
373 # sync liveinfo
374 if has_update:
375 liveinfo = {}
376 for year in monitor_years:
377 liveinfo.update(await get_live_info(year))
378 await set_cf_r2(DANMU.R2_PREFIX.rstrip("/") + "/liveinfo", liveinfo, silent=True)