Commit 26574d1

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-02 03:46:13
refactor(danmu): re-organize danmu files
1 parent 6ef4460
src/danmu/entrypoint.py
@@ -0,0 +1,226 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import re
+from collections import defaultdict
+from datetime import timedelta
+from decimal import Decimal
+from pathlib import Path
+
+from glom import flatten
+from pyrogram.client import Client
+from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
+from pyrogram.types import Message
+
+from config import DANMU, PREFIX, TZ
+from danmu.r2 import query_r2
+from danmu.server import query_server
+from danmu.utils import file_bytes, merge_txt_files, to_usd
+from llm.utils import convert_html
+from messages.parser import parse_msg
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
+from others.emoji import CURRENCY
+from publish import publish_telegraph
+from utils import nowdt, number
+
+HELP = f"""📖**查询直播合订本**
+`{PREFIX.DANMU}` 使用说明:
+1.`{PREFIX.DANMU} + 日期`
+2.`{PREFIX.DANMU} + @用户名` (区分大小写, 且要去除空格)
+3.`{PREFIX.DANMU} + 关键词` (区分大小写)
+4.以上可组合使用, 但日期必须放前面, 关键词必须放后面
+示例:
+{BLOCKQUOTE_EXPANDABLE_DELIM}- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
+- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
+- `{PREFIX.DANMU} 你好`: 查询包含“你好”关键词的弹幕
+- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
+- `{PREFIX.DANMU} 2025-01-01 你好`: 查询2025-01-01日包含“你好”的弹幕
+- `{PREFIX.DANMU} 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的弹幕
+
+注意:
+- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
+- 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
+{BLOCKQUOTE_EXPANDABLE_END_DELIM}
+`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
+额外需注意的是 `{PREFIX.FAYAN}` 命令会忽略指定的 `@用户名`
+"""
+
+DANMU_TIPS = f"时间点为{TZ}时区"
+FAYAN_TIPS = "时间点并非真实时间, 而是相对开播时长"
+
+
+async def query_danmu(client: Client, message: Message, **kwargs):
+    info = parse_msg(message)
+    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
+        return
+    if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
+        await send2tg(client, message, texts=HELP, **kwargs)
+        return
+    if not DANMU.BASE_URL:
+        await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
+        return
+
+    qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
+    match_time, user, keyword, error = parse_queries(info["text"], qtype)
+    if error:
+        await send2tg(client, message, texts=error, **kwargs)
+        return
+
+    user = user if qtype == "弹幕" else DANMU.STREAMER
+
+    caption = f"📖**{qtype}记录**:"
+    if match_time:
+        caption += f"\n🕒日期: {match_time}"
+    caption += f"\n👤用户: {user}"
+    caption += f"\n🔤关键词: {keyword}"
+
+    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
+    kwargs["progress"] = status_msg
+
+    resp = {}
+    paths = []
+    count = 0
+    super_chats = defaultdict(Decimal)  # {"currency": amount}
+    engine_dates = await get_engine_with_dates(match_time, user, keyword, qtype)
+    for engine, dates in sorted(engine_dates.items(), reverse=True):
+        if engine == "r2":
+            resp = await query_r2(dates, user, keyword, caption, super_chats, qtype, **kwargs)
+        else:
+            resp = await query_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
+        paths.extend(resp.get("paths", []))
+        count += resp.get("count", 0)
+    if count == 0:
+        await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
+        return
+
+    header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
+    profit = ""
+    profit_usd = 0
+    if info["ctype"] not in ["GROUP", "SUPERGROUP"]:  # 在群组中不展示打赏榜
+        for currency, amount in sorted(super_chats.items()):
+            profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
+            profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
+        # if only "USD" ccy, do not include total USD
+        super_chats.pop("USD", None)  # remove "USD"
+        if profit_usd > 0 and super_chats:
+            profit += f"\n💵**总计**: {profit_usd:.2f} USD"
+
+    tips = f"{DANMU_TIPS}\n" if qtype == "弹幕" else f"{FAYAN_TIPS}\n"
+    if file_bytes(paths) < 20480:  #  20 KB, short length, try send as message directly
+        danmu = tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
+        final = f"{header}{profit}\n{blockquote(danmu)}"
+        if len(await smart_split(final)) == 1:
+            await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
+            return
+    caption += f"\n#️⃣{qtype}数: {count}"
+    caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
+    dates: list[str] = flatten(engine_dates.values())  # type: ignore
+    merged_paths = merge_txt_files(paths, dates, user, keyword, qtype, tips)
+    media = [{"document": path} for path in sorted(merged_paths)]
+    # less than 200K, add instant view
+    if file_bytes(merged_paths) < 204800:
+        texts = "\n".join([Path(path).read_text() for path in sorted(merged_paths, reverse=True)]).strip()
+        if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=convert_html(texts), author=user):
+            caption += f"\n⚡️[即时预览]({telegraph_url})"
+    await send2tg(client, message, texts=caption, media=media, **kwargs)
+    [Path(path).unlink(missing_ok=True) for path in paths]
+    [Path(path).unlink(missing_ok=True) for path in merged_paths]
+    await modify_progress(message=status_msg, del_status=True, **kwargs)
+
+
+def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
+    """Parse from users' query.
+
+    Returns:
+        match_time, user, keyword, error
+    """
+    match_time = ""
+    user = ""
+    keyword = ""
+    texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU)  # unify prefix
+    # 2025-01-01
+    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
+        match_time = matched.group(1)
+    # 2025-01
+    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts):  # noqa: SIM114
+        match_time = matched.group(1)
+    # 2025
+    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
+        match_time = matched.group(1)
+
+    # remove prefix + date
+    texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
+
+    # @张三 你好
+    # @张三
+    if matched := re.match(r"^@(\w+)(\s+)?", texts):
+        user = matched.group(1)
+
+    # remove user
+    keyword = re.sub(rf"^@{user}", "", texts).lstrip()
+
+    if qtype == "发言":
+        user = ""
+
+    if not any((match_time, user, keyword)):
+        return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.FAYAN}命令查看帮助"
+    return match_time, user, keyword, ""
+
+
+async def get_engine_with_dates(match_time: str, user: str = "", keyword: str = "", qtype: str = "弹幕") -> dict[str, list[str]]:
+    """获取查询引擎和对应查询日期.
+
+    对于弹幕记录根据日期进行判断:
+        不指定日期: 从server获取
+        YYYY-MM-DD: 不是今天或者昨天, 则从R2获取
+        YYYY-MM: 不是本月, 则从R2获取
+        YYYY: 如果获取全年的全部弹幕记录 (不指定user和keyword), 文件可能会非常大, 小内存VPS经常爆内存
+              如果指定了user和keyword, 则从server获取
+              如果YYYY为本年, 从server获取本月, 其余月份从R2获取
+              其余情况一律从R2获取
+              为避免此情况, 当查询为全年时, 改为按月份从R2获取
+              然后合并每2个月的记录为一个文件, 发送6个文件.
+
+    对于发言记录, 如果指定了日期, 从R2查询, 否则从server获取
+    Returns:
+        dict: {"server": [date-1, date-2], "r2": [date-3, date-4]}
+    """
+    now = nowdt(TZ)
+    allowed_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
+    allowed_years.append(str(now.year))
+    allowed_years = sorted(set(allowed_years))
+    if qtype == "发言":
+        if match_time:
+            return {"r2": [match_time]}
+        return {"server": allowed_years}
+
+    # 以下为匹配弹幕查询
+    if not match_time:
+        return {"server": allowed_years}
+    today = now.strftime("%Y-%m-%d")
+    year = now.year
+    month = f"{now.month:02d}"
+    yesterday = (now - timedelta(days=1)).strftime("%Y-%m-%d")
+    # YYYY-MM-DD
+    if len(match_time) == 10:
+        if match_time not in [today, yesterday]:
+            return {"r2": [match_time]}
+        return {"server": [match_time]}
+
+    # YYYY-MM
+    if len(match_time) == 7:
+        if match_time != f"{year}-{month}":
+            return {"r2": [match_time]}
+        return {"server": [f"{year}-{month}"]}  # 本月
+
+    # YYYY
+    if user or keyword:
+        return {"server": [match_time]}
+    if match_time == str(year):  # 今年
+        # 从server获取本月, 其余月份从R2获取
+        r2_months = [f"{year}-{mon:02d}" for mon in range(1, now.month)]
+        return {"server": [f"{year}-{month}"], "r2": r2_months}  # type: ignore
+    # 往年
+    r2_months = [f"{match_time}-{mon:02d}" for mon in range(1, 13)]
+    return {"r2": r2_months}
src/danmu/r2.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from collections import defaultdict
+from datetime import UTC, datetime
+from decimal import Decimal
+from zoneinfo import ZoneInfo
+
+import anyio
+from loguru import logger
+
+from config import DANMU, DOWNLOAD_DIR, TZ
+from danmu.utils import live_date
+from database import get_cf_r2
+from messages.progress import modify_progress
+from others.emoji import CURRENCY
+from utils import number
+
+
+async def query_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
+    """从R2获取记录.
+
+    日期从新到旧, 数据从旧到新
+    Returns:
+        {"paths": list[str], "count": int}
+    """
+    if not dates:
+        return {}
+    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
+    total_count = 0
+    paths = []
+    for date in sorted(dates, reverse=True):  # 日期从新到旧
+        logger.debug(f"Get {qtype} from R2: {date}")
+        data: dict[str, list[dict]] = await get_cf_r2(prefix + date, silent=True)
+        if not data:
+            continue
+        parsed = await parse_from_r2(data, user, keyword, super_chats, qtype)
+        count = parsed.get("count", 0)
+        if count == 0:
+            continue
+        total_count += count
+        texts = parsed.get("texts", "")
+        await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", force_update=True, **kwargs)
+        del parsed
+        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
+        async with await anyio.open_file(save_path, "w") as f:
+            await f.write(texts.strip())
+        paths.append(save_path)
+    return {"paths": paths, "count": total_count}
+
+
+async def parse_from_r2(data: dict[str, list[dict]], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
+    """解析从R2获取的记录.
+
+    日期从新到旧, 数据从旧到新
+    字段含义详见 `simplify_json` 函数
+
+    发言: {"t": "00:04:27", "m": "你好"}
+    弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
+
+
+    """
+    # ruff: noqa: PLW2901
+    texts = ""
+    count = 0
+    for date, items in sorted(data.items(), reverse=True):  # 日期从新到旧
+        if keyword:
+            items = [x for x in items if keyword in x.get("m", "")]
+        if user and qtype == "弹幕":
+            items = [x for x in items if x.get("u", "") == user]
+        sort_key = "s" if qtype == "弹幕" else "t"
+        items = sorted(items, key=lambda x: x[sort_key])  # 数据从旧到新
+        for idx, x in enumerate(items):
+            # only show the day once
+            day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
+            if qtype == "发言":
+                texts += f"\n{day}{x['t']}: {x['m'].strip()}"
+            else:
+                hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
+                sc_amount = ""
+                if super_chat := x.get("p"):
+                    currency, amount = super_chat.split(" ")
+                    super_chats[currency] += Decimal(amount)
+                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
+                msg = x.get("m", "")
+                username = "" if hide_name else "|" + x.get("u", "")
+                dt = datetime.fromtimestamp(x["s"], tz=UTC).astimezone(ZoneInfo(TZ))
+                texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
+            count += 1
+
+    return {"texts": texts.rstrip(), "count": count}
src/danmu/server.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from collections import defaultdict
+from datetime import UTC, datetime
+from decimal import Decimal
+from zoneinfo import ZoneInfo
+
+import anyio
+from loguru import logger
+
+from config import DANMU, DOWNLOAD_DIR, TZ
+from danmu.utils import live_date
+from messages.progress import modify_progress
+from networking import hx_req
+from others.emoji import CURRENCY
+from utils import number
+
+
+async def query_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
+    """从远程数据库获取记录.
+
+    Returns:
+        {"paths": list[str], "count": int}
+    """
+    if not dates:
+        return {}
+    paths = []
+    total_count = 0
+    for date in sorted(dates, reverse=True):
+        api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+        payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
+        if user and qtype == "弹幕":
+            payload["authorName"] = user
+        if keyword:
+            payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
+        payload["page"] = 1
+        logger.debug(f"Query {qtype}: {payload}")
+        resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+        parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
+        count = parsed.get("count", 0)
+        if count == 0:
+            continue
+        total_count += count
+        texts = parsed.get("texts", "")
+        await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", force_update=True, **kwargs)
+        while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
+            payload["page"] += 1
+            logger.debug(f"Query {qtype}: {payload}")
+            resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+            parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
+            total_count += parsed.get("count", 0)
+            texts += parsed.get("texts", "")
+            await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", **kwargs)
+        del parsed
+        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
+        async with await anyio.open_file(save_path, "w") as f:
+            await f.write(texts.strip())
+        paths.append(save_path)
+    return {"paths": paths, "count": total_count}
+
+
+async def parse_from_server(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
+    """解析从远程数据库获取的记录.
+
+    日期从新到旧, 数据从旧到新
+    """
+    texts = ""
+    count = 0
+    text_key = "message" if qtype == "弹幕" else "content"
+    if keyword:
+        data = [x for x in data if keyword in x.get(text_key, "")]
+    logged_date = []
+    if qtype == "发言":
+        data = [x for x in data if x.get("liveDate") and x.get("startTime")]
+        # group data by dates, and normalize startTime
+        grouped_data = defaultdict(list)
+        for x in data:
+            start_time = x["startTime"].split(",")[0]  # 3:53:23 or 03:53:23
+            if len(start_time.split(":")[0]) == 1:
+                x["startTime"] = f"0{start_time}"  # 03:53:23
+            else:
+                x["startTime"] = start_time
+            grouped_data[x["liveDate"][:10]].append(x)
+
+        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
+            for x in sorted(date_data, key=lambda x: x["startTime"]):  # 数据从旧到新
+                # only show the day once
+                if norm_date not in logged_date:
+                    stream_date = await live_date(norm_date)
+                    day = f"\n开播日期: {stream_date}\n"
+                    logged_date.append(norm_date)
+                else:
+                    day = ""
+                texts += f"\n{day}{x['startTime']}: {x['content'].strip()}"
+                count += 1
+    else:
+        hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
+        if user:
+            data = [x for x in data if x.get("authorName", "") == user]
+        if keyword:
+            data = [x for x in data if keyword in x.get("message", "")]
+        data = [x for x in data if x.get("timestamp")]  # ensure timestamp
+        # group data by dates, and normalize startTime
+        grouped_data = defaultdict(list)
+        for x in data:
+            grouped_data[x["liveDate"][:10]].append(x)
+
+        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
+            for x in sorted(date_data, key=lambda x: x["timestamp"]):  # 数据从旧到新
+                dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
+                sc_amount = ""
+                if super_chat := x.get("scAmount"):
+                    currency, amount = super_chat.split(" ")
+                    super_chats[currency] += Decimal(amount)
+                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
+                msg = x.get("message", "")
+                username = "" if hide_name else "|" + x.get("authorName", "")
+                # only show the day once
+                if norm_date not in logged_date:
+                    stream_date = await live_date(norm_date)
+                    day = f"\n开播日期: {stream_date}\n"
+                    logged_date.append(norm_date)
+                else:
+                    day = ""
+                texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
+                count += 1
+
+    return {"texts": texts.rstrip(), "count": count}
src/danmu/sync.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import re
+from collections import defaultdict
+from datetime import UTC, datetime
+from zoneinfo import ZoneInfo
+
+from glom import flatten, glom
+from loguru import logger
+
+from config import DANMU, TZ, cache
+from danmu.utils import merge_json, simplify_json
+from database import create_cf_d1_table, get_cf_r2, list_cf_r2, query_cf_d1, set_cf_r2
+from networking import hx_req
+from utils import nowdt
+
+
+@cache.memoize(ttl=3600)
+async def sync_server_to_r2(qtype: str) -> None:
+    if qtype not in ["弹幕", "发言"]:
+        logger.warning(f"Unknown query type: {qtype}")
+        return
+
+    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
+    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
+    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+
+    async def batch_sync(new_dates: dict[str, list[str]]):
+        for r2_key, server_dates in sorted(new_dates.items()):
+            results = []
+            for date in sorted(server_dates):
+                logger.trace(f"Query {qtype} date: {date}")
+                payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
+                resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+                if resp.get("count", 0) == 0:
+                    continue
+                logger.trace(f"Query {qtype} date: {date} - {resp['count']} results")
+                # append the first page
+                results.extend(resp["data"])
+                quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)  # type: ignore
+                n_pages = quotient + (1 if remainder else 0)
+                tasks = [
+                    hx_req(
+                        api_url,
+                        "POST",
+                        data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
+                        proxy=DANMU.PROXY,
+                        check_kv={"code": 0},
+                        silent=True,
+                    )
+                    for page in range(2, n_pages + 1)
+                ]
+                chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
+                for chunk in chunks:
+                    async with asyncio.Semaphore(concurrency):
+                        res = await asyncio.gather(*chunk, return_exceptions=True)
+                        data = flatten(glom(res, "*.data"))
+                        results.extend(data)
+            # if no results, create an empty key
+            metadata = {"empty": True} if not results else {}
+            await set_cf_r2(prefix + r2_key, simplify_json(results, qtype), metadata, compress=True, silent=True)
+
+    if not DANMU.R2_SYNC_ENABLE:
+        return
+    now = nowdt(TZ)
+    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
+        # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
+        return
+
+    monitor_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
+    monitor_years.append(str(now.year))
+    monitor_years = sorted(set(monitor_years))
+    r2 = await list_cf_r2(prefix=prefix)
+    r2_dates = [x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])]
+    has_update = False
+    for year in monitor_years:
+        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
+        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
+        resp = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)
+        new_dates_map = defaultdict(list)
+        # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
+        # norm dates to YYYY-MM-DD
+        for x in resp:
+            live_date: str = x["liveDate"]  # type: ignore
+            if live_date[:10] in r2_dates:
+                continue
+            if qtype == "弹幕" and live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
+                continue
+            new_dates_map[live_date[:10]].append(live_date)
+        if not new_dates_map:
+            continue
+        has_update = True
+        # sync per date
+        logger.debug(f"Sync {qtype}-{year}: {new_dates_map}")
+        await batch_sync(new_dates_map)
+        r2_dates.extend(new_dates_map.keys())  # update new dates to R2 dates
+
+        # sync whole month
+        year_months = {x[:7] for x in new_dates_map}
+        for year_month in sorted(year_months):
+            month_data = []
+            for d in range(1, 32):
+                date = f"{year_month}-{d:02d}"
+                if date not in r2_dates:
+                    continue
+                logger.trace(f"Get R2 date: {date}")
+                if r2 := await get_cf_r2(prefix + date, silent=True):
+                    month_data.append(r2)
+            await set_cf_r2(prefix + year_month, merge_json(month_data), compress=True, silent=True)
+            del month_data
+        r2_dates.extend(list(year_months))  # update new dates to R2 dates
+        # sync whole year
+        if qtype == "发言":  # only sync 发言 for whole year. 弹幕 is too large
+            year_data = []
+            for mon in range(1, 13):
+                date = f"{year}-{mon:02d}"
+                if date not in r2_dates:
+                    continue
+                logger.trace(f"Get R2 date: {date}")
+                if r2 := await get_cf_r2(prefix + date, silent=True):
+                    year_data.append(r2)
+            await set_cf_r2(prefix + year, merge_json(year_data), compress=True, silent=True)
+            del year_data
+
+    # sync all
+    if has_update and qtype == "发言":
+        all_data = []
+        for year in monitor_years:
+            logger.trace(f"Get R2 date: {year}")
+            if r2 := await get_cf_r2(prefix + year, silent=True):
+                all_data.append(r2)
+        await set_cf_r2(prefix + "all", merge_json(all_data), compress=True, silent=True)
+
+
+@cache.memoize(ttl=3600)
+async def sync_danmu_to_d1() -> None:
+    """Deprecated, D1 only allow 5M reads, 100K writes."""
+    # ruff: noqa: S608
+    concurrency = 200
+    now = nowdt(TZ)
+
+    async def batch_sync(danmu_list: list[dict], saved_items: list[str]) -> int:
+        sc_pattern = re.compile(r"^([A-Z]{3,}) (\d+(?:\.\d+)?)$")
+        db_columns = "id INTEGER PRIMARY KEY, time TEXT, uid INTEGER, user TEXT, text TEXT, sc_amt REAL NULL, sc_ccy TEXT NULL"
+        tasks = []
+        for danmu in danmu_list:
+            dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
+            table_name = dt.year
+            if f"{dt:%Y-%m-%d %H:%M:%S}{danmu['authorId']}" in saved_items:
+                continue
+            await create_cf_d1_table(table_name, db_columns, DANMU.D1_DATABASE)
+            columns = ["time", "uid", "user", "text"]
+            params = [f"{dt:%Y-%m-%d %H:%M:%S}", danmu["authorId"], danmu["authorName"], danmu["message"]]
+            if (amt := danmu.get("scAmount")) and (matched := sc_pattern.fullmatch(amt)):
+                sc_ccy = matched.group(1)
+                sc_amt = matched.group(2)
+                columns.extend(["sc_amt", "sc_ccy"])
+                params.extend([float(sc_amt), sc_ccy])
+            sql = f'INSERT INTO "{table_name}" ({", ".join(columns)}) VALUES ({", ".join(["?" for _ in range(len(columns))])});'
+            tasks.append(query_cf_d1(sql, db_name=DANMU.D1_DATABASE, params=params))
+        chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
+        processed = 0
+        for chunk in chunks:
+            async with asyncio.Semaphore(concurrency):
+                results = await asyncio.gather(*chunk, return_exceptions=True)
+                processed += len(results)
+                if not any(glom(results, "*.success")):
+                    logger.error(f"Sync danmu to d1 failed: {results}")
+                    break
+        return processed
+
+    sql = f'SELECT time, uid FROM "{now.year}";'
+    resp = await query_cf_d1(sql, db_name=DANMU.D1_DATABASE)
+    saved_items = [f"{x['time']}{x['uid']}" for x in flatten(glom(resp, "result.*.results", default=[]))]
+    page = 1
+    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": f"{now:%Y-%m-%d}", "message": "", "authorName": ""}
+    resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+    count = resp["count"]
+    processed = await batch_sync(resp["data"], saved_items)
+    while processed < count:
+        page += 1
+        payload["page"] = page
+        resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+        processed += await batch_sync(resp["data"], saved_items)
src/danmu/utils.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from collections import defaultdict
+from decimal import Decimal
+from pathlib import Path
+
+from glom import glom
+
+from config import DANMU, DOWNLOAD_DIR, PROXY, TOKEN, cache
+from networking import hx_req
+from price.coinmarketcap import get_cmc_fiat
+
+
+@cache.memoize(ttl=28800)
+async def to_usd(ccy: str) -> Decimal:
+    """Convert 1 unit ccy to USD."""
+    if ccy == "USD":
+        return Decimal(1)
+    fiats = await get_cmc_fiat()  # {"ccy": (name, id)}
+    if ccy not in fiats:
+        return Decimal()
+
+    ccy_id = fiats[ccy][1]
+    usd_id = fiats["USD"][1]
+    url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
+    headers = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
+    params = {"amount": 1, "id": ccy_id, "convert_id": usd_id}
+    response = await hx_req(url, params=params, headers=headers, proxy=PROXY.CRYPTO, check_keys=["data.quote"], check_kv={"status.error_code": 0})
+    rate = glom(response, f"data.quote.{usd_id}.price", default=Decimal())
+    return Decimal(rate)
+
+
+def date_str(date: str) -> str:
+    """格式化日期."""
+    if len(date) == 4:
+        date_name = f"{date}年"
+    elif len(date) == 7:
+        date_name = f"{date[:4]}年{date[5:7]}月"
+    elif len(date) == 10:
+        date_name = f"{date[:4]}年{date[5:7]}月{date[8:10]}日"
+    else:
+        date_name = date
+    return date_name
+
+
+def simplify_json(data: list[dict], qtype: str) -> dict:
+    """简化json数据, 去除无用字段.
+
+    发言: {'content': '你好',
+           'endTime': '00:04:28,399',
+           'id': 135366,
+           'liveDate': '2022-11-14',
+           'serial': 1,
+           'startTime': '00:04:27,566'
+         }
+
+    简化后发言: {"t": "00:04:27", "m": "你好"}
+
+    弹幕:
+        {'authorId': 'fakeid',
+        'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
+        'authorName': 'User',
+        'count': None,
+        'emotesCount': None,
+        'id': 1092980,
+        'liveDate': '2021-12-29',
+        'message': '你好',
+        'scAmount': None,
+        'scInfo': None,
+        'timeInSeconds': 18690.529,
+        'timeText': '5:11:30',
+        'timestamp': 1640799880050853}
+
+    简化后弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
+
+    Returns:
+        {"liveDate": list[简化后dict]}
+    """
+    if not data:
+        return {}
+    res = defaultdict(list)
+    if qtype == "发言":
+        for x in data:
+            if not all([x.get("startTime"), x.get("content")]):
+                continue
+            item = {}
+            if ts := x.get("startTime"):  # 3:53:23,123
+                ts = x["startTime"].split(",")[0]  # 3:53:23
+                if len(ts.split(":")[0]) == 1:
+                    ts = f"0{ts}"
+                item["t"] = ts
+            else:
+                continue
+            if x.get("content"):
+                item["m"] = x["content"]
+            res[x["liveDate"][:10]].append(item)
+        return res
+
+    for x in data:
+        item = {}
+        if not all([x.get("timestamp"), x.get("authorName")]):
+            continue
+        item["s"] = round(x["timestamp"] / 1000000)
+        item["u"] = x["authorName"]
+        if x.get("scAmount"):
+            item["p"] = x["scAmount"]
+        if x.get("message"):
+            item["m"] = x["message"]
+        res[x["liveDate"][:10]].append(item)
+    return res
+
+
+def merge_json(data: list[dict]) -> dict:
+    """合并简化后的json数据.
+
+    data: list[发言 | 弹幕]
+    发言: {"2021-12-29": [{"t": "00:04:27", "m": "你好"}]}
+    弹幕: {"2021-12-29": [{"s": 1640799880, "u": "User Name", "p": "USD 100", "m": "你好"}]}
+
+    Returns:
+        {"liveDate": list[dict]}
+    """
+    if not data:
+        return {}
+    res = defaultdict(list)
+    for chunk in data:
+        for live_date, items in chunk.items():
+            res[live_date[:10]].extend(items)
+    return res
+
+
+def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str, qtype: str, header_tips: str) -> list[str]:
+    """Merge multiple txt files into one."""
+    if not paths:
+        return []
+    dates = sorted(dates)
+    paths = sorted(paths)
+    if all(len(x) == 10 for x in dates):  # all days  (不存在此情况)
+        date_name = dates[0][:7]
+    elif all(len(x) == 7 for x in dates):  # all months
+        date_name = dates[0][:4]
+    elif all(len(x) == 4 for x in dates):  # all years
+        date_name = dates[0] if len(set(dates)) == 1 else ""
+
+    keyword = f"“{keyword}”" if keyword else ""
+
+    if file_bytes(paths) < 10 * 1024 * 1024:  # 10 MB
+        texts = header_tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
+        save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_str(date_name)}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
+        Path(save_path).write_text(texts)
+        return [save_path]
+
+    num_merge = 2
+    date_chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
+    path_chunks = [paths[i : i + num_merge] for i in range(0, len(paths), num_merge)]
+    final_paths = []
+    for date_chunk, path_chunk in zip(date_chunks, path_chunks, strict=True):
+        texts = header_tips + "\n\n".join([Path(p).read_text() for p in sorted(path_chunk, reverse=True)]).strip()
+        date_name = "&".join(date_chunk)
+        save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_name}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
+        Path(save_path).write_text(texts)
+        final_paths.append(save_path)
+    return final_paths
+
+
+@cache.memoize(ttl=600)
+async def get_live_info(year: str | int) -> dict:
+    params = {"liveDate": year}
+    api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
+    resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)  # type: ignore
+    dates = {x["liveDate"][:10] for x in resp}
+    info = {date: {"titles": [], "urls": []} for date in dates}
+    for x in resp:
+        info[x["liveDate"][:10]]["titles"].append(x["title"])
+        info[x["liveDate"][:10]]["urls"].append(x["url"])
+    return info
+
+
+async def live_date(date: str) -> str:
+    """Convert YYYY-MM-DD live date to markdown with title."""
+    live_info = await get_live_info(date[:4])
+    day = date[:10]
+    titles = glom(live_info, f"{day}.titles", default=[])
+    urls = glom(live_info, f"{day}.urls", default=[])
+    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
+    texts = date + "\n" + "\n".join(markdown)
+    return texts.rstrip()
+
+
+def file_bytes(paths: list[str] | str) -> int:
+    """Get file size in bytes."""
+    if isinstance(paths, str):
+        paths = [paths]
+    return sum([Path(path).stat().st_size for path in paths])
src/others/danmu.py
@@ -1,778 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import asyncio
-import re
-from collections import defaultdict
-from datetime import UTC, datetime, timedelta
-from decimal import Decimal
-from pathlib import Path
-from zoneinfo import ZoneInfo
-
-import anyio
-from glom import flatten, glom
-from loguru import logger
-from pyrogram.client import Client
-from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
-from pyrogram.types import Message
-
-from config import DANMU, DOWNLOAD_DIR, PREFIX, PROXY, TOKEN, TZ, cache
-from database import create_cf_d1_table, get_cf_r2, list_cf_r2, query_cf_d1, set_cf_r2
-from llm.utils import convert_html
-from messages.parser import parse_msg
-from messages.progress import modify_progress
-from messages.sender import send2tg
-from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
-from networking import hx_req
-from others.emoji import CURRENCY
-from price.coinmarketcap import get_cmc_fiat
-from publish import publish_telegraph
-from utils import nowdt, number
-
-HELP = f"""📖**查询直播合订本**
-`{PREFIX.DANMU}` 使用说明:
-1.`{PREFIX.DANMU} + 日期`
-2.`{PREFIX.DANMU} + @用户名` (区分大小写, 且要去除空格)
-3.`{PREFIX.DANMU} + #关键词` (区分大小写)
-4.以上可组合使用, 但日期必须放前面, 关键词必须放后面
-示例:
-{BLOCKQUOTE_EXPANDABLE_DELIM}- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
-- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
-- `{PREFIX.DANMU} #你好`: 查询包含“你好”关键词的弹幕
-- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
-- `{PREFIX.DANMU} 2025-01-01 #你好`: 查询2025-01-01日包含“你好”的弹幕
-- `{PREFIX.DANMU} 2025 @张三 #你好`: 查询2025年用户【张三】包含“你好”的弹幕
-
-注意:
-- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
-- 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
-{BLOCKQUOTE_EXPANDABLE_END_DELIM}
-`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
-额外需注意的是:
-`{PREFIX.FAYAN}` 命令会忽略指定的 `@用户名`
-`{PREFIX.FAYAN} + #关键词` 可以省略 `#` 号
-直接使用`{PREFIX.FAYAN} + 关键词` 即可
-"""
-
-DANMU_TIPS = f"时间点为{TZ}时区"
-FAYAN_TIPS = "时间点并非真实时间, 而是相对开播时长"
-
-
-async def query_danmu(client: Client, message: Message, **kwargs):
-    info = parse_msg(message)
-    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
-        return
-    if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
-        await send2tg(client, message, texts=HELP, **kwargs)
-        return
-    if not DANMU.BASE_URL:
-        await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
-        return
-
-    qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
-    match_time, user, keyword, error = parse_queries(info["text"], qtype)
-    if error:
-        await send2tg(client, message, texts=error, **kwargs)
-        return
-
-    user = user if qtype == "弹幕" else DANMU.STREAMER
-
-    caption = f"📖**{qtype}记录**:"
-    if match_time:
-        caption += f"\n🕒日期: {match_time}"
-    caption += f"\n👤用户: {user}"
-    caption += f"\n🔤关键词: {keyword}"
-
-    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
-    kwargs["progress"] = status_msg
-
-    resp = {}
-    paths = []
-    count = 0
-    super_chats = defaultdict(Decimal)  # {"currency": amount}
-    engine_dates = await get_engine_with_dates(match_time, user, keyword, qtype)
-    for engine, dates in sorted(engine_dates.items(), reverse=True):
-        if engine == "r2":
-            resp = await query_r2(dates, user, keyword, caption, super_chats, qtype, **kwargs)
-        else:
-            resp = await query_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
-        paths.extend(resp.get("paths", []))
-        count += resp.get("count", 0)
-    if count == 0:
-        await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
-        return
-
-    header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
-    profit = ""
-    profit_usd = 0
-    if info["ctype"] not in ["GROUP", "SUPERGROUP"]:  # 在群组中不展示打赏榜
-        for currency, amount in sorted(super_chats.items()):
-            profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
-            profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
-        # if only "USD" ccy, do not include total USD
-        super_chats.pop("USD", None)  # remove "USD"
-        if profit_usd > 0 and super_chats:
-            profit += f"\n💵**总计**: {profit_usd:.2f} USD"
-
-    tips = f"{DANMU_TIPS}\n" if qtype == "弹幕" else f"{FAYAN_TIPS}\n"
-    if file_bytes(paths) < 20480:  #  20 KB, short length, try send as message directly
-        danmu = tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
-        final = f"{header}{profit}\n{blockquote(danmu)}"
-        if len(await smart_split(final)) == 1:
-            await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
-            return
-    caption += f"\n#️⃣{qtype}数: {count}"
-    caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
-    dates: list[str] = flatten(engine_dates.values())  # type: ignore
-    merged_paths = merge_txt_files(paths, dates, user, keyword, qtype, tips)
-    media = [{"document": path} for path in sorted(merged_paths)]
-    # less than 200K, add instant view
-    if file_bytes(merged_paths) < 204800:
-        texts = "\n".join([Path(path).read_text() for path in sorted(merged_paths, reverse=True)]).strip()
-        if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=convert_html(texts), author=user):
-            caption += f"\n⚡️[即时预览]({telegraph_url})"
-    await send2tg(client, message, texts=caption, media=media, **kwargs)
-    [Path(path).unlink(missing_ok=True) for path in paths]
-    [Path(path).unlink(missing_ok=True) for path in merged_paths]
-    await modify_progress(message=status_msg, del_status=True, **kwargs)
-
-
-def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
-    """Parse from users' query.
-
-    Returns:
-        match_time, user, keyword, error
-    """
-    match_time = ""
-    user = ""
-    keyword = ""
-    texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU)  # unify prefix
-    # 2025-01-01
-    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
-        match_time = matched.group(1)
-    # 2025-01
-    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts):  # noqa: SIM114
-        match_time = matched.group(1)
-    # 2025
-    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
-        match_time = matched.group(1)
-
-    # remove prefix + date
-    texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
-
-    # @张三 #你好
-    # @张三
-    if matched := re.match(r"^@(\w+)(\s+)?", texts):
-        user = matched.group(1)
-    # remove user
-    texts = re.sub(rf"^@{user}\s+", "", texts).lstrip()
-
-    # #你好
-    if matched := re.match(r"^#(.*)", texts):
-        keyword = matched.group(1)
-
-    if qtype == "弹幕":
-        if not any((match_time, user, keyword)):
-            return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.FAYAN}命令查看帮助"
-        return match_time, user, keyword, ""
-
-    # 对于使用`PREFIX.FAYAN keyword`的发言查询
-    # 如果未匹配keyword, 则用户输入是忽略了keyword前的 #
-    if not keyword:
-        keyword = texts
-    return match_time, "", keyword, ""
-
-
-async def get_engine_with_dates(match_time: str, user: str = "", keyword: str = "", qtype: str = "弹幕") -> dict[str, list[str]]:
-    """获取查询引擎和对应查询日期.
-
-    对于弹幕记录根据日期进行判断:
-        不指定日期: 从server获取
-        YYYY-MM-DD: 不是今天或者昨天, 则从R2获取
-        YYYY-MM: 不是本月, 则从R2获取
-        YYYY: 如果获取全年的全部弹幕记录 (不指定user和keyword), 文件可能会非常大, 小内存VPS经常爆内存
-              如果指定了user和keyword, 则从server获取
-              如果YYYY为本年, 从server获取本月, 其余月份从R2获取
-              其余情况一律从R2获取
-              为避免此情况, 当查询为全年时, 改为按月份从R2获取
-              然后合并每2个月的记录为一个文件, 发送6个文件.
-
-    对于发言记录, 如果指定了日期, 从R2查询, 否则从server获取
-    Returns:
-        dict: {"server": [date-1, date-2], "r2": [date-3, date-4]}
-    """
-    now = nowdt(TZ)
-    allowed_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
-    allowed_years.append(str(now.year))
-    allowed_years = sorted(set(allowed_years))
-    if qtype == "发言":
-        if match_time:
-            return {"r2": [match_time]}
-        return {"server": allowed_years}
-
-    # 以下为匹配弹幕查询
-    if not match_time:
-        return {"server": allowed_years}
-    today = now.strftime("%Y-%m-%d")
-    year = now.year
-    month = f"{now.month:02d}"
-    yesterday = (now - timedelta(days=1)).strftime("%Y-%m-%d")
-    # YYYY-MM-DD
-    if len(match_time) == 10:
-        if match_time not in [today, yesterday]:
-            return {"r2": [match_time]}
-        return {"server": [match_time]}
-
-    # YYYY-MM
-    if len(match_time) == 7:
-        if match_time != f"{year}-{month}":
-            return {"r2": [match_time]}
-        return {"server": [f"{year}-{month}"]}  # 本月
-
-    # YYYY
-    if user or keyword:
-        return {"server": [match_time]}
-    if match_time == str(year):  # 今年
-        # 从server获取本月, 其余月份从R2获取
-        r2_months = [f"{year}-{mon:02d}" for mon in range(1, now.month)]
-        return {"server": [f"{year}-{month}"], "r2": r2_months}  # type: ignore
-    # 往年
-    r2_months = [f"{match_time}-{mon:02d}" for mon in range(1, 13)]
-    return {"r2": r2_months}
-
-
-async def query_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
-    """从远程数据库获取记录.
-
-    Returns:
-        {"paths": list[str], "count": int}
-    """
-    if not dates:
-        return {}
-    paths = []
-    total_count = 0
-    for date in sorted(dates, reverse=True):
-        api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
-        payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
-        if user and qtype == "弹幕":
-            payload["authorName"] = user
-        if keyword:
-            payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
-        payload["page"] = 1
-        logger.debug(f"Query {qtype}: {payload}")
-        resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-        parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
-        count = parsed.get("count", 0)
-        if count == 0:
-            continue
-        total_count += count
-        texts = parsed.get("texts", "")
-        await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", force_update=True, **kwargs)
-        while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
-            payload["page"] += 1
-            logger.debug(f"Query {qtype}: {payload}")
-            resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-            parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
-            total_count += parsed.get("count", 0)
-            texts += parsed.get("texts", "")
-            await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", **kwargs)
-        del parsed
-        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
-        async with await anyio.open_file(save_path, "w") as f:
-            await f.write(texts.strip())
-        paths.append(save_path)
-    return {"paths": paths, "count": total_count}
-
-
-async def parse_from_server(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
-    """解析从远程数据库获取的记录.
-
-    日期从新到旧, 数据从旧到新
-    """
-    texts = ""
-    count = 0
-    text_key = "message" if qtype == "弹幕" else "content"
-    if keyword:
-        data = [x for x in data if keyword in x.get(text_key, "")]
-    logged_date = []
-    if qtype == "发言":
-        data = [x for x in data if x.get("liveDate") and x.get("startTime")]
-        # group data by dates, and normalize startTime
-        grouped_data = defaultdict(list)
-        for x in data:
-            start_time = x["startTime"].split(",")[0]  # 3:53:23 or 03:53:23
-            if len(start_time.split(":")[0]) == 1:
-                x["startTime"] = f"0{start_time}"  # 03:53:23
-            else:
-                x["startTime"] = start_time
-            grouped_data[x["liveDate"][:10]].append(x)
-
-        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
-            for x in sorted(date_data, key=lambda x: x["startTime"]):  # 数据从旧到新
-                # only show the day once
-                if norm_date not in logged_date:
-                    stream_date = await live_date(norm_date)
-                    day = f"\n开播日期: {stream_date}\n"
-                    logged_date.append(norm_date)
-                else:
-                    day = ""
-                texts += f"\n{day}{x['startTime']}: {x['content'].strip()}"
-                count += 1
-    else:
-        hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
-        if user:
-            data = [x for x in data if x.get("authorName", "") == user]
-        if keyword:
-            data = [x for x in data if keyword in x.get("message", "")]
-        data = [x for x in data if x.get("timestamp")]  # ensure timestamp
-        # group data by dates, and normalize startTime
-        grouped_data = defaultdict(list)
-        for x in data:
-            grouped_data[x["liveDate"][:10]].append(x)
-
-        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
-            for x in sorted(date_data, key=lambda x: x["timestamp"]):  # 数据从旧到新
-                dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
-                sc_amount = ""
-                if super_chat := x.get("scAmount"):
-                    currency, amount = super_chat.split(" ")
-                    super_chats[currency] += Decimal(amount)
-                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
-                msg = x.get("message", "")
-                username = "" if hide_name else "|" + x.get("authorName", "")
-                # only show the day once
-                if norm_date not in logged_date:
-                    stream_date = await live_date(norm_date)
-                    day = f"\n开播日期: {stream_date}\n"
-                    logged_date.append(norm_date)
-                else:
-                    day = ""
-                texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
-                count += 1
-
-    return {"texts": texts.rstrip(), "count": count}
-
-
-async def query_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
-    """从R2获取记录.
-
-    日期从新到旧, 数据从旧到新
-    Returns:
-        {"paths": list[str], "count": int}
-    """
-    if not dates:
-        return {}
-    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
-    total_count = 0
-    paths = []
-    for date in sorted(dates, reverse=True):  # 日期从新到旧
-        logger.debug(f"Get {qtype} from R2: {date}")
-        data: dict[str, list[dict]] = await get_cf_r2(prefix + date, silent=True)
-        if not data:
-            continue
-        parsed = await parse_from_r2(data, user, keyword, super_chats, qtype)
-        count = parsed.get("count", 0)
-        if count == 0:
-            continue
-        total_count += count
-        texts = parsed.get("texts", "")
-        await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", force_update=True, **kwargs)
-        del parsed
-        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
-        async with await anyio.open_file(save_path, "w") as f:
-            await f.write(texts.strip())
-        paths.append(save_path)
-    return {"paths": paths, "count": total_count}
-
-
-async def parse_from_r2(data: dict[str, list[dict]], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
-    """解析从R2获取的记录.
-
-    日期从新到旧, 数据从旧到新
-    字段含义详见 `simplify_json` 函数
-
-    发言: {"t": "00:04:27", "m": "你好"}
-    弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
-
-
-    """
-    # ruff: noqa: PLW2901
-    texts = ""
-    count = 0
-    for date, items in sorted(data.items(), reverse=True):  # 日期从新到旧
-        if keyword:
-            items = [x for x in items if keyword in x.get("m", "")]
-        if user and qtype == "弹幕":
-            items = [x for x in items if x.get("u", "") == user]
-        sort_key = "s" if qtype == "弹幕" else "t"
-        items = sorted(items, key=lambda x: x[sort_key])  # 数据从旧到新
-        for idx, x in enumerate(items):
-            # only show the day once
-            day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
-            if qtype == "发言":
-                texts += f"\n{day}{x['t']}: {x['m'].strip()}"
-            else:
-                hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
-                sc_amount = ""
-                if super_chat := x.get("p"):
-                    currency, amount = super_chat.split(" ")
-                    super_chats[currency] += Decimal(amount)
-                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
-                msg = x.get("m", "")
-                username = "" if hide_name else "|" + x.get("u", "")
-                dt = datetime.fromtimestamp(x["s"], tz=UTC).astimezone(ZoneInfo(TZ))
-                texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
-            count += 1
-
-    return {"texts": texts.rstrip(), "count": count}
-
-
-@cache.memoize(ttl=3600)
-async def sync_server_to_r2(qtype: str) -> None:
-    if qtype not in ["弹幕", "发言"]:
-        logger.warning(f"Unknown query type: {qtype}")
-        return
-
-    concurrency = 1  # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
-    prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
-    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
-
-    async def batch_sync(new_dates: dict[str, list[str]]):
-        for r2_key, server_dates in sorted(new_dates.items()):
-            results = []
-            for date in sorted(server_dates):
-                logger.trace(f"Query {qtype} date: {date}")
-                payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
-                resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-                if resp.get("count", 0) == 0:
-                    continue
-                logger.trace(f"Query {qtype} date: {date} - {resp['count']} results")
-                # append the first page
-                results.extend(resp["data"])
-                quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY)  # type: ignore
-                n_pages = quotient + (1 if remainder else 0)
-                tasks = [
-                    hx_req(
-                        api_url,
-                        "POST",
-                        data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
-                        proxy=DANMU.PROXY,
-                        check_kv={"code": 0},
-                        silent=True,
-                    )
-                    for page in range(2, n_pages + 1)
-                ]
-                chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
-                for chunk in chunks:
-                    async with asyncio.Semaphore(concurrency):
-                        res = await asyncio.gather(*chunk, return_exceptions=True)
-                        data = flatten(glom(res, "*.data"))
-                        results.extend(data)
-            # if no results, create an empty key
-            metadata = {"empty": True} if not results else {}
-            await set_cf_r2(prefix + r2_key, simplify_json(results, qtype), metadata, compress=True, silent=True)
-
-    if not DANMU.R2_SYNC_ENABLE:
-        return
-    now = nowdt(TZ)
-    if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
-        # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
-        return
-
-    monitor_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
-    monitor_years.append(str(now.year))
-    monitor_years = sorted(set(monitor_years))
-    r2 = await list_cf_r2(prefix=prefix)
-    r2_dates = [x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])]
-    has_update = False
-    for year in monitor_years:
-        params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
-        api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
-        resp = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)
-        new_dates_map = defaultdict(list)
-        # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
-        # norm dates to YYYY-MM-DD
-        for x in resp:
-            live_date: str = x["liveDate"]  # type: ignore
-            if live_date[:10] in r2_dates:
-                continue
-            if qtype == "弹幕" and live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
-                continue
-            new_dates_map[live_date[:10]].append(live_date)
-        if not new_dates_map:
-            continue
-        has_update = True
-        # sync per date
-        logger.debug(f"Sync {qtype}-{year}: {new_dates_map}")
-        await batch_sync(new_dates_map)
-        r2_dates.extend(new_dates_map.keys())  # update new dates to R2 dates
-
-        # sync whole month
-        year_months = {x[:7] for x in new_dates_map}
-        for year_month in sorted(year_months):
-            month_data = []
-            for d in range(1, 32):
-                date = f"{year_month}-{d:02d}"
-                if date not in r2_dates:
-                    continue
-                logger.trace(f"Get R2 date: {date}")
-                if r2 := await get_cf_r2(prefix + date, silent=True):
-                    month_data.append(r2)
-            await set_cf_r2(prefix + year_month, merge_json(month_data), compress=True, silent=True)
-            del month_data
-        r2_dates.extend(list(year_months))  # update new dates to R2 dates
-        # sync whole year
-        if qtype == "发言":  # only sync 发言 for whole year. 弹幕 is too large
-            year_data = []
-            for mon in range(1, 13):
-                date = f"{year}-{mon:02d}"
-                if date not in r2_dates:
-                    continue
-                logger.trace(f"Get R2 date: {date}")
-                if r2 := await get_cf_r2(prefix + date, silent=True):
-                    year_data.append(r2)
-            await set_cf_r2(prefix + year, merge_json(year_data), compress=True, silent=True)
-            del year_data
-
-    # sync all
-    if has_update and qtype == "发言":
-        all_data = []
-        for year in monitor_years:
-            logger.trace(f"Get R2 date: {year}")
-            if r2 := await get_cf_r2(prefix + year, silent=True):
-                all_data.append(r2)
-        await set_cf_r2(prefix + "all", merge_json(all_data), compress=True, silent=True)
-
-
-@cache.memoize(ttl=28800)
-async def to_usd(ccy: str) -> Decimal:
-    """Convert 1 unit ccy to USD."""
-    if ccy == "USD":
-        return Decimal(1)
-    fiats = await get_cmc_fiat()  # {"ccy": (name, id)}
-    if ccy not in fiats:
-        return Decimal()
-
-    ccy_id = fiats[ccy][1]
-    usd_id = fiats["USD"][1]
-    url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
-    headers = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
-    params = {"amount": 1, "id": ccy_id, "convert_id": usd_id}
-    response = await hx_req(url, params=params, headers=headers, proxy=PROXY.CRYPTO, check_keys=["data.quote"], check_kv={"status.error_code": 0})
-    rate = glom(response, f"data.quote.{usd_id}.price", default=Decimal())
-    return Decimal(rate)
-
-
-def date_str(date: str) -> str:
-    """格式化日期."""
-    if len(date) == 4:
-        date_name = f"{date}年"
-    elif len(date) == 7:
-        date_name = f"{date[:4]}年{date[5:7]}月"
-    elif len(date) == 10:
-        date_name = f"{date[:4]}年{date[5:7]}月{date[8:10]}日"
-    else:
-        date_name = date
-    return date_name
-
-
-def simplify_json(data: list[dict], qtype: str) -> dict:
-    """简化json数据, 去除无用字段.
-
-    发言: {'content': '你好',
-           'endTime': '00:04:28,399',
-           'id': 135366,
-           'liveDate': '2022-11-14',
-           'serial': 1,
-           'startTime': '00:04:27,566'
-         }
-
-    简化后发言: {"t": "00:04:27", "m": "你好"}
-
-    弹幕:
-        {'authorId': 'fakeid',
-        'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
-        'authorName': 'User',
-        'count': None,
-        'emotesCount': None,
-        'id': 1092980,
-        'liveDate': '2021-12-29',
-        'message': '你好',
-        'scAmount': None,
-        'scInfo': None,
-        'timeInSeconds': 18690.529,
-        'timeText': '5:11:30',
-        'timestamp': 1640799880050853}
-
-    简化后弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
-
-    Returns:
-        {"liveDate": list[简化后dict]}
-    """
-    if not data:
-        return {}
-    res = defaultdict(list)
-    if qtype == "发言":
-        for x in data:
-            if not all([x.get("startTime"), x.get("content")]):
-                continue
-            item = {}
-            if ts := x.get("startTime"):  # 3:53:23,123
-                ts = x["startTime"].split(",")[0]  # 3:53:23
-                if len(ts.split(":")[0]) == 1:
-                    ts = f"0{ts}"
-                item["t"] = ts
-            else:
-                continue
-            if x.get("content"):
-                item["m"] = x["content"]
-            res[x["liveDate"][:10]].append(item)
-        return res
-
-    for x in data:
-        item = {}
-        if not all([x.get("timestamp"), x.get("authorName")]):
-            continue
-        item["s"] = round(x["timestamp"] / 1000000)
-        item["u"] = x["authorName"]
-        if x.get("scAmount"):
-            item["p"] = x["scAmount"]
-        if x.get("message"):
-            item["m"] = x["message"]
-        res[x["liveDate"][:10]].append(item)
-    return res
-
-
-def merge_json(data: list[dict]) -> dict:
-    """合并简化后的json数据.
-
-    data: list[发言 | 弹幕]
-    发言: {"2021-12-29": [{"t": "00:04:27", "m": "你好"}]}
-    弹幕: {"2021-12-29": [{"s": 1640799880, "u": "User Name", "p": "USD 100", "m": "你好"}]}
-
-    Returns:
-        {"liveDate": list[dict]}
-    """
-    if not data:
-        return {}
-    res = defaultdict(list)
-    for chunk in data:
-        for live_date, items in chunk.items():
-            res[live_date[:10]].extend(items)
-    return res
-
-
-def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str, qtype: str, header_tips: str) -> list[str]:
-    """Merge multiple txt files into one."""
-    if not paths:
-        return []
-    dates = sorted(dates)
-    paths = sorted(paths)
-    if all(len(x) == 10 for x in dates):  # all days  (不存在此情况)
-        date_name = dates[0][:7]
-    elif all(len(x) == 7 for x in dates):  # all months
-        date_name = dates[0][:4]
-    elif all(len(x) == 4 for x in dates):  # all years
-        date_name = dates[0] if len(set(dates)) == 1 else ""
-
-    keyword = f"“{keyword}”" if keyword else ""
-
-    if file_bytes(paths) < 10 * 1024 * 1024:  # 10 MB
-        texts = header_tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
-        save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_str(date_name)}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
-        Path(save_path).write_text(texts)
-        return [save_path]
-
-    num_merge = 2
-    date_chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
-    path_chunks = [paths[i : i + num_merge] for i in range(0, len(paths), num_merge)]
-    final_paths = []
-    for date_chunk, path_chunk in zip(date_chunks, path_chunks, strict=True):
-        texts = header_tips + "\n\n".join([Path(p).read_text() for p in sorted(path_chunk, reverse=True)]).strip()
-        date_name = "&".join(date_chunk)
-        save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_name}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
-        Path(save_path).write_text(texts)
-        final_paths.append(save_path)
-    return final_paths
-
-
-@cache.memoize(ttl=600)
-async def get_live_info(year: str | int) -> dict:
-    params = {"liveDate": year}
-    api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
-    resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)  # type: ignore
-    dates = {x["liveDate"][:10] for x in resp}
-    info = {date: {"titles": [], "urls": []} for date in dates}
-    for x in resp:
-        info[x["liveDate"][:10]]["titles"].append(x["title"])
-        info[x["liveDate"][:10]]["urls"].append(x["url"])
-    return info
-
-
-async def live_date(date: str) -> str:
-    """Convert YYYY-MM-DD live date to markdown with title."""
-    live_info = await get_live_info(date[:4])
-    day = date[:10]
-    titles = glom(live_info, f"{day}.titles", default=[])
-    urls = glom(live_info, f"{day}.urls", default=[])
-    markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
-    texts = date + "\n" + "\n".join(markdown)
-    return texts.rstrip()
-
-
-def file_bytes(paths: list[str] | str) -> int:
-    """Get file size in bytes."""
-    if isinstance(paths, str):
-        paths = [paths]
-    return sum([Path(path).stat().st_size for path in paths])
-
-
-@cache.memoize(ttl=3600)
-async def sync_danmu_to_d1() -> None:
-    """Deprecated, D1 only allow 5M reads, 100K writes."""
-    # ruff: noqa: S608
-    concurrency = 200
-    now = nowdt(TZ)
-
-    async def batch_sync(danmu_list: list[dict], saved_items: list[str]) -> int:
-        sc_pattern = re.compile(r"^([A-Z]{3,}) (\d+(?:\.\d+)?)$")
-        db_columns = "id INTEGER PRIMARY KEY, time TEXT, uid INTEGER, user TEXT, text TEXT, sc_amt REAL NULL, sc_ccy TEXT NULL"
-        tasks = []
-        for danmu in danmu_list:
-            dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
-            table_name = dt.year
-            if f"{dt:%Y-%m-%d %H:%M:%S}{danmu['authorId']}" in saved_items:
-                continue
-            await create_cf_d1_table(table_name, db_columns, DANMU.D1_DATABASE)
-            columns = ["time", "uid", "user", "text"]
-            params = [f"{dt:%Y-%m-%d %H:%M:%S}", danmu["authorId"], danmu["authorName"], danmu["message"]]
-            if (amt := danmu.get("scAmount")) and (matched := sc_pattern.fullmatch(amt)):
-                sc_ccy = matched.group(1)
-                sc_amt = matched.group(2)
-                columns.extend(["sc_amt", "sc_ccy"])
-                params.extend([float(sc_amt), sc_ccy])
-            sql = f'INSERT INTO "{table_name}" ({", ".join(columns)}) VALUES ({", ".join(["?" for _ in range(len(columns))])});'
-            tasks.append(query_cf_d1(sql, db_name=DANMU.D1_DATABASE, params=params))
-        chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
-        processed = 0
-        for chunk in chunks:
-            async with asyncio.Semaphore(concurrency):
-                results = await asyncio.gather(*chunk, return_exceptions=True)
-                processed += len(results)
-                if not any(glom(results, "*.success")):
-                    logger.error(f"Sync danmu to d1 failed: {results}")
-                    break
-        return processed
-
-    sql = f'SELECT time, uid FROM "{now.year}";'
-    resp = await query_cf_d1(sql, db_name=DANMU.D1_DATABASE)
-    saved_items = [f"{x['time']}{x['uid']}" for x in flatten(glom(resp, "result.*.results", default=[]))]
-    page = 1
-    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": f"{now:%Y-%m-%d}", "message": "", "authorName": ""}
-    resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-    count = resp["count"]
-    processed = await batch_sync(resp["data"], saved_items)
-    while processed < count:
-        page += 1
-        payload["page"] = page
-        resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-        processed += await batch_sync(resp["data"], saved_items)
src/handler.py
@@ -9,6 +9,7 @@ from pyrogram.types import Message
 from asr.voice_recognition import voice_to_text
 from bridge.ocr import send_to_ocr_bridge
 from config import ENABLE, PREFIX, PROXY
+from danmu.entrypoint import query_danmu
 from database import del_db
 from llm.gpt import gpt_response
 from llm.summary import ai_summary
@@ -16,7 +17,6 @@ from messages.parser import parse_msg
 from messages.sender import send2tg
 from messages.utils import equal_prefix, startswith_prefix
 from networking import match_social_media_link
-from others.danmu import query_danmu
 from others.download_external import download_url_in_message
 from others.extract_audio import extract_audio_file
 from others.raw_img_file import convert_raw_img_file
src/main.py
@@ -22,11 +22,11 @@ from bridge.chartimg import forward_chartimg_results
 from bridge.ocr import forward_ocr_results
 from bridge.social import forward_social_media_results
 from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
+from danmu.sync import sync_server_to_r2
 from handler import handle_social_media, handle_utilities
 from llm.summary import daily_summary
 from llm.utils import clean_gemini_files
 from messages.parser import parse_msg
-from others.danmu import sync_server_to_r2
 from others.podcast import summary_pods
 from permission import check_permission
 from price.entrypoint import match_symbol_category