main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from datetime import datetime
  4from decimal import Decimal
  5from zoneinfo import ZoneInfo
  6
  7import pandas as pd
  8from glom import glom
  9from pyrogram.client import Client
 10from pyrogram.parser.markdown import Markdown
 11
 12from config import DANMU, DB, PROXY, TOKEN, TZ, cache
 13from networking import hx_req
 14from price.coinmarketcap import get_cmc_fiat
 15
 16TURSO_KWARGS: dict = {
 17    "db_name": DANMU.TURSO_DATABASE,
 18    "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
 19    "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
 20    "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
 21}
 22
 23
 24@cache.memoize(ttl=28800)
 25async def to_usd(ccy: str) -> Decimal:
 26    """Convert 1 unit ccy to USD."""
 27    if ccy == "USD":
 28        return Decimal(1)
 29    fiats = await get_cmc_fiat()  # {"ccy": (name, id)}
 30    if ccy not in fiats:
 31        return Decimal()
 32
 33    ccy_id = fiats[ccy][1]
 34    usd_id = fiats["USD"][1]
 35    url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
 36    headers = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
 37    params = {"amount": 1, "id": ccy_id, "convert_id": usd_id}
 38    response = await hx_req(url, params=params, headers=headers, proxy=PROXY.CRYPTO, check_keys=["data.quote"], check_kv={"status.error_code": 0})
 39    rate = glom(response, f"data.quote.{usd_id}.price", default=Decimal())
 40    return Decimal(rate)
 41
 42
 43def date_str(date: str) -> str:
 44    """格式化日期."""
 45    if len(date) == 4:
 46        date_name = f"{date}"
 47    elif len(date) == 7:
 48        date_name = f"{date[:4]}{date[5:7]}"
 49    elif len(date) == 10:
 50        date_name = f"{date[:4]}{date[5:7]}{date[8:10]}"
 51    else:
 52        date_name = date
 53    return date_name
 54
 55
 56def convert_to_dataframe(data: list[dict], qtype: str) -> pd.DataFrame:
 57    """Convert to dataframe.
 58
 59    发言: {'content': '你好',
 60           'endTime': '00:04:28,399',
 61           'id': 135366,
 62           'liveDate': '2022-11-14',
 63           'serial': 1,
 64           'startTime': '00:04:27,566'
 65         }
 66
 67    弹幕:
 68        {'authorId': 'fakeid',
 69        'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
 70        'authorName': 'User',
 71        'count': None,
 72        'emotesCount': None,
 73        'id': 1092980,
 74        'liveDate': '2021-12-29',
 75        'message': '你好',
 76        'scAmount': "TWD 75.0",
 77        'scInfo': None,
 78        'timeInSeconds': 18690.529,
 79        'timeText': '5:11:30',
 80        'timestamp': 1640799880050853}
 81    """
 82    if not data:
 83        return pd.DataFrame()
 84    data_list = []
 85    if qtype == "发言":
 86        for x in data:
 87            if not all([x.get("startTime"), x.get("content"), x.get("liveDate")]):
 88                continue
 89            item = {}
 90            ts = x["startTime"]  # 3:53:23,123
 91            ts = ts.split(",")[0]  # 3:53:23
 92            if len(ts.split(":")[0]) == 1:
 93                ts = f"0{ts}"  # 03:53:23
 94            timestamp = datetime.strptime(f"{x['liveDate'][:10]} {ts}", "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)).timestamp()
 95            item["ts"] = round(timestamp)
 96            item["content"] = x["content"]
 97            data_list.append(item)
 98        return pd.DataFrame(data_list).astype({"ts": int, "content": str}).drop_duplicates()
 99
100    # 弹幕
101    for x in data:
102        item = {}
103        if not all([x.get("timestamp"), x.get("authorName"), x.get("authorId")]):
104            continue
105        item["uid"] = x["authorId"]
106        item["ts"] = round(x["timestamp"] / 1000000)
107        item["name"] = x["authorName"].lstrip("@")
108        item["content"] = x.get("message") or ""
109        item["superchat"] = x.get("scAmount") or ""
110        data_list.append(item)
111    df = pd.DataFrame(data_list).astype({"uid": str, "ts": int, "name": str, "content": str, "superchat": str})
112    return df.drop_duplicates()
113
114
115@cache.memoize(ttl=86400 * 7)
116async def get_bearer_token() -> str:
117    """获取bearer token."""
118    api_url = DANMU.BASE_URL + "/auth/login"
119    resp = await hx_req(
120        api_url,
121        "POST",
122        data={"userName": DANMU.AUTH_USER, "password": DANMU.AUTH_PASS},
123        proxy=PROXY.DANMU,
124        check_kv={"code": 200},
125        silent=True,
126    )
127    return resp.get("msg", "")
128
129
130@cache.memoize(ttl=600)
131async def get_live_info(year: str | int) -> dict:
132    params = {"liveDate": year}
133    api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
134    headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": str(year)}
135    resp: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)  # type: ignore
136    if glom(resp, "hx_error", default=""):  # API server is down
137        return {}
138    dates = {x["liveDate"][:10] for x in resp}
139    info = {date: {"titles": [], "urls": []} for date in dates}
140    for x in resp:
141        info[x["liveDate"][:10]]["titles"].append(x["title"])
142        info[x["liveDate"][:10]]["urls"].append(x["url"])
143    return info
144
145
146async def live_date(date: str) -> str:
147    """Convert YYYY-MM-DD live date to markdown with title."""
148    live_info = await get_live_info(date[:4])
149    day = date[:10]
150    titles = glom(live_info, f"{day}.titles", default=[])
151    urls = glom(live_info, f"{day}.urls", default=[])
152    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
153    texts = date + "\n" + "\n".join(markdown)
154    return texts.rstrip()
155
156
157async def count_entities(client: Client, markdown: str) -> int:
158    """Count the number of entities in markdown."""
159    parsed = await Markdown(client).parse(markdown, strict=True)
160    return len(glom(parsed, "entities", default=[]))