main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4from pathlib import Path
5from urllib.parse import quote_plus
6
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.types import Message
10
11from config import API, CAPTION_LENGTH, DB, DOWNLOAD_DIR, PROXY, TEXT_LENGTH, TOKEN
12from database.database import get_db
13from messages.database import copy_messages_from_db, save_messages
14from messages.progress import modify_progress
15from messages.sender import send2tg
16from messages.utils import blockquote, count_without_entities, summay_media
17from networking import download_file, download_media, hx_req
18from publish import publish_telegraph
19from utils import nowstr, rand_string
20
21
22async def preview_wechat(client: Client, message: Message, url: str = "", db_key: str = "", **kwargs):
23 """Preview wechat link in the message.
24
25 Args:
26 client (Client): The Pyrogram client.
27 message (Message): The trigger message object.
28 url (str, optional): wechat link
29 db_key (str, optional): The cache key.
30 """
31 if kwargs.get("show_progress") and "progress" not in kwargs:
32 res = await send2tg(client, message, texts=f"🔗正在解析微信链接\n{url}", **kwargs)
33 kwargs["progress"] = res[0]
34 if kv := await get_db(db_key):
35 logger.debug(f"WeChat preview {DB.ENGINE} cache hit for key={db_key}")
36 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
37 return
38 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
39 logger.info(f"WeChat link preview for {url}")
40
41 post_info = await get_wechat_info(url)
42 if error := post_info.get("error"):
43 await modify_progress(text=f"❌微信链接解析失败{url}\n{error}", force_update=True, **kwargs)
44 return
45 sent_messages = []
46 length = await count_without_entities(post_info["header"] + post_info["markdown"])
47 if not post_info.get("media"): # 无图片
48 if length < TEXT_LENGTH - 8: # 无图片短文
49 texts = f"{post_info['header']}\n{blockquote(post_info['markdown'])}"
50 sent_messages.extend(await send2tg(client, message, texts=texts, **kwargs))
51 else: # 无图片长文
52 texts = f"{post_info['header']}"
53 telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url)
54 if telegraph_url:
55 texts += f"\n⚡️[即时预览]({telegraph_url})"
56 sent_messages.extend(await send2tg(client, message, texts=texts, media=[{"document": post_info["html_path"]}], **kwargs))
57 elif length < CAPTION_LENGTH - 8: # 有图片短文
58 texts = f"{post_info['header']}\n{blockquote(post_info['markdown'])}"
59 sent_messages.extend(await send2tg(client, message, texts=texts, media=post_info["media"], **kwargs))
60 else: # 有图片长文
61 texts = f"{post_info['header']}"
62 telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url)
63 if telegraph_url:
64 texts += f"\n⚡️[即时预览]({telegraph_url})"
65 sent_messages.extend(await send2tg(client, message, texts=texts, media=[{"document": post_info["path"]}], **kwargs))
66 kwargs["reply_msg_id"] = -1 # do not send as reply
67 sent_messages.extend(await send2tg(client, message, texts=texts, media=post_info["media"], **kwargs))
68 await modify_progress(del_status=True, **kwargs)
69 await save_messages(messages=sent_messages, key=db_key)
70
71
72async def get_wechat_info(url: str, **kwargs) -> dict:
73 """Get WeChat post info."""
74 api_url = API.TIKHUB_WECHAT + quote_plus(url)
75 logger.info(f"Preview WeChat TikHub for {api_url}")
76 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
77 resp = await hx_req(api_url, headers=headers, check_keys=["data.content.raw_content", "data.title"], check_kv={"code": 200})
78 if resp.get("hx_error"):
79 return {"error": resp["hx_error"]}
80
81 try:
82 data = resp["data"]
83 title = data["title"]
84 author = data.get("author", "author")
85 dt = nowstr()
86 with contextlib.suppress(Exception):
87 dt = data["datetime"] # 2025-04-28T06:12:35.833830
88 dt = dt[:19].replace("T", " ") # 2025-04-28 06:12:35
89 header = f"🟢[{author}]({url})\n🕒{dt}\n**📝{title}**"
90 media = []
91 htmls = ""
92 texts = ""
93 markdowns = ""
94 for tag in data["content"]["raw_content"]:
95 html = ""
96 if text := tag.get("text", ""):
97 html = f"<h3>{text}</h3>" if tag.get("type", "") == "section" else f"<p>{text}</p>"
98 markdown = f"\n\n**{text}**" if tag.get("type", "") == "section" else f"\n{text}"
99 text = f"\n\n{text}" if tag.get("type", "") == "section" else f"\n{text}"
100 htmls += f"<br>{html}"
101 markdowns += f"\n{markdown}"
102 texts += f"\n{text}"
103 if images := tag.get("images", []):
104 for img in images:
105 src = img.get("src", "")
106 ext = img.get("type", "png")
107 media.append({"photo": download_file(src, path=f"{DOWNLOAD_DIR}/{rand_string()}.{ext}", proxy=PROXY.WECHAT, **kwargs)})
108 htmls += f"<br><img src='{PROXY.IMG}{src}' alt='微信图片'/>"
109 await modify_progress(text=f"✅解析成功...\n⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
110 media = await download_media(media, **kwargs)
111 txt_path = Path(DOWNLOAD_DIR) / f"{title}.txt"
112 with txt_path.open("w") as f:
113 f.write(f"📝{title}\n👤{author}\n🕒{dt}\n🔗{url}\n\n" + texts.strip())
114 except Exception as e:
115 logger.error(e)
116 return {"error": str(e)}
117 return {"html": htmls, "path": txt_path.as_posix(), "markdown": markdowns, "media": media, "title": title, "author": author, "header": header}