Commit df5d640
src/danmu/sync.py
@@ -1,6 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
+import hashlib
+import json
from collections import defaultdict
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
@@ -17,6 +19,7 @@ from utils import nowdt, strings_list
# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
+USERINFO_COLUMNS = "uid TEXT, fullname TEXT, superchat TEXT, firstdate TEXT, lastdate TEXT, note TEXT, hash TEXT PRIMARY KEY"
# livechats相关
COLUMNS = {
@@ -25,7 +28,7 @@ COLUMNS = {
}
INDEX_NAMES = {
"发言": ["time"],
- "弹幕": ["time", "fullname", "uid", "superchat"],
+ "弹幕": ["time", "fullname", "uid"],
}
@@ -48,6 +51,7 @@ async def sync_server_to_turso(qtype: str) -> None:
# 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], idx_prefix="idx_liveinfo_", silent=True, **TURSO_KWARGS)
+ await turso_create_table("userinfo", USERINFO_COLUMNS, idx_cols=["uid", "fullname"], idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
await turso_create_table(qtype, COLUMNS[qtype], idx_cols=INDEX_NAMES[qtype], idx_prefix=f"idx_{qtype}_", fts_on_col="rowid", silent=True, **TURSO_KWARGS)
# 获取turso存储的liveinfo
@@ -77,7 +81,7 @@ async def sync_server_to_turso(qtype: str) -> None:
logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
# append the first page
results.extend(resp["data"])
- quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY) # type: ignore
+ quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
n_pages = quotient + (1 if remainder else 0)
tasks = [
hx_req(
@@ -130,9 +134,8 @@ async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str)
continue
dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=ZoneInfo(TZ))
item["time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
- item["fullname"] = x["authorName"] # User Name
- if x.get("authorId"):
- item["uid"] = x["authorId"]
+ item["fullname"] = x["authorName"].removeprefix("@") # User Name
+ item["uid"] = x["authorId"] if x.get("authorId") else item["fullname"]
if x.get("scAmount"):
item["superchat"] = x["scAmount"]
@@ -143,7 +146,7 @@ async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str)
continue
normed_data.append(item)
added.add(f"{item['time']}{item['fullname']}{item.get('content', '')}")
-
+ await save_userinfo_to_turso(normed_data)
# 过滤掉获取已保存在turso的记录
data = await filter_records_in_turso(normed_data, date, qtype)
@@ -202,6 +205,68 @@ async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> li
return filtered
+async def save_userinfo_to_turso(data: list[dict]) -> None:
+ """Save userinfo to turso.
+
+ item in data:
+ {
+ 'content': 'Hello',
+ 'fullname': 'Alive',
+ 'segmented': 'Hello',
+ 'time': '2021-04-03 21:11:25',
+ 'uid': 'UCVN-fPvyxRJnyMkFjAa9oTQ'
+ }
+ """
+ sha224 = lambda s: hashlib.sha224(s.encode()).hexdigest()
+ escape = lambda s: s.replace("'", "''")
+
+ def update_superchat(superchats: list[dict], db_superchat: str | None) -> str:
+ """Update superchat to db_record.
+
+ superchats: ["USD 10.0", "TWD 200.0"]
+ db_superchat: "{"USD": 10.0, "TWD": 200.0}"
+ """
+ db_superchat = db_superchat or "[]"
+ db_sc = json.loads(db_superchat)
+ for x in superchats:
+ sc = {"time": x["time"], "superchat": x["superchat"]}
+ if sc not in db_sc:
+ db_sc.append(sc)
+ return json.dumps(db_sc).removeprefix("[]")
+
+ hashkeys = tuple({sha224(f"{x['uid']}{x['fullname']}") for x in data})
+ db = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM userinfo WHERE hash IN ({', '.join(f"'{x}'" for x in hashkeys)});"}}], silent=True, **TURSO_KWARGS)
+ db = {x["hash"]: x for x in turso_parse_resp(db)}
+ grouped = defaultdict(list) # group by uid-fullname
+ for x in data:
+ grouped[x["uid"] + x["fullname"]].append(x)
+
+ sql = "INSERT OR REPLACE INTO userinfo (uid, fullname, superchat, firstdate, lastdate, note, hash) VALUES "
+ for records in grouped.values():
+ uid = records[0]["uid"]
+ fullname = records[0]["fullname"]
+ hash_ = sha224(f"{uid}{fullname}")
+ times = sorted([x["time"] for x in records])
+ superchats = [x for x in records if x.get("superchat")]
+ db_record = db.get(hash_, {})
+ superchat = update_superchat(superchats, db_record.get("superchat"))
+ db_firstdate = db_record.get("firstdate", "9999-12-31")
+ db_lastdate = db_record.get("lastdate", "0000-01-01")
+ firstdate = min(times[0], db_firstdate)
+ lastdate = max(times[-1], db_lastdate)
+ note = db_record.get("note", "")
+ record = {"uid": uid, "fullname": fullname, "superchat": superchat, "firstdate": firstdate, "lastdate": lastdate, "note": note, "hash": hash_}
+ if record != db_record:
+ sql += f"('{escape(uid)}', '{escape(fullname)}', '{superchat}', '{firstdate}', '{lastdate}', '{escape(note)}', '{hash_}'),"
+
+ if sql.endswith(","):
+ sql = sql[:-1] + ";"
+ logger.info(f"Retrieve userinfo of {len(grouped)} users")
+ resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
+ num_success = glom(resp, "results.0.response.result.affected_row_count", default=0)
+ logger.success(f"Updated userinfo of {num_success} records")
+
+
async def sync_server_to_r2(qtype: str) -> None:
concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
@@ -220,7 +285,7 @@ async def sync_server_to_r2(qtype: str) -> None:
logger.trace(f"Query {qtype} date: {date} - {resp['count']} results")
# append the first page
results.extend(resp["data"])
- quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY) # type: ignore
+ quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
n_pages = quotient + (1 if remainder else 0)
tasks = [
hx_req(
src/danmu/turso.py
@@ -5,6 +5,7 @@ from datetime import datetime, timedelta
from decimal import Decimal
import anyio
+from glom import glom
from loguru import logger
from config import DOWNLOAD_DIR, TZ, cache, cutter
@@ -160,3 +161,29 @@ async def live_date(date: str) -> str:
texts = d + "\n" + "\n".join(markdown)
return texts.rstrip()
return date
+
+
+async def get_uids_by_name(name: str, queried_names: set[str] | None = None) -> list[str]:
+ """Get uids by name."""
+ logger.debug(f"Querying name: {name}, queried_names: {queried_names}")
+ queried_names = queried_names or {name}
+ resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT uid FROM userinfo WHERE fullname = '{name}';"}}], silent=True, **TURSO_KWARGS)
+ uids = glom(turso_parse_resp(resp), "*.uid")
+ if len(uids) <= 1:
+ logger.success(f"Found uid: {uids} for name: {name}")
+ return uids
+ logger.info(f"Found uids: {uids} for name: {name}")
+ # 递归查询
+ matched_uids = uids
+ for uid in uids:
+ logger.debug(f"Querying names for uid: {uid}")
+ resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT fullname FROM userinfo WHERE uid = '{uid}';"}}], silent=True, **TURSO_KWARGS)
+ names = glom(turso_parse_resp(resp), "*.fullname")
+ logger.info(f"Found names: {names} for uid: {uid}")
+ for n in names:
+ if n in queried_names:
+ continue
+ logger.debug(f"Querying uid for name: {n}")
+ matched_uids.extend(await get_uids_by_name(n, queried_names))
+ queried_names.add(n)
+ return matched_uids
src/danmu/utils.py
@@ -108,7 +108,7 @@ def simplify_json(data: list[dict], qtype: str) -> dict:
if not all([x.get("timestamp"), x.get("authorName")]):
continue
item["s"] = round(x["timestamp"] / 1000000)
- item["u"] = x["authorName"].replace(" ", "") # UserName
+ item["u"] = x["authorName"].replace(" ", "").removeprefix("@") # UserName
if x.get("scAmount"):
item["p"] = x["scAmount"]
if x.get("message"):