Commit fcd8c23

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-28 13:36:45
feat(danmu): sync danmu to CF-R2
1 parent ac22419
Changed files (2)
src/others/danmu.py
@@ -14,7 +14,7 @@ from pyrogram.client import Client
 from pyrogram.types import Message
 
 from config import DANMU, PREFIX, TEXT_LENGTH, TZ, cache
-from database import create_cf_d1_table, query_cf_d1
+from database import create_cf_d1_table, list_cf_r2, query_cf_d1, set_cf_r2
 from messages.parser import parse_msg
 from messages.progress import modify_progress
 from messages.sender import send2tg
@@ -28,6 +28,7 @@ HELP = f"""📖**查询弹幕记录**
 2. `{PREFIX.DANMU}` + @用户名 (区分大小写, 且要去除空格)
 3. `{PREFIX.DANMU}` + #关键词 (区分大小写)
 4. 以上可组合使用, 但日期必须放前面, 关键词必须放后面
+5. 不指定日期时, 默认查询本年弹幕记录
 
 示例:
 - `{PREFIX.DANMU} 2025-01-01`: 查询2025-01-01日的弹幕
@@ -40,13 +41,12 @@ HELP = f"""📖**查询弹幕记录**
 - `{PREFIX.DANMU} 2025 @张三 #你好`: 查询2025年用户"张三"包含"你好"的弹幕
 
 注意:
-- 默认返回部分弹幕记录, 获取完整记录需添加YYYY格式的年份, 例如 `{PREFIX.DANMU} 2025 @张三`
 - 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
 - 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
 """
 
 
-async def query_danmu(client: Client, message: Message, *, full_history: bool = False, show_name: bool = True, **kwargs):
+async def query_danmu(client: Client, message: Message, *, show_name: bool = True, **kwargs):
     info = parse_msg(message)
     texts = info["text"]
     if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU]):
@@ -61,11 +61,10 @@ async def query_danmu(client: Client, message: Message, *, full_history: bool =
     user = ""
     keyword = ""
     # 2025-01-01
-    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):
+    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
         match_time = matched.group(1)
     # 2025
     elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
-        full_history = True
         match_time = matched.group(1)
 
     # remove prefix + date
@@ -85,13 +84,12 @@ async def query_danmu(client: Client, message: Message, *, full_history: bool =
     if not any((match_time, user, keyword)):  # NO Match
         await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}命令查看帮助", **kwargs)
         return
+    if not match_time:
+        match_time = nowdt(TZ).year  # this year
 
     page = 1
-    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": "", "message": "", "authorName": ""}
-    caption = "📖**弹幕记录**:"
-    if match_time:
-        payload["liveDate"] = match_time
-        caption += f"\n🕒日期: {match_time}"
+    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": match_time, "message": "", "authorName": ""}
+    caption = f"📖**弹幕记录**:\n🕒日期: {match_time}"
     if user:
         payload["authorName"] = user
         caption += f"\n👤用户: {user}"
@@ -102,7 +100,7 @@ async def query_danmu(client: Client, message: Message, *, full_history: bool =
     logger.debug(f"Query: {payload}")
     status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
     kwargs["progress"] = status_msg
-    resp = await hx_req(DANMU.BASR_URL, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, check_keys=["count", "data"], silent=True)
+    resp = await hx_req(DANMU.BASR_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
     count = resp["count"]
     if count == 0:
         await modify_progress(message=status_msg, text=caption + "\n⚠️未找到匹配弹幕", force_update=True, **kwargs)
@@ -114,28 +112,28 @@ async def query_danmu(client: Client, message: Message, *, full_history: bool =
     danmu = parsed["danmu"]
     processed = parsed["num_messages"]
     await modify_progress(message=status_msg, text=caption + f"\n⏳已获取 {processed} 条弹幕", force_update=True, **kwargs)
-    if full_history:
-        while processed < count:
-            page += 1
-            payload["page"] = page
-            logger.debug(f"Query: {payload}")
-            resp = await hx_req(DANMU.BASR_URL, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, check_keys=["count", "data"], silent=True)
-            parsed = parse_danmu(resp["data"], super_chats, show_name=show_name)
-            danmu += parsed["danmu"]
-            processed += parsed["num_messages"]
-            await modify_progress(message=status_msg, text=caption + f"\n⏳已获取 {processed} 条弹幕", **kwargs)
+    while processed < count:
+        page += 1
+        payload["page"] = page
+        logger.debug(f"Query: {payload}")
+        resp = await hx_req(DANMU.BASR_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+        parsed = parse_danmu(resp["data"], super_chats, show_name=show_name)
+        danmu += parsed["danmu"]
+        processed += parsed["num_messages"]
+        await modify_progress(message=status_msg, text=caption + f"\n⏳已获取 {processed} 条弹幕", **kwargs)
 
     profit = ""
     for currency, amount in sorted(super_chats.items()):
         profit += f"\n**{currency}**: {number(amount)}"
     final = f"{header}{profit}\n{blockquote(danmu)}"
-    if not full_history or len(final) < TEXT_LENGTH:
-        await modify_progress(message=status_msg, text=blockquote(final), force_update=True, **kwargs)
+    if len(final) < TEXT_LENGTH - 10:
+        await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
     else:
         caption += f"\n#️⃣弹幕数: {count}"
         caption += profit
         with BytesIO(danmu.encode("utf-8")) as f:
-            await client.send_document(info["cid"], f, file_name=f"{user}弹幕记录.txt", caption=caption)
+            await client.send_document(info["cid"], f, file_name=f"{user}{match_time}{keyword}弹幕记录.txt", caption=caption)
+        await modify_progress(message=status_msg, del_status=True, **kwargs)
 
 
 def parse_danmu(data: list[dict], super_chats: defaultdict, *, show_name: bool = True) -> dict:
@@ -153,6 +151,55 @@ def parse_danmu(data: list[dict], super_chats: defaultdict, *, show_name: bool =
     return {"danmu": msg.strip(), "num_messages": len(data)}
 
 
+@cache.memoize(ttl=3600)
+async def sync_danmu_to_r2() -> None:
+    async def batch_sync(dates_map: dict[str, list[str]]):
+        for norm_date, server_dates in dates_map.items():
+            results = []
+            for date in server_dates:
+                saved_ids = [f"{x['timestamp']}{x['authorName']}" for x in results]
+                payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date, "message": "", "authorName": ""}
+                resp = await hx_req(DANMU.BASR_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+                count = resp["count"]
+                keep_keys = ["timestamp", "authorName", "message", "scAmount"]
+                results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key) and f"{danmu['timestamp']}{danmu['authorName']}" not in saved_ids} for danmu in resp["data"]])
+                while len(results) < count:
+                    payload["page"] += 1
+                    logger.debug(f"Query: {payload}")
+                    resp = await hx_req(DANMU.BASR_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+                    results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key) and f"{danmu['timestamp']}{danmu['authorName']}" not in saved_ids} for danmu in resp["data"]])
+            await set_cf_r2(DANMU.R2_PREFIX + norm_date, results, compress=True)
+
+    if not DANMU.R2_SYNC_ENABLE:
+        return
+    now = nowdt(TZ)
+    if now.hour <= 4 or now.hour >= 20:
+        return
+    years = [x.strip() for x in DANMU.R2_SYNC_YEARS.split(",") if x.strip()]
+    if not years:
+        years = [str(now.year)]
+    for year in years:
+        r2 = await list_cf_r2(prefix=DANMU.R2_PREFIX + f"{year}-")
+        saved_dates = [x.removeprefix(DANMU.R2_PREFIX) for x in glom(r2, "Contents.*.Key", default=[])]
+        api = f"{DANMU.BASR_URL}/liveInfo/queryListBySelector?liveDate={year}"
+        resp = await hx_req(api, proxy=DANMU.PROXY, silent=True)
+        dates_map = defaultdict(list)
+        # some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
+        # norm dates to YYYY-MM-DD
+        for x in resp:
+            live_date: str = x["liveDate"]  # type: ignore
+            if live_date[:10] in saved_dates or live_date[:10] == f"{now:%Y-%m-%d}":  # ignore today's live
+                continue
+            dates_map[live_date[:10]].append(live_date)
+        # sync per date
+        logger.debug(f"Sync year: {year}, Num dates: {len(dates_map)}")
+        await batch_sync(dates_map)
+
+        # sync the whole year
+        if len(dates_map) > 0:  # has new dates
+            await batch_sync({year: [year]})
+
+
 @cache.memoize(ttl=3600)
 async def sync_danmu_to_d1() -> None:
     """Deprecated, D1 only allow 5M reads, 100K writes."""
@@ -195,11 +242,11 @@ async def sync_danmu_to_d1() -> None:
     saved_items = [f"{x['time']}{x['uid']}" for x in flatten(glom(resp, "result.*.results", default=[]))]
     page = 1
     payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": f"{now:%Y-%m-%d}", "message": "", "authorName": ""}
-    resp = await hx_req(DANMU.BASR_URL, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, check_keys=["count", "data"], silent=True)
+    resp = await hx_req(DANMU.BASR_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
     count = resp["count"]
     processed = await batch_sync(resp["data"], saved_items)
     while processed < count:
         page += 1
         payload["page"] = page
-        resp = await hx_req(DANMU.BASR_URL, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, check_keys=["count", "data"], silent=True)
+        resp = await hx_req(DANMU.BASR_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
         processed += await batch_sync(resp["data"], saved_items)
src/config.py
@@ -103,6 +103,9 @@ class DANMU:
     PROXY = os.getenv("DANMU_PROXY", None)  # socks5://127.0.0.1:7890
     NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query
     D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")
+    R2_PREFIX = os.getenv("DANMU_R2_PREFIX", "Danmu/")
+    R2_SYNC_ENABLE = os.getenv("DANMU_R2_SYNC_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    R2_SYNC_YEARS = os.getenv("DANMU_R2_SYNC_YEARS", "")  # comma separated years to sync to R2. e.g. "2025,2024,2023"
 
 
 class PROVIDER:  # default API provider