main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import json
5import re
6from pathlib import Path
7from urllib.parse import quote_plus
8
9from bs4 import BeautifulSoup
10from glom import Coalesce, glom
11from loguru import logger
12from pyrogram.client import Client
13from pyrogram.types import Message
14
15from config import AI, API, DOWNLOAD_DIR, PROXY, TOKEN, TZ
16from database.r2 import get_cf_r2
17from messages.database import copy_messages_from_db, save_messages
18from messages.progress import modify_progress
19from messages.sender import send2tg, send_blockquote_texts
20from messages.utils import remove_img_tag, summay_media
21from networking import download_file, download_media, hx_req
22from preview.utils import add_summary_url
23from summarize.summarize import summarize
24from utils import convert2md, nowstr, rand_string, remove_consecutive_newlines
25
26
27async def preview_wechat(
28 client: Client,
29 message: Message,
30 url: str = "",
31 db_key: str = "",
32 *,
33 summary_wechat: bool = True,
34 summary_wechat_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
35 **kwargs,
36):
37 """Preview wechat link in the message.
38
39 Args:
40 client (Client): The Pyrogram client.
41 message (Message): The trigger message object.
42 url (str, optional): wechat link
43 db_key (str, optional): The cache key.
44 """
45 if kv := await get_cf_r2(db_key):
46 logger.debug(f"WeChat preview cache hit for key={db_key}")
47 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
48 return
49 logger.warning("❌从缓存中转发失败, 尝试重新解析...")
50 if kwargs.get("show_progress") and "progress" not in kwargs:
51 res = await send2tg(client, message, texts=f"🔗正在解析微信链接\n{url}", **kwargs)
52 kwargs["progress"] = res[0]
53 logger.info(f"WeChat link preview for {url}")
54
55 post_info = await get_wechat_info(url)
56 sender = kwargs.pop("send_from_user", "")
57 if error := post_info.get("error"):
58 await modify_progress(text=f"❌微信链接解析失败{url}\n{error}", force_update=True, **kwargs)
59 return
60
61 # send texts first
62 text_messages = await send_blockquote_texts(client, message, texts=sender + post_info["caption"], **kwargs)
63 text_messages = [x for x in text_messages if isinstance(x, Message)]
64 media_messages = []
65 if media := post_info.get("media"):
66 reply_to_msg: Message = glom(text_messages, "-1", default=message)
67 media_messages = await send2tg(client, reply_to_msg, media=media, keep_file=True, **kwargs)
68 await modify_progress(del_status=True, **kwargs)
69 # Summary wechat
70 if summary_wechat and text_messages:
71 edited_msg = await summarize_wechat(text_messages[0], post_info, summary_wechat_model, url)
72 text_messages[0] = edited_msg
73 await save_messages(messages=text_messages + media_messages, key=db_key)
74 # Clean up
75 [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in post_info.get("media", [])]
76
77
78async def get_wechat_info(url: str, *, use_tikhub: bool = True, **kwargs) -> dict:
79 """Get WeChat post info."""
80 headers = {
81 "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.69(0x18004539) NetType/4G Language/zh_CN"
82 }
83 resp = await hx_req(url, headers=headers, mobile=True, proxy=PROXY.WECHAT, rformat="content")
84 try:
85 html = resp["content"].decode("utf-8")
86 soup = BeautifulSoup(html, "html.parser")
87 title_tag = soup.find("meta", property="og:title")
88 title = "微信公众号文章"
89 if title_tag and title_tag.get("content"):
90 title = str(title_tag["content"])
91
92 date = ""
93 if match_date := re.search(r"createTime = '(.*)'", html):
94 date = match_date.group(1)
95 if len(date) == 16: # '2026-06-02 09:02'
96 date += ":00"
97 date = date or nowstr(TZ)
98
99 author = ""
100 if match_author := re.search(r"nick_name: '(.*)'", html):
101 author = match_author.group(1)
102 if not author:
103 author_tag = soup.find("meta", attrs={"name": "author"})
104 if author_tag and author_tag.get("content"):
105 author = str(author_tag["content"])
106 author = author or "微信公众号"
107 # clean soup
108 ban_attrs: list[dict] = [{"style": "display:none"}, {"style": "display: none;"}, {"aria-hidden": "true"}]
109 for attr in ban_attrs:
110 for tag in soup.find_all(attrs=attr):
111 tag.decompose()
112 markdown = convert2md(html=str(soup))
113 caption = f"🟢[{author}]({url})\n🕒{date}\n📝**[{title}]({url})**\n\n"
114 markdown_no_img, image_urls = remove_img_tag(markdown)
115 caption += remove_consecutive_newlines(markdown_no_img)
116
117 # download images
118 image_urls = [url for url in image_urls if url.startswith("http")]
119 media = []
120 for img in image_urls:
121 suffix = ".jpg" if "mmbiz_jpg" in img or "wx_fmt=jpeg" in img else ".png"
122 path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}{suffix}"
123 media.append({"photo": download_file(img, path=path, proxy=PROXY.WECHAT, **kwargs)})
124 await modify_progress(text=f"✅解析成功...\n⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
125 media = await download_media(media, **kwargs)
126 except Exception as e:
127 logger.warning(f"⚠️直接解析微信文章失败: {e}")
128 if use_tikhub:
129 return await get_via_tikhub(url, **kwargs)
130 return {"error": str(e)}
131 return {"markdown": markdown, "media": media, "title": title, "author": author, "caption": caption, "date": date}
132
133
134async def get_via_tikhub(url: str, **kwargs) -> dict:
135 """Get WeChat post info via TikHub."""
136 api_url = API.TIKHUB_WECHAT + quote_plus(url)
137 logger.info(f"Preview WeChat TikHub for {api_url}")
138 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
139 resp = await hx_req(api_url, headers=headers, check_keys=["data.content.raw_content", "data.title"], check_kv={"code": 200})
140 if resp.get("hx_error"):
141 return {"error": resp["hx_error"]}
142
143 try:
144 data = resp["data"]
145 title = data["title"]
146 author = data.get("author", "author")
147 dt = nowstr()
148 with contextlib.suppress(Exception):
149 dt = data["datetime"] # 2025-04-28T06:12:35.833830
150 dt = dt[:19].replace("T", " ") # 2025-04-28 06:12:35
151 media = []
152 markdown = f"🟢[{author}]({url})\n🕒{dt}\n**📝{title}**"
153 for tag in data["content"]["raw_content"]:
154 if text := tag.get("text", ""):
155 markdown += f"\n\n**{text}**" if tag.get("type", "") == "section" else f"\n{text}"
156 if images := tag.get("images", []):
157 for img in images:
158 src = img.get("src", "")
159 ext = img.get("type", "png")
160 media.append({"photo": download_file(src, path=f"{DOWNLOAD_DIR}/{rand_string()}.{ext}", proxy=PROXY.WECHAT, **kwargs)})
161 await modify_progress(text=f"✅解析成功...\n⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
162 media = await download_media(media, **kwargs)
163 except Exception as e:
164 logger.error(e)
165 return {"error": str(e)}
166 return {"markdown": markdown, "caption": markdown, "media": media, "title": title, "author": author, "date": dt}
167
168
169async def summarize_wechat(message: Message, wechat: dict, model: str, url: str) -> Message:
170 """Generate source for AI summary."""
171 data = {
172 "platform": "微信公众号",
173 "title": wechat["title"],
174 "author_name": wechat["author"],
175 "created_at": wechat["date"],
176 "url": url,
177 "content": wechat["markdown"],
178 }
179
180 sources = [{"type": "image", "path": media["photo"]} for media in wechat.get("media", [])]
181 sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
182 summary = await summarize(
183 sources=sources,
184 model=model,
185 title=data["title"],
186 author=data["author_name"],
187 url=url,
188 date=data.get("created_at") or nowstr(TZ),
189 )
190 telegraph_url = summary.get("telegraph_url")
191 if not telegraph_url:
192 return message
193 return await add_summary_url(telegraph_url, message) or message