Commit fc2e157
src/danmu/sync.py
@@ -11,7 +11,7 @@ from loguru import logger
from config import DANMU, DB, TZ, cutter
from danmu.utils import merge_json, simplify_json
from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
-from database.turso import insert_statement, turso_create_table, turso_exec
+from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
from networking import hx_req
from utils import nowdt
@@ -58,12 +58,7 @@ async def sync_server_to_turso(qtype: str) -> None:
# 获取turso存储的liveinfo
resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
- cols = glom(resp, "results.0.response.result.cols", default=[])
- rows = glom(resp, "results.0.response.result.rows", default=[])
- db_liveinfo: dict[str, dict] = {}
- for row in rows:
- row_info = {col["name"]: x["value"] for x, col in zip(row, cols, strict=True)}
- db_liveinfo[row_info["liveDate"]] = row_info
+ db_liveinfo = {x["liveDate"]: x for x in turso_parse_resp(resp)}
# 开始同步
monitor_years = [x.strip() for x in DANMU.SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.SYNC_FAYAN_YEARS.split(",") if x.strip()]
@@ -73,9 +68,11 @@ async def sync_server_to_turso(qtype: str) -> None:
params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
liveinfo_list: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
+ if glom(liveinfo_list, "hx_error", default=""): # API server is down
+ return
for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
live_date = liveinfo["liveDate"]
- if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default="0") == "1":
+ if glom(db_liveinfo, f"{live_date}.{qtype}已完成", default=0) == 1:
continue
results = []
payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
@@ -149,7 +146,7 @@ async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str)
data = await filter_records_in_turso(normed_data, date, qtype)
# 插入到turso
- concurrency = 1024 # 每次插入1024条
+ concurrency = 4096 # 每次插入4096条
statements = [insert_statement(qtype, records) for records in sorted(data, key=lambda x: x["time"])]
chunks = [statements[i : i + concurrency] for i in range(0, len(statements), concurrency)]
for chunk in chunks:
src/danmu/utils.py
@@ -168,6 +168,8 @@ async def get_live_info(year: str | int) -> dict:
params = {"liveDate": year}
api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
+ if glom(resp, "hx_error", default=""): # API server is down
+ return {}
dates = {x["liveDate"][:10] for x in resp}
info = {date: {"titles": [], "urls": []} for date in dates}
for x in resp:
src/history/turso.py
@@ -310,6 +310,7 @@ async def save_chatinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
idx_prefix=f"idx_{cid}_",
fts_on_col="mid",
fts_name=cid,
+ silent=True,
**TURSO_KWARGS,
)
if cached != records: