Commit e38427c
Changed files (3)
src
src/preview/bilibili.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import DB, PROXY, TELEGRAM_UA, cache
+from cookies import cookie_cloud_bilibili
+from database import get_db
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import summay_media
+from networking import download_file, download_media, hx_req
+from others.emoji import emojify
+from utils import ts_to_dt
+
+
+async def preview_bilibili(
+ client: Client,
+ message: Message,
+ url: str,
+ db_key: str = "",
+ post_id: str = "",
+ platform: str = "bilibili-opus",
+ **kwargs,
+):
+ """Preview bilibili info in the message.
+
+ This scripit is NOT for bilibili videos. For videos, see `ytdlp.py`
+
+ Args:
+ client (Client): The Pyrogram client.
+ message (Message): The trigger message object.
+ url (str, optional): bilibili link.
+ db_key (str, optional): The cache key.
+ post_id (str, optional): bilibili post ID
+ """
+ if kwargs.get("show_progress") and "progress" not in kwargs:
+ res = await send2tg(client, message, texts=f"🔗正在解析B站链接\n{url}", **kwargs)
+ kwargs["progress"] = res[0]
+ if kv := await get_db(db_key):
+ logger.debug(f"Bilibili preview {DB.ENGINE} cache hit for key={url}")
+ if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
+ return
+ await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
+ if platform == "bilibili-opus":
+ post_info = await parse_bilibili_opus(post_id, **kwargs)
+ if error_msg := post_info.get("error_msg"):
+ await modify_progress(text=f"❌B站解析失败: {error_msg}", force_update=True, **kwargs)
+ msg = ""
+ if author := post_info.get("author"):
+ msg += f"\n🅱️{author}"
+
+ if dt := post_info["dt"]:
+ msg += f"\n🕒{dt}"
+ if texts := post_info.get("texts"):
+ msg += f"\n📝[{texts}]({url})"
+
+ media = post_info.get("media", [])
+ sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, **kwargs)
+ await modify_progress(del_status=True, **kwargs)
+ await save_messages(messages=sent_messages, key=db_key)
+
+
+@cache.memoize(ttl=30)
+async def parse_bilibili_opus(post_id: str, **kwargs) -> dict: # type: ignore
+ cookie = await cookie_cloud_bilibili()
+ if not cookie:
+ return {"error_msg": "Bilibili cookie not found"}
+ url = f"https://api.bilibili.com/x/polymer/web-dynamic/v1/detail?id={post_id}"
+ try:
+ response = await hx_req(url, headers={"cookie": cookie}, check_keys=["data.item.modules"], check_kv={"code": 0}, max_retry=0, timeout=3)
+ except Exception:
+ logger.warning("Bilibili Opus API failed")
+ return {"error_msg": "Bilibili Opus API failed"}
+ info = {}
+ try:
+ data = response["data"]["item"]["modules"]
+ author_name = glom(data, "module_author.name", default="")
+ author_uid = glom(data, "module_author.mid", default="")
+ info["author"] = f"**[{author_name}](https://space.bilibili.com/{author_uid})**" if author_uid else f"**{author_name}**"
+
+ timestamp = glom(data, "module_author.pub_ts", default=0)
+ info["dt"] = f"{ts_to_dt(timestamp):%Y-%m-%d %H:%M:%S}" if timestamp else ""
+
+ info["texts"] = glom(data, "module_dynamic.desc.text", default="")
+
+ images = glom(data, "module_dynamic.major.draw.items.*.src", default=[])
+ media = [{"photo": download_file(photo_url, headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, **kwargs)} for photo_url in images]
+ await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
+ info["media"] = await download_media(media, **kwargs)
+ except Exception as e:
+ logger.warning(f"Bilibili Opus parse failed: {e}")
+ return {"error_msg": str(e)}
+ return info
src/handler.py
@@ -23,6 +23,7 @@ from others.extract_audio import extract_audio_file
from others.raw_img_file import convert_raw_img_file
from others.subtitle import get_subtitle
from permission import check_service
+from preview.bilibili import preview_bilibili
from preview.douyin import preview_douyin
from preview.instagram import preview_instagram
from preview.twitter import preview_twitter
@@ -192,6 +193,8 @@ async def handle_social_media(
await preview_weibo(client, message, **kwargs)
if xhs and matched["platform"] == "xiaohongshu":
await preview_xhs(client, message, **kwargs)
+ if matched["platform"].startswith("bilibili-"): # this is not bilibili video, for videos, use yt-dlp
+ await preview_bilibili(client, message, **kwargs)
try:
if ytdlp and any(matched["platform"] == x for x in ["bilibili", "youtube", "ytdlp"]):
await preview_ytdlp(client, message, **kwargs)
src/networking.py
@@ -331,6 +331,12 @@ async def match_social_media_link(text: str, *, flatten_first: bool = False) ->
url = f"https://www.bilibili.com/video/{bvid}?p={pid}".removesuffix("?p=1")
return {"url": url, "db_key": bare_url(url), "bvid": av2bv(bvid), "pid": pid, "platform": "bilibili"}
+ # https://m.bilibili.com/opus/1048442220384878593
+ if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/opus/(\d+)", text):
+ post_id = matched.group(3)
+ url = f"https://www.bilibili.com/opus/{post_id}"
+ return {"url": url, "db_key": url, "post_id": post_id, "platform": "bilibili-opus"}
+
# https://www.youtube.com/watch?v=D6aE2E0RHTc
if matched := re.search(r"(https?://)?(:?m\.|www\.)?youtube\.com/watch([^,,.。\s]+)", text):
queries = parse_qs(urlparse(matched.group(0)).query)
@@ -423,12 +429,14 @@ if __name__ == "__main__":
import asyncio
check_data(json.dumps({"foo": "bar", "baz": {"qux": "quux"}, "lst": ["1", "2", "3"]}), check_keys=["baz.qux"], check_kv={"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]})
- asyncio.run(match_social_media_link("https://www.facebook.com/share/r/19QGGp39T3/", flatten_first=True))
+ asyncio.run(match_social_media_link("https://b23.tv/3MSgT4q/", flatten_first=True))
+ # asyncio.run(match_social_media_link("https://www.facebook.com/share/r/19QGGp39T3/", flatten_first=True))
# asyncio.run(match_social_media_link("https://www.douyin.com/video/7398813386827468041"))
# asyncio.run(match_social_media_link("https://www.iesdouyin.com/share/note/7454527270925946138/"))
# asyncio.run(match_social_media_link("https://www.instagram.com/yifaer_chen/p/DEzv9x-vzOn/"))
# asyncio.run(flatten_rediercts("http://t.cn/A6ukIuVn"))
# asyncio.run(flatten_rediercts("shorturl.at/fuyrt"))
+ # asyncio.run(flatten_rediercts("https://b23.tv/3MSgT4q"))
# asyncio.run(flatten_rediercts("https://v.douyin.com/CeiJfJMQG/"))
# asyncio.run(flatten_rediercts("https://www.tiktok.com/t/ZT2mcMA7f/"))
# asyncio.run(flatten_rediercts("https://t.co/Wwo3x69CQz"))