main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import json
4from datetime import datetime
5from pathlib import Path
6from zoneinfo import ZoneInfo
7
8import yaml
9from bs4 import BeautifulSoup
10from glom import Coalesce, glom
11from loguru import logger
12from pyrogram.client import Client
13from pyrogram.types import Message
14
15from ai.utils import trim_none
16from bridge.social import send_to_social_media_bridge
17from config import AI, PROVIDER, PROXY, TZ
18from database.r2 import get_cf_r2
19from messages.database import copy_messages_from_db, save_messages
20from messages.progress import modify_progress
21from messages.sender import send2tg
22from messages.utils import summay_media
23from networking import download_file, download_first_success_urls, download_media, hx_req
24from others.emoji import emojify
25from preview.utils import add_summary_url
26from summarize.summarize import summarize
27from utils import nowstr, true
28
29
30async def preview_xhs(
31 client: Client,
32 message: Message,
33 url: str = "",
34 db_key: str = "",
35 xsec: str = "",
36 *,
37 is_xhs_link: bool = False,
38 xhs_provider: str = PROVIDER.XHS,
39 summary_xhs: bool = False,
40 summary_xhs_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
41 show_author: bool = True,
42 show_title: bool = True,
43 show_pubdate: bool = True,
44 show_ip: bool = True,
45 show_statistics: bool = True,
46 show_description: bool = True,
47 **kwargs,
48):
49 """Preview xiaohongshu link in the message.
50
51 Args:
52 client (Client): The Pyrogram client.
53 message (Message): The trigger message object.
54 url (str, optional): xiaohongshu link
55 db_key (str, optional): The cache key.
56 xsec (str, optional): The xsec token.
57 is_xhs_link (bool, optional): Whether the link is a share link from APP.
58 xhs_provider (str, optional): The xiaohongshu provider.
59 """
60 if kv := await get_cf_r2(db_key):
61 logger.debug(f"Xiaohongshu preview cache hit for key={db_key}")
62 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
63 return
64 logger.warning("❌从缓存中转发失败, 尝试重新解析...")
65 if kwargs.get("show_progress") and "progress" not in kwargs:
66 res = await send2tg(client, message, texts=f"🔗正在解析小红书链接\n{url}", **kwargs)
67 kwargs["progress"] = res[0]
68
69 if not is_xhs_link and "xsec_token" not in url:
70 msg = "链接格式错误: 缺少 xsec_token 参数, 请发送完整链接"
71 msg += "\n或者使用手机APP分享的链接 (xhslink.com域名)"
72 await modify_progress(text=msg, **kwargs)
73 return
74
75 logger.info(f"Xiaohongshu link preview for {url}")
76 xhs_info = await get_xhs_info(url)
77 note = xhs_info.get("note", {})
78 if not note:
79 if "bridge" in xhs_provider:
80 await modify_progress(text="❌小红书解析失败, 尝试第三方Bot...", **kwargs)
81 full_url = f"https://{db_key}?xsec_token={xsec}" if xsec else url
82 kwargs |= {"target_mid": message.id}
83 await send_to_social_media_bridge(client, message, full_url, **kwargs)
84 else:
85 await modify_progress(text="❌小红书解析失败, 请稍候再尝试", force_update=True, **kwargs)
86 return
87 await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
88 note["url"] = url
89 media: list[dict] = []
90 if note.get("type") == "video":
91 video_urls = [] # Extract all urls, but prefer H264
92 for vcodec in ["h264", "h265", "av1", "h266"]:
93 format_list = note.get("video", {}).get("media", {}).get("stream").get(vcodec, [])
94 for x in format_list:
95 if x.get("masterUrl"):
96 video_urls.append(x["masterUrl"])
97 if x.get("backupUrls"):
98 video_urls.extend(x.get("backupUrls", []))
99 media.append({"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
100 else:
101 for img_info in note.get("imageList", []):
102 img_url = img_info.get("urlDefault") or img_info.get("url") or ""
103 if img_info.get("livePhoto"):
104 video_urls = []
105 for vcodec in ["h264", "h265", "av1", "h266"]:
106 format_list = img_info.get("stream", {}).get(vcodec, [])
107 for x in format_list:
108 if x.get("masterUrl"):
109 video_urls.append(x["masterUrl"])
110 if x.get("backupUrls"):
111 video_urls.extend(x.get("backupUrls", []))
112 media.append({"livephoto": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
113 else:
114 media.append({"photo": download_file(img_url, suffix=".jpg", proxy=PROXY.XHS, stream=True, **kwargs)})
115
116 texts = ""
117 if true(show_author) and (author := glom(note, Coalesce("user.nickname", "user.nickName"), default="")):
118 texts += f"🍠**[{author}]({url})**\n"
119 if true(show_pubdate) and note.get("time"):
120 dt = datetime.fromtimestamp(float(note["time"]) / 1000).astimezone(ZoneInfo(TZ))
121 texts += f"🕒{dt:%Y-%m-%d %H:%M:%S}\n"
122 if true(show_statistics) and xhs_info.get("statistics"):
123 texts += f"{xhs_info['statistics']}"
124 if true(show_ip) and note.get("ipLocation"):
125 texts += f"📍{note['ipLocation']}\n"
126 else:
127 texts += "\n"
128 if true(show_title) and note.get("title"):
129 texts += f"📝**{note['title']}**\n"
130 desc = note.get("desc", "").replace("[话题]#", "")
131 if true(show_description):
132 texts += desc
133 comments = get_xhs_comments(xhs_info.get("soup")) # Not implemented yet
134 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
135 media = await download_media(media, **kwargs)
136 sent_messages = await send2tg(client, message, texts=emojify(texts), media=media, comments=comments, keep_file=True, **kwargs)
137 await modify_progress(del_status=True, **kwargs)
138 # Summary xhs
139 # find the first message that has a caption
140 caption_msg = None
141 index = -1
142 for idx, m in enumerate(sent_messages):
143 if isinstance(m, Message) and (m.caption or m.text):
144 caption_msg = m
145 index = idx
146 break
147 if summary_xhs and caption_msg:
148 edited_msg = await summarize_xhs(caption_msg, note, media, summary_xhs_model)
149 sent_messages[index] = edited_msg
150 await save_messages(messages=sent_messages, key=db_key)
151 # Clean up
152 [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
153
154
155async def get_xhs_info(url: str, retry: int = 0, *, use_mobile: bool = False) -> dict:
156 """Get xiaohongshu post info.
157
158 XHS banned VPS IP, so we need to use residential proxy.
159 XHS has two different return formats base on User-Agent.
160 Some posts can only be accessed with mobile User-Agent. (I don't know why)
161 But images got from mobile has XHS watermark.
162 So we prefer to use desktop User-Agent.
163 """
164 headers = {"referer": "https://www.xiaohongshu.com/"}
165 if retry > 3:
166 logger.error(f"XHS parsing response failed after 3 retries: {url}")
167 return {}
168 data = {}
169 try:
170 resp = await hx_req(url, headers=headers, cookies=None, mobile=use_mobile, proxy=PROXY.XHS, rformat="text")
171 if not resp.get("text"):
172 logger.warning(f"XHS webpage not found: {url}, Retrying: {retry + 1}")
173 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
174 soup = BeautifulSoup(resp["text"], "html.parser")
175 data["soup"] = soup
176 script_info = next((str(x.text).removeprefix("window.__INITIAL_STATE__=") for x in soup.find_all("script") if str(x.text).startswith("window.__INITIAL_STATE__=")), "{}")
177 info = yaml.safe_load(script_info)
178 if not info:
179 logger.warning(f"XHS failed: {url}, Retrying: {retry + 1}")
180 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
181 except Exception as e:
182 logger.warning(f"XHS failed: {e}, Retrying: {retry + 1}")
183 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
184
185 # XHS has two different return formats
186 note = {}
187 if notes := glom(info, "note.noteDetailMap.*.note", default=[]):
188 note = notes[0]
189 if glom(info, "noteData.data.noteData", default={}):
190 note = glom(info, "noteData.data.noteData", default={})
191 if not note:
192 logger.warning(f"Parsed info has no post, Retrying: {retry + 1}")
193 return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
194 statistics = ""
195 if like := glom(note, "interactInfo.likedCount", default=0):
196 statistics += f"❤️{like} "
197 if comment := glom(note, "interactInfo.commentCount", default=0):
198 statistics += f"💬{comment} "
199 if favorite := glom(note, "interactInfo.collectedCount", default=0):
200 statistics += f"⭐️{favorite} "
201 return data | {"note": note, "statistics": statistics.strip()}
202
203
204def get_xhs_comments(soup: BeautifulSoup | None) -> list[str]:
205 """Not implemented yet."""
206 if not soup:
207 return []
208 return []
209
210
211async def summarize_xhs(message: Message, note: dict, media_list: list[dict], model: str) -> Message:
212 """Generate source for AI summary."""
213 data = {
214 "platform": "小红书",
215 "title": note.get("title"),
216 "author_name": glom(note, Coalesce("user.nickname", "user.nickName"), default=None),
217 "url": note["url"],
218 "location": note.get("ipLocation"),
219 }
220 if desc := note.get("desc", "").replace("[话题]#", ""):
221 data["description"] = desc
222 data = trim_none(data)
223 sources = []
224 min_text_length = 1000 # skip short tweets
225 min_video_duration = None
226 for media in media_list:
227 if media.get("photo"):
228 sources.append({"type": "image", "path": media["photo"]})
229 if media.get("video"):
230 min_text_length = None # always summarize video
231 min_video_duration = 120
232 sources.append({"type": "video", "path": media["video"]})
233 sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
234 author_name = data.get("author", "Anonymous")
235 pid = note.get("noteId", "小红书")
236 summary = await summarize(
237 sources=sources,
238 model=model,
239 title=f"🍠{author_name} - {pid}",
240 author=author_name,
241 url=data["url"],
242 date=data.get("created_at") or nowstr(TZ),
243 min_text_length=min_text_length,
244 min_video_duration=min_video_duration,
245 max_video_duration=3600, # skip long videos more than 1 hour
246 )
247 telegraph_url = summary.get("telegraph_url")
248 if not telegraph_url:
249 return message
250 return await add_summary_url(telegraph_url, message) or message