Commit ead116c

benny-dou <60535774+benny-dou@users.noreply.github.com>
2026-04-29 05:07:51
chore(danmu): update danmu function
1 parent 739b699
src/danmu/entrypoint.py
@@ -3,19 +3,21 @@
 import re
 from collections import defaultdict
 from decimal import Decimal
+from io import BytesIO
 from pathlib import Path
 
 from pyrogram.client import Client
-from pyrogram.types import Message
+from pyrogram.errors.exceptions.bad_request_400 import MediaCaptionTooLong
+from pyrogram.types import InputMediaDocument, Message
 
-from config import DANMU, PREFIX, TZ
-from danmu.r2 import query_r2
+from config import DANMU, DOWNLOAD_DIR, PREFIX, TZ
+from danmu.r2 import get_username_history, query_r2
 from danmu.turso import query_turso
-from danmu.utils import file_bytes, merge_txt_files, to_usd
+from danmu.utils import count_entities, to_usd
 from messages.parser import parse_msg
 from messages.progress import modify_progress
 from messages.sender import send2tg
-from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
+from messages.utils import blockquote, delete_message, equal_prefix, smart_split, startswith_prefix
 from others.emoji import CURRENCY
 from publish import publish_telegraph
 from utils import convert_html, nowdt, number, strings_list
@@ -43,8 +45,7 @@ HELP = f"""📖**查询直播合订本**
 ( `@用户名` 对于 `{PREFIX.FAYAN}` 命令无效)
 """
 
-DANMU_TIPS = f"时间点为{TZ}时区"
-FAYAN_TIPS = "时间点并非真实时间, 而是相对开播时长"
+DANMU_TIPS = f"时间点为{TZ}时区\n"
 
 
 async def query_danmu(client: Client, message: Message, **kwargs):
@@ -72,7 +73,7 @@ async def query_danmu(client: Client, message: Message, **kwargs):
     caption += f"\n👤用户: {user}"
     caption += f"\n🔤关键词: {keyword}"
 
-    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0] if kwargs.get("show_progress") else None
+    status_msg = await message.reply_text(caption, quote=True)
     kwargs["progress"] = status_msg
 
     super_chats = defaultdict(Decimal)  # {"currency": amount}
@@ -90,6 +91,7 @@ async def query_danmu(client: Client, message: Message, **kwargs):
         await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
         return
     header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
+    username_history = await get_username_history(user) if qtype == "弹幕" and user else ""
     profit = ""
     profit_usd = 0
     for currency, amount in sorted(super_chats.items()):
@@ -100,26 +102,30 @@ async def query_danmu(client: Client, message: Message, **kwargs):
     if profit_usd > 0 and super_chats:
         profit += f"\n💵**总计**: {profit_usd:.2f} USD"
 
-    tips = f"{DANMU_TIPS}\n" if qtype == "弹幕" else f"{FAYAN_TIPS}\n"
-    if file_bytes(paths) < 20480:  #  20 KB, short length, try send as message directly
-        danmu = tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
-        final = f"{header}{profit}\n{blockquote(danmu)}"
-        if len(await smart_split(final)) == 1:
-            await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
-            return
+    texts = "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
+    # try send as message directly
+    tips = DANMU_TIPS if qtype == "弹幕" else ""
+    final = f"{header}{tips}{profit}{blockquote(username_history)}\n{texts}"
+    if (await count_entities(client, final)) <= 100 and len(await smart_split(final)) == 1:
+        await modify_progress(message=status_msg, text=blockquote(final), force_update=True, **kwargs)
+        return
+
     caption += f"\n#️⃣{qtype}数: {count}"
     caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
-    merged_paths = paths if DANMU.QUERY_METHOD.lower() == "turso" else merge_txt_files(paths, query_dates, user, keyword, qtype, tips)
-    media = [{"document": path} for path in sorted(merged_paths)]
-    # less than 200K, add instant view
-    if file_bytes(merged_paths) < 204800:
-        texts = "\n".join([Path(path).read_text() for path in sorted(merged_paths, reverse=True)]).strip()
-        if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=convert_html(texts), author=user, ttl="1d"):
-            caption += f"\n⚡️[即时预览]({telegraph_url})"
-    await send2tg(client, message, texts=caption, media=media, **kwargs)
+    html = convert_html("\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip())
+    if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=html, author=user, ttl="1d"):
+        caption += f"\n⚡️[即时预览]({telegraph_url})"
+    caption += blockquote(username_history)
+    html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{qtype}查询结果</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css"></head><body><article>{html}</article></body></html>'
+    try:
+        with BytesIO(html.encode("utf-8")) as f:
+            await status_msg.edit_media(file_name=f"{qtype}查询结果.html", media=InputMediaDocument(f, file_name=f"{qtype}查询结果.html", caption=caption))
+    except MediaCaptionTooLong:
+        save_path = Path(DOWNLOAD_DIR).joinpath(f"{qtype}查询结果.html")
+        save_path.write_text(html)
+        await send2tg(client, message, texts=caption, media=[{"document": save_path.as_posix()}], **kwargs)
+        await delete_message(status_msg)
     [Path(path).unlink(missing_ok=True) for path in paths]
-    [Path(path).unlink(missing_ok=True) for path in merged_paths]
-    await modify_progress(message=status_msg, del_status=True, **kwargs)
 
 
 def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
src/danmu/r2.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
+import re
 from collections import defaultdict
 from datetime import UTC, datetime, timedelta
 from decimal import Decimal
@@ -13,10 +14,10 @@ from glom import glom
 from loguru import logger
 
 from config import DANMU, DOWNLOAD_DIR, TZ, cache
-from database.r2 import get_cf_r2
+from database.r2 import get_cf_r2, set_cf_r2
 from messages.progress import modify_progress
 from others.emoji import CURRENCY
-from utils import nowdt, number
+from utils import nowdt, number, number_to_emoji
 
 
 async def query_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
@@ -28,13 +29,12 @@ async def query_r2(dates: list[str], user: str, keyword: str, caption: str, supe
     """
     if not dates:
         return {}
-    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
+
     total_count = 0
     queried_dates = []
     paths = []
     for date in sorted(dates, reverse=True):  # 日期从新到旧
-        r2_key = prefix + date
-        df = await query_r2_for_date(date, r2_key, qtype)
+        df = await query_r2_for_date(date, qtype)
         queried_dates.append(date.upper())
         if len(df) == 0:
             continue
@@ -60,16 +60,18 @@ async def parse_dataframe(df: pd.DataFrame, user: str, keyword: str, super_chats
     if keyword:
         df = df[df["content"].str.contains(keyword)]
     if user and qtype == "弹幕":
-        df = df[df["name"] == user]
+        uids = await get_uids_by_name(name=user)
+        names = await get_names_by_uids(uids)
+        df = df[df["name"].str.lower().isin(names)]
     df["livedate"] = df["ts"].apply(ts_to_liveday, args=(qtype,))
     df = df.sort_values(by=["livedate", "ts"], ascending=[False, True])
     processed_day = set()
     for _, row in df.iterrows():
-        day, title = await live_date_info(row["ts"], qtype)
-        day_str = f"\n开播日期: {day}\n{title}\n" if day not in processed_day else ""
+        day, title, url = await live_date_info(row["ts"], qtype)
+        day_str = f"\n{title}\n" if day not in processed_day else ""
         processed_day.add(day)
         if qtype == "发言":
-            texts += f"\n{day_str}{ts_time(row['ts'])}: {row['content'].strip()}"
+            texts += f"\n{day_str}{ts_with_url(row['ts'], url)}: {row['content'].strip()}"
         else:
             sc_amount = ""
             if super_chat := row["superchat"]:
@@ -82,39 +84,70 @@ async def parse_dataframe(df: pd.DataFrame, user: str, keyword: str, super_chats
     return {"texts": texts.rstrip(), "count": count}
 
 
-async def query_r2_for_date(date: str, r2_key: str, qtype: str) -> pd.DataFrame:
+async def query_r2_for_date(date: str, qtype: str) -> pd.DataFrame:
     """首先尝试从本地磁盘获取, 如果不存在, 则从R2获取."""
-    path = Path(f"{DOWNLOAD_DIR}/{qtype}/{date}.parquet")
+    r2_key = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/{date[:4]}"
+    path = Path(f"{DOWNLOAD_DIR}/{qtype}/{date[:4]}.parquet")
     now = datetime.now(UTC).timestamp()
-    # always return local file if it is less than 1 hour old
+    # always use local file if it is less than 1 hour old
     if path.is_file() and now - path.stat().st_mtime < 3600:
-        logger.trace(f"Load {qtype} {date} from local file: {path.name}")
+        logger.trace(f"Load {qtype} from local file: {path.name}")
+        df = pd.read_parquet(path)
+
+    # get from r2 for this year
+    elif date[:4] == nowdt(TZ).strftime("%Y"):
+        logger.debug(f"Query {qtype} from R2: {r2_key}")
+        df = await get_r2_dataframe(r2_key, path)
+
+    # use local file if it exists
+    elif path.is_file():
+        logger.trace(f"Load {qtype} from local file: {path.name}")
+        df = pd.read_parquet(path)
+
+    # get from r2 for other dates
+    else:
+        logger.debug(f"Save {qtype} to {path.name}")
+        df = await get_r2_dataframe(r2_key, path)
+
+    # filter specific date
+    if len(date) == 7:  # YYYY-MM
+        start = datetime.strptime(date, "%Y-%m").replace(day=1, tzinfo=ZoneInfo(TZ))
+        end = datetime.strptime(date, "%Y-%m").replace(day=31, hour=23, minute=59, second=59, microsecond=999999, tzinfo=ZoneInfo(TZ))
+        start_ts = int(start.timestamp())
+        end_ts = int(end.timestamp())
+        df = df[(df["ts"] >= start_ts) & (df["ts"] <= end_ts)]
+    elif len(date) == 10:  # YYYY-MM-DD
+        start = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=ZoneInfo(TZ))
+        end = datetime.strptime(date, "%Y-%m-%d").replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=ZoneInfo(TZ))
+        start_ts = int(start.timestamp())
+        end_ts = int(end.timestamp())
+        df = df[(df["ts"] >= start_ts) & (df["ts"] <= end_ts)]
+    return df.reset_index(drop=True)
+
+
+@cache.memoize(ttl=120)
+async def get_r2_dataframe(r2_key: str, path: Path | None = None) -> pd.DataFrame:
+    # always use local file if it is less than 1 hour old
+    if isinstance(path, Path) and path.is_file() and datetime.now(UTC).timestamp() - path.stat().st_mtime < 3600:
+        logger.trace(f"Load {r2_key} from local file: {path.name}")
         return pd.read_parquet(path)
 
-    async def r2_to_dataframe(r2_key: str) -> pd.DataFrame:
-        logger.debug(f"Query {qtype} from R2: {date}")
-        parquet = await get_cf_r2(r2_key, rformat="bytes", silent=True)
-        if isinstance(parquet, bytes):
-            df = pd.read_parquet(BytesIO(parquet)).drop_duplicates()
+    parquet = await get_cf_r2(r2_key, rformat="bytes", silent=True)
+    if isinstance(parquet, bytes):
+        df = pd.read_parquet(BytesIO(parquet)).drop_duplicates()
+        if path is not None:
             path.parent.mkdir(parents=True, exist_ok=True)
             df.to_parquet(path, index=False, compression="brotli")
-            return df
-        return pd.DataFrame()
-
-    this_year = nowdt(TZ).strftime("%Y")
-    this_month = nowdt(TZ).strftime("%Y-%m")
-    # get from r2 for specific day / this year / this month
-    if len(date) == 10 or date in [this_year, this_month]:
-        return await r2_to_dataframe(r2_key)
-
-    # return local file if it exists
-    if path.is_file():
-        logger.trace(f"Load {qtype} {date} from local file: {path.name}")
-        return pd.read_parquet(path)
+        return df
+    return pd.DataFrame()
 
-    # get from r2 for other dates
-    logger.debug(f"Save {qtype} {date} to {path.name}")
-    return await r2_to_dataframe(r2_key)
+
+async def save_dataframe_to_r2(r2_key: str, df: pd.DataFrame):
+    buffer = BytesIO()
+    df.to_parquet(buffer, index=False, compression="brotli")
+    parquet_bytes = buffer.getvalue()
+    buffer.close()
+    await set_cf_r2(r2_key, parquet_bytes, mime_type="application/x-parquet", silent=True)
 
 
 def ts_to_liveday(ts: int, qtype: str) -> str:
@@ -132,27 +165,105 @@ def ts_to_liveday(ts: int, qtype: str) -> str:
 
 
 def ts_time(ts: int) -> str:
-    """将时间戳转换为时间."""
+    """将时间戳转换为时间 (格式: HH:MM:SS)."""
     dt = datetime.fromtimestamp(ts, tz=ZoneInfo(TZ))
     return dt.strftime("%H:%M:%S")
 
 
-@cache.memoize(ttl=600)
-async def get_liveinfo() -> list[dict]:
+def ts_with_url(ts: int, url: str) -> str:
+    """将时间戳添加到直播链接."""
+    dt = datetime.fromtimestamp(ts, tz=ZoneInfo(TZ))
+    start = dt.replace(hour=0, minute=0, second=0, microsecond=0)
+    seconds = int((dt - start).total_seconds())
+    return f"[{ts_time(ts)}]({url}&t={seconds})"
+
+
+@cache.memoize(ttl=300)
+async def r2_liveinfo() -> list[dict]:
     """获取直播信息."""
     return await get_cf_r2(DANMU.R2_PREFIX.rstrip("/") + "/liveinfo", silent=True)  # ty:ignore[invalid-return-type]
 
 
-async def live_date_info(ts: int, qtype: str) -> tuple[str, str]:
+@cache.memoize(ttl=300)
+async def r2_userinfo() -> pd.DataFrame:
+    """获取用户信息."""
+    return await get_r2_dataframe(DANMU.R2_PREFIX.rstrip("/") + "/userinfo")
+
+
+async def live_date_info(ts: int, qtype: str) -> tuple[str, str, str]:
     """将时间戳转换为直播日期信息.
 
     Returns:
         tuple[str, str]: (直播日期, 直播链接)
         Eg: ("2023-12-12", "[直播标题](https://...)")
     """
-    day = ts_to_liveday(ts, qtype)
-    live_info = await get_liveinfo()
-    titles = glom(live_info, f"{day}.titles", default=[])
-    urls = glom(live_info, f"{day}.urls", default=[])
-    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
-    return day, "\n".join(markdown)
+    day = ts_to_liveday(ts, qtype)  # YYYY-MM-DD
+
+    def beautify(title: str, url: str) -> str:
+        title = re.sub(r"[((]202\d{5}[))]", "", title).strip()  # delete date and bracket
+        return f"**[【{day}】{title}]({url})**"
+
+    live_info = await r2_liveinfo()
+    title = glom(live_info, f"{day}.title", default="")
+    url = glom(live_info, f"{day}.url", default="")
+    return day, beautify(title, url), url
+
+
+@cache.memoize(ttl=60)
+async def get_uids_by_name(name: str, queried_names: set[str] | None = None) -> set[str]:
+    """Get uids by name."""
+    logger.debug(f"Querying name: {name}, queried_names: {queried_names}")
+    if queried_names is None:
+        queried_names = set()
+    queried_names.add(name.lower())
+    userinfo = await r2_userinfo()
+    uids = userinfo[userinfo["name"].str.lower() == name.lower()]["uid"].to_list()
+    if len(uids) <= 1:
+        logger.success(f"Found uid of {name}: {glom(uids, '0', default='')}")
+        return uids
+    logger.info(f"Found uids of {name}: {uids}")
+    # 递归查询
+    matched_uids = uids
+    for uid in map(str, uids):
+        logger.debug(f"Querying names of {uid}")
+        names = userinfo[userinfo["uid"].str.lower() == uid.lower()]["name"].to_list()
+        logger.info(f"Found names of {uid}: {names}")
+        for n in names:
+            if n.lower() in queried_names:
+                continue
+            logger.debug(f"Querying uid for name: {n}")
+            matched_uids.extend(await get_uids_by_name(n, queried_names))
+    return set(matched_uids)
+
+
+async def get_names_by_uids(uids: set[str] | list[str]) -> set[str]:
+    """Get names by uids."""
+    names = []
+    userinfo = await r2_userinfo()
+    for uid in uids:
+        names.extend(userinfo[userinfo["uid"].str.lower() == uid.lower()]["name"].to_list())
+    return {x.lower() for x in names}
+
+
+async def get_username_history(name: str) -> str:
+    """Get username history by name."""
+    texts = ""
+    history = pd.DataFrame()
+    uids = await get_uids_by_name(name)
+    userinfo = await r2_userinfo()
+    for uid in uids:
+        df = userinfo[userinfo["uid"].str.lower() == uid.lower()]
+        history = pd.concat([history, df], axis=0)
+    history = history.sort_values(by="first")
+
+    format_time = lambda ts: datetime.fromtimestamp(ts, tz=ZoneInfo(TZ)).strftime("%Y-%m-%d")
+
+    # 遍历每个组
+    for idx, (uid, group) in enumerate(history.groupby("uid", sort=False)):
+        if re.match(r"^UC[\w-]{22}$", str(uid)):
+            texts += f"\n{number_to_emoji(idx + 1)}**UID: [{uid}](https://www.youtube.com/channel/{uid})**\n"
+        else:
+            texts += f"\n{number_to_emoji(idx + 1)}**UID: {uid}\n"
+        for _, row in group.iterrows():
+            texts += f"**{row['name']}**: {format_time(row['first'])}➡️{format_time(row['last'])}\n"
+    return texts
src/danmu/server.py
@@ -1,140 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-from collections import defaultdict
-from datetime import UTC, datetime
-from decimal import Decimal
-from zoneinfo import ZoneInfo
-
-import anyio
-from loguru import logger
-
-from config import DANMU, DOWNLOAD_DIR, PROXY, TZ
-from danmu.utils import get_bearer_token, live_date
-from messages.progress import modify_progress
-from networking import hx_req
-from others.emoji import CURRENCY
-from utils import number
-
-
-async def query_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
-    """从远程数据库获取记录.
-
-    Returns:
-        {"paths": list[str], "count": int}
-    """
-    if not dates:
-        return {}
-    paths = []
-    total_count = 0
-    queried_dates = []
-    for date in sorted(dates, reverse=True):
-        api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
-        headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
-        payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
-        if user and qtype == "弹幕":
-            payload["authorName"] = user
-        if keyword:
-            payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
-        payload["page"] = 1
-        logger.debug(f"Query {qtype}: {payload}")
-        resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
-        queried_dates.append(date)
-        parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
-        count = parsed.get("count", 0)
-        if count == 0:
-            continue
-        total_count += count
-        texts = parsed.get("texts", "")
-        await modify_progress(text=caption + f"\n🔍查询时间: {'、'.join(queried_dates)}\n⏳匹配{qtype}数: {total_count}", force_update=True, **kwargs)
-        while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
-            payload["page"] += 1
-            logger.debug(f"Query {qtype}: {payload}")
-            resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
-            parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
-            total_count += parsed.get("count", 0)
-            texts += parsed.get("texts", "")
-            await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", **kwargs)
-        del parsed
-        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
-        async with await anyio.open_file(save_path, "w") as f:
-            await f.write(texts.strip())
-        paths.append(save_path)
-    return {"paths": paths, "count": total_count}
-
-
-async def parse_from_server(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
-    """解析从远程数据库获取的记录.
-
-    日期从新到旧, 数据从旧到新
-    """
-    texts = ""
-    count = 0
-    text_key = "message" if qtype == "弹幕" else "content"
-    if keyword:
-        data = [x for x in data if keyword in x.get(text_key, "")]
-    # deduplicate
-    added = set()
-    deduplicated = []
-    for x in data:
-        if f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}" not in added:
-            deduplicated.append(x)
-            added.add(f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}")
-    data = deduplicated
-
-    logged_date = []
-    if qtype == "发言":
-        data = [x for x in data if x.get("liveDate") and x.get("startTime")]
-        # group data by dates, and normalize startTime
-        grouped_data = defaultdict(list)
-        for x in data:
-            start_time = x["startTime"].split(",")[0]  # 3:53:23 or 03:53:23
-            if len(start_time.split(":")[0]) == 1:
-                x["startTime"] = f"0{start_time}"  # 03:53:23
-            else:
-                x["startTime"] = start_time
-            grouped_data[x["liveDate"][:10]].append(x)
-
-        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
-            for x in sorted(date_data, key=lambda x: x["startTime"]):  # 数据从旧到新
-                # only show the day once
-                if norm_date not in logged_date:
-                    stream_date = await live_date(norm_date)
-                    day = f"\n开播日期: {stream_date}\n"
-                    logged_date.append(norm_date)
-                else:
-                    day = ""
-                texts += f"\n{day}{x['startTime']}: {x['content'].strip()}"
-                count += 1
-    else:
-        hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
-        if user:
-            data = [x for x in data if x.get("authorName", "") == user]
-        if keyword:
-            data = [x for x in data if keyword in x.get("message", "")]
-        data = [x for x in data if x.get("timestamp")]  # ensure timestamp
-        # group data by dates, and normalize startTime
-        grouped_data = defaultdict(list)
-        for x in data:
-            grouped_data[x["liveDate"][:10]].append(x)
-
-        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
-            for x in sorted(date_data, key=lambda x: x["timestamp"]):  # 数据从旧到新
-                dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
-                sc_amount = ""
-                if super_chat := x.get("scAmount"):
-                    currency, amount = super_chat.split(" ")
-                    super_chats[currency] += Decimal(amount)
-                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
-                msg = x.get("message", "")
-                username = "" if hide_name else "|" + x.get("authorName", "")
-                # only show the day once
-                if norm_date not in logged_date:
-                    stream_date = await live_date(norm_date)
-                    day = f"\n开播日期: {stream_date}\n"
-                    logged_date.append(norm_date)
-                else:
-                    day = ""
-                texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
-                count += 1
-
-    return {"texts": texts.rstrip(), "count": count}
src/danmu/sync.py
@@ -7,12 +7,14 @@ from collections import defaultdict
 from datetime import datetime, timedelta
 from zoneinfo import ZoneInfo
 
+import pandas as pd
 from glom import flatten, glom
 from loguru import logger
 
 from config import DANMU, PROXY, TZ, cutter
-from danmu.utils import TURSO_KWARGS, convert_parquet, get_bearer_token, get_live_info, merge_parquet
-from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
+from danmu.r2 import get_r2_dataframe, query_r2_for_date, r2_liveinfo, save_dataframe_to_r2
+from danmu.utils import TURSO_KWARGS, convert_to_dataframe, get_bearer_token
+from database.r2 import get_cf_r2, set_cf_r2
 from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
 from networking import hx_req
 from utils import nowdt, strings_list
@@ -272,107 +274,112 @@ async def sync_server_to_r2(qtype: str) -> None:
     prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
     api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
 
-    async def batch_sync(new_dates: dict[str, list[str]]):
-        for r2_key, server_dates in sorted(new_dates.items()):
-            results = []
-            for date in sorted(server_dates):
-                logger.trace(f"Query {qtype} date: {date}")
-                headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
-                payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
-                resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
-                if resp.get("count", 0) == 0:
-                    continue
-                logger.trace(f"Found {qtype} date: {date} - {resp['count']} results")
-                # append the first page
-                results.extend(resp["data"])
-                quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
-                n_pages = quotient + (1 if remainder else 0)
-                tasks = [
-                    hx_req(
-                        api_url,
-                        "POST",
-                        headers=headers,
-                        data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
-                        proxy=PROXY.DANMU,
-                        check_kv={"code": 0},
-                        silent=True,
-                    )
-                    for page in range(2, n_pages + 1)
-                ]
-                chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
-                for chunk in chunks:
-                    async with asyncio.Semaphore(concurrency):
-                        res = await asyncio.gather(*chunk, return_exceptions=True)
-                        data = flatten(glom(res, "*.data"))
-                        results.extend(data)
-            # if no results, create an empty key
-            metadata = {"empty": True} if not results else {}
-            await set_cf_r2(prefix + r2_key, convert_parquet(results, qtype, date), metadata, mime_type="application/x-parquet", silent=True)
+    async def batch_fetch(new_dates: set[str]) -> pd.DataFrame:
+        results = []
+        for date in sorted(new_dates):
+            logger.trace(f"Query {qtype} date: {date}")
+            headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
+            payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
+            resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
+            if resp.get("count", 0) == 0:
+                continue
+            logger.debug(f"Found {qtype} date: {date} - {resp['count']} results")
+            # append the first page
+            results.extend(resp["data"])
+            quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)
+            n_pages = quotient + (1 if remainder else 0)
+            tasks = [
+                hx_req(
+                    api_url,
+                    "POST",
+                    headers=headers,
+                    data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
+                    proxy=PROXY.DANMU,
+                    check_kv={"code": 0},
+                    silent=True,
+                )
+                for page in range(2, n_pages + 1)
+            ]
+            chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
+            for chunk in chunks:
+                async with asyncio.Semaphore(concurrency):
+                    res = await asyncio.gather(*chunk, return_exceptions=True)
+                    data = flatten(glom(res, "*.data"))
+                    results.extend(data)
+        return convert_to_dataframe(results, qtype)
 
     now = nowdt(TZ)
-    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
-        # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
-        return
-
     monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
     monitor_years.append(str(now.year))
     monitor_years = sorted(set(monitor_years))
-    r2 = await list_cf_r2(prefix=prefix)
-    r2_dates = {x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])}
-    has_update = False
+    finished_key = DANMU.R2_PREFIX.rstrip("/") + "/finished"
+    liveinfo_key = DANMU.R2_PREFIX.rstrip("/") + "/liveinfo"
+    userinfo_key = DANMU.R2_PREFIX.rstrip("/") + "/userinfo"
+    finished_info = await get_cf_r2(finished_key, silent=True)
+    live_info = await r2_liveinfo()
+    user_info = await get_r2_dataframe(userinfo_key) if qtype == "弹幕" else pd.DataFrame()
+    r2_dates = set(finished_info.get(qtype, []))
     for year in monitor_years:
         headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
         params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
         api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
-        resp = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)
-        new_dates_map = defaultdict(list)
-        # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
-        # norm dates to YYYY-MM-DD
-        for x in resp:
+        server_live_info = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params, silent=True)
+        new_dates = set()
+        for x in server_live_info:
             live_date: str = x["liveDate"]  # type: ignore
-            if live_date[:10] in r2_dates:
+            if live_date in r2_dates:
                 continue
-            if qtype == "弹幕" and live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
-                continue
-            new_dates_map[live_date[:10]].append(live_date)
-        if not new_dates_map:
+            new_dates.add(live_date)
+        if not new_dates:
             continue
-        has_update = True
-        # sync per date
-        logger.debug(f"Sync {qtype}-{year}: {new_dates_map}")
-        await batch_sync(new_dates_map)
-        r2_dates.update(new_dates_map.keys())  # update new dates to R2 dates
-
-        # sync whole month
-        year_months = {x[:7] for x in new_dates_map}
-        for year_month in sorted(year_months):
-            month_data = []
-            for d in range(1, 32):
-                date = f"{year_month}-{d:02d}"
-                if date not in r2_dates:
+        r2 = await query_r2_for_date(year, qtype)
+        logger.debug(f"Sync {qtype}-{year}: {new_dates}")
+        df = await batch_fetch(new_dates)
+        if len(df) > 0:
+            # save parquet
+            df = pd.concat([r2, df], ignore_index=True).sort_values(by=["ts"]).drop_duplicates()
+            if len(df) != len(r2):
+                logger.info(f"Saving {len(df) - len(r2)} records to {qtype}/{year}")
+                await save_dataframe_to_r2(prefix + year, df)
+            r2_dates |= new_dates
+            if 4 < now.hour < 20:  # only mark as finished during day time
+                finished_info[qtype] = sorted(r2_dates)
+                await set_cf_r2(finished_key, finished_info, silent=True)
+
+            # save liveinfo
+            params_2 = {"liveDate": year} if qtype != "弹幕" else {"srtCount": 1, "liveDate": year}
+            server_live_info_2 = await hx_req(api, headers=headers, proxy=PROXY.DANMU, params=params_2, silent=True)
+            info_1: dict = {x["liveDate"]: x for x in server_live_info}  # ty:ignore[invalid-argument-type]
+            info_2: dict = {x["liveDate"]: x for x in server_live_info_2}  # ty:ignore[invalid-argument-type]
+            keys = set(info_1) | set(info_2)
+            for key in keys:
+                live_info[key] = trim_empty(info_1.get(key, {}) | info_2.get(key, {}))
+
+            # handle dates like "2022-04-07_t", rename to "2022-04-07"
+            for key in keys:
+                if len(key) == 10:
                     continue
-                logger.trace(f"Get R2 date: {date}")
-                if r2 := await get_cf_r2(prefix + date, rformat="bytes", silent=True):
-                    month_data.append(r2)
-            await set_cf_r2(prefix + year_month, merge_parquet(month_data), mime_type="application/x-parquet", silent=True)
-            del month_data
-        r2_dates.update(year_months)  # update new dates to R2 dates
-
-        # sync whole year
-        year_data = []
-        for mon in range(1, 13):
-            date = f"{year}-{mon:02d}"
-            if date not in r2_dates:
+                if key[:10] not in live_info:
+                    live_info[key[:10]] = live_info[key]
+                del live_info[key]
+            await set_cf_r2(liveinfo_key, live_info, silent=True)
+
+            # save userinfo
+            if qtype != "弹幕":
                 continue
-            logger.trace(f"Get R2 date: {date}")
-            if r2 := await get_cf_r2(prefix + date, rformat="bytes", silent=True):
-                year_data.append(r2)
-        await set_cf_r2(prefix + year, merge_parquet(year_data), mime_type="application/x-parquet", silent=True)
-        del year_data
-
-    # sync liveinfo
-    if has_update:
-        liveinfo = {}
-        for year in monitor_years:
-            liveinfo.update(await get_live_info(year))
-        await set_cf_r2(DANMU.R2_PREFIX.rstrip("/") + "/liveinfo", liveinfo, silent=True)
+            # 对 ts 列进行聚合操作:求最小值(min)命名为 first,求最大值(max)命名为 last
+            userinfo_this_year = df[["uid", "ts", "name"]].groupby(["uid", "name"], as_index=False).agg(first=("ts", "min"), last=("ts", "max"))
+            user_info = pd.concat([userinfo_this_year, user_info]).groupby(["uid", "name"], as_index=False).agg(first=("first", "min"), last=("last", "max"))
+            user_info = user_info.sort_values(by=["last"], ascending=False).reset_index(drop=True)
+            await save_dataframe_to_r2(userinfo_key, user_info)
+
+
+def trim_empty(obj: dict) -> dict:
+    res = {}
+    for k, v in obj.items():
+        if v is None:
+            continue
+        if isinstance(v, str) and v.strip() == "":
+            continue
+        res[k] = v
+    return res
src/danmu/utils.py
@@ -2,14 +2,14 @@
 # -*- coding: utf-8 -*-
 from datetime import datetime
 from decimal import Decimal
-from io import BytesIO
-from pathlib import Path
 from zoneinfo import ZoneInfo
 
 import pandas as pd
 from glom import glom
+from pyrogram.client import Client
+from pyrogram.parser.markdown import Markdown
 
-from config import DANMU, DB, DOWNLOAD_DIR, PROXY, TOKEN, TZ, cache
+from config import DANMU, DB, PROXY, TOKEN, TZ, cache
 from networking import hx_req
 from price.coinmarketcap import get_cmc_fiat
 
@@ -53,8 +53,8 @@ def date_str(date: str) -> str:
     return date_name
 
 
-def convert_parquet(data: list[dict], qtype: str, date: str) -> bytes:
-    """Convert to parquet file.
+def convert_to_dataframe(data: list[dict], qtype: str) -> pd.DataFrame:
+    """Convert to dataframe.
 
     发言: {'content': '你好',
            'endTime': '00:04:28,399',
@@ -80,91 +80,36 @@ def convert_parquet(data: list[dict], qtype: str, date: str) -> bytes:
         'timestamp': 1640799880050853}
     """
     if not data:
-        return b""
+        return pd.DataFrame()
     data_list = []
     if qtype == "发言":
         for x in data:
-            if not all([x.get("startTime"), x.get("content")]):
+            if not all([x.get("startTime"), x.get("content"), x.get("liveDate")]):
                 continue
             item = {}
             ts = x["startTime"]  # 3:53:23,123
             ts = ts.split(",")[0]  # 3:53:23
             if len(ts.split(":")[0]) == 1:
                 ts = f"0{ts}"  # 03:53:23
-            timestamp = datetime.strptime(f"{date[:10]} {ts}", "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)).timestamp()
+            timestamp = datetime.strptime(f"{x['liveDate'][:10]} {ts}", "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)).timestamp()
             item["ts"] = round(timestamp)
             item["content"] = x["content"]
             data_list.append(item)
-        df = pd.DataFrame(data_list).astype({"ts": int, "content": str})
-
-    else:  # 弹幕
-        for x in data:
-            item = {}
-            if not all([x.get("timestamp"), x.get("authorName"), x.get("authorId")]):
-                continue
-            item["uid"] = x["authorId"]
-            item["ts"] = round(x["timestamp"] / 1000000)
-            item["name"] = x["authorName"].lstrip("@")
-            item["content"] = x.get("message") or ""
-            item["superchat"] = x.get("scAmount") or ""
-            data_list.append(item)
-        df = pd.DataFrame(data_list).astype({"uid": str, "ts": int, "name": str, "content": str, "superchat": str})
-    buffer = BytesIO()
-    df.drop_duplicates().to_parquet(buffer, index=False, compression="brotli")
-    parquet_bytes = buffer.getvalue()
-    buffer.close()
-    return parquet_bytes
-
-
-def merge_parquet(parquets: list[bytes]) -> bytes:
-    """Merge parquet bytes."""
-    if not parquets:
-        return b""
-    dataframes = [pd.read_parquet(BytesIO(parquet)) for parquet in parquets]
-    df = pd.concat(dataframes, ignore_index=True).sort_values(by=["ts"]).drop_duplicates()
-    buffer = BytesIO()
-    df.drop_duplicates().to_parquet(buffer, index=False, compression="brotli")
-    parquet_bytes = buffer.getvalue()
-    buffer.close()
-    return parquet_bytes
-
-
-def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str, qtype: str, header_tips: str) -> list[str]:
-    """Merge multiple txt files into one."""
-    if not paths:
-        return []
-    dates = sorted(dates)
-    paths = sorted(paths)
-    dates = [name for name in dates if any(name in path for path in paths)]
-    if len(dates) == 1:
-        date_name = dates[0]
-    elif all(len(x) == 10 for x in dates):  # all days  (不存在此情况)
-        date_name = dates[0][:7]
-    elif all(len(x) == 7 for x in dates):  # all months
-        date_name = dates[0][:4]
-    elif all(len(x) == 4 for x in dates):  # all years
-        date_name = dates[0] if len(set(dates)) == 1 else ""
-    else:
-        date_name = ""
-    keyword = f"“{keyword}”" if keyword else ""
-
-    if file_bytes(paths) < 10 * 1024 * 1024:  # 10 MB
-        texts = header_tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
-        save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_str(date_name)}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
-        Path(save_path).write_text(texts)
-        return [save_path]
-
-    num_merge = 2
-    date_chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
-    path_chunks = [paths[i : i + num_merge] for i in range(0, len(paths), num_merge)]
-    final_paths = []
-    for date_chunk, path_chunk in zip(date_chunks, path_chunks, strict=True):
-        texts = header_tips + "\n\n".join([Path(p).read_text() for p in sorted(path_chunk, reverse=True)]).strip()
-        date_name = "&".join(date_chunk)
-        save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_name}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
-        Path(save_path).write_text(texts)
-        final_paths.append(save_path)
-    return final_paths
+        return pd.DataFrame(data_list).astype({"ts": int, "content": str}).drop_duplicates()
+
+    # 弹幕
+    for x in data:
+        item = {}
+        if not all([x.get("timestamp"), x.get("authorName"), x.get("authorId")]):
+            continue
+        item["uid"] = x["authorId"]
+        item["ts"] = round(x["timestamp"] / 1000000)
+        item["name"] = x["authorName"].lstrip("@")
+        item["content"] = x.get("message") or ""
+        item["superchat"] = x.get("scAmount") or ""
+        data_list.append(item)
+    df = pd.DataFrame(data_list).astype({"uid": str, "ts": int, "name": str, "content": str, "superchat": str})
+    return df.drop_duplicates()
 
 
 @cache.memoize(ttl=86400 * 7)
@@ -209,8 +154,7 @@ async def live_date(date: str) -> str:
     return texts.rstrip()
 
 
-def file_bytes(paths: list[str] | str) -> int:
-    """Get file size in bytes."""
-    if isinstance(paths, str):
-        paths = [paths]
-    return sum([Path(path).stat().st_size for path in paths])
+async def count_entities(client: Client, markdown: str) -> int:
+    """Count the number of entities in markdown."""
+    parsed = await Markdown(client).parse(markdown, strict=True)
+    return len(glom(parsed, "entities", default=[]))