Commit e482d85
Changed files (5)
src/preview/douyin.py
@@ -4,6 +4,7 @@ from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
+from glom import glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
@@ -79,19 +80,18 @@ async def preview_douyin(
await send_to_social_media_bridge(client, message, url, platform, **kwargs)
return
- aweme_id = data.get("aweme_id", Path(url).stem)
- if int(data.get("media_type", 4)) == 2: # image post
- media = [{"photo": download_first_success_urls(x.get("url_list", []), workers_proxy=True, **kwargs)} for x in data.get("images", [])]
+ aweme_id = glom(data, "aweme_id", default=Path(url).stem)
+ if int(glom(data, "media_type", default=4)) == 2: # image post
+ media = [{"photo": download_first_success_urls(glom(x, "url_list", default=[]), workers_proxy=True, **kwargs)} for x in glom(data, "images", default=[])]
else: # video post
- video = data.get("video", {})
video_urls = []
for key in ["play_addr_h264", "play_addr_265", "play_addr"]:
- video_urls.extend(video.get(key, {}).get("url_list", []))
+ video_urls.extend(glom(data, f"video.{key}.url_list", default=[]))
media = [{"video": download_first_success_urls(video_urls, suffix=".mp4", workers_proxy=True, **kwargs)}]
await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
media = await download_media(media, **kwargs)
texts = ""
- if author := data.get("author", {}).get("nickname", ""):
+ if author := glom(data, "author.nickname", default=""):
texts += f"\n🎶**[{author}]({url})**"
if ts := data.get("create_time"):
dt = datetime.fromtimestamp(ts).astimezone(ZoneInfo(TZ))
@@ -153,9 +153,11 @@ async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comm
return []
try:
for node in data:
- name = node.get("user", {}).get("nickname", "")
- text = node.get("text", "")
+ name = glom(node, "user.nickname", default="")
region = f"({node['ip_label']})" if node.get("ip_label") else ""
+ text = node.get("text", "")
+ if uid := glom(node, "user.sec_uid", default=""):
+ name = f"[{name}](https://www.{platform}.com/user/{uid})"
if name and text:
comments.append({"name": name, "text": emojify(text.strip()), "region": region})
except Exception as e:
src/preview/instagram.py
@@ -5,6 +5,7 @@ from datetime import datetime
from zoneinfo import ZoneInfo
from bs4 import BeautifulSoup
+from glom import glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
@@ -67,38 +68,37 @@ async def preview_instagram(
media = []
if data.get("video_url"): # reel
media.append({"video": download_file(data.get("video_url", ""), **kwargs)})
- elif media_nodes := data.get("edge_sidecar_to_children", {}).get("edges", []):
+ elif media_nodes := glom(data, "edge_sidecar_to_children.edges", default=[]):
for node in media_nodes:
- ftype = "photo" if not node.get("node", {}).get("is_video") else "video"
- media_url = node.get("node", {}).get("display_url", "") if ftype == "photo" else node.get("node", {}).get("video_url", "")
+ ftype = "photo" if not glom(node, "node.is_video", default=False) else "video"
+ media_url = glom(node, "node.display_url", default="") if ftype == "photo" else glom(node, "node.video_url", default="")
media.append({ftype: download_file(media_url, **kwargs)})
elif data.get("display_url"):
media.append({"photo": download_file(data.get("display_url"), **kwargs)})
texts = ""
- if fullname := data.get("owner", {}).get("full_name"):
+ if fullname := glom(data, "owner.full_name", default=""):
texts += f"🏞**[{fullname}]({url})**\n"
- if metadata_nodes := data.get("edge_media_to_caption", {}).get("edges"):
- if ts := metadata_nodes[0].get("node", {}).get("created_at"):
+ if metadata_node := glom(data, "edge_media_to_caption.edges.0", default=None):
+ if ts := glom(metadata_node, "node.created_at", default=0):
dt = datetime.fromtimestamp(float(ts)).astimezone(ZoneInfo(TZ))
create_time = f"{dt:%Y-%m-%d %H:%M:%S}"
texts += f"🕒{create_time}\n"
- if description := metadata_nodes[0].get("node", {}).get("text", ""):
+ if description := glom(metadata_node, "node.text", default=""):
texts += f"{description}\n"
-
# parse comments
comments: list[str] = []
if true(instagram_comments_provider):
- comment_nodes = data.get("edge_media_to_parent_comment", {}).get("edges", [])
- comment_nodes = sorted(comment_nodes, key=lambda x: x.get("node", {}).get("created_at", 0))
- comment_list = [{"author": node.get("node", {}).get("owner", {}).get("username", "user"), "text": node.get("node", {}).get("text", "")} for node in comment_nodes]
+ comment_nodes = glom(data, "edge_media_to_parent_comment.edges", default=[])
+ comment_nodes = sorted(comment_nodes, key=lambda x: glom(x, "node.created_at", default=0))
+ comment_list = [{"author": glom(node, "node.owner.username", default="user"), "text": glom(node, "node.text", default="")} for node in comment_nodes]
comment_list = [x for x in comment_list if x["text"]]
for idx, cmt in enumerate(comment_list):
cmt_text = cmt["text"].replace("\n", "\n> ")
if idx == 0:
comments.append("\n**> 💬**点此展开评论区**:")
- comments.append(f"\n> 💬**{cmt['author']}**: {cmt_text}")
+ comments.append(f"\n> 💬**[{cmt['author']}](https://www.instagram.com/{cmt['author']})**: {cmt_text}")
await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
media = await download_media(media, **kwargs)
src/preview/twitter.py
@@ -5,6 +5,7 @@ import re
from datetime import UTC, datetime
from zoneinfo import ZoneInfo
+from glom import glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
@@ -270,14 +271,18 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
threads = sorted(threads, key=lambda x: x.get("id", {}))
comments = []
for node in threads:
- comment_handle = node.get("author", {}).get("screen_name", "")
+ comment_handle = glom(node, "author.screen_name", default="")
+ if comment_post_id := node.get("id", ""):
+ comment_author = f"[{comment_handle}](https://x.com/{comment_handle}/status/{comment_post_id})"
+ else:
+ comment_author = f"[{comment_handle}](https://x.com/{comment_handle})"
comment_text = node.get("text", "").removeprefix(f"@{handle}")
comment_text = re.sub(r"https?://t\.co/\w+$", "", comment_text) # remove t.co link suffix
comment_text = await remove_tco_suffix(comment_text, post_id=node.get("id", ""))
comment_text = await flatten_rediercts(comment_text)
comment_text = comment_text.strip()
if comment_handle and comment_text:
- comments.append({"author": comment_handle, "text": comment_text, "post_id": node.get("id", "")})
+ comments.append({"author": comment_author, "text": comment_text})
info["comments"] = comments
info["quote_info"] = data.get("quoted", {})
src/preview/weibo.py
@@ -8,6 +8,7 @@ from urllib.parse import quote_plus
from zoneinfo import ZoneInfo
from bs4 import BeautifulSoup
+from glom import glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
@@ -164,9 +165,9 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
logger.trace(data)
media = []
for x in data.get("pics", []):
- pid = x.get("pid", rand_string())
- mtype = x.get("type", "photo")
- photo_url = x.get("large", {}).get("url", x.get("url"))
+ pid = glom(x, "pid", default=rand_string())
+ mtype = glom(x, "type", default="photo")
+ photo_url = glom(x, "large.url", default=x.get("url"))
video_url = x.get("videoSrc")
if mtype == "livephoto":
# media.append({"photo": download_file(photo_url, **kwargs)}) # main photo
@@ -180,8 +181,8 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
if video_urls := [videos.get(quality) for quality in ["mp4_720p_mp4", "mp4_hd_mp4", "mp4_ld_mp4"] if videos.get(quality)]:
# This maybe already downloaded by the above loop (for loop in data['pics'])
media.append({"video": download_first_success_urls(video_urls, skip_exist=True, suffix=".mp4", **kwargs)})
- info["post_id"] = data.get("id", post_id)
- info["author"] = data.get("user", {}).get("screen_name", "")
+ info["post_id"] = glom(data, "id", default=post_id)
+ info["author"] = glom(data, "user.screen_name", default="")
info["author_url"] = f"https://m.weibo.cn/detail/{post_id}" # for weibo post, use post url as author url
info["region"] = data.get("region_name", "").removeprefix("发布于").strip()
info["dt"] = ""
@@ -286,7 +287,11 @@ async def parse_weibo_comments(post_id: str) -> list[str]:
if not info.get("text"):
continue
cmt = ""
- if author := info.get("user", {}).get("screen_name"):
+ uid = glom(info, "user.id", default="")
+ author = glom(info, "user.screen_name", default="")
+ if author and uid:
+ cmt += f"💬**[{author}](https://weibo.com/u/{uid})**"
+ elif author:
cmt += f"💬**{author}**"
if region := info.get("source", "").removeprefix("来自"):
cmt += f"({region})"
src/preview/ytdlp.py
@@ -10,6 +10,7 @@ from pathlib import Path
from urllib.parse import quote_plus, unquote_plus, urlparse
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
+from glom import glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
@@ -481,8 +482,10 @@ async def get_bilibili_comments(bvid: str | None, provider: str = PROVIDER.BILIB
comments = []
try:
for idx, x in enumerate(data):
- name = x.get("member", {}).get("uname", "匿名")
- if cmt := x.get("content", {}).get("message"):
+ name = glom(x, "member.uname", default="匿名")
+ if uid := glom(x, "member.mid", default=""):
+ name = f"[{name}](https://space.bilibili.com/{uid})"
+ if cmt := glom(x, "content.message", default=""):
cmt = cmt.replace("\n", "\n> ")
if idx == 0:
comments.append("\n**> 💬**点此展开评论区**:")
@@ -490,8 +493,10 @@ async def get_bilibili_comments(bvid: str | None, provider: str = PROVIDER.BILIB
# replies of comments, free api only got 3 comments, so we add replies here
if provider == "free" and (replies := x.get("replies")):
for r in replies:
- name = r.get("member", {}).get("uname", "匿名")
- if cmt := r.get("content", {}).get("message"):
+ name = glom(r, "member.uname", default="匿名")
+ if uid := glom(r, "member.mid", default=""):
+ name = f"[{name}](https://space.bilibili.com/{uid})"
+ if cmt := glom(r, "content.message", default=""):
cmt = cmt.replace("\n", "\n> ")
comments.append(f"\n> ↪️**{name}**: {emojify(cmt)}")
except Exception as e:
@@ -514,9 +519,11 @@ async def get_youtube_comments(vid: str | None, provider: str = PROVIDER.YOUTUBE
return []
data = resp["items"]
for idx, x in enumerate(data):
- name = x.get("snippet", {}).get("topLevelComment", {}).get("snippet", {}).get("authorDisplayName", "匿名")
+ name = glom(x, "snippet.topLevelComment.snippet.authorDisplayName", default="匿名")
name = name.removeprefix("@")
- if cmt := x.get("snippet", {}).get("topLevelComment", {}).get("snippet", {}).get("textDisplay"):
+ if author_url := glom(x, "snippet.topLevelComment.snippet.authorChannelUrl", default=""):
+ name = f"[{name}]({author_url})"
+ if cmt := glom(x, "snippet.topLevelComment.snippet.textDisplay", default=""):
cmt = cmt.replace("\n", "\n> ")
if idx == 0:
comments.append("\n**> 💬**点此展开评论区**:")