Commit 33e811d

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-29 17:24:34
feat(danmu): add support for streamer live chat
1 parent 9ff57ab
Changed files (4)
src/others/danmu.py
@@ -12,6 +12,7 @@ import anyio
 from glom import flatten, glom
 from loguru import logger
 from pyrogram.client import Client
+from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
 from pyrogram.types import Message
 
 from config import DANMU, DOWNLOAD_DIR, PREFIX, PROXY, TEXT_LENGTH, TOKEN, TZ, cache
@@ -26,49 +27,60 @@ from price.coinmarketcap import get_cmc_fiat
 from utils import nowdt, number
 
 HELP = f"""📖**查询弹幕记录**
-使用说明:
-1. `{PREFIX.DANMU}` + 日期 (YYYY-MM-DD)
-2. `{PREFIX.DANMU}` + @用户名 (区分大小写, 且要去除空格)
-3. `{PREFIX.DANMU}` + #关键词 (区分大小写)
-4. 以上可组合使用, 但日期必须放前面, 关键词必须放后面
-5. 不指定日期时, 默认查询本年弹幕记录
-
-示例:
+`{PREFIX.DANMU}` 使用说明:
+1.`{PREFIX.DANMU} + 日期`
+2.`{PREFIX.DANMU} + @用户名` (区分大小写, 且要去除空格)
+3.`{PREFIX.DANMU} + #关键词` (区分大小写)
+4.以上可组合使用, 但日期必须放前面, 关键词必须放后面
+5.不指定日期时, 默认查询本年弹幕记录
+{BLOCKQUOTE_EXPANDABLE_DELIM}示例:
 - `{PREFIX.DANMU} 2025-01-01`: 查询2025-01-01日的弹幕
 - `{PREFIX.DANMU} 2025-01`: 查询2025年1月份的弹幕
 - `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
-- `{PREFIX.DANMU} @张三`: 查询用户"张三"的弹幕
-- `{PREFIX.DANMU} #你好`: 查询包含"你好"关键词的弹幕
-- `{PREFIX.DANMU} 2025-01-01 @张三`: 查询2025-01-01日用户"张三"的弹幕
+- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
+- `{PREFIX.DANMU} #你好`: 查询包含“你好”关键词的弹幕
+- `{PREFIX.DANMU} 2025-01-01 @张三`: 查询2025-01-01日用户【张三】的弹幕
 - `{PREFIX.DANMU} 2025 @张三`: 查询2025年张三的全部弹幕
-- `{PREFIX.DANMU} 2025-01-01 #你好`: 查询2025-01-01日包含"你好"的弹幕
-- `{PREFIX.DANMU} 2025 @张三 #你好`: 查询2025年用户"张三"包含"你好"的弹幕
+- `{PREFIX.DANMU} 2025-01-01 #你好`: 查询2025-01-01日包含“你好”的弹幕
+- `{PREFIX.DANMU} 2025 @张三 #你好`: 查询2025年用户【张三】包含“你好”的弹幕
 
 注意:
 - 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
 - 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
+{BLOCKQUOTE_EXPANDABLE_END_DELIM}
+`{PREFIX.ZIMU}` 和 `{PREFIX.DANMU}` 类似, 但查询的是{DANMU.STREAMER}直播发言。
+额外需注意的是:
+`{PREFIX.ZIMU}`会忽略指定的 @用户名
+`{PREFIX.ZIMU} + #关键词` 也可以省略**#**号
+直接使用`{PREFIX.ZIMU} + 关键词` 即可
 """
 
+DANMU_TIPS = f"时间点为{TZ}时区"
+SRT_TIPS = "时间点并非真实时间, 而是相对开播时长"
+
 
 async def query_danmu(client: Client, message: Message, **kwargs):
     info = parse_msg(message)
-    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU]):
+    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.ZIMU]):
         return
-    if equal_prefix(message.text, prefix=[PREFIX.DANMU]):
+    if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.ZIMU]):
         await send2tg(client, message, texts=HELP, **kwargs)
         return
     if not DANMU.BASE_URL:
         await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
         return
-    match_time, user, keyword = parse_queries(info["text"])
+
+    qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
+    match_time, user, keyword = parse_queries(info["text"], qtype)
     if not any((match_time, user, keyword)):  # NO Match
-        await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}命令查看帮助", **kwargs)
+        await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.ZIMU}命令查看帮助", **kwargs)
         return
     if not match_time:
         match_time = str(nowdt(TZ).year)  # this year
 
-    # payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": match_time, "message": "", "authorName": ""}
-    caption = f"📖**弹幕记录**:\n🕒日期: {match_time}"
+    user = user if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else DANMU.STREAMER
+
+    caption = f"📖**{qtype}记录**:\n🕒日期: {match_time}"
     if user:
         caption += f"\n👤用户: {user}"
 
@@ -83,19 +95,20 @@ async def query_danmu(client: Client, message: Message, **kwargs):
     count = 0
     super_chats = defaultdict(Decimal)  # {"currency": amount}
     engine_dates = await query_engine_with_dates(match_time, user, keyword)
+    if qtype == "发言":
+        engine_dates = {"server": [match_time]}  # always use server
     for engine, dates in engine_dates.items():
         if engine == "r2":
             resp = await query_danmu_from_r2(dates, user, keyword, caption, super_chats, **kwargs)
             paths.extend(resp.get("paths", []))
         else:
-            resp = await query_danmu_from_server(dates, user, keyword, caption, super_chats, **kwargs)
+            resp = await query_danmu_from_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
             paths.append(resp["path"]) if resp.get("path") else None
         count += resp.get("count", 0)
     if count == 0:
-        await modify_progress(text=caption + "\n⚠️未匹配任何弹幕", force_update=True, **kwargs)
+        await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
         return
-    header = f"📖**弹幕记录 ({user})**:" if user else "📖**弹幕记录**:"
-
+    header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
     profit = ""
     profit_usd = 0
     if info["ctype"] not in ["GROUP", "SUPERGROUP"]:  # 在群组中不展示打赏榜
@@ -108,14 +121,14 @@ async def query_danmu(client: Client, message: Message, **kwargs):
             profit += f"\n💵**总计**: {profit_usd:.2f} USD"
 
     total_bytes = sum([Path(path).stat().st_size for path in paths])
-    if total_bytes < 10000:  #  short length, try send as message directly
-        danmu = "".join([Path(path).read_text() for path in paths]).strip()
+    if total_bytes < 1048576:  #  short length, try send as message directly
+        danmu = "\n".join([Path(path).read_text() for path in paths]).strip()
         final = f"{header}{profit}\n{blockquote(danmu)}"
         if len(final) < TEXT_LENGTH - 10:
             await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
             return
 
-    caption += f"\n#️⃣弹幕数: {count}"
+    caption += f"\n#️⃣{qtype}数: {count}"
     caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
     media = [{"document": path} for path in sorted(paths)]
     await send2tg(client, message, texts=caption, media=media, **kwargs)
@@ -123,7 +136,7 @@ async def query_danmu(client: Client, message: Message, **kwargs):
     await modify_progress(message=status_msg, del_status=True, **kwargs)
 
 
-def parse_queries(texts: str) -> tuple[str, str, str]:
+def parse_queries(texts: str, qtype: str) -> tuple[str, str, str]:
     """Parse from users' query.
 
     Returns:
@@ -132,6 +145,7 @@ def parse_queries(texts: str) -> tuple[str, str, str]:
     match_time = ""
     user = ""
     keyword = ""
+    texts = texts.replace(PREFIX.ZIMU, PREFIX.DANMU)  # unify prefix
     # 2025-01-01
     if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
         match_time = matched.group(1)
@@ -155,6 +169,16 @@ def parse_queries(texts: str) -> tuple[str, str, str]:
     # #你好
     if matched := re.match(r"^#(.*)", texts):
         keyword = matched.group(1)
+    if qtype == "弹幕":
+        return match_time, user, keyword
+
+    # ! wait for changing server limit
+    if match_time and not texts:  # 暂时不支持获取单日全部发言
+        return "", "", ""
+    # 对于发言, 如果未匹配时间和keyword, 则用户输入是忽略了keyword前的 #
+    # 即使用 `PREFIX.ZIMU keyword`` 查询
+    if not keyword:
+        keyword = texts
     return match_time, user, keyword
 
 
@@ -189,70 +213,62 @@ async def query_engine_with_dates(match_time: str, user: str = "", keyword: str
     if len(match_time) == 7:
         if match_time != f"{year}-{month}":
             return {"r2": [match_time]}
-        # 本月
-        live_dates = await get_live_dates_from_server(year)
-        server_dates = flatten([v for k, v in live_dates.items() if k.startswith(f"{year}-{month}")])
-        return {"server": sorted(server_dates)}  # type: ignore
+        return {"server": [f"{year}-{month}"]}  # 本月
 
     # YYYY
     if user or keyword:
         return {"server": [match_time]}
     if match_time == str(year):  # 今年
         # 从server获取本月, 其余月份从R2获取
-        live_dates = await get_live_dates_from_server(year)
-        server_dates = flatten([v for k, v in live_dates.items() if k.startswith(f"{year}-{month}")])
         r2_months = [f"{year}-{mon:02d}" for mon in range(1, now.month)]
-        return {"server": sorted(server_dates), "r2": r2_months}  # type: ignore
+        return {"server": [f"{year}-{month}"], "r2": r2_months}  # type: ignore
     r2_months = [f"{match_time}-{mon:02d}" for mon in range(1, 13)]
     return {"r2": r2_months}
 
 
-async def query_danmu_from_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, **kwargs) -> dict:
+async def query_danmu_from_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
     """从远程数据库获取弹幕记录.
 
-    一般情况下, dates列表中只有一个日期.
-    只有在获取本月所有记录时, dates列表才会有多个日期. (详见`query_engine_with_dates`函数的docstrings)
-
-    当只有单日期时, 直接获取弹幕记录.
-    当dates为本月的列表时, 不要分别查询每个日期, 而是直接查询最近所有弹幕, 截断到本月最早的日期.
+    dates列表中只有一个日期.
 
     Returns:
         {"path": str, "count": int}
     """
     if not dates:
         return {}
-    dates = sorted(dates)
-    cut_date = dates[0][:10]  # YYYY-MM-DD
-    if any(x[:7] != cut_date[:7] for x in dates):
-        logger.warning(f"查询日期列表有误, 应该全部为本月日期。当前查询: {dates}")
-        return {}
-    query_date = cut_date if len(dates) == 1 else cut_date[:4]  # year
-    payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": query_date}
-    if user:
+    date = dates[0]
+    api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+    payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
+    if qtype == "发言":
+        payload["limit"] = min(100, DANMU.NUM_PER_QUERY)  # ! wait for changing server limit
+    if user and qtype == "弹幕":
         payload["authorName"] = user
     if keyword:
-        payload["message"] = keyword
+        payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
     payload["page"] = 1
     logger.debug(f"Query: {payload}")
-    resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-    parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats, cut_date)
+    resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+    parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats) if qtype == "弹幕" else parse_srt(resp.get("data", ""), keyword)
     processed = parsed["count"]
     if processed == 0:
         return {}
     danmu = parsed["danmu"]
-    await modify_progress(text=caption + f"\n⏳已获取 {processed} 条弹幕", force_update=True, **kwargs)
-    while len(resp.get("data", [])) == payload["limit"] and parsed["count"] and not parsed["force_stop"]:
+    await modify_progress(text=caption + f"\n⏳已获取 {processed} 条{qtype}", force_update=True, **kwargs)
+    while len(resp.get("data", [])) == payload["limit"] and parsed["count"]:
         payload["page"] += 1
         logger.debug(f"Query: {payload}")
-        resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
-        parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats, cut_date)
-        danmu += parsed["danmu"]
+        resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+        parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats) if qtype == "弹幕" else parse_srt(resp.get("data", ""), keyword)
         processed += parsed["count"]
-        await modify_progress(text=caption + f"\n⏳已获取 {processed} 条弹幕", **kwargs)
-
-    # long texts, save txt
-    date_name = f"{cut_date[:4]}年{cut_date[5:7]}月" if len(dates) > 1 else cut_date
-    save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}弹幕记录.txt"
+        danmu += parsed["danmu"]
+        await modify_progress(text=caption + f"\n⏳已获取 {processed} 条{qtype}", **kwargs)
+
+    del parsed
+    date_name = f"{date[:4]}年{date[5:7]}月" if len(date) == 7 else date
+    user = f"{user}-" if user else ""
+    keyword = f"-“{keyword}”" if keyword else ""
+    save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}-{qtype}.txt"
+    danmu = f"{DANMU_TIPS}\n{danmu.strip()}" if qtype == "弹幕" else f"{SRT_TIPS}\n{danmu.strip()}"
     async with await anyio.open_file(save_path, "w") as f:
         await f.write(danmu)
     return {"path": save_path, "count": processed}
@@ -295,18 +311,22 @@ async def query_danmu_from_r2(dates: list[str], user: str, keyword: str, caption
             await modify_progress(text=caption + f"\n⏳已获取 {count} 条弹幕", force_update=True, **kwargs)
         if not danmu:
             continue
+        del parsed
         year = chunk[0][:4]
         months = [f"{x[5:7]}月" for x in chunk]
         month_name = "&".join(months)
         date_name = f"{year}年{month_name}"
-        save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}弹幕记录.txt"
+        user = f"{user}-" if user else ""
+        keyword = f"-“{keyword}”" if keyword else ""
+        save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}-弹幕.txt"
         async with await anyio.open_file(save_path, "w") as f:
-            await f.write(danmu)
+            await f.write(f"{DANMU_TIPS}\n{danmu.strip()}")
         paths.append(save_path)
     return {"paths": paths, "count": count}
 
 
-def parse_danmu(data: list[dict], user: str, keyword: str, super_chats: defaultdict, cut_date: str = "") -> dict:
+def parse_danmu(data: list[dict], user: str, keyword: str, super_chats: defaultdict) -> dict:
+    """解析弹幕记录."""
     msg = ""
     hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
     if user:
@@ -315,12 +335,8 @@ def parse_danmu(data: list[dict], user: str, keyword: str, super_chats: defaultd
         data = [x for x in data if keyword in x.get("message", "")]
     data = [x for x in data if x.get("timestamp")]  # ensure timestamp
     count = 0
-    force_stop = False
     for danmu in sorted(data, key=lambda x: x["timestamp"], reverse=True):  # new to old
         dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
-        if cut_date and f"{dt:%Y-%m-%d}" < cut_date:
-            force_stop = True
-            break
         sc_amount = ""
         if super_chat := danmu.get("scAmount"):
             currency, amount = super_chat.split(" ")
@@ -330,7 +346,20 @@ def parse_danmu(data: list[dict], user: str, keyword: str, super_chats: defaultd
         username = danmu.get("authorName", "")
         msg += f"\n{dt:%m-%d %H:%M:%S}{sc_amount}: {texts}" if hide_name else f"\n{dt:%m-%d %H:%M:%S}|{username}{sc_amount}: {texts}"
         count += 1
-    return {"danmu": msg.rstrip(), "count": count, "force_stop": force_stop}
+    return {"danmu": msg.rstrip(), "count": count}
+
+
+def parse_srt(data: list[dict], keyword: str) -> dict:
+    """解析发言记录."""
+    msg = ""
+    if keyword:
+        data = [x for x in data if keyword in x.get("content", "")]
+    data = [x for x in data if x.get("liveDate") and x.get("startTime")]
+    count = 0
+    for x in sorted(data, key=lambda x: (x["liveDate"], x["startTime"]), reverse=True):  # new to old
+        msg += f"\n{x['liveDate'][:10]} {x['startTime'][:8]}: {x['content'].strip()}"
+        count += 1
+    return {"danmu": msg.rstrip(), "count": count}
 
 
 @cache.memoize(ttl=600)
src/config.py
@@ -82,6 +82,7 @@ class PREFIX:
     SEARCH_GOOGLE = os.getenv("PREFIX_SEARCH_GOOGLE", "/google").lower()
     GENIMG = os.getenv("PREFIX_GENIMG", "/gen").lower()
     DANMU = os.getenv("PREFIX_DANMU", "/danmu").lower()
+    ZIMU = os.getenv("PREFIX_ZIMU", "/zimu").lower()
 
 
 class API:
@@ -100,6 +101,8 @@ class API:
 
 class DANMU:
     BASE_URL = os.getenv("DANMU_BASE_URL", "")  # Custom API, No docs
+    STREAMER = os.getenv("DANMU_STREAMER", "Streamer")  # streamer name
+    STREAM_YEARS = os.getenv("DANMU_STREAM_YEARS", "")  # comma separated years that has live stream
     PROXY = os.getenv("DANMU_PROXY", None)  # socks5://127.0.0.1:7890
     NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query
     D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")
src/handler.py
@@ -184,6 +184,7 @@ async def handle_social_media(
             PREFIX.SUBTITLE,
             PREFIX.VOICE,
             PREFIX.WGET,
+            PREFIX.ZIMU,
         ]
     )
     info = parse_msg(message)
@@ -347,7 +348,7 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] |
     if permission["google"]:
         msg += f"\n🔍**搜索Google**: `{PREFIX.SEARCH_GOOGLE}` + 关键词"
     if permission["danmu"]:
-        msg += f"\n📖**查询弹幕记录**: 发送 `{PREFIX.DANMU}` 查看详细教程"
+        msg += f"\n📖**查询弹幕记录**: 发送 `{PREFIX.DANMU}`, `{PREFIX.ZIMU}` 查看详细教程"
 
     msg += "\n\n单独发送每个命令前缀本身可查看该命令详细使用说明"
     return msg
src/networking.py
@@ -469,7 +469,7 @@ if __name__ == "__main__":
     # asyncio.run(match_social_media_link("https://b23.tv/3MSgT4q/", flatten_first=True))
     # print(asyncio.run(match_social_media_link("https://mp.weixin.qq.com/s/bd_giuPEyPBu9LTOtC2VHw", flatten_first=True)))
     # print(asyncio.run(match_social_media_link("https://reddit.com/comments/1kaazzn", flatten_first=True)))
-    print(asyncio.run(match_social_media_link("https://www.reddit.com/r/China_irl/s/bA50WleCBM")))
+    # print(asyncio.run(match_social_media_link("https://www.reddit.com/r/China_irl/s/bA50WleCBM")))
     # asyncio.run(match_social_media_link("https://www.facebook.com/share/r/19QGGp39T3/", flatten_first=True))
     # asyncio.run(match_social_media_link("https://www.douyin.com/video/7398813386827468041"))
     # asyncio.run(match_social_media_link("https://www.iesdouyin.com/share/note/7454527270925946138/"))
@@ -483,8 +483,8 @@ if __name__ == "__main__":
     # res = asyncio.run(hx_req("https://httpbin.org/delay/10"))
     # asyncio.run(hx_req("https://httpbin.org/get", check_kv={"url": "https://httpbin.org/get", "headers.Pragma": "no-cache1"}, max_retry=1))
     # resp = asyncio.run(hx_req("https://httpbin.org/get", check_kv={"headers": {"Accept-Language": "en-US,en;q=0.8"}}))
-    # resp = asyncio.run(hx_req("https://httpbin.org/headers", headers={"referer": "https://www.xiaohongshu.com/"}))
-    # print(resp)
+    resp = asyncio.run(hx_req("https://httpbin.org/headers", headers={"referer": "https://www.xiaohongshu.com/"}))
+    print(resp)
 
     # asyncio.run(download_file("https://httpbin.org/image/jpeg", suffix=".jpg"))
     # asyncio.run(match_social_media_link("https://www.instagram.com/p/C7P3jN8vmEN"))