main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from collections import defaultdict
4from datetime import UTC, datetime, timedelta
5from decimal import Decimal
6from io import BytesIO
7from pathlib import Path
8from zoneinfo import ZoneInfo
9
10import anyio
11import pandas as pd
12from glom import glom
13from loguru import logger
14
15from config import DANMU, DOWNLOAD_DIR, TZ, cache
16from database.r2 import get_cf_r2
17from messages.progress import modify_progress
18from others.emoji import CURRENCY
19from utils import nowdt, number
20
21
22async def query_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
23 """从R2获取记录.
24
25 日期从新到旧, 数据从旧到新
26 Returns:
27 {"paths": list[str], "count": int}
28 """
29 if not dates:
30 return {}
31 prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
32 total_count = 0
33 queried_dates = []
34 paths = []
35 for date in sorted(dates, reverse=True): # 日期从新到旧
36 r2_key = prefix + date
37 df = await query_r2_for_date(date, r2_key, qtype)
38 queried_dates.append(date.upper())
39 if len(df) == 0:
40 continue
41 parsed = await parse_dataframe(df, user, keyword, super_chats, qtype)
42 count = parsed.get("count", 0)
43 if count == 0:
44 continue
45 total_count += count
46 texts = parsed.get("texts", "")
47 await modify_progress(text=caption + f"\n🔍查询时间: {'、'.join(queried_dates)}\n⏳匹配{qtype}数: {total_count}", force_update=True, **kwargs)
48 del parsed
49 save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
50 async with await anyio.open_file(save_path, "w") as f:
51 await f.write(texts.strip())
52 paths.append(save_path)
53 return {"paths": paths, "count": total_count}
54
55
56async def parse_dataframe(df: pd.DataFrame, user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
57 """解析从R2获取的记录."""
58 texts = ""
59 count = 0
60 if keyword:
61 df = df[df["content"].str.contains(keyword)]
62 if user and qtype == "弹幕":
63 df = df[df["name"] == user]
64 df["livedate"] = df["ts"].apply(ts_to_liveday, args=(qtype,))
65 df = df.sort_values(by=["livedate", "ts"], ascending=[False, True])
66 processed_day = set()
67 for _, row in df.iterrows():
68 day, title = await live_date_info(row["ts"], qtype)
69 day_str = f"\n开播日期: {day}\n{title}\n" if day not in processed_day else ""
70 processed_day.add(day)
71 if qtype == "发言":
72 texts += f"\n{day_str}{ts_time(row['ts'])}: {row['content'].strip()}"
73 else:
74 sc_amount = ""
75 if super_chat := row["superchat"]:
76 currency, amount = super_chat.split(" ")
77 super_chats[currency] += Decimal(amount)
78 sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
79 username = "" if user else f"|{row['name']}" # 当指定过滤user时, 隐藏用户名
80 texts += f"\n{day_str}{ts_time(row['ts'])}{username}{sc_amount}: {row['content'].strip()}"
81 count += 1
82 return {"texts": texts.rstrip(), "count": count}
83
84
85async def query_r2_for_date(date: str, r2_key: str, qtype: str) -> pd.DataFrame:
86 """首先尝试从本地磁盘获取, 如果不存在, 则从R2获取."""
87 path = Path(f"{DOWNLOAD_DIR}/{qtype}/{date}.parquet")
88 now = datetime.now(UTC).timestamp()
89 # always return local file if it is less than 1 hour old
90 if path.is_file() and now - path.stat().st_mtime < 3600:
91 logger.trace(f"Load {qtype} {date} from local file: {path.name}")
92 return pd.read_parquet(path)
93
94 async def r2_to_dataframe(r2_key: str) -> pd.DataFrame:
95 logger.debug(f"Query {qtype} from R2: {date}")
96 parquet: bytes = await get_cf_r2(r2_key, rformat="bytes", silent=True) # ty:ignore[invalid-assignment]
97 df = pd.read_parquet(BytesIO(parquet)).drop_duplicates()
98 path.parent.mkdir(parents=True, exist_ok=True)
99 df.to_parquet(path, index=False, compression="brotli")
100 return df
101
102 this_year = nowdt(TZ).strftime("%Y")
103 this_month = nowdt(TZ).strftime("%Y-%m")
104 # get from r2 for specific day / this year / this month
105 if len(date) == 10 or date in [this_year, this_month]:
106 return await r2_to_dataframe(r2_key)
107
108 # return local file if it exists
109 if path.is_file():
110 logger.trace(f"Load {qtype} {date} from local file: {path.name}")
111 return pd.read_parquet(path)
112
113 # get from r2 for other dates
114 logger.debug(f"Save {qtype} {date} to {path.name}")
115 return await r2_to_dataframe(r2_key)
116
117
118def ts_to_liveday(ts: int, qtype: str) -> str:
119 """将时间戳转换为直播日期.
120
121 弹幕时间戳是真实时间, 而发言时间是相对开播时间
122 """
123 dt = datetime.fromtimestamp(ts, tz=ZoneInfo(TZ))
124 if qtype == "发言":
125 return dt.strftime("%Y-%m-%d")
126
127 if 0 <= dt.hour < 6: # 过了凌晨也算前一天
128 return (dt - timedelta(days=1)).strftime("%Y-%m-%d")
129 return dt.strftime("%Y-%m-%d")
130
131
132def ts_time(ts: int) -> str:
133 """将时间戳转换为时间."""
134 dt = datetime.fromtimestamp(ts, tz=ZoneInfo(TZ))
135 return dt.strftime("%H:%M:%S")
136
137
138@cache.memoize(ttl=600)
139async def get_liveinfo() -> list[dict]:
140 """获取直播信息."""
141 return await get_cf_r2(DANMU.R2_PREFIX.rstrip("/") + "/liveinfo", silent=True) # ty:ignore[invalid-return-type]
142
143
144async def live_date_info(ts: int, qtype: str) -> tuple[str, str]:
145 """将时间戳转换为直播日期信息.
146
147 Returns:
148 tuple[str, str]: (直播日期, 直播链接)
149 Eg: ("2023-12-12", "[直播标题](https://...)")
150 """
151 day = ts_to_liveday(ts, qtype)
152 live_info = await get_liveinfo()
153 titles = glom(live_info, f"{day}.titles", default=[])
154 urls = glom(live_info, f"{day}.urls", default=[])
155 markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
156 return day, "\n".join(markdown)