main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3"""This file contains the code for extracting information from Bilibili videos.
  4
  5But not for downloading Bilibili videos.
  6For downloading Bilibili videos, please see `src/preview/ytdlp.py`.
  7"""
  8
  9import re
 10from datetime import datetime
 11from pathlib import Path
 12from zoneinfo import ZoneInfo
 13
 14from bilibili_api import ApiException, Credential, comment, opus, video
 15from glom import Coalesce, flatten, glom
 16from loguru import logger
 17from pyrogram.client import Client
 18from pyrogram.types import Message
 19
 20from config import DB, READING_SPEED, TZ, cache
 21from cookies import bilibili_cookie_dict
 22from database.database import get_db
 23from messages.database import copy_messages_from_db, save_messages
 24from messages.progress import modify_progress
 25from messages.sender import send2tg
 26from messages.utils import blockquote, summay_media
 27from networking import download_file, download_media, hx_req
 28from others.emoji import emojify
 29from utils import av2bv, count_subtitles, https_url, number_to_emoji, readable_count, seconds_to_hms, ts_to_dt
 30
 31
 32async def preview_bilibili(
 33    client: Client,
 34    message: Message,
 35    url: str,
 36    db_key: str = "",
 37    post_id: str = "",
 38    platform: str = "bilibili-opus",
 39    **kwargs,
 40):
 41    """Preview bilibili info in the message.
 42
 43    Args:
 44        client (Client): The Pyrogram client.
 45        message (Message): The trigger message object.
 46        url (str, optional): bilibili link.
 47        db_key (str, optional): The cache key.
 48        post_id (str, optional): bilibili post ID
 49    """
 50    if kwargs.get("show_progress") and "progress" not in kwargs:
 51        res = await send2tg(client, message, texts=f"🔗正在解析B站链接\n{url}", **kwargs)
 52        kwargs["progress"] = res[0]
 53    if kv := await get_db(db_key):
 54        logger.debug(f"Bilibili preview {DB.ENGINE} cache hit for key={url}")
 55        if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
 56            return
 57        await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
 58    if platform == "bilibili-opus":
 59        post_info = await parse_bilibili_opus(post_id, **kwargs)
 60    else:
 61        msg = f"Unsupported platform: {platform}"
 62        raise RuntimeError(msg)
 63    if error_msg := post_info.get("error_msg"):
 64        await modify_progress(text=f"❌B站解析失败: {error_msg}", force_update=True, **kwargs)
 65    msg = ""
 66    if author := post_info.get("author"):
 67        msg += f"\n🅱️{author}"
 68
 69    if dt := post_info.get("dt"):
 70        msg += f"\n🕒{dt}"
 71    if title := post_info.get("title"):
 72        msg += f"\n📝[{title}]({url})"
 73
 74    if texts := post_info.get("texts"):
 75        msg += f"\n{texts}"
 76
 77    media = post_info.get("media", [])
 78    sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, **kwargs)
 79    await modify_progress(del_status=True, **kwargs)
 80    await save_messages(messages=sent_messages, key=db_key)
 81
 82
 83@cache.memoize(ttl=30)
 84async def parse_bilibili_opus(post_id: str, **kwargs) -> dict:
 85    try:
 86        op = opus.Opus(int(post_id))
 87        resp = await op.get_info()
 88    except Exception:
 89        logger.warning("Bilibili Opus API failed")
 90        return {"error_msg": "Bilibili Opus API failed"}
 91    info = {}
 92    media = []
 93    texts = ""
 94    try:
 95        modules = glom(resp, "item.modules", default=[])
 96        if banner_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TOP"), None):
 97            img_urls = glom(banner_module, "module_top.display.album.pics.*.url", default=[])
 98            media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
 99
100        if title_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TITLE"), None):
101            info["title"] = glom(title_module, "module_title.text", default="")
102
103        if author_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_AUTHOR"), None):
104            author_name = glom(author_module, "module_author.name", default="B站用户")
105            author_uid = glom(author_module, "module_author.mid", default="")
106            info["author"] = f"**[{author_name}](https://space.bilibili.com/{author_uid})**" if author_uid else f"**{author_name}**"
107            timestamp = glom(author_module, "module_author.pub_ts", default=0)
108            info["dt"] = f"{ts_to_dt(timestamp):%Y-%m-%d %H:%M:%S}" if timestamp else ""
109
110        if content_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_CONTENT"), None):
111            for paragraph in glom(content_module, "module_content.paragraphs", default=[]):
112                img_urls = glom(paragraph, "pic.pics.*.url", default=[])
113                media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
114                for piece in glom(paragraph, "text.nodes", default=[]):
115                    if words := glom(piece, "word.words", default=""):
116                        texts += words
117                    elif rich_text := glom(piece, "rich.text", default=""):
118                        texts += rich_text
119                texts += "\n"
120
121        if media:
122            await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
123        info["media"] = await download_media(media, **kwargs)
124        info["texts"] = texts.strip()
125    except Exception as e:
126        logger.warning(f"Bilibili Opus parse failed: {e}")
127        return {"error_msg": str(e)}
128    return info
129
130
131@cache.memoize(ttl=120)
132async def get_bilibili_vinfo(url_or_vid: int | str) -> dict:
133    """Get Bilibili video info.
134
135    Returns:
136        {
137            "downloadable": (bool),
138            "error_msg": (str),
139            "title": (str),
140            "description": (str),
141            "author": (str),
142            "channel": (str) channel url,
143            "pubdate": (str)
144            "duration": (int) in seconds,
145            "upload_date": (str)
146            "view_count": (int),
147            "like_count": (int),
148            "favorite_count": (int),
149            "coin_count": (int),
150            "comment_count": (int),
151            "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
152            "emoji": (str) "🅱️"
153        }
154
155    """
156    if not url_or_vid:
157        return {"downloadable": False, "error_msg": "❌未提供VideoID"}
158    info = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
159    try:
160        logger.info(f"Fetch Bilibili video info for {url_or_vid}")
161        vid = bilibili_url2vid(url_or_vid)
162        v = video.Video(bvid=av2bv(vid))
163        info = await v.get_info()
164        info["title"] = info.get("title", "Title")
165        info["description"] = glom(info, Coalesce("desc", "desc_v2.0.raw_text", default=""))
166        info["author"] = glom(info, "owner.name", default="B站UP主")
167        info["channel"] = f"https://space.bilibili.com/{glom(info, 'owner.mid', default='')}"
168        info["pubdate"] = datetime.fromtimestamp(info["pubdate"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
169        info["upload_date"] = datetime.fromtimestamp(info["ctime"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
170        info["duration"] = int(info.get("duration", 0))
171        # statistics
172        info |= {
173            "view_count": int(glom(info, "stat.view", default=0)),
174            "like_count": int(glom(info, "stat.like", default=0)),
175            "favorite_count": int(glom(info, "stat.favorite", default=0)),
176            "coin_count": int(glom(info, "stat.coin", default=0)),
177            "comment_count": int(glom(info, "stat.reply", default=0)),
178        }
179        statistics = ""
180        if view := info.get("view_count"):
181            statistics += f"👁{readable_count(view)}"
182        if like := info.get("like_count"):
183            statistics += f"👍{readable_count(like)}"
184        if coin := info.get("coin_count"):
185            statistics += f"🪙{readable_count(coin)}"
186        if favorite := info.get("favorite_count"):
187            statistics += f"⭐️{readable_count(favorite)}"
188        if comment := info.get("comment_count"):
189            statistics += f"💬{readable_count(comment)}"
190        info["statistics"] = statistics
191
192        info |= {"downloadable": True, "error_msg": ""}
193
194    except ApiException as e:
195        logger.error(f"Failed to get video info: {e}")
196        return {"downloadable": False, "error_msg": "" + str(e.msg)}
197    except Exception as e:
198        logger.error(f"Failed to get video info: {e}")
199        return info
200    return info | {"emoji": "🅱️"}
201
202
203async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
204    """(Depracated) Get Bilibili subtitle.
205
206    This function is deprecated, it only returns the subtitle url,
207    We need to download it from the url and parse it.
208    Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
209
210    Returns:
211        dict: {
212            "subtitles": "[minute:second] texts",
213            "num_chars": len(texts),
214            "reading_minutes": 2,
215            }
216    """
217    try:
218        # url to vid
219        info = await get_bilibili_vinfo(url_or_vid)
220        cid = info["cid"]
221        cookie = await bilibili_cookie_dict()
222        credential = Credential(sessdata=cookie["SESSDATA"])
223        v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
224        res = await v.get_subtitle(cid=cid)
225        if subtitles := res.get("subtitles", []):
226            subtitle_url = next((x.get("subtitle_url", "") for x in subtitles if "中文" in x.get("lan_doc", "")), "")
227            data = await hx_req(https_url(subtitle_url), check_keys=["body"])
228            items = data["body"]
229            sentences = []
230            num_chars = 0
231            for subtitle in items:
232                sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
233                num_chars += len(subtitle["content"])
234            return {
235                "subtitles": "\n".join(sentences),
236                "num_chars": num_chars,
237                "reading_minutes": num_chars / READING_SPEED,
238            }
239    except Exception as e:
240        logger.error(e)
241    return {"error": "下载B站内嵌字幕失败"}
242
243
244async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
245    """Get Bilibili comments."""
246    comments = []
247    try:
248        # url to vid
249        cookie = await bilibili_cookie_dict()
250        credential = Credential(sessdata=cookie["SESSDATA"])
251        info = await get_bilibili_vinfo(url_or_vid)
252        response = await comment.get_comments_lazy(oid=info["aid"], type_=comment.CommentResourceType.VIDEO, order=comment.OrderType.LIKE, credential=credential)
253        data = response.get("replies", [])
254        data = sorted(data, key=lambda x: x.get("like", 0), reverse=True)
255    except Exception as e:
256        logger.error(f"Failed to get Bilibili comments: {e}")
257        return []
258    try:
259        for idx, x in enumerate(data):
260            name = glom(x, "member.uname", default="匿名")
261            if uid := glom(x, "member.mid", default=""):
262                name = f"[{name}](https://space.bilibili.com/{uid})"
263            location = glom(x, "reply_control.location", default="").removeprefix("IP属地:")  # noqa: RUF001
264            location = f"({location})" if location else ""
265            if cmt := glom(x, "content.message", default=""):
266                if idx == 0:
267                    comments.append(f"\n{blockquote('💬**点此展开评论区**:')}")
268                cmt = f"💬**{name}**{location}: {emojify(cmt)}"
269                comments.append(f"\n{blockquote(cmt)}")
270    except Exception as e:
271        logger.error(f"Failed to get Bilibili comments: {e}")
272        return []
273    return comments
274
275
276async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
277    """Get Bilibili subtitles and AI summary.
278
279    Returns:
280        dict: {
281            "summary": "AI summary texts",
282            "subtitles": "[minute:second] texts",
283            "num_chars": len(texts),
284            "reading_minutes": 2,
285            "full": "summary first, followed by subtitles",
286            }
287    """
288    try:
289        # url to vid
290        info = await get_bilibili_vinfo(url_or_vid)
291        cid = info["cid"]
292        cookie = await bilibili_cookie_dict()
293        credential = Credential(sessdata=cookie["SESSDATA"])
294        v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
295        res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
296        # First, get subtitles
297        if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
298            final = await get_bilibili_subtitle(url_or_vid)  # use `get_bilibili_subtitle`
299            subtitles = final.get("subtitles", "")
300        else:
301            subtitles = ""
302            for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
303                if item.get("content", ""):
304                    subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
305            final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
306
307        # Then get AI summary
308        summary = ""
309        if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0:  # has summary
310            summary += glom(res, "model_result.summary", default="")
311            outlines = glom(res, "model_result.outline", default=[])
312            for idx, outline in enumerate(outlines):
313                summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
314                for item in glom(outline, "part_outline", default=[]):
315                    summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
316        if summary:
317            final["summary"] = summary.strip()
318        if summary and subtitles:
319            final["full"] = f"AI总结(B站版):\n{summary}\n\n\n外挂字幕(B站版):\n{subtitles.strip()}"  # noqa: RUF001
320    except Exception as e:
321        logger.error(e)
322        return {"error": "下载B站AI总结失败"}
323    return final
324
325
326def make_bvid_clickable(texts: str) -> str:
327    """Make bvid in texts clickable.
328
329    "BV1234567890" -> [BV1234567890](https://www.bilibili.com/video/BV1234567890)
330
331    bvid format: https://github.com/SocialSisterYi/bilibili-API-collect/blob/18c1efb/docs/misc/bvid_desc.md
332    Args:
333        texts (str): The texts to process.
334
335    Returns:
336        str: bvid with markdown url.
337    """
338    if not texts:
339        return ""
340
341    def markdown_url(match):
342        if match.group(1):  # full url
343            bvid = match.group(3)
344            return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
345        # bvid only
346        bvid = match.group(0)
347        return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
348
349    # match bilibili links or bvid only
350    pattern = r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(BV1[a-zA-Z0-9]{9})\b|\bBV1[a-zA-Z0-9]{9}\b"
351    return re.sub(pattern, markdown_url, texts)
352
353
354def bilibili_url2vid(url: str | int) -> str:
355    if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(url)):  # noqa: RUF001
356        base_url = matched.group(0).split("?")[0]
357        return Path(base_url).stem
358
359    # already vid
360    return av2bv(url)