main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""This file contains the code for extracting information from Bilibili videos.
4
5But not for downloading Bilibili videos.
6For downloading Bilibili videos, please see `src/preview/ytdlp.py`.
7"""
8
9import re
10from datetime import datetime
11from pathlib import Path
12from zoneinfo import ZoneInfo
13
14from bilibili_api import ApiException, Credential, comment, opus, video
15from glom import Coalesce, flatten, glom
16from loguru import logger
17from pyrogram.client import Client
18from pyrogram.types import Message
19
20from config import DB, READING_SPEED, TZ, cache
21from cookies import bilibili_cookie_dict
22from database.database import get_db
23from messages.database import copy_messages_from_db, save_messages
24from messages.progress import modify_progress
25from messages.sender import send2tg
26from messages.utils import blockquote, summay_media
27from networking import download_file, download_media, hx_req
28from others.emoji import emojify
29from utils import av2bv, count_subtitles, https_url, number_to_emoji, readable_count, seconds_to_hms, ts_to_dt
30
31
32async def preview_bilibili(
33 client: Client,
34 message: Message,
35 url: str,
36 db_key: str = "",
37 post_id: str = "",
38 platform: str = "bilibili-opus",
39 **kwargs,
40):
41 """Preview bilibili info in the message.
42
43 Args:
44 client (Client): The Pyrogram client.
45 message (Message): The trigger message object.
46 url (str, optional): bilibili link.
47 db_key (str, optional): The cache key.
48 post_id (str, optional): bilibili post ID
49 """
50 if kwargs.get("show_progress") and "progress" not in kwargs:
51 res = await send2tg(client, message, texts=f"🔗正在解析B站链接\n{url}", **kwargs)
52 kwargs["progress"] = res[0]
53 if kv := await get_db(db_key):
54 logger.debug(f"Bilibili preview {DB.ENGINE} cache hit for key={url}")
55 if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
56 return
57 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
58 if platform == "bilibili-opus":
59 post_info = await parse_bilibili_opus(post_id, **kwargs)
60 else:
61 msg = f"Unsupported platform: {platform}"
62 raise RuntimeError(msg)
63 if error_msg := post_info.get("error_msg"):
64 await modify_progress(text=f"❌B站解析失败: {error_msg}", force_update=True, **kwargs)
65 msg = ""
66 if author := post_info.get("author"):
67 msg += f"\n🅱️{author}"
68
69 if dt := post_info.get("dt"):
70 msg += f"\n🕒{dt}"
71 if title := post_info.get("title"):
72 msg += f"\n📝[{title}]({url})"
73
74 if texts := post_info.get("texts"):
75 msg += f"\n{texts}"
76
77 media = post_info.get("media", [])
78 sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, **kwargs)
79 await modify_progress(del_status=True, **kwargs)
80 await save_messages(messages=sent_messages, key=db_key)
81
82
83@cache.memoize(ttl=30)
84async def parse_bilibili_opus(post_id: str, **kwargs) -> dict:
85 try:
86 op = opus.Opus(int(post_id))
87 resp = await op.get_info()
88 except Exception:
89 logger.warning("Bilibili Opus API failed")
90 return {"error_msg": "Bilibili Opus API failed"}
91 info = {}
92 media = []
93 texts = ""
94 try:
95 modules = glom(resp, "item.modules", default=[])
96 if banner_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TOP"), None):
97 img_urls = glom(banner_module, "module_top.display.album.pics.*.url", default=[])
98 media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
99
100 if title_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_TITLE"), None):
101 info["title"] = glom(title_module, "module_title.text", default="")
102
103 if author_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_AUTHOR"), None):
104 author_name = glom(author_module, "module_author.name", default="B站用户")
105 author_uid = glom(author_module, "module_author.mid", default="")
106 info["author"] = f"**[{author_name}](https://space.bilibili.com/{author_uid})**" if author_uid else f"**{author_name}**"
107 timestamp = glom(author_module, "module_author.pub_ts", default=0)
108 info["dt"] = f"{ts_to_dt(timestamp):%Y-%m-%d %H:%M:%S}" if timestamp else ""
109
110 if content_module := next((module for module in modules if module.get("module_type") == "MODULE_TYPE_CONTENT"), None):
111 for paragraph in glom(content_module, "module_content.paragraphs", default=[]):
112 img_urls = glom(paragraph, "pic.pics.*.url", default=[])
113 media.extend([{"photo": download_file(img_url)} for img_url in img_urls])
114 for piece in glom(paragraph, "text.nodes", default=[]):
115 if words := glom(piece, "word.words", default=""):
116 texts += words
117 elif rich_text := glom(piece, "rich.text", default=""):
118 texts += rich_text
119 texts += "\n"
120
121 if media:
122 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
123 info["media"] = await download_media(media, **kwargs)
124 info["texts"] = texts.strip()
125 except Exception as e:
126 logger.warning(f"Bilibili Opus parse failed: {e}")
127 return {"error_msg": str(e)}
128 return info
129
130
131@cache.memoize(ttl=120)
132async def get_bilibili_vinfo(url_or_vid: int | str) -> dict:
133 """Get Bilibili video info.
134
135 Returns:
136 {
137 "downloadable": (bool),
138 "error_msg": (str),
139 "title": (str),
140 "description": (str),
141 "author": (str),
142 "channel": (str) channel url,
143 "pubdate": (str)
144 "duration": (int) in seconds,
145 "upload_date": (str)
146 "view_count": (int),
147 "like_count": (int),
148 "favorite_count": (int),
149 "coin_count": (int),
150 "comment_count": (int),
151 "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
152 "emoji": (str) "🅱️"
153 }
154
155 """
156 if not url_or_vid:
157 return {"downloadable": False, "error_msg": "❌未提供VideoID"}
158 info = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
159 try:
160 logger.info(f"Fetch Bilibili video info for {url_or_vid}")
161 vid = bilibili_url2vid(url_or_vid)
162 v = video.Video(bvid=av2bv(vid))
163 info = await v.get_info()
164 info["title"] = info.get("title", "Title")
165 info["description"] = glom(info, Coalesce("desc", "desc_v2.0.raw_text", default=""))
166 info["author"] = glom(info, "owner.name", default="B站UP主")
167 info["channel"] = f"https://space.bilibili.com/{glom(info, 'owner.mid', default='')}"
168 info["pubdate"] = datetime.fromtimestamp(info["pubdate"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
169 info["upload_date"] = datetime.fromtimestamp(info["ctime"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
170 info["duration"] = int(info.get("duration", 0))
171 # statistics
172 info |= {
173 "view_count": int(glom(info, "stat.view", default=0)),
174 "like_count": int(glom(info, "stat.like", default=0)),
175 "favorite_count": int(glom(info, "stat.favorite", default=0)),
176 "coin_count": int(glom(info, "stat.coin", default=0)),
177 "comment_count": int(glom(info, "stat.reply", default=0)),
178 }
179 statistics = ""
180 if view := info.get("view_count"):
181 statistics += f"👁{readable_count(view)}"
182 if like := info.get("like_count"):
183 statistics += f"👍{readable_count(like)}"
184 if coin := info.get("coin_count"):
185 statistics += f"🪙{readable_count(coin)}"
186 if favorite := info.get("favorite_count"):
187 statistics += f"⭐️{readable_count(favorite)}"
188 if comment := info.get("comment_count"):
189 statistics += f"💬{readable_count(comment)}"
190 info["statistics"] = statistics
191
192 info |= {"downloadable": True, "error_msg": ""}
193
194 except ApiException as e:
195 logger.error(f"Failed to get video info: {e}")
196 return {"downloadable": False, "error_msg": "❌" + str(e.msg)}
197 except Exception as e:
198 logger.error(f"Failed to get video info: {e}")
199 return info
200 return info | {"emoji": "🅱️"}
201
202
203async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
204 """(Depracated) Get Bilibili subtitle.
205
206 This function is deprecated, it only returns the subtitle url,
207 We need to download it from the url and parse it.
208 Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
209
210 Returns:
211 dict: {
212 "subtitles": "[minute:second] texts",
213 "num_chars": len(texts),
214 "reading_minutes": 2,
215 }
216 """
217 try:
218 # url to vid
219 info = await get_bilibili_vinfo(url_or_vid)
220 cid = info["cid"]
221 cookie = await bilibili_cookie_dict()
222 credential = Credential(sessdata=cookie["SESSDATA"])
223 v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
224 res = await v.get_subtitle(cid=cid)
225 if subtitles := res.get("subtitles", []):
226 subtitle_url = next((x.get("subtitle_url", "") for x in subtitles if "中文" in x.get("lan_doc", "")), "")
227 data = await hx_req(https_url(subtitle_url), check_keys=["body"])
228 items = data["body"]
229 sentences = []
230 num_chars = 0
231 for subtitle in items:
232 sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
233 num_chars += len(subtitle["content"])
234 return {
235 "subtitles": "\n".join(sentences),
236 "num_chars": num_chars,
237 "reading_minutes": num_chars / READING_SPEED,
238 }
239 except Exception as e:
240 logger.error(e)
241 return {"error": "下载B站内嵌字幕失败"}
242
243
244async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
245 """Get Bilibili comments."""
246 comments = []
247 try:
248 # url to vid
249 cookie = await bilibili_cookie_dict()
250 credential = Credential(sessdata=cookie["SESSDATA"])
251 info = await get_bilibili_vinfo(url_or_vid)
252 response = await comment.get_comments_lazy(oid=info["aid"], type_=comment.CommentResourceType.VIDEO, order=comment.OrderType.LIKE, credential=credential)
253 data = response.get("replies", [])
254 data = sorted(data, key=lambda x: x.get("like", 0), reverse=True)
255 except Exception as e:
256 logger.error(f"Failed to get Bilibili comments: {e}")
257 return []
258 try:
259 for idx, x in enumerate(data):
260 name = glom(x, "member.uname", default="匿名")
261 if uid := glom(x, "member.mid", default=""):
262 name = f"[{name}](https://space.bilibili.com/{uid})"
263 location = glom(x, "reply_control.location", default="").removeprefix("IP属地:") # noqa: RUF001
264 location = f"({location})" if location else ""
265 if cmt := glom(x, "content.message", default=""):
266 if idx == 0:
267 comments.append(f"\n{blockquote('💬**点此展开评论区**:')}")
268 cmt = f"💬**{name}**{location}: {emojify(cmt)}"
269 comments.append(f"\n{blockquote(cmt)}")
270 except Exception as e:
271 logger.error(f"Failed to get Bilibili comments: {e}")
272 return []
273 return comments
274
275
276async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
277 """Get Bilibili subtitles and AI summary.
278
279 Returns:
280 dict: {
281 "summary": "AI summary texts",
282 "subtitles": "[minute:second] texts",
283 "num_chars": len(texts),
284 "reading_minutes": 2,
285 "full": "summary first, followed by subtitles",
286 }
287 """
288 try:
289 # url to vid
290 info = await get_bilibili_vinfo(url_or_vid)
291 cid = info["cid"]
292 cookie = await bilibili_cookie_dict()
293 credential = Credential(sessdata=cookie["SESSDATA"])
294 v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
295 res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
296 # First, get subtitles
297 if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
298 final = await get_bilibili_subtitle(url_or_vid) # use `get_bilibili_subtitle`
299 subtitles = final.get("subtitles", "")
300 else:
301 subtitles = ""
302 for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
303 if item.get("content", ""):
304 subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
305 final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
306
307 # Then get AI summary
308 summary = ""
309 if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0: # has summary
310 summary += glom(res, "model_result.summary", default="")
311 outlines = glom(res, "model_result.outline", default=[])
312 for idx, outline in enumerate(outlines):
313 summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
314 for item in glom(outline, "part_outline", default=[]):
315 summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
316 if summary:
317 final["summary"] = summary.strip()
318 if summary and subtitles:
319 final["full"] = f"AI总结(B站版):\n{summary}\n\n\n外挂字幕(B站版):\n{subtitles.strip()}" # noqa: RUF001
320 except Exception as e:
321 logger.error(e)
322 return {"error": "下载B站AI总结失败"}
323 return final
324
325
326def make_bvid_clickable(texts: str) -> str:
327 """Make bvid in texts clickable.
328
329 "BV1234567890" -> [BV1234567890](https://www.bilibili.com/video/BV1234567890)
330
331 bvid format: https://github.com/SocialSisterYi/bilibili-API-collect/blob/18c1efb/docs/misc/bvid_desc.md
332 Args:
333 texts (str): The texts to process.
334
335 Returns:
336 str: bvid with markdown url.
337 """
338 if not texts:
339 return ""
340
341 def markdown_url(match):
342 if match.group(1): # full url
343 bvid = match.group(3)
344 return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
345 # bvid only
346 bvid = match.group(0)
347 return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
348
349 # match bilibili links or bvid only
350 pattern = r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(BV1[a-zA-Z0-9]{9})\b|\bBV1[a-zA-Z0-9]{9}\b"
351 return re.sub(pattern, markdown_url, texts)
352
353
354def bilibili_url2vid(url: str | int) -> str:
355 if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(url)): # noqa: RUF001
356 base_url = matched.group(0).split("?")[0]
357 return Path(base_url).stem
358
359 # already vid
360 return av2bv(url)