main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from collections import defaultdict
  4from datetime import UTC, datetime, timedelta
  5from decimal import Decimal
  6from io import BytesIO
  7from pathlib import Path
  8from zoneinfo import ZoneInfo
  9
 10import anyio
 11import pandas as pd
 12from glom import glom
 13from loguru import logger
 14
 15from config import DANMU, DOWNLOAD_DIR, TZ, cache
 16from database.r2 import get_cf_r2
 17from messages.progress import modify_progress
 18from others.emoji import CURRENCY
 19from utils import nowdt, number
 20
 21
 22async def query_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
 23    """从R2获取记录.
 24
 25    日期从新到旧, 数据从旧到新
 26    Returns:
 27        {"paths": list[str], "count": int}
 28    """
 29    if not dates:
 30        return {}
 31    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
 32    total_count = 0
 33    queried_dates = []
 34    paths = []
 35    for date in sorted(dates, reverse=True):  # 日期从新到旧
 36        r2_key = prefix + date
 37        df = await query_r2_for_date(date, r2_key, qtype)
 38        queried_dates.append(date.upper())
 39        if len(df) == 0:
 40            continue
 41        parsed = await parse_dataframe(df, user, keyword, super_chats, qtype)
 42        count = parsed.get("count", 0)
 43        if count == 0:
 44            continue
 45        total_count += count
 46        texts = parsed.get("texts", "")
 47        await modify_progress(text=caption + f"\n🔍查询时间: {''.join(queried_dates)}\n⏳匹配{qtype}数: {total_count}", force_update=True, **kwargs)
 48        del parsed
 49        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
 50        async with await anyio.open_file(save_path, "w") as f:
 51            await f.write(texts.strip())
 52        paths.append(save_path)
 53    return {"paths": paths, "count": total_count}
 54
 55
 56async def parse_dataframe(df: pd.DataFrame, user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
 57    """解析从R2获取的记录."""
 58    texts = ""
 59    count = 0
 60    if keyword:
 61        df = df[df["content"].str.contains(keyword)]
 62    if user and qtype == "弹幕":
 63        df = df[df["name"] == user]
 64    df["livedate"] = df["ts"].apply(ts_to_liveday, args=(qtype,))
 65    df = df.sort_values(by=["livedate", "ts"], ascending=[False, True])
 66    processed_day = set()
 67    for _, row in df.iterrows():
 68        day, title = await live_date_info(row["ts"], qtype)
 69        day_str = f"\n开播日期: {day}\n{title}\n" if day not in processed_day else ""
 70        processed_day.add(day)
 71        if qtype == "发言":
 72            texts += f"\n{day_str}{ts_time(row['ts'])}: {row['content'].strip()}"
 73        else:
 74            sc_amount = ""
 75            if super_chat := row["superchat"]:
 76                currency, amount = super_chat.split(" ")
 77                super_chats[currency] += Decimal(amount)
 78                sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
 79            username = "" if user else f"|{row['name']}"  # 当指定过滤user时, 隐藏用户名
 80            texts += f"\n{day_str}{ts_time(row['ts'])}{username}{sc_amount}: {row['content'].strip()}"
 81        count += 1
 82    return {"texts": texts.rstrip(), "count": count}
 83
 84
 85async def query_r2_for_date(date: str, r2_key: str, qtype: str) -> pd.DataFrame:
 86    """首先尝试从本地磁盘获取, 如果不存在, 则从R2获取."""
 87    path = Path(f"{DOWNLOAD_DIR}/{qtype}/{date}.parquet")
 88    now = datetime.now(UTC).timestamp()
 89    # always return local file if it is less than 1 hour old
 90    if path.is_file() and now - path.stat().st_mtime < 3600:
 91        logger.trace(f"Load {qtype} {date} from local file: {path.name}")
 92        return pd.read_parquet(path)
 93
 94    async def r2_to_dataframe(r2_key: str) -> pd.DataFrame:
 95        logger.debug(f"Query {qtype} from R2: {date}")
 96        parquet: bytes = await get_cf_r2(r2_key, rformat="bytes", silent=True)  # ty:ignore[invalid-assignment]
 97        df = pd.read_parquet(BytesIO(parquet)).drop_duplicates()
 98        path.parent.mkdir(parents=True, exist_ok=True)
 99        df.to_parquet(path, index=False, compression="brotli")
100        return df
101
102    this_year = nowdt(TZ).strftime("%Y")
103    this_month = nowdt(TZ).strftime("%Y-%m")
104    # get from r2 for specific day / this year / this month
105    if len(date) == 10 or date in [this_year, this_month]:
106        return await r2_to_dataframe(r2_key)
107
108    # return local file if it exists
109    if path.is_file():
110        logger.trace(f"Load {qtype} {date} from local file: {path.name}")
111        return pd.read_parquet(path)
112
113    # get from r2 for other dates
114    logger.debug(f"Save {qtype} {date} to {path.name}")
115    return await r2_to_dataframe(r2_key)
116
117
118def ts_to_liveday(ts: int, qtype: str) -> str:
119    """将时间戳转换为直播日期.
120
121    弹幕时间戳是真实时间, 而发言时间是相对开播时间
122    """
123    dt = datetime.fromtimestamp(ts, tz=ZoneInfo(TZ))
124    if qtype == "发言":
125        return dt.strftime("%Y-%m-%d")
126
127    if 0 <= dt.hour < 6:  #  过了凌晨也算前一天
128        return (dt - timedelta(days=1)).strftime("%Y-%m-%d")
129    return dt.strftime("%Y-%m-%d")
130
131
132def ts_time(ts: int) -> str:
133    """将时间戳转换为时间."""
134    dt = datetime.fromtimestamp(ts, tz=ZoneInfo(TZ))
135    return dt.strftime("%H:%M:%S")
136
137
138@cache.memoize(ttl=600)
139async def get_liveinfo() -> list[dict]:
140    """获取直播信息."""
141    return await get_cf_r2(DANMU.R2_PREFIX.rstrip("/") + "/liveinfo", silent=True)  # ty:ignore[invalid-return-type]
142
143
144async def live_date_info(ts: int, qtype: str) -> tuple[str, str]:
145    """将时间戳转换为直播日期信息.
146
147    Returns:
148        tuple[str, str]: (直播日期, 直播链接)
149        Eg: ("2023-12-12", "[直播标题](https://...)")
150    """
151    day = ts_to_liveday(ts, qtype)
152    live_info = await get_liveinfo()
153    titles = glom(live_info, f"{day}.titles", default=[])
154    urls = glom(live_info, f"{day}.urls", default=[])
155    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
156    return day, "\n".join(markdown)