Commit d3a986a

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-08-26 16:57:51
chore(social): add statistics display
1 parent c2b66c8
src/preview/douyin.py
@@ -23,7 +23,7 @@ from messages.sender import send2tg
 from messages.utils import summay_media
 from networking import download_file, download_first_success_urls, download_media, hx_req
 from others.emoji import emojify
-from utils import true
+from utils import readable_count, true
 
 
 async def preview_douyin(
@@ -80,6 +80,8 @@ async def preview_douyin(
     if ts := data.get("create_time"):
         dt = datetime.fromtimestamp(ts).astimezone(ZoneInfo(TZ))
         texts += f"\n🕒{dt:%Y-%m-%d %H:%M:%S}"
+    if statistics := data.get("statistics"):
+        texts += f"\n{statistics}"
     if decs := data.get("desc"):
         texts += f"\n{decs}"
 
@@ -135,12 +137,23 @@ async def parse_via_direct(url: str = "", platform: str = "douyin", proxy: str |
             if not media:
                 logger.warning(f"{platform} API [direct] media download failed")
                 return False, {}
+            statistics = ""
+            if like := glom(info, "statistics.digg_count", default=0):
+                statistics += f"❤️{readable_count(like)}"
+            if comment := glom(info, "statistics.comment_count", default=0):
+                statistics += f"💬{readable_count(comment)}"
+            if favorite := glom(info, "statistics.collect_count", default=0):
+                statistics += f"⭐️{readable_count(favorite)}"
+            if share := glom(info, "statistics.share_count", default=0):
+                statistics += f"↗️{readable_count(share)}"
+
             return True, {
                 "aweme_id": info.get("aweme_id", video_id),
                 "media": media,
                 "author": glom(info, "author.nickname", default=""),
                 "create_time": info.get("create_time"),
                 "desc": info.get("desc"),
+                "statistics": statistics,
             }
         logger.warning(f"{platform} API [direct] matched nothing")
     except Exception:
@@ -188,12 +201,23 @@ async def parse_via_tikhub(url: str = "", platform: str = "douyin", proxy: str |
         if not media:
             logger.warning(f"{platform} API [{provider}] media download failed")
             return False, {}
+        statistics = ""
+        if like := glom(info, "statistics.digg_count", default=0):
+            statistics += f"❤️{readable_count(like)}"
+        if comment := glom(info, "statistics.comment_count", default=0):
+            statistics += f"💬{readable_count(comment)}"
+        if favorite := glom(info, "statistics.collect_count", default=0):
+            statistics += f"⭐️{readable_count(favorite)}"
+        if share := glom(info, "statistics.share_count", default=0):
+            statistics += f"↗️{readable_count(share)}"
+
         return True, {
             "aweme_id": info.get("aweme_id", Path(url).stem),
             "media": media,
             "author": glom(info, "author.nickname", default=""),
             "create_time": info.get("create_time"),
             "desc": info.get("desc"),
+            "statistics": statistics,
         }
     except Exception:
         logger.warning(f"{platform} API [{provider}] failed")
src/preview/instagram.py
@@ -20,7 +20,7 @@ from messages.sender import send2tg
 from messages.utils import summay_media
 from multimedia import is_valid_video_or_audio, validate_img
 from networking import download_file, download_media, hx_req
-from utils import true
+from utils import readable_count, true
 
 
 async def preview_instagram(
@@ -67,7 +67,6 @@ async def preview_instagram(
         return
 
     data = resp["data"]
-
     # parse media
     media = []
     if data.get("video_url"):  # reel
@@ -80,6 +79,12 @@ async def preview_instagram(
     elif data.get("display_url"):
         media.append({"photo": download_file(data.get("display_url"), proxy=PROXY.INSTAGRAM, **kwargs)})
 
+    statistics = ""
+    if like := glom(data, "edge_media_preview_like.count", default=0):
+        statistics += f"❤️{readable_count(like)}"
+    if comment := glom(data, "edge_media_to_parent_comment.count", default=0):
+        statistics += f"💬{readable_count(comment)}"
+
     texts = ""
     if fullname := glom(data, "owner.full_name", default=""):
         texts += f"🏞**[{fullname}]({url})**\n"
@@ -89,6 +94,8 @@ async def preview_instagram(
             dt = datetime.fromtimestamp(float(ts)).astimezone(ZoneInfo(TZ))
             create_time = f"{dt:%Y-%m-%d %H:%M:%S}"
             texts += f"🕒{create_time}\n"
+        if statistics:
+            texts += f"{statistics}\n"
         if description := glom(metadata_node, "node.text", default=""):
             texts += f"{description}\n"
     # parse comments
src/preview/twitter.py
@@ -19,7 +19,7 @@ from messages.progress import modify_progress
 from messages.sender import send2tg
 from messages.utils import summay_media
 from networking import download_file, download_media, flatten_rediercts, hx_req
-from utils import remove_none_values, split_parts, true
+from utils import readable_count, remove_none_values, split_parts, true
 
 
 async def preview_twitter(
@@ -152,6 +152,8 @@ async def preview_twitter(
             msg += f"\n🕒{time_str}"
         if device := master_info.get("device"):
             msg += f"📱{device}"
+        if statistics := master_info.get("statistics"):
+            msg += f"\n{statistics}"
         if part_strs["first"]:
             msg += f"\n🏞{part_strs['first']}属于主推"
         if texts := master_info.get("texts"):
@@ -176,9 +178,10 @@ async def preview_twitter(
 
     if time_str := this_info.get("time"):
         msg += f"\n🕒{time_str}"
-
     if device := this_info.get("device"):
         msg += f"📱{device}"
+    if statistics := this_info.get("statistics"):
+        msg += f"\n{statistics}"
     if part_strs["middle"] and (this_info["has_master"] or this_info["has_quote"]):  # 当有supp_info时, 附加图片数量说明
         msg += f"\n🏞{part_strs['middle']}属于{this_tweet_type}"
 
@@ -200,13 +203,12 @@ async def preview_twitter(
         msg += "\n🔁**本推文还引用下述推文:**"
         if author := quote_info.get("author"):
             msg += f"\n🕊[{author}]({quote_x_url})"
-
         if time_str := quote_info.get("time"):
             msg += f"\n🕒{time_str}"
-
         if device := quote_info.get("device"):
             msg += f"📱{device}"
-
+        if statistics := quote_info.get("statistics"):
+            msg += f"\n{statistics}"
         if part_strs["last"]:
             msg += f"\n🏞{part_strs['last']}属于引推"
 
@@ -307,6 +309,16 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
         if comment_handle and comment_text:
             comments.append({"author": comment_author, "text": comment_text, "post_id": comment_post_id})
 
+    statistics = ""
+    if view := glom(data, "views", default=0):
+        statistics += f"👁{readable_count(view)}"
+    if like := glom(data, "likes", default=0):
+        statistics += f"❤️{readable_count(like)}"
+    if comment := glom(data, "replies", default=0):
+        statistics += f"💬{readable_count(comment)}"
+    if share := glom(data, "retweets", default=0):
+        statistics += f"🔁{readable_count(share)}"
+    info["statistics"] = statistics
     info["comments"] = comments
     info["quote_info"] = glom(data, "quoted", default={}) or {}
     info["has_quote"] = bool(info["quote_info"])
@@ -347,7 +359,16 @@ async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id:
                 mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
             x["url"] = mp4_url or m3u8_url
         x["id"] = x["url"]  # record media "id" for de-duplication
-
+    statistics = ""
+    if view := glom(data, "views", default=0):
+        statistics += f"👁{readable_count(view)}"
+    if like := glom(data, "likes", default=0):
+        statistics += f"❤️{readable_count(like)}"
+    if comment := glom(data, "replies", default=0):
+        statistics += f"💬{readable_count(comment)}"
+    if share := glom(data, "retweets", default=0):
+        statistics += f"🔁{readable_count(share)}"
+    info["statistics"] = statistics
     info["media"] = media
     info["author"] = glom(data, "author.name", default="")
     if ts := data.get("created_timestamp", ""):
@@ -393,6 +414,16 @@ async def get_tweet_info_via_vxtwitter(url: str = "", handle: str = "", post_id:
         x["id"] = x.get("url", "")  # record media "id" for de-duplication
         if x.get("type", "") == "image":  # change `image` -> `photo`
             x["type"] = "photo"
+    statistics = ""
+    if view := glom(data, "views", default=0):
+        statistics += f"👁{readable_count(view)}"
+    if like := glom(data, "likes", default=0):
+        statistics += f"❤️{readable_count(like)}"
+    if comment := glom(data, "replies", default=0):
+        statistics += f"💬{readable_count(comment)}"
+    if share := glom(data, "retweets", default=0):
+        statistics += f"🔁{readable_count(share)}"
+    info["statistics"] = statistics
     info["media"] = media
     info["author"] = data.get("user_name", f"@{info['handle']}")
     if ts := data.get("date_epoch", 0):
src/preview/weibo.py
@@ -24,7 +24,7 @@ from messages.sender import send2tg
 from messages.utils import summay_media
 from networking import download_file, download_first_success_urls, download_media, hx_req
 from others.emoji import emojify
-from utils import rand_string, soup_to_text, split_parts, true
+from utils import rand_string, readable_count, soup_to_text, split_parts, true
 
 
 async def preview_weibo(
@@ -93,6 +93,10 @@ async def preview_weibo(
 
     if device := this_info.get("device"):
         msg += f"\n📱{device}"
+
+    if statistics := this_info.get("statistics"):
+        msg += f"\n{statistics}"
+
     if part_strs["first"] and quote_info:  # 当有quote_info时, 附加图片数量说明:
         msg += f"\n🏞{part_strs['first']}属于本帖"
 
@@ -114,6 +118,9 @@ async def preview_weibo(
         if device := quote_info.get("device"):
             msg += f"\n📱{device}"
 
+        if statistics := quote_info.get("statistics"):
+            msg += f"\n{statistics}"
+
         if part_strs["last"]:
             msg += f"\n🏞{part_strs['last']}属于转帖"
 
@@ -156,7 +163,7 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
                 logger.error(f"Weibo API response cannot be parsed: {matched.group(1)}")
                 info["error_msg"] = "Weibo API response cannot be parsed"
                 return info
-            data: dict = json_data[0].get("status", {})
+            data: dict = glom(json_data, "0.status", default={}) or {}
             data["text"] = soup_to_text(soup=BeautifulSoup(data.get("text", ""), "html.parser"))
             await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
         except Exception as e:
@@ -185,6 +192,15 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
         if video_urls := [videos.get(quality) for quality in ["mp4_720p_mp4", "mp4_hd_mp4", "mp4_ld_mp4"] if videos.get(quality)]:
             # This maybe already downloaded by the above loop (for loop in data['pics'])
             media.append({"video": download_first_success_urls(video_urls, skip_exist=True, suffix=".mp4", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
+
+    statistics = ""
+    if like := glom(data, "attitudes_count", default=0):
+        statistics += f"👍{readable_count(like)}"
+    if comment := glom(data, "comments_count", default=0):
+        statistics += f"💬{readable_count(comment)}"
+    if share := glom(data, "reposts_count", default=0):
+        statistics += f"↗️{readable_count(share)}"
+
     info["post_id"] = glom(data, "id", default=post_id)
     info["author"] = glom(data, "user.screen_name", default="")
     info["author_url"] = f"https://m.weibo.cn/detail/{post_id}"  # for weibo post, use post url as author url
@@ -196,6 +212,7 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
     info["device"] = data.get("source", "")
     info["texts"] = soup_to_text(BeautifulSoup(data.get("text", ""), "html.parser"))
     info["reply_data"] = data.get("retweeted_status", {})
+    info["statistics"] = statistics
     await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
     media = await download_media(media, **kwargs)
     # de-duplicate media
src/preview/xiaohongshu.py
@@ -5,6 +5,7 @@ from zoneinfo import ZoneInfo
 
 import yaml
 from bs4 import BeautifulSoup
+from glom import glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.types import Message
@@ -110,6 +111,8 @@ async def preview_xhs(
             texts += f"📍{region}\n"
         else:
             texts += "\n"
+    if statistics := xhs_info.get("statistics"):
+        texts += f"{statistics}\n"
     if title := note.get("title", ""):
         texts += f"📝**{title}**\n"
     desc = note.get("desc", "").replace("[话题]#", "")
@@ -153,15 +156,22 @@ async def get_xhs_info(url: str, retry: int = 0, *, use_mobile: bool = False) ->
         return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
 
     # XHS has two different return formats
-    if notes := list(info.get("note", {}).get("noteDetailMap", {}).values()):
-        data["note"] = notes[0].get("note", {})
-        if data["note"]:
-            return data
-    if note := info.get("noteData", {}).get("data", {}).get("noteData", {}):
-        data["note"] = note
-        return data
-    logger.error(f"Parsed info has no post, Retrying: {retry + 1}")
-    return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
+    note = {}
+    if notes := glom(info, "note.noteDetailMap.*.note", default=[]):
+        note = notes[0]
+    if glom(info, "noteData.data.noteData", default={}):
+        note = glom(info, "noteData.data.noteData", default={})
+    if not note:
+        logger.warning(f"Parsed info has no post, Retrying: {retry + 1}")
+        return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
+    statistics = ""
+    if like := glom(note, "interactInfo.likedCount", default=0):
+        statistics += f"❤️{like} "
+    if comment := glom(note, "interactInfo.commentCount", default=0):
+        statistics += f"💬{comment} "
+    if favorite := glom(note, "interactInfo.collectedCount", default=0):
+        statistics += f"⭐️{favorite} "
+    return data | {"note": note, "statistics": statistics.strip()}
 
 
 def get_xhs_comments(soup: BeautifulSoup | None) -> list[str]:
src/ytdlp/main.py
@@ -125,7 +125,7 @@ async def preview_ytdlp(
             subtitle_msg = None
             subtitle_target = ytdlp_subtitle_target or kwargs.get("target_chat") or message.chat.id
             if len(subtitles) > TEXT_LENGTH or true(subtitle_force_file):
-                caption = f"{captions['caption']}\n#️⃣字符数: {count_subtitles(subtitles)}\n⏳阅读时长: {readable_time(60 * count_subtitles(subtitles) / READING_SPEED)}"
+                caption = f"{captions['caption_without_comments']}\n#️⃣字符数: {count_subtitles(subtitles)}\n⏳阅读时长: {readable_time(60 * count_subtitles(subtitles) / READING_SPEED)}"
                 if true(to_telegraph):
                     html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
                     if telegraph_url := await publish_telegraph(title=info["title"], html=html, author=info["author"], url=url):
@@ -217,9 +217,10 @@ async def generate_captions(info: dict, url: str, platform: str, vid: str, bvid:
     for comment in comment_list:
         if await count_without_entities(f"{captions}{comment}") < CAPTION_LENGTH - 15:  # leave some margin for other info
             comments += comment
-    results["comments"] = comments.strip()
-    captions = f"{captions}{comments.strip()}"
-    results["caption"] = captions
+    comments = comments.strip()
+    results["comments"] = comments
+    results["caption_without_comments"] = captions.strip()
+    results["caption"] = f"{captions}{comments}".strip()
     return results
 
 
src/utils.py
@@ -287,7 +287,9 @@ def readable_count(num: int | str) -> str:
     count = to_int(num)
     if not isinstance(count, int):
         return str(num)
-    if count > 10000:
+    if count >= 100000:
+        return f"{count // 10000}万"
+    if count >= 10000:
         m, n = divmod(count, 10000)
         return f"{m}万" if n < 1000 else f"{m}.{n // 1000}万"
     return str(count)