Commit 259a90f

benny-dou <60535774+benny-dou@users.noreply.github.com>
2026-05-06 06:12:37
refactor(danmu): refactor danmu function
1 parent 84ee13d
Changed files (2)
src/danmu/entrypoint.py
@@ -36,10 +36,17 @@ HELP = f"""📖**查询直播合订本**
 - `{PREFIX.DANMU} 2025-01-01 你好`: 查询2025-01-01日包含“你好”的弹幕
 - `{PREFIX.DANMU} 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的弹幕
 
-⚠️注意:
-- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
+🕒**日期说明:**
+- 发言记录的时间点是开播时长
+- 弹幕记录的时间点是真实时间,时区为 **{TZ}**
+- 指定弹幕查询日期时,采用的是30小时制(凌晨0—6点属于前一天)
+- 例如 `{PREFIX.DANMU} 2025-01-01` 查询的是2025-01-01日6点至次日6点的弹幕
+
+👤**用户名说明:**
 - 如果用户名中有空格, 请用**引号**包住用户名 (单双引号、中英引号皆可)。
 - 例如: 想指定用户为John Doe请使用 `@"John Doe"`
+- 用户名也支持指定YouTube的 **ChannelID**, 例如 `{PREFIX.DANMU} @UC...`
+- 查询ChannelID方法:在YouTube用户页面的简介处点击“更多” -> 点击最下方的“分享频道” -> 点击“复制频道ID”
 
 `{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
 ( `@用户名` 对于 `{PREFIX.FAYAN}` 命令无效)
@@ -81,11 +88,13 @@ async def query_danmu(client: Client, message: Message, **kwargs):
         resp = await query_turso(match_time, user, keyword, caption, super_chats, qtype, **kwargs)
         count = resp.get("count", 0)
         paths = resp.get("paths", [])
+        texts = "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
+        [Path(path).unlink(missing_ok=True) for path in paths]
         user = resp.get("user", user)
     else:
         query_dates = await get_query_dates(match_time, qtype)
         resp = await query_r2(query_dates, user, keyword, caption, super_chats, qtype, **kwargs)
-        paths = resp.get("paths", [])
+        texts = resp.get("texts", "")
         count = resp.get("count", 0)
     if count == 0:
         await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
@@ -102,17 +111,16 @@ async def query_danmu(client: Client, message: Message, **kwargs):
     if profit_usd > 0 and super_chats:
         profit += f"\n💵**总计**: {profit_usd:.2f} USD"
 
-    texts = "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
     # try send as message directly
     tips = DANMU_TIPS if qtype == "弹幕" else ""
-    final = f"{header}{tips}{profit}{blockquote(username_history)}\n{texts}"
+    final = f"{header}{tips}{profit}{username_history.strip()}\n\n{texts}"
     if (await count_entities(client, final)) <= 100 and len(await smart_split(final)) == 1:
         await modify_progress(message=status_msg, text=blockquote(final), force_update=True, **kwargs)
         return
 
     caption += f"\n#️⃣{qtype}数: {count}"
     caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
-    html = convert_html("\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip())
+    html = convert_html(texts)
     if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=html, author=user, ttl="1d"):
         caption += f"\n⚡️[即时预览]({telegraph_url})"
     caption += blockquote(username_history)
@@ -125,7 +133,6 @@ async def query_danmu(client: Client, message: Message, **kwargs):
         save_path.write_text(html)
         await send2tg(client, message, texts=caption, media=[{"document": save_path.as_posix()}], **kwargs)
         await delete_message(status_msg)
-    [Path(path).unlink(missing_ok=True) for path in paths]
 
 
 def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
src/danmu/r2.py
@@ -8,7 +8,6 @@ from io import BytesIO
 from pathlib import Path
 from zoneinfo import ZoneInfo
 
-import anyio
 import pandas as pd
 from glom import glom
 from loguru import logger
@@ -25,14 +24,14 @@ async def query_r2(dates: list[str], user: str, keyword: str, caption: str, supe
 
     日期从新到旧, 数据从旧到新
     Returns:
-        {"paths": list[str], "count": int}
+        {"texts": str, "count": int}
     """
     if not dates:
         return {}
 
     total_count = 0
     queried_dates = []
-    paths = []
+    texts = ""
     for date in sorted(dates, reverse=True):  # 日期从新到旧
         df = await query_r2_for_date(date, qtype)
         queried_dates.append(date.upper())
@@ -43,14 +42,10 @@ async def query_r2(dates: list[str], user: str, keyword: str, caption: str, supe
         if count == 0:
             continue
         total_count += count
-        texts = parsed.get("texts", "")
+        texts += parsed.get("texts", "")
         await modify_progress(text=caption + f"\n🔍查询时间: {'、'.join(queried_dates)}\n⏳匹配{qtype}数: {total_count}", force_update=True, **kwargs)
         del parsed
-        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
-        async with await anyio.open_file(save_path, "w") as f:
-            await f.write(texts.strip())
-        paths.append(save_path)
-    return {"paths": paths, "count": total_count}
+    return {"texts": texts.strip(), "count": total_count}
 
 
 async def parse_dataframe(df: pd.DataFrame, user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
@@ -61,8 +56,7 @@ async def parse_dataframe(df: pd.DataFrame, user: str, keyword: str, super_chats
         df = df[df["content"].str.contains(keyword)]
     if user and qtype == "弹幕":
         uids = await get_uids_by_name(name=user)
-        names = await get_names_by_uids(uids)
-        df = df[df["name"].str.lower().isin(names)]
+        df = df[df["uid"].isin(uids)]
     df["livedate"] = df["ts"].apply(ts_to_liveday, args=(qtype,))
     df = df.sort_values(by=["livedate", "ts"], ascending=[False, True])
     processed_day = set()
@@ -110,15 +104,17 @@ async def query_r2_for_date(date: str, qtype: str) -> pd.DataFrame:
         df = await get_r2_dataframe(r2_key, path)
 
     # filter specific date
+    # use 30-hour system for danmu
+    offset = timedelta(hours=6) if qtype == "弹幕" else timedelta(hours=0)
     if len(date) == 7:  # YYYY-MM
-        start = datetime.strptime(date, "%Y-%m").replace(day=1, tzinfo=ZoneInfo(TZ))
-        end = datetime.strptime(date, "%Y-%m").replace(day=31, hour=23, minute=59, second=59, microsecond=999999, tzinfo=ZoneInfo(TZ))
+        start = datetime.strptime(date, "%Y-%m").replace(day=1, tzinfo=ZoneInfo(TZ)) + offset
+        end = datetime.strptime(date, "%Y-%m").replace(day=31, hour=23, minute=59, second=59, microsecond=999999, tzinfo=ZoneInfo(TZ)) + offset
         start_ts = int(start.timestamp())
         end_ts = int(end.timestamp())
         df = df[(df["ts"] >= start_ts) & (df["ts"] <= end_ts)]
     elif len(date) == 10:  # YYYY-MM-DD
-        start = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=ZoneInfo(TZ))
-        end = datetime.strptime(date, "%Y-%m-%d").replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=ZoneInfo(TZ))
+        start = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=ZoneInfo(TZ)) + offset
+        end = datetime.strptime(date, "%Y-%m-%d").replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=ZoneInfo(TZ)) + offset
         start_ts = int(start.timestamp())
         end_ts = int(end.timestamp())
         df = df[(df["ts"] >= start_ts) & (df["ts"] <= end_ts)]
@@ -212,15 +208,16 @@ async def live_date_info(ts: int, qtype: str) -> tuple[str, str, str]:
 @cache.memoize(ttl=60)
 async def get_uids_by_name(name: str, queried_names: set[str] | None = None) -> set[str]:
     """Get uids by name."""
+    if name == "无名氏":
+        return set()
+    if re.match(r"^UC[\w-]{22}$", name):
+        return {name}
     logger.debug(f"Querying name: {name}, queried_names: {queried_names}")
     if queried_names is None:
         queried_names = set()
     queried_names.add(name.lower())
     userinfo = await r2_userinfo()
     uids = userinfo[userinfo["name"].str.lower() == name.lower()]["uid"].to_list()
-    if len(uids) <= 1:
-        logger.success(f"Found uid of {name}: {glom(uids, '0', default='')}")
-        return uids
     logger.info(f"Found uids of {name}: {uids}")
     # 递归查询
     matched_uids = uids
@@ -236,15 +233,6 @@ async def get_uids_by_name(name: str, queried_names: set[str] | None = None) ->
     return set(matched_uids)
 
 
-async def get_names_by_uids(uids: set[str] | list[str]) -> set[str]:
-    """Get names by uids."""
-    names = []
-    userinfo = await r2_userinfo()
-    for uid in uids:
-        names.extend(userinfo[userinfo["uid"].str.lower() == uid.lower()]["name"].to_list())
-    return {x.lower() for x in names}
-
-
 async def get_username_history(name: str) -> str:
     """Get username history by name."""
     texts = ""
@@ -263,7 +251,7 @@ async def get_username_history(name: str) -> str:
         if re.match(r"^UC[\w-]{22}$", str(uid)):
             texts += f"\n{number_to_emoji(idx + 1)}**UID: [{uid}](https://www.youtube.com/channel/{uid})**\n"
         else:
-            texts += f"\n{number_to_emoji(idx + 1)}**UID: {uid}\n"
+            texts += f"\n{number_to_emoji(idx + 1)}**UID: {uid}**\n"
         for _, row in group.iterrows():
             texts += f"**{row['name']}**: {format_time(row['first'])}➡️{format_time(row['last'])}\n"
     return texts