main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from datetime import datetime
  4from zoneinfo import ZoneInfo
  5
  6import yaml
  7from bs4 import BeautifulSoup
  8from glom import Coalesce, glom
  9from loguru import logger
 10from pyrogram.client import Client
 11from pyrogram.types import Message
 12
 13from bridge.social import send_to_social_media_bridge
 14from config import DB, PROVIDER, PROXY, TZ
 15from database.database import get_db
 16from messages.database import copy_messages_from_db, save_messages
 17from messages.progress import modify_progress
 18from messages.sender import send2tg
 19from messages.utils import summay_media
 20from networking import download_file, download_first_success_urls, download_media, hx_req
 21from others.emoji import emojify
 22from utils import true
 23
 24
 25async def preview_xhs(
 26    client: Client,
 27    message: Message,
 28    url: str = "",
 29    db_key: str = "",
 30    xsec: str = "",
 31    *,
 32    is_xhs_link: bool = False,
 33    xhs_provider: str = PROVIDER.XHS,
 34    show_author: bool = True,
 35    show_title: bool = True,
 36    show_pubdate: bool = True,
 37    show_ip: bool = True,
 38    show_statistics: bool = True,
 39    show_description: bool = True,
 40    **kwargs,
 41):
 42    """Preview xiaohongshu link in the message.
 43
 44    Args:
 45        client (Client): The Pyrogram client.
 46        message (Message): The trigger message object.
 47        url (str, optional): xiaohongshu link
 48        db_key (str, optional): The cache key.
 49        xsec (str, optional): The xsec token.
 50        is_xhs_link (bool, optional): Whether the link is a share link from APP.
 51        xhs_provider (str, optional): The xiaohongshu provider.
 52    """
 53    if kwargs.get("show_progress") and "progress" not in kwargs:
 54        res = await send2tg(client, message, texts=f"🔗正在解析小红书链接\n{url}", **kwargs)
 55        kwargs["progress"] = res[0]
 56    if kv := await get_db(db_key):
 57        logger.debug(f"Xiaohongshu preview {DB.ENGINE} cache hit for key={db_key}")
 58        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 59            return
 60        await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
 61
 62    if not is_xhs_link and "xsec_token" not in url:
 63        msg = "链接格式错误: 缺少 xsec_token 参数, 请发送完整链接"
 64        msg += "\n或者使用手机APP分享的链接 (xhslink.com域名)"
 65        await modify_progress(text=msg, **kwargs)
 66        return
 67
 68    logger.info(f"Xiaohongshu link preview for {url}")
 69    xhs_info = await get_xhs_info(url)
 70    note = xhs_info.get("note", {})
 71    if not note:
 72        if "bridge" in xhs_provider:
 73            await modify_progress(text="❌小红书解析失败, 尝试第三方Bot...", **kwargs)
 74            full_url = f"https://{db_key}?xsec_token={xsec}" if xsec else url
 75            kwargs |= {"target_mid": message.id}
 76            await send_to_social_media_bridge(client, message, full_url, **kwargs)
 77        else:
 78            await modify_progress(text="❌小红书解析失败, 请稍候再尝试", force_update=True, **kwargs)
 79        return
 80    await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
 81    media: list[dict] = []
 82    if note.get("type") == "video":
 83        video_urls = []  # Extract all urls, but prefer H264
 84        for vcodec in ["h264", "h265", "av1", "h266"]:
 85            format_list = note.get("video", {}).get("media", {}).get("stream").get(vcodec, [])
 86            for x in format_list:
 87                if x.get("masterUrl"):
 88                    video_urls.append(x["masterUrl"])
 89                if x.get("backupUrls"):
 90                    video_urls.extend(x.get("backupUrls", []))
 91        media.append({"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
 92    else:
 93        for img_info in note.get("imageList", []):
 94            img_url = img_info.get("urlDefault") or img_info.get("url") or ""
 95            if img_info.get("livePhoto"):
 96                video_urls = []
 97                for vcodec in ["h264", "h265", "av1", "h266"]:
 98                    format_list = img_info.get("stream", {}).get(vcodec, [])
 99                    for x in format_list:
100                        if x.get("masterUrl"):
101                            video_urls.append(x["masterUrl"])
102                        if x.get("backupUrls"):
103                            video_urls.extend(x.get("backupUrls", []))
104                media.append({"livephoto": download_first_success_urls(video_urls, suffix=".mp4", proxy=PROXY.XHS, stream=True, **kwargs)})
105            else:
106                media.append({"photo": download_file(img_url, suffix=".jpg", proxy=PROXY.XHS, stream=True, **kwargs)})
107
108    texts = ""
109    if true(show_author) and (author := glom(note, Coalesce("user.nickname", "user.nickName"), default="")):
110        texts += f"🍠[{author}]({url})\n"
111    if true(show_pubdate) and note.get("time"):
112        dt = datetime.fromtimestamp(float(note["time"]) / 1000).astimezone(ZoneInfo(TZ))
113        texts += f"🕒{dt:%Y-%m-%d %H:%M:%S}"
114        if true(show_ip) and note.get("ipLocation"):
115            texts += f"📍{note['ipLocation']}\n"
116        else:
117            texts += "\n"
118    if true(show_statistics) and xhs_info.get("statistics"):
119        texts += f"{xhs_info['statistics']}\n"
120    if true(show_title) and note.get("title"):
121        texts += f"📝**{note['title']}**\n"
122    desc = note.get("desc", "").replace("[话题]#", "")
123    if true(show_description):
124        texts += desc
125    comments = get_xhs_comments(xhs_info.get("soup"))  # Not implemented yet
126    await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
127    media = await download_media(media, **kwargs)
128    sent_messages = await send2tg(client, message, texts=emojify(texts), media=media, comments=comments, **kwargs)
129    await modify_progress(del_status=True, **kwargs)
130    await save_messages(messages=sent_messages, key=db_key)
131
132
133async def get_xhs_info(url: str, retry: int = 0, *, use_mobile: bool = False) -> dict:
134    """Get xiaohongshu post info.
135
136    XHS banned VPS IP, so we need to use residential proxy.
137    XHS has two different return formats base on User-Agent.
138    Some posts can only be accessed with mobile User-Agent. (I don't know why)
139    But images got from mobile has XHS watermark.
140    So we prefer to use desktop User-Agent.
141    """
142    headers = {"referer": "https://www.xiaohongshu.com/"}
143    if retry > 3:
144        logger.error(f"XHS parsing response failed after 3 retries: {url}")
145        return {}
146    data = {}
147    try:
148        resp = await hx_req(url, headers=headers, cookies=None, mobile=use_mobile, proxy=PROXY.XHS, rformat="text")
149        if not resp.get("text"):
150            logger.warning(f"XHS webpage not found: {url}, Retrying: {retry + 1}")
151            return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
152        soup = BeautifulSoup(resp["text"], "html.parser")
153        data["soup"] = soup
154        script_info = next((str(x.text).removeprefix("window.__INITIAL_STATE__=") for x in soup.find_all("script") if str(x.text).startswith("window.__INITIAL_STATE__=")), "{}")
155        info = yaml.safe_load(script_info)
156        if not info:
157            logger.warning(f"XHS failed: {url}, Retrying: {retry + 1}")
158            return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
159    except Exception as e:
160        logger.warning(f"XHS failed: {e}, Retrying: {retry + 1}")
161        return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
162
163    # XHS has two different return formats
164    note = {}
165    if notes := glom(info, "note.noteDetailMap.*.note", default=[]):
166        note = notes[0]
167    if glom(info, "noteData.data.noteData", default={}):
168        note = glom(info, "noteData.data.noteData", default={})
169    if not note:
170        logger.warning(f"Parsed info has no post, Retrying: {retry + 1}")
171        return await get_xhs_info(url, use_mobile=not use_mobile, retry=retry + 1)
172    statistics = ""
173    if like := glom(note, "interactInfo.likedCount", default=0):
174        statistics += f"❤️{like} "
175    if comment := glom(note, "interactInfo.commentCount", default=0):
176        statistics += f"💬{comment} "
177    if favorite := glom(note, "interactInfo.collectedCount", default=0):
178        statistics += f"⭐️{favorite} "
179    return data | {"note": note, "statistics": statistics.strip()}
180
181
182def get_xhs_comments(soup: BeautifulSoup | None) -> list[str]:
183    """Not implemented yet."""
184    if not soup:
185        return []
186    return []