main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from datetime import datetime
4from zoneinfo import ZoneInfo
5
6import yaml
7from bs4 import BeautifulSoup
8from glom import Coalesce, glom
9from loguru import logger
10from pyrogram.client import Client
11from pyrogram.types import Message
12
13from bridge.social import send_to_social_media_bridge
14from config import DB, PROVIDER, PROXY, TZ
15from database.database import get_db
16from messages.database import copy_messages_from_db, save_messages
17from messages.progress import modify_progress
18from messages.sender import send2tg
19from messages.utils import summay_media
20from networking import download_file, download_first_success_urls, download_media, hx_req
21from others.emoji import emojify
22from utils import true
23
24
25async def preview_xhs(
26 client: Client,
27 message: Message,
28 url: str = "",
29 db_key: str = "",
30 xsec: str = "",
31 *,
32 is_xhs_link: bool = False,
33 xhs_provider: str = PROVIDER.XHS,
34 show_author: bool = True,
35 show_title: bool = True,
36 show_pubdate: bool = True,
37 show_ip: bool = True,
38 show_statistics: bool = True,
39 show_description: bool = True,
40 **kwargs,
41):
42 """Preview xiaohongshu link in the message.
43
44 Args:
45 client (Client): The Pyrogram client.
46 message (Message): The trigger message object.
47 url (str, optional): xiaohongshu link
48 db_key (str, optional): The cache key.
49 xsec (str, optional): The xsec token.
50 is_xhs_link (bool, optional): Whether the link is a share link from APP.
51 xhs_provider (str, optional): The xiaohongshu provider.
52 """
53 if kwargs.get("show_progress") and "progress" not in kwargs:
54 res = await send2tg(client, message, texts=f"🔗正在解析小红书链接\n{url}", **kwargs)
55 kwargs["progress"] = res[0]
56 if kv := await get_db(db_key):
57 logger.debug(f"Xiaohongshu preview {DB.ENGINE} cache hit for key={db_key}")
58 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
59 return
60 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
61
62 if not is_xhs_link and "xsec_token" not in url:
63 msg = "链接格式错误: 缺少 xsec_token 参数, 请发送完整链接"
64 msg += "\n或者使用手机APP分享的链接 (xhslink.com域名)"
65 await modify_progress(text=msg, **kwargs)
66 return
67
68 logger.info(f"Xiaohongshu link preview for {url}")
69 xhs_info = await get_xhs_info(url)
70 note = xhs_info.get("note", {})
71 if not note:
72 if "bridge" in xhs_provider:
73 await modify_progress(text="❌小红书解析失败, 尝试第三方Bot...", **kwargs)
74 full_url = f"https://{db_key}?xsec_token={xsec}" if xsec else url
75 kwargs |= {"target_mid": message.id}
76 await send_to_social_media_bridge(client, message, full_url, **kwargs)
77 else:
78 await modify_progress(text="❌小红书解析失败, 请稍候再尝试", force_update=True, **kwargs)
79 return
80 await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
81 media: list[dict] = []
82 if note.get("type") == "video":
83 video_urls = [] # Extract all urls, but prefer H264
84 for vcodec in ["h264", "h265", "av1", "h266"]:
85 format_list = note.get("video", {}).get("media", {}).get("stream").get(vcodec, [])
86 for x in format_list:
87 if x.get("masterUrl"):
88 video_urls.append(x["masterUrl"])
89 if x.get("backupUrls"):
90 video_urls.extend(x.get("backupUrls", []))
91 media.append({"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
92 else:
93 for img_info in note.get("imageList", []):
94 img_url = img_info.get("urlDefault") or img_info.get("url") or ""
95 if img_info.get("livePhoto"):
96 video_urls = []
97 for vcodec in ["h264", "h265", "av1", "h266"]:
98 format_list = img_info.get("stream", {}).get(vcodec, [])
99 for x in format_list:
100 if x.get("masterUrl"):
101 video_urls.append(x["masterUrl"])
102 if x.get("backupUrls"):
103 video_urls.extend(x.get("backupUrls", []))
104 media.append({"livephoto": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
105 else:
106 media.append({"photo": download_file(img_url, suffix=".jpg", proxy=PROXY.XHS, stream=True, **kwargs)})
107
108 texts = ""
109 if true(show_author) and (author := glom(note, Coalesce("user.nickname", "user.nickName"), default="")):
110 texts += f"🍠[{author}]({url})\n"
111 if true(show_pubdate) and note.get("time"):
112 dt = datetime.fromtimestamp(float(note["time"]) / 1000).astimezone(ZoneInfo(TZ))
113 texts += f"🕒{dt:%Y-%m-%d %H:%M:%S}"
114 if true(show_ip) and note.get("ipLocation"):
115 texts += f"📍{note['ipLocation']}\n"
116 else:
117 texts += "\n"
118 if true(show_statistics) and xhs_info.get("statistics"):
119 texts += f"{xhs_info['statistics']}\n"
120 if true(show_title) and note.get("title"):
121 texts += f"📝**{note['title']}**\n"
122 desc = note.get("desc", "").replace("[话题]#", "")
123 if true(show_description):
124 texts += desc
125 comments = get_xhs_comments(xhs_info.get("soup")) # Not implemented yet
126 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
127 media = await download_media(media, **kwargs)
128 sent_messages = await send2tg(client, message, texts=emojify(texts), media=media, comments=comments, **kwargs)
129 await modify_progress(del_status=True, **kwargs)
130 await save_messages(messages=sent_messages, key=db_key)
131
132
133async def get_xhs_info(url: str, retry: int = 0, *, use_mobile: bool = False) -> dict:
134 """Get xiaohongshu post info.
135
136 XHS banned VPS IP, so we need to use residential proxy.
137 XHS has two different return formats base on User-Agent.
138 Some posts can only be accessed with mobile User-Agent. (I don't know why)
139 But images got from mobile has XHS watermark.
140 So we prefer to use desktop User-Agent.
141 """
142 headers = {"referer": "https://www.xiaohongshu.com/"}
143 if retry > 3:
144 logger.error(f"XHS parsing response failed after 3 retries: {url}")
145 return {}
146 data = {}
147 try:
148 resp = await hx_req(url, headers=headers, cookies=None, mobile=use_mobile, proxy=PROXY.XHS, rformat="text")
149 if not resp.get("text"):
150 logger.warning(f"XHS webpage not found: {url}, Retrying: {retry + 1}")
151 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
152 soup = BeautifulSoup(resp["text"], "html.parser")
153 data["soup"] = soup
154 script_info = next((str(x.text).removeprefix("window.__INITIAL_STATE__=") for x in soup.find_all("script") if str(x.text).startswith("window.__INITIAL_STATE__=")), "{}")
155 info = yaml.safe_load(script_info)
156 if not info:
157 logger.warning(f"XHS failed: {url}, Retrying: {retry + 1}")
158 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
159 except Exception as e:
160 logger.warning(f"XHS failed: {e}, Retrying: {retry + 1}")
161 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
162
163 # XHS has two different return formats
164 note = {}
165 if notes := glom(info, "note.noteDetailMap.*.note", default=[]):
166 note = notes[0]
167 if glom(info, "noteData.data.noteData", default={}):
168 note = glom(info, "noteData.data.noteData", default={})
169 if not note:
170 logger.warning(f"Parsed info has no post, Retrying: {retry + 1}")
171 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
172 statistics = ""
173 if like := glom(note, "interactInfo.likedCount", default=0):
174 statistics += f"❤️{like} "
175 if comment := glom(note, "interactInfo.commentCount", default=0):
176 statistics += f"💬{comment} "
177 if favorite := glom(note, "interactInfo.collectedCount", default=0):
178 statistics += f"⭐️{favorite} "
179 return data | {"note": note, "statistics": statistics.strip()}
180
181
182def get_xhs_comments(soup: BeautifulSoup | None) -> list[str]:
183 """Not implemented yet."""
184 if not soup:
185 return []
186 return []