Commit 8286676
Changed files (2)
src
preview
src/preview/weibo.py
@@ -24,7 +24,7 @@ from messages.sender import send2tg
from messages.utils import summay_media
from networking import download_file, download_first_success_urls, download_media, hx_req
from others.emoji import emojify
-from utils import https_url, rand_string, soup_to_text, split_parts, true, ts_to_dt
+from utils import rand_string, soup_to_text, split_parts, true
async def preview_weibo(
@@ -49,9 +49,11 @@ async def preview_weibo(
weibo_comments_provider (str, optional): The weibo comments extractor: "free" or "false".
fallback (bool, optional): Fallback to other bots. Defaults to True.
"""
- if not post_id.startswith("weibovideo"):
- real_post_id = real_weibo_post_id(post_id)
- db_key = db_key.replace(post_id, real_post_id)
+ if post_id.startswith("weibovideo"):
+ post_id = await weibo_vid_to_postid(post_id)
+
+ real_post_id = real_weibo_post_id(post_id)
+ db_key = db_key.replace(post_id, real_post_id)
if kwargs.get("show_progress") and "progress" not in kwargs:
res = await send2tg(client, message, texts=f"šę£åØč§£ęå¾®åé¾ę„\n{url}", **kwargs)
kwargs["progress"] = res[0]
@@ -133,8 +135,6 @@ async def preview_weibo(
async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) -> dict: # type: ignore
info = {}
if not data:
- if post_id.startswith("weibovideo"):
- return await parse_weibo_video(post_id, **kwargs)
weibo_url = f"https://m.weibo.cn/detail/{post_id}"
logger.info(f"Weibo link preview for {weibo_url}")
headers = {"referer": "https://m.weibo.cn"}
@@ -211,44 +211,15 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
return info
-async def parse_weibo_video(post_id: str, **kwargs) -> dict:
- info = {}
- try:
- url = f"https://video.weibo.com/show?fid={post_id.removeprefix('weibovideo')}"
- api_url = f"{API.TIKHUB_WEIBO_VIDEO}{quote_plus(url)}"
- headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_kv={"data.msg": "succ"}, check_keys=["data.data.Component_Play_Playinfo"])
- if resp.get("hx_error"):
- info["error_msg"] = resp["hx_error"]
- return info
- data = resp["data"]["data"]["Component_Play_Playinfo"]
- urls = [https_url(x) for x in data.get("urls", {}).values()]
- info["media"] = [{"video": await download_first_success_urls(urls, suffix=".mp4", proxy=PROXY.WEIBO, headers={"user-agent": TELEGRAM_UA}, **kwargs)}]
- info["dt"] = ""
- if dt := ts_to_dt(data.get("real_date")):
- info["dt"] = f"{dt:%Y-%m-%d %H:%M:%S}"
- elif data.get("date"):
- info["dt"] = data.get("date")
- info["author"] = data.get("author", "")
- info["author_url"] = f"https://m.weibo.cn/u/{data['author_id']}" if data.get("author_id") else url # for weibo video, use author profile as author url
- if region := data.get("ip_info_str"): # maybe empty
- info["region"] = region.removeprefix("ååøäŗ").strip()
- if mid := data.get("mid"):
- info["comments"] = await parse_weibo_comments(post_id=mid)
- url = f"https://m.weibo.cn/detail/{mid}"
- texts = ""
- if title := data.get("title"):
- texts += f"\nš[{title}]({url})"
- if desc := data.get("text"):
- soup = BeautifulSoup(desc, "html.parser")
- texts += f"\n{soup_to_text(soup)}"
- info["texts"] = texts.strip()
- await modify_progress(text="ā
č§£ęęå, ę£åØå¤ē...", **kwargs)
- except Exception as e:
- msg = f"Weibo Video API failed: {e}"
- logger.error(msg)
- return {"error_msg": msg}
- return info
+@cache.memoize(ttl=120)
+async def weibo_vid_to_postid(post_id: str) -> str:
+ if not post_id.startswith("weibovideo"):
+ return ""
+ url = f"https://video.weibo.com/show?fid={post_id.removeprefix('weibovideo')}"
+ api_url = f"{API.TIKHUB_WEIBO_VIDEO}{quote_plus(url)}"
+ headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
+ resp = await hx_req(api_url, headers=headers, check_kv={"data.msg": "succ"}, check_keys=["data.data.Component_Play_Playinfo.mid"])
+ return str(glom(resp, "data.data.Component_Play_Playinfo.mid", default=""))
@cache.memoize(ttl=30)
@@ -331,6 +302,7 @@ def real_weibo_post_id(post_id: str) -> str:
Reference:
https://blog.csdn.net/steven30832/article/details/8292230
"""
+ post_id = str(post_id)
if post_id.isdigit():
return post_id
mapping = {c: i for i, c in enumerate("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")}
src/networking.py
@@ -311,7 +311,7 @@ async def match_social_media_link(text: str, *, flatten_first: bool = True) -> d
# https://video.weibo.com/show?fid=1034:5123779299311660
# https://h5.video.weibo.com/show/1034:5169532881535051
if matched := re.search(r"(https?://)?(h5\.)?video\.weibo\.(:?com|cn)/show(\?fid=|\/)(\d+):(\d+)", text):
- return {"post_id": f"weibovideo{matched.group(3)}:{matched.group(4)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}
+ return {"post_id": f"weibovideo{matched.group(5)}:{matched.group(6)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}
# https://weibo.com/tv/show/1034:5123779299311660?from=old_pc_videoshow
if matched := re.search(r"(https?://)?(www\.)?weibo\.(:?com|cn)/tv/show/(\d+):(\d+)", text):
return {"post_id": f"weibovideo{matched.group(4)}:{matched.group(5)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}