main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from datetime import datetime
4from decimal import Decimal
5from io import BytesIO
6from pathlib import Path
7from zoneinfo import ZoneInfo
8
9import pandas as pd
10from glom import glom
11
12from config import DANMU, DB, DOWNLOAD_DIR, PROXY, TOKEN, TZ, cache
13from networking import hx_req
14from price.coinmarketcap import get_cmc_fiat
15
16TURSO_KWARGS: dict = {
17 "db_name": DANMU.TURSO_DATABASE,
18 "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
19 "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
20 "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
21}
22
23
24@cache.memoize(ttl=28800)
25async def to_usd(ccy: str) -> Decimal:
26 """Convert 1 unit ccy to USD."""
27 if ccy == "USD":
28 return Decimal(1)
29 fiats = await get_cmc_fiat() # {"ccy": (name, id)}
30 if ccy not in fiats:
31 return Decimal()
32
33 ccy_id = fiats[ccy][1]
34 usd_id = fiats["USD"][1]
35 url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
36 headers = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
37 params = {"amount": 1, "id": ccy_id, "convert_id": usd_id}
38 response = await hx_req(url, params=params, headers=headers, proxy=PROXY.CRYPTO, check_keys=["data.quote"], check_kv={"status.error_code": 0})
39 rate = glom(response, f"data.quote.{usd_id}.price", default=Decimal())
40 return Decimal(rate)
41
42
43def date_str(date: str) -> str:
44 """格式化日期."""
45 if len(date) == 4:
46 date_name = f"{date}年"
47 elif len(date) == 7:
48 date_name = f"{date[:4]}年{date[5:7]}月"
49 elif len(date) == 10:
50 date_name = f"{date[:4]}年{date[5:7]}月{date[8:10]}日"
51 else:
52 date_name = date
53 return date_name
54
55
56def convert_parquet(data: list[dict], qtype: str, date: str) -> bytes:
57 """Convert to parquet file.
58
59 发言: {'content': '你好',
60 'endTime': '00:04:28,399',
61 'id': 135366,
62 'liveDate': '2022-11-14',
63 'serial': 1,
64 'startTime': '00:04:27,566'
65 }
66
67 弹幕:
68 {'authorId': 'fakeid',
69 'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
70 'authorName': 'User',
71 'count': None,
72 'emotesCount': None,
73 'id': 1092980,
74 'liveDate': '2021-12-29',
75 'message': '你好',
76 'scAmount': "TWD 75.0",
77 'scInfo': None,
78 'timeInSeconds': 18690.529,
79 'timeText': '5:11:30',
80 'timestamp': 1640799880050853}
81 """
82 if not data:
83 return b""
84 data_list = []
85 if qtype == "发言":
86 for x in data:
87 if not all([x.get("startTime"), x.get("content")]):
88 continue
89 item = {}
90 ts = x["startTime"] # 3:53:23,123
91 ts = ts.split(",")[0] # 3:53:23
92 if len(ts.split(":")[0]) == 1:
93 ts = f"0{ts}" # 03:53:23
94 timestamp = datetime.strptime(f"{date[:10]} {ts}", "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)).timestamp()
95 item["ts"] = round(timestamp)
96 item["content"] = x["content"]
97 data_list.append(item)
98 df = pd.DataFrame(data_list).astype({"ts": int, "content": str})
99
100 else: # 弹幕
101 for x in data:
102 item = {}
103 if not all([x.get("timestamp"), x.get("authorName"), x.get("authorId")]):
104 continue
105 item["uid"] = x["authorId"]
106 item["ts"] = round(x["timestamp"] / 1000000)
107 item["name"] = x["authorName"].lstrip("@")
108 item["content"] = x.get("message") or ""
109 item["superchat"] = x.get("scAmount") or ""
110 data_list.append(item)
111 df = pd.DataFrame(data_list).astype({"uid": str, "ts": int, "name": str, "content": str, "superchat": str})
112 buffer = BytesIO()
113 df.drop_duplicates().to_parquet(buffer, index=False, compression="brotli")
114 parquet_bytes = buffer.getvalue()
115 buffer.close()
116 return parquet_bytes
117
118
119def merge_parquet(parquets: list[bytes]) -> bytes:
120 """Merge parquet bytes."""
121 if not parquets:
122 return b""
123 dataframes = [pd.read_parquet(BytesIO(parquet)) for parquet in parquets]
124 df = pd.concat(dataframes, ignore_index=True).sort_values(by=["ts"]).drop_duplicates()
125 buffer = BytesIO()
126 df.drop_duplicates().to_parquet(buffer, index=False, compression="brotli")
127 parquet_bytes = buffer.getvalue()
128 buffer.close()
129 return parquet_bytes
130
131
132def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str, qtype: str, header_tips: str) -> list[str]:
133 """Merge multiple txt files into one."""
134 if not paths:
135 return []
136 dates = sorted(dates)
137 paths = sorted(paths)
138 dates = [name for name in dates if any(name in path for path in paths)]
139 if len(dates) == 1:
140 date_name = dates[0]
141 elif all(len(x) == 10 for x in dates): # all days (不存在此情况)
142 date_name = dates[0][:7]
143 elif all(len(x) == 7 for x in dates): # all months
144 date_name = dates[0][:4]
145 elif all(len(x) == 4 for x in dates): # all years
146 date_name = dates[0] if len(set(dates)) == 1 else ""
147 else:
148 date_name = ""
149 keyword = f"“{keyword}”" if keyword else ""
150
151 if file_bytes(paths) < 10 * 1024 * 1024: # 10 MB
152 texts = header_tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
153 save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_str(date_name)}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
154 Path(save_path).write_text(texts)
155 return [save_path]
156
157 num_merge = 2
158 date_chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
159 path_chunks = [paths[i : i + num_merge] for i in range(0, len(paths), num_merge)]
160 final_paths = []
161 for date_chunk, path_chunk in zip(date_chunks, path_chunks, strict=True):
162 texts = header_tips + "\n\n".join([Path(p).read_text() for p in sorted(path_chunk, reverse=True)]).strip()
163 date_name = "&".join(date_chunk)
164 save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_name}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
165 Path(save_path).write_text(texts)
166 final_paths.append(save_path)
167 return final_paths
168
169
170@cache.memoize(ttl=86400 * 7)
171async def get_bearer_token() -> str:
172 """获取bearer token."""
173 api_url = DANMU.BASE_URL + "/auth/login"
174 resp = await hx_req(
175 api_url,
176 "POST",
177 data={"userName": DANMU.AUTH_USER, "password": DANMU.AUTH_PASS},
178 proxy=PROXY.DANMU,
179 check_kv={"code": 200},
180 silent=True,
181 )
182 return resp.get("msg", "")
183
184
185@cache.memoize(ttl=600)
186async def get_live_info(year: str | int) -> dict:
187 params = {"liveDate": year}
188 api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
189 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": str(year)}
190 resp: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True) # type: ignore
191 if glom(resp, "hx_error", default=""): # API server is down
192 return {}
193 dates = {x["liveDate"][:10] for x in resp}
194 info = {date: {"titles": [], "urls": []} for date in dates}
195 for x in resp:
196 info[x["liveDate"][:10]]["titles"].append(x["title"])
197 info[x["liveDate"][:10]]["urls"].append(x["url"])
198 return info
199
200
201async def live_date(date: str) -> str:
202 """Convert YYYY-MM-DD live date to markdown with title."""
203 live_info = await get_live_info(date[:4])
204 day = date[:10]
205 titles = glom(live_info, f"{day}.titles", default=[])
206 urls = glom(live_info, f"{day}.urls", default=[])
207 markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
208 texts = date + "\n" + "\n".join(markdown)
209 return texts.rstrip()
210
211
212def file_bytes(paths: list[str] | str) -> int:
213 """Get file size in bytes."""
214 if isinstance(paths, str):
215 paths = [paths]
216 return sum([Path(path).stat().st_size for path in paths])