main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import json
  4from datetime import datetime
  5from pathlib import Path
  6from zoneinfo import ZoneInfo
  7
  8import yaml
  9from bs4 import BeautifulSoup
 10from glom import Coalesce, glom
 11from loguru import logger
 12from pyrogram.client import Client
 13from pyrogram.types import Message
 14
 15from ai.utils import trim_none
 16from bridge.social import send_to_social_media_bridge
 17from config import AI, PROVIDER, PROXY, TZ
 18from database.r2 import get_cf_r2
 19from messages.database import copy_messages_from_db, save_messages
 20from messages.progress import modify_progress
 21from messages.sender import send2tg
 22from messages.utils import summay_media
 23from networking import download_file, download_first_success_urls, download_media, hx_req
 24from others.emoji import emojify
 25from preview.utils import add_summary_url
 26from summarize.summarize import summarize
 27from utils import nowstr, true
 28
 29
 30async def preview_xhs(
 31    client: Client,
 32    message: Message,
 33    url: str = "",
 34    db_key: str = "",
 35    xsec: str = "",
 36    *,
 37    is_xhs_link: bool = False,
 38    xhs_provider: str = PROVIDER.XHS,
 39    summary_xhs: bool = False,
 40    summary_xhs_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
 41    show_author: bool = True,
 42    show_title: bool = True,
 43    show_pubdate: bool = True,
 44    show_ip: bool = True,
 45    show_statistics: bool = True,
 46    show_description: bool = True,
 47    **kwargs,
 48):
 49    """Preview xiaohongshu link in the message.
 50
 51    Args:
 52        client (Client): The Pyrogram client.
 53        message (Message): The trigger message object.
 54        url (str, optional): xiaohongshu link
 55        db_key (str, optional): The cache key.
 56        xsec (str, optional): The xsec token.
 57        is_xhs_link (bool, optional): Whether the link is a share link from APP.
 58        xhs_provider (str, optional): The xiaohongshu provider.
 59    """
 60    if kv := await get_cf_r2(db_key):
 61        logger.debug(f"Xiaohongshu preview cache hit for key={db_key}")
 62        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 63            return
 64        logger.warning("❌从缓存中转发失败, 尝试重新解析...")
 65    if kwargs.get("show_progress") and "progress" not in kwargs:
 66        res = await send2tg(client, message, texts=f"🔗正在解析小红书链接\n{url}", **kwargs)
 67        kwargs["progress"] = res[0]
 68
 69    if not is_xhs_link and "xsec_token" not in url:
 70        msg = "链接格式错误: 缺少 xsec_token 参数, 请发送完整链接"
 71        msg += "\n或者使用手机APP分享的链接 (xhslink.com域名)"
 72        await modify_progress(text=msg, **kwargs)
 73        return
 74
 75    logger.info(f"Xiaohongshu link preview for {url}")
 76    xhs_info = await get_xhs_info(url)
 77    note = xhs_info.get("note", {})
 78    if not note:
 79        if "bridge" in xhs_provider:
 80            await modify_progress(text="❌小红书解析失败, 尝试第三方Bot...", **kwargs)
 81            full_url = f"https://{db_key}?xsec_token={xsec}" if xsec else url
 82            kwargs |= {"target_mid": message.id}
 83            await send_to_social_media_bridge(client, message, full_url, **kwargs)
 84        else:
 85            await modify_progress(text="❌小红书解析失败, 请稍候再尝试", force_update=True, **kwargs)
 86        return
 87    await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
 88    note["url"] = url
 89    media: list[dict] = []
 90    if note.get("type") == "video":
 91        video_urls = []  # Extract all urls, but prefer H264
 92        for vcodec in ["h264", "h265", "av1", "h266"]:
 93            format_list = note.get("video", {}).get("media", {}).get("stream").get(vcodec, [])
 94            for x in format_list:
 95                if x.get("masterUrl"):
 96                    video_urls.append(x["masterUrl"])
 97                if x.get("backupUrls"):
 98                    video_urls.extend(x.get("backupUrls", []))
 99        media.append({"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
100    else:
101        for img_info in note.get("imageList", []):
102            img_url = img_info.get("urlDefault") or img_info.get("url") or ""
103            if img_info.get("livePhoto"):
104                video_urls = []
105                for vcodec in ["h264", "h265", "av1", "h266"]:
106                    format_list = img_info.get("stream", {}).get(vcodec, [])
107                    for x in format_list:
108                        if x.get("masterUrl"):
109                            video_urls.append(x["masterUrl"])
110                        if x.get("backupUrls"):
111                            video_urls.extend(x.get("backupUrls", []))
112                media.append({"livephoto": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
113            else:
114                media.append({"photo": download_file(img_url, suffix=".jpg", proxy=PROXY.XHS, stream=True, **kwargs)})
115
116    texts = ""
117    if true(show_author) and (author := glom(note, Coalesce("user.nickname", "user.nickName"), default="")):
118        texts += f"🍠**[{author}]({url})**\n"
119    if true(show_pubdate) and note.get("time"):
120        dt = datetime.fromtimestamp(float(note["time"]) / 1000).astimezone(ZoneInfo(TZ))
121        texts += f"🕒{dt:%Y-%m-%d %H:%M:%S}\n"
122    if true(show_statistics) and xhs_info.get("statistics"):
123        texts += f"{xhs_info['statistics']}"
124        if true(show_ip) and note.get("ipLocation"):
125            texts += f"📍{note['ipLocation']}\n"
126        else:
127            texts += "\n"
128    if true(show_title) and note.get("title"):
129        texts += f"📝**{note['title']}**\n"
130    desc = note.get("desc", "").replace("[话题]#", "")
131    if true(show_description):
132        texts += desc
133    comments = get_xhs_comments(xhs_info.get("soup"))  # Not implemented yet
134    await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
135    media = await download_media(media, **kwargs)
136    sent_messages = await send2tg(client, message, texts=emojify(texts), media=media, comments=comments, keep_file=True, **kwargs)
137    await modify_progress(del_status=True, **kwargs)
138    # Summary xhs
139    # find the first message that has a caption
140    caption_msg = None
141    index = -1
142    for idx, m in enumerate(sent_messages):
143        if isinstance(m, Message) and (m.caption or m.text):
144            caption_msg = m
145            index = idx
146            break
147    if summary_xhs and caption_msg:
148        edited_msg = await summarize_xhs(caption_msg, note, media, summary_xhs_model)
149        sent_messages[index] = edited_msg
150    await save_messages(messages=sent_messages, key=db_key)
151    # Clean up
152    [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
153
154
155async def get_xhs_info(url: str, retry: int = 0, *, use_mobile: bool = False) -> dict:
156    """Get xiaohongshu post info.
157
158    XHS banned VPS IP, so we need to use residential proxy.
159    XHS has two different return formats base on User-Agent.
160    Some posts can only be accessed with mobile User-Agent. (I don't know why)
161    But images got from mobile has XHS watermark.
162    So we prefer to use desktop User-Agent.
163    """
164    headers = {"referer": "https://www.xiaohongshu.com/"}
165    if retry > 3:
166        logger.error(f"XHS parsing response failed after 3 retries: {url}")
167        return {}
168    data = {}
169    try:
170        resp = await hx_req(url, headers=headers, cookies=None, mobile=use_mobile, proxy=PROXY.XHS, rformat="text")
171        if not resp.get("text"):
172            logger.warning(f"XHS webpage not found: {url}, Retrying: {retry + 1}")
173            return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
174        soup = BeautifulSoup(resp["text"], "html.parser")
175        data["soup"] = soup
176        script_info = next((str(x.text).removeprefix("window.__INITIAL_STATE__=") for x in soup.find_all("script") if str(x.text).startswith("window.__INITIAL_STATE__=")), "{}")
177        info = yaml.safe_load(script_info)
178        if not info:
179            logger.warning(f"XHS failed: {url}, Retrying: {retry + 1}")
180            return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
181    except Exception as e:
182        logger.warning(f"XHS failed: {e}, Retrying: {retry + 1}")
183        return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
184
185    # XHS has two different return formats
186    note = {}
187    if notes := glom(info, "note.noteDetailMap.*.note", default=[]):
188        note = notes[0]
189    if glom(info, "noteData.data.noteData", default={}):
190        note = glom(info, "noteData.data.noteData", default={})
191    if not note:
192        logger.warning(f"Parsed info has no post, Retrying: {retry + 1}")
193        return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
194    statistics = ""
195    if like := glom(note, "interactInfo.likedCount", default=0):
196        statistics += f"❤️{like} "
197    if comment := glom(note, "interactInfo.commentCount", default=0):
198        statistics += f"💬{comment} "
199    if favorite := glom(note, "interactInfo.collectedCount", default=0):
200        statistics += f"⭐️{favorite} "
201    return data | {"note": note, "statistics": statistics.strip()}
202
203
204def get_xhs_comments(soup: BeautifulSoup | None) -> list[str]:
205    """Not implemented yet."""
206    if not soup:
207        return []
208    return []
209
210
211async def summarize_xhs(message: Message, note: dict, media_list: list[dict], model: str) -> Message:
212    """Generate source for AI summary."""
213    data = {
214        "platform": "小红书",
215        "title": note.get("title"),
216        "author_name": glom(note, Coalesce("user.nickname", "user.nickName"), default=None),
217        "url": note["url"],
218        "location": note.get("ipLocation"),
219    }
220    if desc := note.get("desc", "").replace("[话题]#", ""):
221        data["description"] = desc
222    data = trim_none(data)
223    sources = []
224    min_text_length = 1000  # skip short tweets
225    min_video_duration = None
226    for media in media_list:
227        if media.get("photo"):
228            sources.append({"type": "image", "path": media["photo"]})
229        if media.get("video"):
230            min_text_length = None  # always summarize video
231            min_video_duration = 120
232            sources.append({"type": "video", "path": media["video"]})
233    sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
234    author_name = data.get("author", "Anonymous")
235    pid = note.get("noteId", "小红书")
236    summary = await summarize(
237        sources=sources,
238        model=model,
239        title=f"🍠{author_name} - {pid}",
240        author=author_name,
241        url=data["url"],
242        date=data.get("created_at") or nowstr(TZ),
243        min_text_length=min_text_length,
244        min_video_duration=min_video_duration,
245        max_video_duration=3600,  # skip long videos more than 1 hour
246    )
247    telegraph_url = summary.get("telegraph_url")
248    if not telegraph_url:
249        return message
250    return await add_summary_url(telegraph_url, message) or message