main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from datetime import datetime
4from decimal import Decimal
5from zoneinfo import ZoneInfo
6
7import pandas as pd
8from glom import glom
9from pyrogram.client import Client
10from pyrogram.parser.markdown import Markdown
11
12from config import DANMU, DB, PROXY, TOKEN, TZ, cache
13from networking import hx_req
14from price.coinmarketcap import get_cmc_fiat
15
16TURSO_KWARGS: dict = {
17 "db_name": DANMU.TURSO_DATABASE,
18 "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
19 "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
20 "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
21}
22
23
24@cache.memoize(ttl=28800)
25async def to_usd(ccy: str) -> Decimal:
26 """Convert 1 unit ccy to USD."""
27 if ccy == "USD":
28 return Decimal(1)
29 fiats = await get_cmc_fiat() # {"ccy": (name, id)}
30 if ccy not in fiats:
31 return Decimal()
32
33 ccy_id = fiats[ccy][1]
34 usd_id = fiats["USD"][1]
35 url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
36 headers = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
37 params = {"amount": 1, "id": ccy_id, "convert_id": usd_id}
38 response = await hx_req(url, params=params, headers=headers, proxy=PROXY.CRYPTO, check_keys=["data.quote"], check_kv={"status.error_code": 0})
39 rate = glom(response, f"data.quote.{usd_id}.price", default=Decimal())
40 return Decimal(rate)
41
42
43def date_str(date: str) -> str:
44 """格式化日期."""
45 if len(date) == 4:
46 date_name = f"{date}年"
47 elif len(date) == 7:
48 date_name = f"{date[:4]}年{date[5:7]}月"
49 elif len(date) == 10:
50 date_name = f"{date[:4]}年{date[5:7]}月{date[8:10]}日"
51 else:
52 date_name = date
53 return date_name
54
55
56def convert_to_dataframe(data: list[dict], qtype: str) -> pd.DataFrame:
57 """Convert to dataframe.
58
59 发言: {'content': '你好',
60 'endTime': '00:04:28,399',
61 'id': 135366,
62 'liveDate': '2022-11-14',
63 'serial': 1,
64 'startTime': '00:04:27,566'
65 }
66
67 弹幕:
68 {'authorId': 'fakeid',
69 'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
70 'authorName': 'User',
71 'count': None,
72 'emotesCount': None,
73 'id': 1092980,
74 'liveDate': '2021-12-29',
75 'message': '你好',
76 'scAmount': "TWD 75.0",
77 'scInfo': None,
78 'timeInSeconds': 18690.529,
79 'timeText': '5:11:30',
80 'timestamp': 1640799880050853}
81 """
82 if not data:
83 return pd.DataFrame()
84 data_list = []
85 if qtype == "发言":
86 for x in data:
87 if not all([x.get("startTime"), x.get("content"), x.get("liveDate")]):
88 continue
89 item = {}
90 ts = x["startTime"] # 3:53:23,123
91 ts = ts.split(",")[0] # 3:53:23
92 if len(ts.split(":")[0]) == 1:
93 ts = f"0{ts}" # 03:53:23
94 timestamp = datetime.strptime(f"{x['liveDate'][:10]} {ts}", "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)).timestamp()
95 item["ts"] = round(timestamp)
96 item["content"] = x["content"]
97 data_list.append(item)
98 return pd.DataFrame(data_list).astype({"ts": int, "content": str}).drop_duplicates()
99
100 # 弹幕
101 for x in data:
102 item = {}
103 if not all([x.get("timestamp"), x.get("authorName"), x.get("authorId")]):
104 continue
105 item["uid"] = x["authorId"]
106 item["ts"] = round(x["timestamp"] / 1000000)
107 item["name"] = x["authorName"].lstrip("@")
108 item["content"] = x.get("message") or ""
109 item["superchat"] = x.get("scAmount") or ""
110 data_list.append(item)
111 df = pd.DataFrame(data_list).astype({"uid": str, "ts": int, "name": str, "content": str, "superchat": str})
112 return df.drop_duplicates()
113
114
115@cache.memoize(ttl=86400 * 7)
116async def get_bearer_token() -> str:
117 """获取bearer token."""
118 api_url = DANMU.BASE_URL + "/auth/login"
119 resp = await hx_req(
120 api_url,
121 "POST",
122 data={"userName": DANMU.AUTH_USER, "password": DANMU.AUTH_PASS},
123 proxy=PROXY.DANMU,
124 check_kv={"code": 200},
125 silent=True,
126 )
127 return resp.get("msg", "")
128
129
130@cache.memoize(ttl=600)
131async def get_live_info(year: str | int) -> dict:
132 params = {"liveDate": year}
133 api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
134 headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": str(year)}
135 resp: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True) # type: ignore
136 if glom(resp, "hx_error", default=""): # API server is down
137 return {}
138 dates = {x["liveDate"][:10] for x in resp}
139 info = {date: {"titles": [], "urls": []} for date in dates}
140 for x in resp:
141 info[x["liveDate"][:10]]["titles"].append(x["title"])
142 info[x["liveDate"][:10]]["urls"].append(x["url"])
143 return info
144
145
146async def live_date(date: str) -> str:
147 """Convert YYYY-MM-DD live date to markdown with title."""
148 live_info = await get_live_info(date[:4])
149 day = date[:10]
150 titles = glom(live_info, f"{day}.titles", default=[])
151 urls = glom(live_info, f"{day}.urls", default=[])
152 markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
153 texts = date + "\n" + "\n".join(markdown)
154 return texts.rstrip()
155
156
157async def count_entities(client: Client, markdown: str) -> int:
158 """Count the number of entities in markdown."""
159 parsed = await Markdown(client).parse(markdown, strict=True)
160 return len(glom(parsed, "entities", default=[]))