main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3"""This file contains the code for extracting information from Bilibili videos.
  4
  5But not for downloading Bilibili videos.
  6For downloading Bilibili videos, please see `src/preview/ytdlp.py`.
  7"""
  8
  9import re
 10from datetime import datetime
 11from pathlib import Path
 12from zoneinfo import ZoneInfo
 13
 14from bilibili_api import ApiException, Credential, comment, opus, video
 15from glom import Coalesce, flatten, glom
 16from loguru import logger
 17from pyrogram.client import Client
 18from pyrogram.types import Message
 19
 20from config import READING_SPEED, TZ, cache
 21from cookies import bilibili_cookie_dict
 22from database.r2 import get_cf_r2
 23from messages.database import copy_messages_from_db, save_messages
 24from messages.progress import modify_progress
 25from messages.sender import send2tg
 26from messages.utils import blockquote, summay_media
 27from networking import download_file, download_media, hx_req
 28from others.emoji import emojify
 29from utils import av2bv, count_subtitles, https_url, number_to_emoji, readable_count, seconds_to_hms, ts_to_dt
 30
 31
 32async def preview_bilibili(
 33    client: Client,
 34    message: Message,
 35    url: str,
 36    db_key: str = "",
 37    post_id: str = "",
 38    platform: str = "bilibili-opus",
 39    **kwargs,
 40):
 41    """Preview bilibili info in the message.
 42
 43    Args:
 44        client (Client): The Pyrogram client.
 45        message (Message): The trigger message object.
 46        url (str, optional): bilibili link.
 47        db_key (str, optional): The cache key.
 48        post_id (str, optional): bilibili post ID
 49    """
 50    if kv := await get_cf_r2(db_key):
 51        logger.debug(f"Bilibili preview cache hit for key={url}")
 52        if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
 53            return
 54        logger.warning("❌从缓存中转发失败, 尝试重新解析...")
 55    if kwargs.get("show_progress") and "progress" not in kwargs:
 56        res = await send2tg(client, message, texts=f"🔗正在解析B站链接\n{url}", **kwargs)
 57        kwargs["progress"] = res[0]
 58    if platform == "bilibili-opus":
 59        post_info = await parse_bilibili_opus(post_id, **kwargs)
 60    else:
 61        msg = f"Unsupported platform: {platform}"
 62        raise RuntimeError(msg)
 63    if error_msg := post_info.get("error_msg"):
 64        await modify_progress(text=f"❌B站解析失败: {error_msg}", force_update=True, **kwargs)
 65    msg = ""
 66    if author := post_info.get("author"):
 67        msg += f"\n🅱️{author}"
 68
 69    if dt := post_info.get("dt"):
 70        msg += f"\n🕒{dt}"
 71    if title := post_info.get("title"):
 72        msg += f"\n📝[{title}]({url})"
 73
 74    if texts := post_info.get("texts"):
 75        msg += f"\n{texts}"
 76
 77    media = post_info.get("media", [])
 78    sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, **kwargs)
 79    await modify_progress(del_status=True, **kwargs)
 80    await save_messages(messages=sent_messages, key=db_key)
 81
 82
 83@cache.memoize(ttl=30)
 84async def parse_bilibili_opus(post_id: str, **kwargs) -> dict:
 85    try:
 86        op = opus.Opus(int(post_id))
 87        resp = await op.get_info()
 88    except Exception:
 89        logger.warning("Bilibili Opus API failed")
 90        return {"error_msg": "Bilibili Opus API failed"}
 91    info = {}
 92    media = []
 93    texts = ""
 94    try:
 95        modules = glom(resp, "item.modules", default=[])
 96        if banner_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TOP"), None):
 97            img_urls = glom(banner_module, "module_top.display.album.pics.*.url", default=[])
 98            media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
 99
100        if title_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TITLE"), None):
101            info["title"] = glom(title_module, "module_title.text", default="")
102
103        if author_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_AUTHOR"), None):
104            author_name = glom(author_module, "module_author.name", default="B站用户")
105            author_uid = glom(author_module, "module_author.mid", default="")
106            info["author"] = f"**[{author_name}](https://space.bilibili.com/{author_uid})**" if author_uid else f"**{author_name}**"
107            timestamp = glom(author_module, "module_author.pub_ts", default=0)
108            info["dt"] = f"{ts_to_dt(timestamp):%Y-%m-%d %H:%M:%S}" if timestamp else ""
109
110        if content_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_CONTENT"), None):
111            for paragraph in glom(content_module, "module_content.paragraphs", default=[]):
112                img_urls = glom(paragraph, "pic.pics.*.url", default=[])
113                media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
114                for piece in glom(paragraph, "text.nodes", default=[]):
115                    if words := glom(piece, "word.words", default=""):
116                        texts += words
117                    elif rich_text := glom(piece, "rich.text", default=""):
118                        texts += rich_text
119                texts += "\n"
120
121        if media:
122            await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
123        info["media"] = await download_media(media, **kwargs)
124        info["texts"] = texts.strip()
125    except Exception as e:
126        logger.warning(f"Bilibili Opus parse failed: {e}")
127        return {"error_msg": str(e)}
128    return info
129
130
131@cache.memoize(ttl=120)
132async def get_bilibili_vinfo(url_or_vid: int | str) -> dict:
133    """Get Bilibili video info.
134
135    Returns:
136        {
137            "downloadable": (bool),
138            "error_msg": (str),
139            "title": (str),
140            "description": (str),
141            "author": (str),
142            "channel": (str) channel url,
143            "pubdate": (str)
144            "duration": (int) in seconds,
145            "upload_date": (str)
146            "view_count": (int),
147            "like_count": (int),
148            "favorite_count": (int),
149            "coin_count": (int),
150            "comment_count": (int),
151            "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
152            "emoji": (str) "🅱️"
153        }
154
155    """
156    if not url_or_vid:
157        return {"downloadable": False, "error_msg": "❌未提供VideoID"}
158    info = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
159    try:
160        logger.info(f"Fetch Bilibili video info for {url_or_vid}")
161        vid = bilibili_url2vid(url_or_vid)
162        v = video.Video(bvid=av2bv(vid))
163        info = await v.get_info()
164        info["title"] = info.get("title", "Title")
165        desc = glom(info, Coalesce("desc", "desc_v2.0.raw_text", default=""))
166        if desc == "-":
167            desc = ""
168        info["description"] = desc
169        info["author"] = glom(info, "owner.name", default="B站UP主")
170        info["channel"] = f"https://space.bilibili.com/{glom(info, 'owner.mid', default='')}"
171        info["pubdate"] = datetime.fromtimestamp(info["pubdate"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
172        info["upload_date"] = datetime.fromtimestamp(info["ctime"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
173        info["duration"] = int(info.get("duration", 0))
174        # statistics
175        info |= {
176            "view_count": int(glom(info, "stat.view", default=0)),
177            "like_count": int(glom(info, "stat.like", default=0)),
178            "favorite_count": int(glom(info, "stat.favorite", default=0)),
179            "coin_count": int(glom(info, "stat.coin", default=0)),
180            "comment_count": int(glom(info, "stat.reply", default=0)),
181        }
182        statistics = ""
183        if view := info.get("view_count"):
184            statistics += f"👁{readable_count(view)}"
185        if like := info.get("like_count"):
186            statistics += f"👍{readable_count(like)}"
187        if coin := info.get("coin_count"):
188            statistics += f"🪙{readable_count(coin)}"
189        if favorite := info.get("favorite_count"):
190            statistics += f"⭐️{readable_count(favorite)}"
191        if comment := info.get("comment_count"):
192            statistics += f"💬{readable_count(comment)}"
193        info["statistics"] = statistics
194
195        info |= {"downloadable": True, "error_msg": ""}
196
197    except ApiException as e:
198        logger.error(f"Failed to get video info: {e}")
199        return {"downloadable": False, "error_msg": "" + str(e.msg)}
200    except Exception as e:
201        logger.error(f"Failed to get video info: {e}")
202        return info
203    return info | {"emoji": "🅱️"}
204
205
206async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
207    """(Depracated) Get Bilibili subtitle.
208
209    This function is deprecated, it only returns the subtitle url,
210    We need to download it from the url and parse it.
211    Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
212
213    Returns:
214        dict: {
215            "subtitles": "[minute:second] texts",
216            "num_chars": len(texts),
217            "reading_minutes": 2,
218            }
219    """
220    try:
221        # url to vid
222        info = await get_bilibili_vinfo(url_or_vid)
223        cid = info["cid"]
224        cookie = await bilibili_cookie_dict()
225        credential = Credential(sessdata=cookie["SESSDATA"])
226        v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
227        res = await v.get_subtitle(cid=cid)
228        if subtitles := res.get("subtitles", []):
229            subtitle_url = next((x.get("subtitle_url", "") for x in subtitles if "中文" in x.get("lan_doc", "")), "")
230            data = await hx_req(https_url(subtitle_url), check_keys=["body"])
231            items = data["body"]
232            sentences = []
233            num_chars = 0
234            for subtitle in items:
235                sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
236                num_chars += len(subtitle["content"])
237            return {
238                "subtitles": "\n".join(sentences),
239                "num_chars": num_chars,
240                "reading_minutes": num_chars / READING_SPEED,
241            }
242    except Exception as e:
243        logger.error(e)
244    return {"error": "下载B站内嵌字幕失败"}
245
246
247async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
248    """Get Bilibili comments."""
249    comments_list = []
250    try:
251        # url to vid
252        cookie = await bilibili_cookie_dict()
253        credential = Credential(sessdata=cookie["SESSDATA"])
254        info = await get_bilibili_vinfo(url_or_vid)
255        response = await comment.get_comments_lazy(oid=info["aid"], type_=comment.CommentResourceType.VIDEO, order=comment.OrderType.LIKE, credential=credential)
256        data = response.get("replies", [])
257        data = sorted(data, key=lambda x: x.get("like", 0), reverse=True)
258    except Exception as e:
259        logger.error(f"Failed to get Bilibili comments: {e}")
260        return []
261    try:
262        for idx, x in enumerate(data):
263            name = glom(x, "member.uname", default="匿名")
264            if uid := glom(x, "member.mid", default=""):
265                name = f"[{name}](https://space.bilibili.com/{uid})"
266            location = glom(x, "reply_control.location", default="").removeprefix("IP属地:")  # noqa: RUF001
267            location = f"({location})" if location else ""
268            if cmt := glom(x, "content.message", default=""):
269                if idx == 0:
270                    comments_list.append("💬**点击展开评论**:")
271                cmt = f"💬**{name}**{location}: {emojify(cmt)}"
272                comments_list.append(f"\n{cmt}")
273    except Exception as e:
274        logger.error(f"Failed to get Bilibili comments: {e}")
275        return []
276    if not comments_list:
277        return []
278    comments = blockquote("".join(comments_list))
279    return comments.splitlines(keepends=True)
280
281
282async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
283    """Get Bilibili subtitles and AI summary.
284
285    Returns:
286        dict: {
287            "summary": "AI summary texts",
288            "subtitles": "[minute:second] texts",
289            "num_chars": len(texts),
290            "reading_minutes": 2,
291            "full": "summary first, followed by subtitles",
292            }
293    """
294    try:
295        # url to vid
296        info = await get_bilibili_vinfo(url_or_vid)
297        cid = info["cid"]
298        cookie = await bilibili_cookie_dict()
299        credential = Credential(sessdata=cookie["SESSDATA"])
300        v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
301        res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
302        # First, get subtitles
303        if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
304            final = await get_bilibili_subtitle(url_or_vid)  # use `get_bilibili_subtitle`
305            subtitles = final.get("subtitles", "")
306        else:
307            subtitles = ""
308            for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
309                if item.get("content", ""):
310                    subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
311            final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
312
313        # Then get AI summary
314        summary = ""
315        if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0:  # has summary
316            summary += glom(res, "model_result.summary", default="")
317            outlines = glom(res, "model_result.outline", default=[])
318            for idx, outline in enumerate(outlines):
319                summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
320                for item in glom(outline, "part_outline", default=[]):
321                    summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
322        if summary:
323            final["summary"] = summary.strip()
324        if summary and subtitles:
325            final["full"] = f"AI总结(B站版):\n{summary}\n\n\n外挂字幕(B站版):\n{subtitles.strip()}"  # noqa: RUF001
326    except Exception as e:
327        logger.error(e)
328        return {"error": "下载B站AI总结失败"}
329    return final
330
331
332def make_bvid_clickable(texts: str) -> str:
333    """Make bvid in texts clickable.
334
335    "BV1234567890" -> [BV1234567890](https://www.bilibili.com/video/BV1234567890)
336
337    bvid format: https://github.com/SocialSisterYi/bilibili-API-collect/blob/18c1efb/docs/misc/bvid_desc.md
338    Args:
339        texts (str): The texts to process.
340
341    Returns:
342        str: bvid with markdown url.
343    """
344    if not texts:
345        return ""
346
347    def markdown_url(match):
348        if match.group(1):  # full url
349            bvid = match.group(3)
350            return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
351        # bvid only
352        bvid = match.group(0)
353        return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
354
355    # match bilibili links or bvid only
356    pattern = r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(BV1[a-zA-Z0-9]{9})\b|\bBV1[a-zA-Z0-9]{9}\b"
357    return re.sub(pattern, markdown_url, texts)
358
359
360def bilibili_url2vid(url: str | int) -> str:
361    if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(url)):  # noqa: RUF001
362        base_url = matched.group(0).split("?")[0]
363        return Path(base_url).stem
364
365    # already vid
366    return av2bv(url)