main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from collections import defaultdict
4from datetime import UTC, datetime
5from decimal import Decimal
6from zoneinfo import ZoneInfo
7
8import anyio
9from loguru import logger
10
11from config import DANMU, DOWNLOAD_DIR, PROXY, TZ
12from danmu.utils import get_bearer_token, live_date
13from messages.progress import modify_progress
14from networking import hx_req
15from others.emoji import CURRENCY
16from utils import number
17
18
19async def query_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
20 """从远程数据库获取记录.
21
22 Returns:
23 {"paths": list[str], "count": int}
24 """
25 if not dates:
26 return {}
27 paths = []
28 total_count = 0
29 queried_dates = []
30 for date in sorted(dates, reverse=True):
31 api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
32 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
33 payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
34 if user and qtype == "弹幕":
35 payload["authorName"] = user
36 if keyword:
37 payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
38 payload["page"] = 1
39 logger.debug(f"Query {qtype}: {payload}")
40 resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
41 queried_dates.append(date)
42 parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
43 count = parsed.get("count", 0)
44 if count == 0:
45 continue
46 total_count += count
47 texts = parsed.get("texts", "")
48 await modify_progress(text=caption + f"\n🔍查询时间: {'、'.join(queried_dates)}\n⏳匹配{qtype}数: {total_count}", force_update=True, **kwargs)
49 while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
50 payload["page"] += 1
51 logger.debug(f"Query {qtype}: {payload}")
52 resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
53 parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
54 total_count += parsed.get("count", 0)
55 texts += parsed.get("texts", "")
56 await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", **kwargs)
57 del parsed
58 save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
59 async with await anyio.open_file(save_path, "w") as f:
60 await f.write(texts.strip())
61 paths.append(save_path)
62 return {"paths": paths, "count": total_count}
63
64
65async def parse_from_server(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
66 """解析从远程数据库获取的记录.
67
68 日期从新到旧, 数据从旧到新
69 """
70 texts = ""
71 count = 0
72 text_key = "message" if qtype == "弹幕" else "content"
73 if keyword:
74 data = [x for x in data if keyword in x.get(text_key, "")]
75 # deduplicate
76 added = set()
77 deduplicated = []
78 for x in data:
79 if f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}" not in added:
80 deduplicated.append(x)
81 added.add(f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}")
82 data = deduplicated
83
84 logged_date = []
85 if qtype == "发言":
86 data = [x for x in data if x.get("liveDate") and x.get("startTime")]
87 # group data by dates, and normalize startTime
88 grouped_data = defaultdict(list)
89 for x in data:
90 start_time = x["startTime"].split(",")[0] # 3:53:23 or 03:53:23
91 if len(start_time.split(":")[0]) == 1:
92 x["startTime"] = f"0{start_time}" # 03:53:23
93 else:
94 x["startTime"] = start_time
95 grouped_data[x["liveDate"][:10]].append(x)
96
97 for norm_date, date_data in sorted(grouped_data.items(), reverse=True): # 日期从新到旧
98 for x in sorted(date_data, key=lambda x: x["startTime"]): # 数据从旧到新
99 # only show the day once
100 if norm_date not in logged_date:
101 stream_date = await live_date(norm_date)
102 day = f"\n开播日期: {stream_date}\n"
103 logged_date.append(norm_date)
104 else:
105 day = ""
106 texts += f"\n{day}{x['startTime']}: {x['content'].strip()}"
107 count += 1
108 else:
109 hide_name = bool(user) # 当指定过滤user时, 隐藏用户名
110 if user:
111 data = [x for x in data if x.get("authorName", "") == user]
112 if keyword:
113 data = [x for x in data if keyword in x.get("message", "")]
114 data = [x for x in data if x.get("timestamp")] # ensure timestamp
115 # group data by dates, and normalize startTime
116 grouped_data = defaultdict(list)
117 for x in data:
118 grouped_data[x["liveDate"][:10]].append(x)
119
120 for norm_date, date_data in sorted(grouped_data.items(), reverse=True): # 日期从新到旧
121 for x in sorted(date_data, key=lambda x: x["timestamp"]): # 数据从旧到新
122 dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
123 sc_amount = ""
124 if super_chat := x.get("scAmount"):
125 currency, amount = super_chat.split(" ")
126 super_chats[currency] += Decimal(amount)
127 sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
128 msg = x.get("message", "")
129 username = "" if hide_name else "|" + x.get("authorName", "")
130 # only show the day once
131 if norm_date not in logged_date:
132 stream_date = await live_date(norm_date)
133 day = f"\n开播日期: {stream_date}\n"
134 logged_date.append(norm_date)
135 else:
136 day = ""
137 texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
138 count += 1
139
140 return {"texts": texts.rstrip(), "count": count}