Commit 6a78701

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-17 08:17:20
feat(bilibili): use official API to get AI summary
1 parent 31b3b7a
Changed files (4)
src/preview/utils.py
@@ -7,7 +7,7 @@ from pathlib import Path
 from zoneinfo import ZoneInfo
 
 from bilibili_api import Credential, comment, video
-from glom import glom
+from glom import flatten, glom
 from loguru import logger
 from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
 
@@ -15,7 +15,7 @@ from config import PROXY, READING_SPEED, TOKEN, TZ, cache
 from cookies import bilibili_cookie_dict
 from networking import hx_req
 from others.emoji import emojify
-from utils import av2bv, https_url
+from utils import av2bv, count_subtitles, https_url, number_to_emoji, seconds_to_hms
 
 
 def make_bvid_clickable(texts: str) -> str:
@@ -69,7 +69,11 @@ async def get_bilibili_video_info(url_or_vid: int | str) -> dict:
 
 
 async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
-    """Get Bilibili subtitle.
+    """(Depracated) Get Bilibili subtitle.
+
+    This function is deprecated, it just get the subtitle url first,
+    then we need to download the subtitle and parse it.
+    Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
 
     Returns:
         dict: {
@@ -93,9 +97,7 @@ async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
             sentences = []
             num_chars = 0
             for subtitle in items:
-                minutes = int(float(subtitle["from"]) // 60)
-                seconds = int(float(subtitle["from"]) % 60)
-                sentences.append(f"[{minutes}:{seconds:02d}] {subtitle['content']}")
+                sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
                 num_chars += len(subtitle["content"])
             return {
                 "subtitles": "\n".join(sentences),
@@ -138,6 +140,56 @@ async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
     return comments
 
 
+async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
+    """Get Bilibili subtitles and AI summary.
+
+    Returns:
+        dict: {
+            "summary": "AI summary texts",
+            "subtitles": "[minute:second] texts",
+            "num_chars": len(texts),
+            "reading_minutes": 2,
+            "full": "summary first, followed by subtitles",
+            }
+    """
+    try:
+        # url to vid
+        info = await get_bilibili_video_info(url_or_vid)
+        cid = info["cid"]
+        cookie = await bilibili_cookie_dict()
+        credential = Credential(sessdata=cookie["SESSDATA"])
+        v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
+        res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
+        # First, get subtitles
+        if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
+            final = await get_bilibili_subtitle(url_or_vid)  # use `get_bilibili_subtitle`
+            subtitles = final.get("subtitles", "")
+        else:
+            subtitles = ""
+            for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
+                if item.get("content", ""):
+                    subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
+            final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
+
+        # Then get AI summary
+        summary = ""
+        if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0:  # has summary
+            summary += glom(res, "model_result.summary", default="")
+            outlines = glom(res, "model_result.outline", default=[])
+            for idx, outline in enumerate(outlines):
+                summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
+                for item in glom(outline, "part_outline", default=[]):
+                    summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
+        if summary:
+            final["summary"] = summary.strip()
+        if summary and subtitles:
+            final["full"] = f"以下为B站视频AI总结:\n{summary}\n\n\n以下为B站视频字幕:\n{subtitles}".strip()
+    except Exception as e:
+        logger.error(e)
+        return {"error": "下载B站AI总结失败"}
+    return final
+
+
 @cache.memoize(ttl=120)
 async def fetch_youtube_video_info(video_id: str) -> dict:
     """Fetch YouTube video info."""
src/preview/ytdlp.py
@@ -11,7 +11,7 @@ from pathlib import Path
 from urllib.parse import quote_plus, unquote_plus, urlparse
 
 from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
-from glom import glom
+from glom import Coalesce, glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
@@ -258,7 +258,7 @@ async def preview_ytdlp(
         await save_messages(messages=sent_messages, key=url, metadata=metadata)
     if any(x in info["extractor"] for x in ["youtube", "bilibili"]) and append_transcription and (video_path.is_file() or audio_path.is_file()):
         res = await fetch_subtitle(url=url, provider="free")
-        subtitles = res.get("subtitles", "")
+        subtitles = glom(res, Coalesce("full", "subtitles"), default="")
         if not subtitles:
             ytdlp_transcription_engine = "gemini" if "youtube" in info["extractor"] else ytdlp_transcription_engine  # use gemini to bypass censorship
             res = await asr_file(audio_path, ytdlp_transcription_engine, duration, client=client, message=message, silent=True)
src/subtitles/base.py
@@ -11,7 +11,7 @@ from config import API, PREFIX, PROXY, READING_SPEED, TOKEN, cache
 from messages.parser import parse_msg
 from messages.utils import startswith_prefix
 from networking import hx_req, match_social_media_link
-from preview.utils import get_bilibili_subtitle
+from preview.utils import bilibili_subtitle_and_summary
 
 
 async def match_url(client: Client, message: Message) -> str:
@@ -62,7 +62,7 @@ async def fetch_subtitle(url: str, provider: str) -> dict:
     subtitles = []
     matched = await match_social_media_link(url)
     if matched["platform"] == "bilibili":
-        return await get_bilibili_subtitle(url)
+        return await bilibili_subtitle_and_summary(url)
     video_id = matched["vid"]
     try:
         if "free" in provider:
src/subtitles/subtitle.py
@@ -3,6 +3,7 @@
 import contextlib
 import io
 
+from glom import Coalesce, glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.types import Message
@@ -87,7 +88,7 @@ async def get_subtitle(client: Client, message: Message, youtube_subtitle_provid
             await preview_ytdlp(client=client, message=message, **kwargs)
             await modify_progress(del_status=True, **kwargs)
             return
-    subtitles = res.get("subtitles", "")
+    subtitles = glom(res, Coalesce("full", "subtitles", "summary"), default="")
     if not subtitles:
         return
     logger.success(subtitles)