main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import json
  5import re
  6from pathlib import Path
  7from urllib.parse import quote_plus
  8
  9from bs4 import BeautifulSoup
 10from glom import Coalesce, glom
 11from loguru import logger
 12from pyrogram.client import Client
 13from pyrogram.types import Message
 14
 15from config import AI, API, DOWNLOAD_DIR, PROXY, TOKEN, TZ
 16from database.r2 import get_cf_r2
 17from messages.database import copy_messages_from_db, save_messages
 18from messages.progress import modify_progress
 19from messages.sender import send2tg, send_blockquote_texts
 20from messages.utils import remove_img_tag, summay_media
 21from networking import download_file, download_media, hx_req
 22from preview.utils import add_summary_url
 23from summarize.summarize import summarize
 24from utils import convert2md, nowstr, rand_string, remove_consecutive_newlines
 25
 26
 27async def preview_wechat(
 28    client: Client,
 29    message: Message,
 30    url: str = "",
 31    db_key: str = "",
 32    *,
 33    summary_wechat: bool = True,
 34    summary_wechat_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
 35    **kwargs,
 36):
 37    """Preview wechat link in the message.
 38
 39    Args:
 40        client (Client): The Pyrogram client.
 41        message (Message): The trigger message object.
 42        url (str, optional): wechat link
 43        db_key (str, optional): The cache key.
 44    """
 45    if kv := await get_cf_r2(db_key):
 46        logger.debug(f"WeChat preview cache hit for key={db_key}")
 47        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 48            return
 49        logger.warning("❌从缓存中转发失败, 尝试重新解析...")
 50    if kwargs.get("show_progress") and "progress" not in kwargs:
 51        res = await send2tg(client, message, texts=f"🔗正在解析微信链接\n{url}", **kwargs)
 52        kwargs["progress"] = res[0]
 53    logger.info(f"WeChat link preview for {url}")
 54
 55    post_info = await get_wechat_info(url)
 56    sender = kwargs.pop("send_from_user", "")
 57    if error := post_info.get("error"):
 58        await modify_progress(text=f"❌微信链接解析失败{url}\n{error}", force_update=True, **kwargs)
 59        return
 60
 61    # send texts first
 62    text_messages = await send_blockquote_texts(client, message, texts=sender + post_info["caption"], **kwargs)
 63    text_messages = [x for x in text_messages if isinstance(x, Message)]
 64    media_messages = []
 65    if media := post_info.get("media"):
 66        reply_to_msg: Message = glom(text_messages, "-1", default=message)
 67        media_messages = await send2tg(client, reply_to_msg, media=media, keep_file=True, **kwargs)
 68    await modify_progress(del_status=True, **kwargs)
 69    # Summary wechat
 70    if summary_wechat and text_messages:
 71        edited_msg = await summarize_wechat(text_messages[0], post_info, summary_wechat_model, url)
 72        text_messages[0] = edited_msg
 73    await save_messages(messages=text_messages + media_messages, key=db_key)
 74    # Clean up
 75    [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in post_info.get("media", [])]
 76
 77
 78async def get_wechat_info(url: str, *, use_tikhub: bool = True, **kwargs) -> dict:
 79    """Get WeChat post info."""
 80    headers = {
 81        "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.69(0x18004539) NetType/4G Language/zh_CN"
 82    }
 83    resp = await hx_req(url, headers=headers, mobile=True, proxy=PROXY.WECHAT, rformat="content")
 84    try:
 85        html = resp["content"].decode("utf-8")
 86        soup = BeautifulSoup(html, "html.parser")
 87        title_tag = soup.find("meta", property="og:title")
 88        title = "微信公众号文章"
 89        if title_tag and title_tag.get("content"):
 90            title = str(title_tag["content"])
 91
 92        date = ""
 93        if match_date := re.search(r"createTime = '(.*)'", html):
 94            date = match_date.group(1)
 95            if len(date) == 16:  # '2026-06-02 09:02'
 96                date += ":00"
 97        date = date or nowstr(TZ)
 98
 99        author = ""
100        if match_author := re.search(r"nick_name: '(.*)'", html):
101            author = match_author.group(1)
102        if not author:
103            author_tag = soup.find("meta", attrs={"name": "author"})
104            if author_tag and author_tag.get("content"):
105                author = str(author_tag["content"])
106        author = author or "微信公众号"
107        # clean soup
108        ban_attrs: list[dict] = [{"style": "display:none"}, {"style": "display: none;"}, {"aria-hidden": "true"}]
109        for attr in ban_attrs:
110            for tag in soup.find_all(attrs=attr):
111                tag.decompose()
112        markdown = convert2md(html=str(soup))
113        caption = f"🟢[{author}]({url})\n🕒{date}\n📝**[{title}]({url})**\n\n"
114        markdown_no_img, image_urls = remove_img_tag(markdown)
115        caption += remove_consecutive_newlines(markdown_no_img)
116
117        # download images
118        image_urls = [url for url in image_urls if url.startswith("http")]
119        media = []
120        for img in image_urls:
121            suffix = ".jpg" if "mmbiz_jpg" in img or "wx_fmt=jpeg" in img else ".png"
122            path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}{suffix}"
123            media.append({"photo": download_file(img, path=path, proxy=PROXY.WECHAT, **kwargs)})
124        await modify_progress(text=f"✅解析成功...\n⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
125        media = await download_media(media, **kwargs)
126    except Exception as e:
127        logger.warning(f"⚠️直接解析微信文章失败: {e}")
128        if use_tikhub:
129            return await get_via_tikhub(url, **kwargs)
130        return {"error": str(e)}
131    return {"markdown": markdown, "media": media, "title": title, "author": author, "caption": caption, "date": date}
132
133
134async def get_via_tikhub(url: str, **kwargs) -> dict:
135    """Get WeChat post info via TikHub."""
136    api_url = API.TIKHUB_WECHAT + quote_plus(url)
137    logger.info(f"Preview WeChat TikHub for {api_url}")
138    headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
139    resp = await hx_req(api_url, headers=headers, check_keys=["data.content.raw_content", "data.title"], check_kv={"code": 200})
140    if resp.get("hx_error"):
141        return {"error": resp["hx_error"]}
142
143    try:
144        data = resp["data"]
145        title = data["title"]
146        author = data.get("author", "author")
147        dt = nowstr()
148        with contextlib.suppress(Exception):
149            dt = data["datetime"]  # 2025-04-28T06:12:35.833830
150            dt = dt[:19].replace("T", " ")  # 2025-04-28 06:12:35
151        media = []
152        markdown = f"🟢[{author}]({url})\n🕒{dt}\n**📝{title}**"
153        for tag in data["content"]["raw_content"]:
154            if text := tag.get("text", ""):
155                markdown += f"\n\n**{text}**" if tag.get("type", "") == "section" else f"\n{text}"
156            if images := tag.get("images", []):
157                for img in images:
158                    src = img.get("src", "")
159                    ext = img.get("type", "png")
160                    media.append({"photo": download_file(src, path=f"{DOWNLOAD_DIR}/{rand_string()}.{ext}", proxy=PROXY.WECHAT, **kwargs)})
161        await modify_progress(text=f"✅解析成功...\n⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
162        media = await download_media(media, **kwargs)
163    except Exception as e:
164        logger.error(e)
165        return {"error": str(e)}
166    return {"markdown": markdown, "caption": markdown, "media": media, "title": title, "author": author, "date": dt}
167
168
169async def summarize_wechat(message: Message, wechat: dict, model: str, url: str) -> Message:
170    """Generate source for AI summary."""
171    data = {
172        "platform": "微信公众号",
173        "title": wechat["title"],
174        "author_name": wechat["author"],
175        "created_at": wechat["date"],
176        "url": url,
177        "content": wechat["markdown"],
178    }
179
180    sources = [{"type": "image", "path": media["photo"]} for media in wechat.get("media", [])]
181    sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
182    summary = await summarize(
183        sources=sources,
184        model=model,
185        title=data["title"],
186        author=data["author_name"],
187        url=url,
188        date=data.get("created_at") or nowstr(TZ),
189    )
190    telegraph_url = summary.get("telegraph_url")
191    if not telegraph_url:
192        return message
193    return await add_summary_url(telegraph_url, message) or message