Commit dd8d39a

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-27 19:33:43
feat(danmu): add `/danmu` command to query danmu records
1 parent b6009e9
src/others/danmu.py
@@ -0,0 +1,141 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import re
+from collections import defaultdict
+from datetime import UTC, datetime
+from decimal import Decimal
+from io import BytesIO
+from zoneinfo import ZoneInfo
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import DANMU, PREFIX, TZ
+from messages.parser import parse_msg
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
+from networking import hx_req
+from utils import number
+
+HELP = f"""📖**查询弹幕记录**
+使用说明:
+1. `{PREFIX.DANMU}` + 日期 (YYYY-MM-DD或YYYY)
+2. `{PREFIX.DANMU}` + @用户名 (区分大小写, 且要去除空格)
+3. `{PREFIX.DANMU}` + #关键词 (区分大小写)
+4. 以上可组合使用, 但日期必须放前面, 关键词必须放后面
+
+示例:
+- `{PREFIX.DANMU} 2025-01-01`: 查询2025-01-01日的弹幕
+- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
+- `{PREFIX.DANMU} @张三`: 查询用户"张三"的弹幕
+- `{PREFIX.DANMU} #你好`: 查询包含"你好"关键词的弹幕
+- `{PREFIX.DANMU} 2025-01-01 @张三`: 查询2025-01-01日用户"张三"的弹幕
+- `{PREFIX.DANMU} 2025 @张三`: 查询2025年张三的全部弹幕
+- `{PREFIX.DANMU} 2025-01-01 #你好`: 查询2025-01-01日包含"你好"的弹幕
+- `{PREFIX.DANMU} 2025 @张三 #你好`: 查询2025年用户"张三"包含"你好"的弹幕
+
+注意:
+- 默认返回部分弹幕记录, 获取完整记录需添加YYYY格式的年份, 例如 `{PREFIX.DANMU} 2025 @张三`
+- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
+- 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
+"""
+
+
+async def query_danmu(client: Client, message: Message, *, full_history: bool = False, show_name: bool = True, **kwargs):
+    info = parse_msg(message)
+    texts = info["text"]
+    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU]):
+        return
+    if equal_prefix(message.text, prefix=[PREFIX.DANMU]):
+        await send2tg(client, message, texts=HELP, **kwargs)
+        return
+    if not DANMU.BASR_URL:
+        await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
+        return
+    match_time = ""
+    user = ""
+    keyword = ""
+    # 2025-01-01
+    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):
+        match_time = matched.group(1)
+    # 2025
+    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
+        full_history = True
+        match_time = matched.group(1)
+
+    # remove prefix + date
+    texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
+
+    # @张三 #你好
+    # @张三
+    if matched := re.match(r"^@(\w+)(\s+)?", texts):
+        user = matched.group(1)
+    # remove user
+    texts = re.sub(rf"^@{user}\s+", "", texts).lstrip()
+
+    # #你好
+    if matched := re.match(r"^#(.*)", texts):
+        keyword = matched.group(1)
+
+    if not any((match_time, user, keyword)):  # NO Match
+        await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}命令查看帮助", **kwargs)
+        return
+
+    page = 1
+    payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": "", "message": "", "authorName": ""}
+    caption = "📖**弹幕记录**:"
+    if match_time:
+        payload["liveDate"] = match_time
+        caption += f"\n🕒日期: {match_time}"
+    if user:
+        payload["authorName"] = user
+        caption += f"\n👤用户: {user}"
+        show_name = False
+    if keyword:
+        payload["message"] = keyword
+        caption += f"\n🔤关键词: {keyword}"
+    logger.debug(f"Query: {payload}")
+    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
+    kwargs["progress"] = status_msg
+    resp = await hx_req(DANMU.BASR_URL, "POST", data=payload, check_kv={"code": 0}, check_keys=["count", "data"], silent=True)
+    count = resp["count"]
+    if count == 0:
+        await modify_progress(message=status_msg, text=caption + "\n⚠️未匹配到任何弹幕, 请重新查询", force_update=True, **kwargs)
+        return
+    all_danmu = resp["data"]
+    if full_history:
+        while len(all_danmu) < count:
+            page += 1
+            payload["page"] = page
+            logger.debug(f"Query: {payload}")
+            resp = await hx_req(DANMU.BASR_URL, "POST", data=payload, check_kv={"code": 0}, check_keys=["count", "data"], silent=True)
+            all_danmu.extend(resp["data"])
+
+    header = f"📖**弹幕记录 ({user})**:" if user else "📖**弹幕记录**:"
+    msg = ""
+    super_chats = defaultdict(Decimal)  # {"currency": amount}
+    sc_pattern = re.compile(r"^([A-Z]{2,}) (\d+(?:\.\d+)?)$")
+    for danmu in sorted(all_danmu, key=lambda x: x["timestamp"], reverse=True):  # new to old
+        dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
+        sc_amount = ""
+        if amt := danmu.get("scAmount"):
+            sc_amount = f" ({amt.replace('USD ', '💵')})"
+            if matched := sc_pattern.fullmatch(amt):
+                currency = matched.group(1)
+                super_chats[currency] += Decimal(matched.group(2))
+        msg += f"\n{dt:%m-%d %H:%M:%S}|{danmu['authorName']}{sc_amount}: {danmu['message']}" if show_name else f"\n{dt:%m-%d %H:%M:%S}{sc_amount}: {danmu['message']}"
+    msg = msg.strip()
+    profit = ""
+    for currency, amount in sorted(super_chats.items()):
+        profit += f"\n**{currency}**: {number(amount)}"
+    final = f"{header}{profit}\n{blockquote(msg)}"
+    if not full_history or len(await smart_split(final)) == 1:
+        final = (await smart_split(final))[0]
+        await modify_progress(message=status_msg, text=blockquote(final), force_update=True, **kwargs)
+    else:
+        caption += f"\n#️⃣弹幕数: {count}"
+        caption += profit
+        with BytesIO(msg.encode("utf-8")) as f:
+            await client.send_document(info["cid"], f, file_name=f"{user}弹幕记录.txt", caption=caption)
src/config.py
@@ -60,6 +60,7 @@ class ENABLE:  # see fine-grained permission in `src/permission.py`
     USERS = os.getenv("ENABLE_USERS", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     SEND_AS_REPLY = os.getenv("ENABLE_SEND_AS_REPLY", "1").lower() in ["1", "y", "yes", "t", "true", "on"]  # Send as a reply to the original message
     CACHE_PRICE_SYMBOLS = os.getenv("ENABLE_CACHE_PRICE_SYMBOLS", "0").lower() in ["1", "y", "yes", "t", "true", "on"]
+    QUERY_DANMU = os.getenv("ENABLE_QUERY_DANMU", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
 
 
 class PREFIX:
@@ -80,6 +81,7 @@ class PREFIX:
     SEARCH_YOUTUBE = os.getenv("PREFIX_SEARCH_YOUTUBE", "/ytb").lower()
     SEARCH_GOOGLE = os.getenv("PREFIX_SEARCH_GOOGLE", "/google").lower()
     GENIMG = os.getenv("PREFIX_GENIMG", "/gen").lower()
+    DANMU = os.getenv("PREFIX_DANMU", "/danmu").lower()
 
 
 class API:
@@ -96,6 +98,11 @@ class API:
     OKX = os.getenv("OKX_API", "https://www.okx.com")
 
 
+class DANMU:
+    BASR_URL = os.getenv("DANMU_BASR_URL", "")  # Custom API, No docs
+    NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query
+
+
 class PROVIDER:  # default API provider
     DOUYIN = os.getenv("DOUYIN_PROVIDER", "free-tikhub").lower()  # free or tikhub
     DOUYIN_COMMENTS = os.getenv("DOUYIN_COMMENTS_PROVIDER", "free-tikhub").lower()  # free or tikhub or a false value (0, false, none, null, etc.)
src/handler.py
@@ -16,6 +16,7 @@ from messages.parser import parse_msg
 from messages.sender import send2tg
 from messages.utils import equal_prefix, startswith_prefix
 from networking import match_social_media_link
+from others.danmu import query_danmu
 from others.download_external import download_url_in_message
 from others.extract_audio import extract_audio_file
 from others.raw_img_file import convert_raw_img_file
@@ -45,6 +46,7 @@ async def handle_utilities(
     ai: bool = True,
     asr: bool = True,
     audio: bool = True,
+    danmu: bool = True,
     google: bool = True,
     ocr: bool = True,
     price: bool = True,
@@ -69,6 +71,7 @@ async def handle_utilities(
         ai (bool, optional): Enable GPT. Defaults to True.
         asr (bool, optional): Enable ASR. Defaults to True.
         audio (bool, optional): Enable Video -> Audio. Defaults to True.
+        danmu (bool, optional): Enable Query Danmu database. Defaults to True.
         google (bool, optional): Enable Google Search. Defaults to True.
         ytb (bool, optional): Enable YouTube Search. Defaults to True.
         subtitle (bool, optional): Enable YouTube subtitle. Defaults to True.
@@ -103,6 +106,8 @@ async def handle_utilities(
         await get_asset_price(client, message, **kwargs)  # /price
     if summary:
         await ai_summary(client, message, **kwargs)  # /summary
+    if danmu:
+        await query_danmu(client, message, **kwargs)  # /danmu
     if raw_img:
         await convert_raw_img_file(client, message, **kwargs)
 
@@ -168,6 +173,7 @@ async def handle_social_media(
             PREFIX.COMBINATION,
             PREFIX.CONVERT,
             PREFIX.CRYPTO,
+            PREFIX.DANMU,
             PREFIX.GENIMG,
             PREFIX.GPT,
             PREFIX.OCR,
@@ -322,8 +328,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] |
         msg += "\n🆕和所有yt-dlp支持的链接\n"
     if permission["ai"]:
         msg += f"\n🤖**AI对话**: `{PREFIX.GPT} /gpt /gemini /ds /qwen /doubao /grok`"
-        msg += f"\n📖**AI总结**: `{PREFIX.AI_SUMMARY}` 总结历史聊天记录"
         msg += f"\n🌠**AI生图**: `{PREFIX.GENIMG}` + 提示词"
+        msg += f"\n📖**AI总结**: 发送 `{PREFIX.AI_SUMMARY}` 查看详细教程"
     if permission["asr"]:
         msg += f"\n🗣**语音转文字**: `{PREFIX.ASR}` + 语音消息"
     if permission["audio"]:
@@ -340,6 +346,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] |
         msg += f"\n🔍**搜索YouTube**: `{PREFIX.SEARCH_YOUTUBE}` + 关键词"
     if permission["google"]:
         msg += f"\n🔍**搜索Google**: `{PREFIX.SEARCH_GOOGLE}` + 关键词"
+    if permission["danmu"]:
+        msg += f"\n📖**查询弹幕记录**: 发送 `{PREFIX.DANMU}` 查看详细教程"
 
     msg += "\n\n单独发送每个命令前缀本身可查看该命令详细使用说明"
     return msg
src/networking.py
@@ -39,6 +39,7 @@ async def hx_req(
     headers: dict | None = None,
     cookies: dict | None = None,
     params: dict | None = None,
+    data: dict | None = None,
     post_json: dict | None = None,
     post_content: RequestContent | None = None,
     files: RequestFiles | None = None,
@@ -62,8 +63,9 @@ async def hx_req(
         headers (dict, optional): The headers to use for the request.
         cookies (dict, optional): The cookies to use for the request.
         params (dict, optional): The parameters to use for the request.
-        post_json (dict, optional): The JSON data to use for the request.
-        post_content (dict, optional): The form data to use for the request.
+        data (dict, optional): The data to POST or PUT.
+        post_json (dict, optional): The JSON data to POST.
+        post_content (dict, optional): The form data to POST.
         proxy (str, optional): The proxy to use for the request.
         follow_redirects (bool, optional): Whether to follow redirects.
         check_keys (list[str], optional): The keys to check in the response.
@@ -98,15 +100,15 @@ async def hx_req(
             if method == "GET":
                 response = await client.get(url, cookies=cookies, headers=headers, params=params)
             elif method == "POST":
-                response = await client.post(url, cookies=cookies, headers=headers, json=post_json, files=files, content=post_content, params=params)
+                response = await client.post(url, cookies=cookies, headers=headers, data=data, json=post_json, files=files, content=post_content, params=params)
             else:
-                response = await client.put(url, cookies=cookies, headers=headers, files=files, params=params)
+                response = await client.put(url, cookies=cookies, headers=headers, data=data, files=files, params=params)
             response.raise_for_status()
             if rformat == "content":
                 return {"content": response.content}
-            data = response.text
-            check_data(data, check_keys=check_keys, check_kv=check_kv)
-            res = json.loads(data) if rformat == "json" else {rformat: data}
+            resp_data = response.text
+            check_data(resp_data, check_keys=check_keys, check_kv=check_kv)
+            res = json.loads(resp_data) if rformat == "json" else {rformat: resp_data}
             if not silent:
                 logger.trace(res)
             return res
@@ -119,7 +121,7 @@ async def hx_req(
         elif "response" in locals():
             error += f"\n{response}"
         logger.error(error)
-        return await hx_req(url, method, headers=headers, cookies=cookies, params=params, post_json=post_json, proxy=proxy, follow_redirects=follow_redirects, check_keys=check_keys, check_kv=check_kv, timeout=timeout, retry=retry + 1, max_retry=max_retry, silent=silent, rformat=rformat, last_error=error)  # fmt: off
+        return await hx_req(url, method, headers=headers, cookies=cookies, params=params, data=data, post_json=post_json, proxy=proxy, follow_redirects=follow_redirects, check_keys=check_keys, check_kv=check_kv, timeout=timeout, retry=retry + 1, max_retry=max_retry, silent=silent, rformat=rformat, last_error=error)  # fmt: off
 
 
 async def download_file(
src/permission.py
@@ -97,6 +97,7 @@ def check_service(cid: int | str, ctype: str) -> dict:
         "ai": True,
         "asr": True,
         "audio": True,
+        "danmu": True,
         "subtitle": True,
         "wget": True,
         "ocr": True,
@@ -160,6 +161,8 @@ def check_service(cid: int | str, ctype: str) -> dict:
         permission["price"] = False
     if not ENABLE.RAW_IMG_CONVERT:
         permission["raw_img"] = False
+    if not ENABLE.QUERY_DANMU:
+        permission["danmu"] = False
 
     """
     Set specific service