Commit bb5a987

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-28 18:27:55
feat(danmu): prefer CF-R2 database
1 parent 4ef9709
Changed files (5)
src/messages/preprocess.py
@@ -146,7 +146,13 @@ async def warp_media_group(media: list[dict], caption: str = "", *, caption_abov
     if len(media) > 10:
         logger.warning(f"Too many media files, number of media: {len(media)}")
         media = media[:10]
-    # add caption to the first item
+    # add caption to the first item, except for all media are documents
+    # If all media are documents, caption will be added to the last item
+    if all(x.get("document") for x in media):
+        group = [InputMediaDocument(x["document"]) for x in media[:-1]]
+        group.append(InputMediaDocument(media[-1]["document"], caption=caption))
+        return group
+
     if media[0].get("photo"):
         group.append(InputMediaPhoto(media[0]["photo"], caption=caption, show_caption_above_media=caption_above))
     elif media[0].get("video"):
src/messages/utils.py
@@ -68,6 +68,8 @@ def summay_media(media: list[dict]) -> str:
             msg += f"\n🎬P{idx + 1}{filesize(value)}"
         elif value := info.get("audio"):
             msg += f"\n🎧P{idx + 1}{filesize(value)}"
+        elif value := info.get("document"):
+            msg += f"\n💾P{idx + 1}{filesize(value)}"
     return msg.strip()
 
 
src/others/danmu.py
@@ -3,18 +3,19 @@
 import asyncio
 import re
 from collections import defaultdict
-from datetime import UTC, datetime
+from datetime import UTC, datetime, timedelta
 from decimal import Decimal
-from io import BytesIO
+from pathlib import Path
 from zoneinfo import ZoneInfo
 
+import anyio
 from glom import flatten, glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.types import Message
 
-from config import DANMU, PREFIX, PROXY, TEXT_LENGTH, TOKEN, TZ, cache
-from database import create_cf_d1_table, list_cf_r2, query_cf_d1, set_cf_r2
+from config import DANMU, DOWNLOAD_DIR, PREFIX, PROXY, TEXT_LENGTH, TOKEN, TZ, cache
+from database import create_cf_d1_table, get_cf_r2, list_cf_r2, query_cf_d1, set_cf_r2
 from messages.parser import parse_msg
 from messages.progress import modify_progress
 from messages.sender import send2tg
@@ -26,7 +27,7 @@ from utils import nowdt, number
 
 HELP = f"""📖**查询弹幕记录**
 使用说明:
-1. `{PREFIX.DANMU}` + 日期 (YYYY-MM-DD或YYYY)
+1. `{PREFIX.DANMU}` + 日期 (YYYY-MM-DD)
 2. `{PREFIX.DANMU}` + @用户名 (区分大小写, 且要去除空格)
 3. `{PREFIX.DANMU}` + #关键词 (区分大小写)
 4. 以上可组合使用, 但日期必须放前面, 关键词必须放后面
@@ -34,6 +35,7 @@ HELP = f"""📖**查询弹幕记录**
 
 示例:
 - `{PREFIX.DANMU} 2025-01-01`: 查询2025-01-01日的弹幕
+- `{PREFIX.DANMU} 2025-01`: 查询2025年1月份的弹幕
 - `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
 - `{PREFIX.DANMU} @张三`: 查询用户"张三"的弹幕
 - `{PREFIX.DANMU} #你好`: 查询包含"你好"关键词的弹幕
@@ -48,9 +50,8 @@ HELP = f"""📖**查询弹幕记录**
 """
 
 
-async def query_danmu(client: Client, message: Message, *, show_name: bool = True, **kwargs):
+async def query_danmu(client: Client, message: Message, **kwargs):
     info = parse_msg(message)
-    texts = info["text"]
     if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU]):
         return
     if equal_prefix(message.text, prefix=[PREFIX.DANMU]):
@@ -59,12 +60,84 @@ async def query_danmu(client: Client, message: Message, *, show_name: bool = Tru
     if not DANMU.BASE_URL:
         await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
         return
+    match_time, user, keyword = parse_queries(info["text"])
+    if not any((match_time, user, keyword)):  # NO Match
+        await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}命令查看帮助", **kwargs)
+        return
+    if not match_time:
+        match_time = str(nowdt(TZ).year)  # this year
+
+    # payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": match_time, "message": "", "authorName": ""}
+    caption = f"📖**弹幕记录**:\n🕒日期: {match_time}"
+    if user:
+        caption += f"\n👤用户: {user}"
+
+    if keyword:
+        caption += f"\n🔤关键词: {keyword}"
+
+    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
+    kwargs["progress"] = status_msg
+
+    resp = {}
+    paths = []
+    count = 0
+    super_chats = defaultdict(Decimal)  # {"currency": amount}
+    engine_dates = await query_engine_with_dates(match_time, user, keyword)
+    for engine, dates in engine_dates.items():
+        if engine == "r2":
+            resp = await query_danmu_from_r2(dates, user, keyword, caption, super_chats, **kwargs)
+            paths.extend(resp.get("paths", []))
+        else:
+            resp = await query_danmu_from_server(dates, user, keyword, caption, super_chats, **kwargs)
+            paths.append(resp["path"]) if resp.get("path") else None
+        count += resp.get("count", 0)
+    if count == 0:
+        await modify_progress(text=caption + "\n⚠️未匹配任何弹幕", force_update=True, **kwargs)
+        return
+    header = f"📖**弹幕记录 ({user})**:" if user else "📖**弹幕记录**:"
+
+    profit = ""
+    profit_usd = 0
+    if info["ctype"] not in ["GROUP", "SUPERGROUP"]:  # 在群组中不展示打赏榜
+        for currency, amount in sorted(super_chats.items()):
+            profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
+            profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
+        # if only "USD" ccy, do not include total USD
+        super_chats.pop("USD", None)  # remove "USD"
+        if profit_usd > 0 and super_chats:
+            profit += f"\n💵**总计**: {profit_usd:.2f} USD"
+
+    total_bytes = sum([Path(path).stat().st_size for path in paths])
+    if total_bytes < 10000:  #  short length, try send as message directly
+        danmu = "".join([Path(path).read_text() for path in paths]).strip()
+        final = f"{header}{profit}\n{blockquote(danmu)}"
+        if len(final) < TEXT_LENGTH - 10:
+            await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
+            return
+
+    caption += f"\n#️⃣弹幕数: {count}"
+    caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
+    media = [{"document": path} for path in sorted(paths)]
+    await send2tg(client, message, texts=caption, media=media, **kwargs)
+    [Path(path).unlink(missing_ok=True) for path in paths]
+    await modify_progress(message=status_msg, del_status=True, **kwargs)
+
+
+def parse_queries(texts: str) -> tuple[str, str, str]:
+    """Parse from users' query.
+
+    Returns:
+        match_time, user, keyword
+    """
     match_time = ""
     user = ""
     keyword = ""
     # 2025-01-01
     if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
         match_time = matched.group(1)
+    # 2025-01
+    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts):  # noqa: SIM114
+        match_time = matched.group(1)
     # 2025
     elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
         match_time = matched.group(1)
@@ -82,100 +155,223 @@ async def query_danmu(client: Client, message: Message, *, show_name: bool = Tru
     # #你好
     if matched := re.match(r"^#(.*)", texts):
         keyword = matched.group(1)
+    return match_time, user, keyword
 
-    if not any((match_time, user, keyword)):  # NO Match
-        await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}命令查看帮助", **kwargs)
-        return
-    if not match_time:
-        match_time = nowdt(TZ).year  # this year
 
-    page = 1
-    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": match_time, "message": "", "authorName": ""}
-    caption = f"📖**弹幕记录**:\n🕒日期: {match_time}"
+async def query_engine_with_dates(match_time: str, user: str = "", keyword: str = "") -> dict[str, list[str]]:
+    """获取查询引擎和对应查询日期.
+
+    日期:
+        YYYY-MM-DD: 不是今天或者昨天, 则从R2获取
+        YYYY-MM: 不是本月, 则从R2获取
+        YYYY: 如果获取全年的全部聊天记录 (不指定user和keyword), 文件可能会非常大, 小内存VPS经常爆内存
+              如果指定了user和keyword, 则从server获取
+              如果YYYY为本年, 从server获取本月, 其余月份从R2获取
+              其余情况一律从R2获取
+              为避免此情况, 当查询为全年时, 改为按月份从R2获取
+              然后合并每2个月的记录为一个文件, 发送6个文件.
+
+    Returns:
+        dict: {"server": [date-1, date-2], "r2": [date-3, date-4]}
+    """
+    now = nowdt(TZ)
+    today = now.strftime("%Y-%m-%d")
+    year = now.year
+    month = f"{now.month:02d}"
+    yesterday = (now - timedelta(days=1)).strftime("%Y-%m-%d")
+    # YYYY-MM-DD
+    if len(match_time) == 10:
+        if match_time not in [today, yesterday]:
+            return {"r2": [match_time]}
+        return {"server": [match_time]}
+
+    # YYYY-MM
+    if len(match_time) == 7:
+        if match_time != f"{year}-{month}":
+            return {"r2": [match_time]}
+        # 本月
+        live_dates = await get_live_dates_from_server(year)
+        server_dates = flatten([v for k, v in live_dates.items() if k.startswith(f"{year}-{month}")])
+        return {"server": sorted(server_dates)}  # type: ignore
+
+    # YYYY
+    if user or keyword:
+        return {"server": [match_time]}
+    if match_time == str(year):  # 今年
+        # 从server获取本月, 其余月份从R2获取
+        live_dates = await get_live_dates_from_server(year)
+        server_dates = flatten([v for k, v in live_dates.items() if k.startswith(f"{year}-{month}")])
+        r2_months = [f"{year}-{mon:02d}" for mon in range(1, now.month)]
+        return {"server": sorted(server_dates), "r2": r2_months}  # type: ignore
+    r2_months = [f"{match_time}-{mon:02d}" for mon in range(1, 13)]
+    return {"r2": r2_months}
+
+
+async def query_danmu_from_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, **kwargs) -> dict:
+    """从远程数据库获取弹幕记录.
+
+    一般情况下, dates列表中只有一个日期.
+    只有在获取本月所有记录时, dates列表才会有多个日期. (详见`query_engine_with_dates`函数的docstrings)
+
+    当只有单日期时, 直接获取弹幕记录.
+    当dates为本月的列表时, 不要分别查询每个日期, 而是直接查询最近所有弹幕, 截断到本月最早的日期.
+
+    Returns:
+        {"path": str, "count": int}
+    """
+    if not dates:
+        return {}
+    dates = sorted(dates)
+    cut_date = dates[0][:10]  # YYYY-MM-DD
+    if any(x[:7] != cut_date[:7] for x in dates):
+        logger.warning(f"查询日期列表有误, 应该全部为本月日期。当前查询: {dates}")
+        return {}
+    query_date = cut_date if len(dates) == 1 else cut_date[:4]  # year
+    payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": query_date}
     if user:
         payload["authorName"] = user
-        caption += f"\n👤用户: {user}"
-        show_name = False
     if keyword:
         payload["message"] = keyword
-        caption += f"\n🔤关键词: {keyword}"
+    payload["page"] = 1
     logger.debug(f"Query: {payload}")
-    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
-    kwargs["progress"] = status_msg
     resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-    count = resp["count"]
-    if count == 0:
-        await modify_progress(message=status_msg, text=caption + "\n⚠️未找到匹配弹幕", force_update=True, **kwargs)
-        return
-    header = f"📖**弹幕记录 ({user})**:" if user else "📖**弹幕记录**:"
-    super_chats = defaultdict(Decimal)  # {"currency": amount}
-
-    parsed = parse_danmu(resp["data"], super_chats, show_name=show_name)
+    parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats, cut_date)
+    processed = parsed["count"]
+    if processed == 0:
+        return {}
     danmu = parsed["danmu"]
-    processed = parsed["num_messages"]
-    await modify_progress(message=status_msg, text=caption + f"\n⏳已获取 {processed} 条弹幕", force_update=True, **kwargs)
-    while processed < count:
-        page += 1
-        payload["page"] = page
+    await modify_progress(text=caption + f"\n⏳已获取 {processed} 条弹幕", force_update=True, **kwargs)
+    while len(resp.get("data", [])) == payload["limit"] and parsed["count"] and not parsed["force_stop"]:
+        payload["page"] += 1
         logger.debug(f"Query: {payload}")
         resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-        parsed = parse_danmu(resp["data"], super_chats, show_name=show_name)
+        parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats, cut_date)
         danmu += parsed["danmu"]
-        processed += parsed["num_messages"]
-        await modify_progress(message=status_msg, text=caption + f"\n⏳已获取 {processed} 条弹幕", **kwargs)
+        processed += parsed["count"]
+        await modify_progress(text=caption + f"\n⏳已获取 {processed} 条弹幕", **kwargs)
 
-    profit = ""
-    profit_usd = 0
-    for currency, amount in sorted(super_chats.items()):
-        profit += f"\n{CURRENCY.get(currency, '❔')}**{currency}**: {number(amount)}"
-        profit_usd += amount * (await to_usd(currency))
-    # if only "USD" ccy, do not include total USD
-    super_chats.pop("USD", None)  # remove "USD"
-    if profit_usd > 0 and super_chats:
-        profit += f"\n💵**总计**: {profit_usd:.2f} USD"
-
-    final = f"{header}{profit}\n{blockquote(danmu)}"
-    if len(final) < TEXT_LENGTH - 10:
-        await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
-    else:
-        caption += f"\n#️⃣弹幕数: {count}"
-        caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
-        with BytesIO(danmu.encode("utf-8")) as f:
-            await client.send_document(info["cid"], f, file_name=f"{user}{match_time}{keyword}弹幕记录.txt", caption=caption)
-        await modify_progress(message=status_msg, del_status=True, **kwargs)
-
-
-def parse_danmu(data: list[dict], super_chats: defaultdict, *, show_name: bool = True) -> dict:
+    # long texts, save txt
+    date_name = f"{cut_date[:4]}年{cut_date[5:7]}月" if len(dates) > 1 else cut_date
+    save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}弹幕记录.txt"
+    async with await anyio.open_file(save_path, "w") as f:
+        await f.write(danmu)
+    return {"path": save_path, "count": processed}
+
+
+async def query_danmu_from_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, **kwargs) -> dict:
+    """从R2获取弹幕记录.
+
+    一般情况下, dates列表中只有一个日期.
+    只有在获取全年所有月份时, dates列表才会有多个日期. (详见`query_engine_with_dates`函数的docstrings)
+
+    当只有单日期时, 直接获取弹幕记录.
+    当dates为全年的月份列表时, 合并每两个相邻月份的弹幕记录为一个文件, 最后发送6个文件.
+
+    Returns:
+        {"paths": list[str], "count": int}
+    """
+    if not dates:
+        return {}
+    num_merge = 2
+    chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
+    count = 0
+    paths = []
+    for chunk in chunks:
+        danmu = ""
+        for date in chunk:
+            logger.debug(f"Get Danmu from R2: {date}")
+            data: list[dict] = await get_cf_r2(DANMU.R2_PREFIX + date, silent=True)  # type: ignore
+            if not data:
+                continue
+            if user:
+                data = [x for x in data if x.get("authorName", "") == user]
+            if keyword:
+                data = [x for x in data if keyword in x.get("message", "")]
+            if not data:
+                continue
+            parsed = parse_danmu(data, user, keyword, super_chats)
+            danmu += parsed["danmu"]
+            count += parsed["count"]
+            await modify_progress(text=caption + f"\n⏳已获取 {count} 条弹幕", force_update=True, **kwargs)
+        if not danmu:
+            continue
+        year = chunk[0][:4]
+        months = [f"{x[5:7]}月" for x in chunk]
+        month_name = "&".join(months)
+        date_name = f"{year}年{month_name}"
+        save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}弹幕记录.txt"
+        async with await anyio.open_file(save_path, "w") as f:
+            await f.write(danmu)
+        paths.append(save_path)
+    return {"paths": paths, "count": count}
+
+
+def parse_danmu(data: list[dict], user: str, keyword: str, super_chats: defaultdict, cut_date: str = "") -> dict:
     msg = ""
+    hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
+    if user:
+        data = [x for x in data if x.get("authorName", "") == user]
+    if keyword:
+        data = [x for x in data if keyword in x.get("message", "")]
+    data = [x for x in data if x.get("timestamp")]  # ensure timestamp
+    count = 0
+    force_stop = False
     for danmu in sorted(data, key=lambda x: x["timestamp"], reverse=True):  # new to old
         dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
+        if cut_date and f"{dt:%Y-%m-%d}" < cut_date:
+            force_stop = True
+            break
         sc_amount = ""
         if super_chat := danmu.get("scAmount"):
             currency, amount = super_chat.split(" ")
             super_chats[currency] += Decimal(amount)
-            sc_amount = f" ({CURRENCY.get(currency, '❔')}{super_chat})"
-        msg += f"\n{dt:%m-%d %H:%M:%S}|{danmu['authorName']}{sc_amount}: {danmu['message']}" if show_name else f"\n{dt:%m-%d %H:%M:%S}{sc_amount}: {danmu['message']}"
-    return {"danmu": msg.strip(), "num_messages": len(data)}
+            sc_amount = f" ({CURRENCY[currency]}{super_chat})" if currency in CURRENCY else ""
+        texts = danmu.get("message", "")
+        username = danmu.get("authorName", "")
+        msg += f"\n{dt:%m-%d %H:%M:%S}{sc_amount}: {texts}" if hide_name else f"\n{dt:%m-%d %H:%M:%S}|{username}{sc_amount}: {texts}"
+        count += 1
+    return {"danmu": msg.rstrip(), "count": count, "force_stop": force_stop}
+
+
+@cache.memoize(ttl=600)
+async def get_live_dates_from_server(year: str | int) -> dict[str, list[str]]:
+    """从远程数据库获取直播日期.
+
+    但远程日期可能并非标准YYYY-MM-DD, 这里做标准化后返回.
+    例如可能因为断流问题导致同一日开播多次, 2025-04-14日对应: "2025-04-14_01", "2025-04-14_02", ...
+
+    Reutnrs:
+        {"2025-04-14": ["2025-04-14_01", "2025-04-14_02", ...]}
+    """
+    api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector?liveDate={year}"
+    resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, silent=True)  # type: ignore
+    dates = defaultdict(list)
+    for x in resp:
+        if not x.get("liveDate"):
+            continue
+        live_date = x["liveDate"]
+        dates[live_date[:10]].append(live_date)
+    return dates
 
 
 @cache.memoize(ttl=3600)
 async def sync_danmu_to_r2() -> None:
-    async def batch_sync(dates_map: dict[str, list[str]]):
-        for norm_date, server_dates in dates_map.items():
+    async def batch_sync(new_dates: dict[str, list[str]]):
+        for r2_key, server_dates in new_dates.items():
             results = []
-            for date in server_dates:
-                saved_ids = [f"{x['timestamp']}{x['authorName']}" for x in results]
+            for date in sorted(server_dates):
+                logger.trace(f"Query server date: {date}")
                 payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date, "message": "", "authorName": ""}
                 resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
                 count = resp["count"]
                 keep_keys = ["timestamp", "authorName", "message", "scAmount"]
-                results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key) and f"{danmu['timestamp']}{danmu['authorName']}" not in saved_ids} for danmu in resp["data"]])
+                results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key)} for danmu in resp["data"]])
                 while len(results) < count:
                     payload["page"] += 1
                     logger.debug(f"Query: {payload}")
                     resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-                    results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key) and f"{danmu['timestamp']}{danmu['authorName']}" not in saved_ids} for danmu in resp["data"]])
-            await set_cf_r2(DANMU.R2_PREFIX + norm_date, results, compress=True)
+                    results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key)} for danmu in resp["data"]])
+            await set_cf_r2(DANMU.R2_PREFIX + r2_key, results, compress=True, silent=True)
 
     if not DANMU.R2_SYNC_ENABLE:
         return
@@ -187,24 +383,30 @@ async def sync_danmu_to_r2() -> None:
         years = [str(now.year)]
     for year in years:
         r2 = await list_cf_r2(prefix=DANMU.R2_PREFIX + f"{year}-")
-        saved_dates = [x.removeprefix(DANMU.R2_PREFIX) for x in glom(r2, "Contents.*.Key", default=[])]
+        r2_dates = [x.removeprefix(DANMU.R2_PREFIX) for x in glom(r2, "Contents.*.Key", default=[])]
         api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector?liveDate={year}"
         resp = await hx_req(api, proxy=DANMU.PROXY, silent=True)
-        dates_map = defaultdict(list)
+        all_server_dates = []
+        new_dates_map = defaultdict(list)
         # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
         # norm dates to YYYY-MM-DD
         for x in resp:
             live_date: str = x["liveDate"]  # type: ignore
-            if live_date[:10] in saved_dates or live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
+            all_server_dates.append(live_date)
+            if live_date[:10] in r2_dates or live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
                 continue
-            dates_map[live_date[:10]].append(live_date)
+            new_dates_map[live_date[:10]].append(live_date)
+
         # sync per date
-        logger.debug(f"Sync year: {year}, Num dates: {len(dates_map)}")
-        await batch_sync(dates_map)
+        logger.debug(f"Sync year: {year}, Num dates: {len(new_dates_map)}")
+        await batch_sync(new_dates_map)
 
-        # sync the whole year
-        if len(dates_map) > 0:  # has new dates
-            await batch_sync({year: [year]})
+        # sync per month
+        months = {x[:7] for x in new_dates_map}  # YYYY-MM
+        for month in sorted(months):
+            days = [x for x in all_server_dates if x[:7] == month]
+            logger.debug(f"Sync month: {month}, Num dates: {len(days)}")
+            await batch_sync({month: days})
 
 
 @cache.memoize(ttl=28800)
src/database.py
@@ -116,7 +116,7 @@ async def list_cf_r2(prefix: str = "", continuation_token: str | None = None) ->
         return results
 
 
-async def get_cf_r2(key: str) -> dict:
+async def get_cf_r2(key: str, *, silent: bool = False) -> dict:
     """Get from Cloudflare R2."""
     if not DB.CF_R2_ENABLED:
         logger.warning("SKIP GET CF-R2: Cloudflare R2 disabled")
@@ -135,10 +135,11 @@ async def get_cf_r2(key: str) -> dict:
             if obj.get("Body"):
                 data = await obj["Body"].read()
                 data = json.loads(data)
-                logger.success(f"GET CF-R2 for {key}: {data}")
+                if not silent:
+                    logger.success(f"GET CF-R2 for {key}: {data}")
                 return data
         except ClientError as e:
-            if e.response["Error"]["Code"] != "404":
+            if e.response["Error"]["Code"] != "NoSuchKey":
                 logger.warning(f"GET CF-R2 failed for {key}: {e}")
         except Exception as e:
             logger.warning(f"GET CF-R2 failed for {key}: {e}")
@@ -203,8 +204,10 @@ async def set_cf_r2(
     ttl: int | None = None,
     *,
     compress: bool = False,
+    quality: int = 4,
     mime_type: str = "application/json",
     skip_in_memory: bool = True,
+    silent: bool = False,
 ) -> bool:
     """Set to Cloudflare R2 via boto3.
 
@@ -213,7 +216,7 @@ async def set_cf_r2(
     If `skip_in_memory` is True, it will skip setting to CF-R2 if the key is already in memory cache.
     """
     if skip_in_memory and cache.get(key):
-        logger.trace(f"SKIP SET CF-R2: key is already in  memory cache for {key}: {cache.get(key)}")
+        logger.trace(f"SKIP SET CF-R2: key is already in memory cache for {key}: {cache.get(key)}")
         return True
     if not DB.CF_R2_ENABLED:
         logger.warning("SKIP SET CF-R2: Cloudflare R2 disabled")
@@ -234,7 +237,7 @@ async def set_cf_r2(
         payload |= {"Body": upload}
 
         if compress:
-            payload["Body"] = brotli.compress(upload, quality=11)
+            payload["Body"] = brotli.compress(upload, quality=min(quality, 11))
             payload["ContentEncoding"] = "br"
 
     metadata = metadata or {}
@@ -251,7 +254,8 @@ async def set_cf_r2(
     ) as s3:  # type: ignore
         try:
             await s3.put_object(**payload)
-            logger.success(f"Successfully SET CF-R2 for {key}: {data=}, {metadata=}")
+            if not silent:
+                logger.success(f"Successfully SET CF-R2 for {key}: {data=}, {metadata=}")
         except Exception as e:
             logger.warning(f"SET CF-R2 failed for {key}: {e}")
             return False
src/main.py
@@ -26,6 +26,7 @@ from handler import handle_social_media, handle_utilities
 from llm.summary import daily_summary
 from llm.utils import clean_gemini_files
 from messages.parser import parse_msg
+from others.danmu import sync_danmu_to_r2
 from others.podcast import summary_pods
 from permission import check_permission
 from price.entrypoint import match_symbol_category
@@ -127,6 +128,7 @@ async def scheduling(client: Client):
                 logger.info(f"Sending daily message to {chat_id}: {msg}")
                 await client.send_message(to_int(chat_id), msg)
     await summary_pods(client)
+    await sync_danmu_to_r2()
     await clean_gemini_files()