main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""This file contains the code for extracting information from Bilibili videos.
4
5But not for downloading Bilibili videos.
6For downloading Bilibili videos, please see `src/preview/ytdlp.py`.
7"""
8
9import re
10from datetime import datetime
11from pathlib import Path
12from zoneinfo import ZoneInfo
13
14from bilibili_api import ApiException, Credential, comment, opus, video
15from glom import Coalesce, flatten, glom
16from loguru import logger
17from pyrogram.client import Client
18from pyrogram.types import Message
19
20from config import READING_SPEED, TZ, cache
21from cookies import bilibili_cookie_dict
22from database.r2 import get_cf_r2
23from messages.database import copy_messages_from_db, save_messages
24from messages.progress import modify_progress
25from messages.sender import send2tg
26from messages.utils import blockquote, summay_media
27from networking import download_file, download_media, hx_req
28from others.emoji import emojify
29from utils import av2bv, count_subtitles, https_url, number_to_emoji, readable_count, seconds_to_hms, ts_to_dt
30
31
32async def preview_bilibili(
33 client: Client,
34 message: Message,
35 url: str,
36 db_key: str = "",
37 post_id: str = "",
38 platform: str = "bilibili-opus",
39 **kwargs,
40):
41 """Preview bilibili info in the message.
42
43 Args:
44 client (Client): The Pyrogram client.
45 message (Message): The trigger message object.
46 url (str, optional): bilibili link.
47 db_key (str, optional): The cache key.
48 post_id (str, optional): bilibili post ID
49 """
50 if kv := await get_cf_r2(db_key):
51 logger.debug(f"Bilibili preview cache hit for key={url}")
52 if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
53 return
54 logger.warning("❌从缓存中转发失败, 尝试重新解析...")
55 if kwargs.get("show_progress") and "progress" not in kwargs:
56 res = await send2tg(client, message, texts=f"🔗正在解析B站链接\n{url}", **kwargs)
57 kwargs["progress"] = res[0]
58 if platform == "bilibili-opus":
59 post_info = await parse_bilibili_opus(post_id, **kwargs)
60 else:
61 msg = f"Unsupported platform: {platform}"
62 raise RuntimeError(msg)
63 if error_msg := post_info.get("error_msg"):
64 await modify_progress(text=f"❌B站解析失败: {error_msg}", force_update=True, **kwargs)
65 msg = ""
66 if author := post_info.get("author"):
67 msg += f"\n🅱️{author}"
68
69 if dt := post_info.get("dt"):
70 msg += f"\n🕒{dt}"
71 if title := post_info.get("title"):
72 msg += f"\n📝[{title}]({url})"
73
74 if texts := post_info.get("texts"):
75 msg += f"\n{texts}"
76
77 media = post_info.get("media", [])
78 sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, **kwargs)
79 await modify_progress(del_status=True, **kwargs)
80 await save_messages(messages=sent_messages, key=db_key)
81
82
83@cache.memoize(ttl=30)
84async def parse_bilibili_opus(post_id: str, **kwargs) -> dict:
85 try:
86 op = opus.Opus(int(post_id))
87 resp = await op.get_info()
88 except Exception:
89 logger.warning("Bilibili Opus API failed")
90 return {"error_msg": "Bilibili Opus API failed"}
91 info = {}
92 media = []
93 texts = ""
94 try:
95 modules = glom(resp, "item.modules", default=[])
96 if banner_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TOP"), None):
97 img_urls = glom(banner_module, "module_top.display.album.pics.*.url", default=[])
98 media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
99
100 if title_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TITLE"), None):
101 info["title"] = glom(title_module, "module_title.text", default="")
102
103 if author_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_AUTHOR"), None):
104 author_name = glom(author_module, "module_author.name", default="B站用户")
105 author_uid = glom(author_module, "module_author.mid", default="")
106 info["author"] = f"**[{author_name}](https://space.bilibili.com/{author_uid})**" if author_uid else f"**{author_name}**"
107 timestamp = glom(author_module, "module_author.pub_ts", default=0)
108 info["dt"] = f"{ts_to_dt(timestamp):%Y-%m-%d %H:%M:%S}" if timestamp else ""
109
110 if content_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_CONTENT"), None):
111 for paragraph in glom(content_module, "module_content.paragraphs", default=[]):
112 img_urls = glom(paragraph, "pic.pics.*.url", default=[])
113 media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
114 for piece in glom(paragraph, "text.nodes", default=[]):
115 if words := glom(piece, "word.words", default=""):
116 texts += words
117 elif rich_text := glom(piece, "rich.text", default=""):
118 texts += rich_text
119 texts += "\n"
120
121 if media:
122 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
123 info["media"] = await download_media(media, **kwargs)
124 info["texts"] = texts.strip()
125 except Exception as e:
126 logger.warning(f"Bilibili Opus parse failed: {e}")
127 return {"error_msg": str(e)}
128 return info
129
130
131@cache.memoize(ttl=120)
132async def get_bilibili_vinfo(url_or_vid: int | str) -> dict:
133 """Get Bilibili video info.
134
135 Returns:
136 {
137 "downloadable": (bool),
138 "error_msg": (str),
139 "title": (str),
140 "description": (str),
141 "author": (str),
142 "channel": (str) channel url,
143 "pubdate": (str)
144 "duration": (int) in seconds,
145 "upload_date": (str)
146 "view_count": (int),
147 "like_count": (int),
148 "favorite_count": (int),
149 "coin_count": (int),
150 "comment_count": (int),
151 "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
152 "emoji": (str) "🅱️"
153 }
154
155 """
156 if not url_or_vid:
157 return {"downloadable": False, "error_msg": "❌未提供VideoID"}
158 info = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
159 try:
160 logger.info(f"Fetch Bilibili video info for {url_or_vid}")
161 vid = bilibili_url2vid(url_or_vid)
162 v = video.Video(bvid=av2bv(vid))
163 info = await v.get_info()
164 info["title"] = info.get("title", "Title")
165 desc = glom(info, Coalesce("desc", "desc_v2.0.raw_text", default=""))
166 if desc == "-":
167 desc = ""
168 info["description"] = desc
169 info["author"] = glom(info, "owner.name", default="B站UP主")
170 info["channel"] = f"https://space.bilibili.com/{glom(info, 'owner.mid', default='')}"
171 info["pubdate"] = datetime.fromtimestamp(info["pubdate"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
172 info["upload_date"] = datetime.fromtimestamp(info["ctime"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
173 info["duration"] = int(info.get("duration", 0))
174 # statistics
175 info |= {
176 "view_count": int(glom(info, "stat.view", default=0)),
177 "like_count": int(glom(info, "stat.like", default=0)),
178 "favorite_count": int(glom(info, "stat.favorite", default=0)),
179 "coin_count": int(glom(info, "stat.coin", default=0)),
180 "comment_count": int(glom(info, "stat.reply", default=0)),
181 }
182 statistics = ""
183 if view := info.get("view_count"):
184 statistics += f"👁{readable_count(view)}"
185 if like := info.get("like_count"):
186 statistics += f"👍{readable_count(like)}"
187 if coin := info.get("coin_count"):
188 statistics += f"🪙{readable_count(coin)}"
189 if favorite := info.get("favorite_count"):
190 statistics += f"⭐️{readable_count(favorite)}"
191 if comment := info.get("comment_count"):
192 statistics += f"💬{readable_count(comment)}"
193 info["statistics"] = statistics
194
195 info |= {"downloadable": True, "error_msg": ""}
196
197 except ApiException as e:
198 logger.error(f"Failed to get video info: {e}")
199 return {"downloadable": False, "error_msg": "❌" + str(e.msg)}
200 except Exception as e:
201 logger.error(f"Failed to get video info: {e}")
202 return info
203 return info | {"emoji": "🅱️"}
204
205
206async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
207 """(Depracated) Get Bilibili subtitle.
208
209 This function is deprecated, it only returns the subtitle url,
210 We need to download it from the url and parse it.
211 Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
212
213 Returns:
214 dict: {
215 "subtitles": "[minute:second] texts",
216 "num_chars": len(texts),
217 "reading_minutes": 2,
218 }
219 """
220 try:
221 # url to vid
222 info = await get_bilibili_vinfo(url_or_vid)
223 cid = info["cid"]
224 cookie = await bilibili_cookie_dict()
225 credential = Credential(sessdata=cookie["SESSDATA"])
226 v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
227 res = await v.get_subtitle(cid=cid)
228 if subtitles := res.get("subtitles", []):
229 subtitle_url = next((x.get("subtitle_url", "") for x in subtitles if "中文" in x.get("lan_doc", "")), "")
230 data = await hx_req(https_url(subtitle_url), check_keys=["body"])
231 items = data["body"]
232 sentences = []
233 num_chars = 0
234 for subtitle in items:
235 sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
236 num_chars += len(subtitle["content"])
237 return {
238 "subtitles": "\n".join(sentences),
239 "num_chars": num_chars,
240 "reading_minutes": num_chars / READING_SPEED,
241 }
242 except Exception as e:
243 logger.error(e)
244 return {"error": "下载B站内嵌字幕失败"}
245
246
247async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
248 """Get Bilibili comments."""
249 comments_list = []
250 try:
251 # url to vid
252 cookie = await bilibili_cookie_dict()
253 credential = Credential(sessdata=cookie["SESSDATA"])
254 info = await get_bilibili_vinfo(url_or_vid)
255 response = await comment.get_comments_lazy(oid=info["aid"], type_=comment.CommentResourceType.VIDEO, order=comment.OrderType.LIKE, credential=credential)
256 data = response.get("replies", [])
257 data = sorted(data, key=lambda x: x.get("like", 0), reverse=True)
258 except Exception as e:
259 logger.error(f"Failed to get Bilibili comments: {e}")
260 return []
261 try:
262 for idx, x in enumerate(data):
263 name = glom(x, "member.uname", default="匿名")
264 if uid := glom(x, "member.mid", default=""):
265 name = f"[{name}](https://space.bilibili.com/{uid})"
266 location = glom(x, "reply_control.location", default="").removeprefix("IP属地:") # noqa: RUF001
267 location = f"({location})" if location else ""
268 if cmt := glom(x, "content.message", default=""):
269 if idx == 0:
270 comments_list.append("💬**点击展开评论**:")
271 cmt = f"💬**{name}**{location}: {emojify(cmt)}"
272 comments_list.append(f"\n{cmt}")
273 except Exception as e:
274 logger.error(f"Failed to get Bilibili comments: {e}")
275 return []
276 if not comments_list:
277 return []
278 comments = blockquote("".join(comments_list))
279 return comments.splitlines(keepends=True)
280
281
282async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
283 """Get Bilibili subtitles and AI summary.
284
285 Returns:
286 dict: {
287 "summary": "AI summary texts",
288 "subtitles": "[minute:second] texts",
289 "num_chars": len(texts),
290 "reading_minutes": 2,
291 "full": "summary first, followed by subtitles",
292 }
293 """
294 try:
295 # url to vid
296 info = await get_bilibili_vinfo(url_or_vid)
297 cid = info["cid"]
298 cookie = await bilibili_cookie_dict()
299 credential = Credential(sessdata=cookie["SESSDATA"])
300 v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
301 res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
302 # First, get subtitles
303 if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
304 final = await get_bilibili_subtitle(url_or_vid) # use `get_bilibili_subtitle`
305 subtitles = final.get("subtitles", "")
306 else:
307 subtitles = ""
308 for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
309 if item.get("content", ""):
310 subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
311 final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
312
313 # Then get AI summary
314 summary = ""
315 if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0: # has summary
316 summary += glom(res, "model_result.summary", default="")
317 outlines = glom(res, "model_result.outline", default=[])
318 for idx, outline in enumerate(outlines):
319 summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
320 for item in glom(outline, "part_outline", default=[]):
321 summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
322 if summary:
323 final["summary"] = summary.strip()
324 if summary and subtitles:
325 final["full"] = f"AI总结(B站版):\n{summary}\n\n\n外挂字幕(B站版):\n{subtitles.strip()}" # noqa: RUF001
326 except Exception as e:
327 logger.error(e)
328 return {"error": "下载B站AI总结失败"}
329 return final
330
331
332def make_bvid_clickable(texts: str) -> str:
333 """Make bvid in texts clickable.
334
335 "BV1234567890" -> [BV1234567890](https://www.bilibili.com/video/BV1234567890)
336
337 bvid format: https://github.com/SocialSisterYi/bilibili-API-collect/blob/18c1efb/docs/misc/bvid_desc.md
338 Args:
339 texts (str): The texts to process.
340
341 Returns:
342 str: bvid with markdown url.
343 """
344 if not texts:
345 return ""
346
347 def markdown_url(match):
348 if match.group(1): # full url
349 bvid = match.group(3)
350 return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
351 # bvid only
352 bvid = match.group(0)
353 return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
354
355 # match bilibili links or bvid only
356 pattern = r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(BV1[a-zA-Z0-9]{9})\b|\bBV1[a-zA-Z0-9]{9}\b"
357 return re.sub(pattern, markdown_url, texts)
358
359
360def bilibili_url2vid(url: str | int) -> str:
361 if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(url)): # noqa: RUF001
362 base_url = matched.group(0).split("?")[0]
363 return Path(base_url).stem
364
365 # already vid
366 return av2bv(url)