main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from datetime import datetime
  4from decimal import Decimal
  5from io import BytesIO
  6from pathlib import Path
  7from zoneinfo import ZoneInfo
  8
  9import pandas as pd
 10from glom import glom
 11
 12from config import DANMU, DB, DOWNLOAD_DIR, PROXY, TOKEN, TZ, cache
 13from networking import hx_req
 14from price.coinmarketcap import get_cmc_fiat
 15
 16TURSO_KWARGS: dict = {
 17    "db_name": DANMU.TURSO_DATABASE,
 18    "username": DANMU.TURSO_USERNAME or DB.TURSO_USERNAME,
 19    "api_token": DANMU.TURSO_API_TOKEN or DB.TURSO_API_TOKEN,
 20    "group_token": DANMU.TURSO_GROUP_TOKEN or DB.TURSO_GROUP_TOKEN,
 21}
 22
 23
 24@cache.memoize(ttl=28800)
 25async def to_usd(ccy: str) -> Decimal:
 26    """Convert 1 unit ccy to USD."""
 27    if ccy == "USD":
 28        return Decimal(1)
 29    fiats = await get_cmc_fiat()  # {"ccy": (name, id)}
 30    if ccy not in fiats:
 31        return Decimal()
 32
 33    ccy_id = fiats[ccy][1]
 34    usd_id = fiats["USD"][1]
 35    url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
 36    headers = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
 37    params = {"amount": 1, "id": ccy_id, "convert_id": usd_id}
 38    response = await hx_req(url, params=params, headers=headers, proxy=PROXY.CRYPTO, check_keys=["data.quote"], check_kv={"status.error_code": 0})
 39    rate = glom(response, f"data.quote.{usd_id}.price", default=Decimal())
 40    return Decimal(rate)
 41
 42
 43def date_str(date: str) -> str:
 44    """格式化日期."""
 45    if len(date) == 4:
 46        date_name = f"{date}"
 47    elif len(date) == 7:
 48        date_name = f"{date[:4]}{date[5:7]}"
 49    elif len(date) == 10:
 50        date_name = f"{date[:4]}{date[5:7]}{date[8:10]}"
 51    else:
 52        date_name = date
 53    return date_name
 54
 55
 56def convert_parquet(data: list[dict], qtype: str, date: str) -> bytes:
 57    """Convert to parquet file.
 58
 59    发言: {'content': '你好',
 60           'endTime': '00:04:28,399',
 61           'id': 135366,
 62           'liveDate': '2022-11-14',
 63           'serial': 1,
 64           'startTime': '00:04:27,566'
 65         }
 66
 67    弹幕:
 68        {'authorId': 'fakeid',
 69        'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
 70        'authorName': 'User',
 71        'count': None,
 72        'emotesCount': None,
 73        'id': 1092980,
 74        'liveDate': '2021-12-29',
 75        'message': '你好',
 76        'scAmount': "TWD 75.0",
 77        'scInfo': None,
 78        'timeInSeconds': 18690.529,
 79        'timeText': '5:11:30',
 80        'timestamp': 1640799880050853}
 81    """
 82    if not data:
 83        return b""
 84    data_list = []
 85    if qtype == "发言":
 86        for x in data:
 87            if not all([x.get("startTime"), x.get("content")]):
 88                continue
 89            item = {}
 90            ts = x["startTime"]  # 3:53:23,123
 91            ts = ts.split(",")[0]  # 3:53:23
 92            if len(ts.split(":")[0]) == 1:
 93                ts = f"0{ts}"  # 03:53:23
 94            timestamp = datetime.strptime(f"{date[:10]} {ts}", "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)).timestamp()
 95            item["ts"] = round(timestamp)
 96            item["content"] = x["content"]
 97            data_list.append(item)
 98        df = pd.DataFrame(data_list).astype({"ts": int, "content": str})
 99
100    else:  # 弹幕
101        for x in data:
102            item = {}
103            if not all([x.get("timestamp"), x.get("authorName"), x.get("authorId")]):
104                continue
105            item["uid"] = x["authorId"]
106            item["ts"] = round(x["timestamp"] / 1000000)
107            item["name"] = x["authorName"].lstrip("@")
108            item["content"] = x.get("message") or ""
109            item["superchat"] = x.get("scAmount") or ""
110            data_list.append(item)
111        df = pd.DataFrame(data_list).astype({"uid": str, "ts": int, "name": str, "content": str, "superchat": str})
112    buffer = BytesIO()
113    df.drop_duplicates().to_parquet(buffer, index=False, compression="brotli")
114    parquet_bytes = buffer.getvalue()
115    buffer.close()
116    return parquet_bytes
117
118
119def merge_parquet(parquets: list[bytes]) -> bytes:
120    """Merge parquet bytes."""
121    if not parquets:
122        return b""
123    dataframes = [pd.read_parquet(BytesIO(parquet)) for parquet in parquets]
124    df = pd.concat(dataframes, ignore_index=True).sort_values(by=["ts"]).drop_duplicates()
125    buffer = BytesIO()
126    df.drop_duplicates().to_parquet(buffer, index=False, compression="brotli")
127    parquet_bytes = buffer.getvalue()
128    buffer.close()
129    return parquet_bytes
130
131
132def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str, qtype: str, header_tips: str) -> list[str]:
133    """Merge multiple txt files into one."""
134    if not paths:
135        return []
136    dates = sorted(dates)
137    paths = sorted(paths)
138    dates = [name for name in dates if any(name in path for path in paths)]
139    if len(dates) == 1:
140        date_name = dates[0]
141    elif all(len(x) == 10 for x in dates):  # all days  (不存在此情况)
142        date_name = dates[0][:7]
143    elif all(len(x) == 7 for x in dates):  # all months
144        date_name = dates[0][:4]
145    elif all(len(x) == 4 for x in dates):  # all years
146        date_name = dates[0] if len(set(dates)) == 1 else ""
147    else:
148        date_name = ""
149    keyword = f"{keyword}" if keyword else ""
150
151    if file_bytes(paths) < 10 * 1024 * 1024:  # 10 MB
152        texts = header_tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
153        save_path = f"{DOWNLOAD_DIR}/【{qtype}{user}-{date_str(date_name)}-{keyword}.txt".replace("--", "-").replace("】-", "").replace("-.txt", ".txt")
154        Path(save_path).write_text(texts)
155        return [save_path]
156
157    num_merge = 2
158    date_chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
159    path_chunks = [paths[i : i + num_merge] for i in range(0, len(paths), num_merge)]
160    final_paths = []
161    for date_chunk, path_chunk in zip(date_chunks, path_chunks, strict=True):
162        texts = header_tips + "\n\n".join([Path(p).read_text() for p in sorted(path_chunk, reverse=True)]).strip()
163        date_name = "&".join(date_chunk)
164        save_path = f"{DOWNLOAD_DIR}/【{qtype}{user}-{date_name}-{keyword}.txt".replace("--", "-").replace("】-", "").replace("-.txt", ".txt")
165        Path(save_path).write_text(texts)
166        final_paths.append(save_path)
167    return final_paths
168
169
170@cache.memoize(ttl=86400 * 7)
171async def get_bearer_token() -> str:
172    """获取bearer token."""
173    api_url = DANMU.BASE_URL + "/auth/login"
174    resp = await hx_req(
175        api_url,
176        "POST",
177        data={"userName": DANMU.AUTH_USER, "password": DANMU.AUTH_PASS},
178        proxy=PROXY.DANMU,
179        check_kv={"code": 200},
180        silent=True,
181    )
182    return resp.get("msg", "")
183
184
185@cache.memoize(ttl=600)
186async def get_live_info(year: str | int) -> dict:
187    params = {"liveDate": year}
188    api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
189    headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": str(year)}
190    resp: list[dict] = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)  # type: ignore
191    if glom(resp, "hx_error", default=""):  # API server is down
192        return {}
193    dates = {x["liveDate"][:10] for x in resp}
194    info = {date: {"titles": [], "urls": []} for date in dates}
195    for x in resp:
196        info[x["liveDate"][:10]]["titles"].append(x["title"])
197        info[x["liveDate"][:10]]["urls"].append(x["url"])
198    return info
199
200
201async def live_date(date: str) -> str:
202    """Convert YYYY-MM-DD live date to markdown with title."""
203    live_info = await get_live_info(date[:4])
204    day = date[:10]
205    titles = glom(live_info, f"{day}.titles", default=[])
206    urls = glom(live_info, f"{day}.urls", default=[])
207    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
208    texts = date + "\n" + "\n".join(markdown)
209    return texts.rstrip()
210
211
212def file_bytes(paths: list[str] | str) -> int:
213    """Get file size in bytes."""
214    if isinstance(paths, str):
215        paths = [paths]
216    return sum([Path(path).stat().st_size for path in paths])