main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4from pathlib import Path
  5from urllib.parse import quote_plus
  6
  7from loguru import logger
  8from pyrogram.client import Client
  9from pyrogram.types import Message
 10
 11from config import API, CAPTION_LENGTH, DB, DOWNLOAD_DIR, PROXY, TEXT_LENGTH, TOKEN
 12from database.database import get_db
 13from messages.database import copy_messages_from_db, save_messages
 14from messages.progress import modify_progress
 15from messages.sender import send2tg
 16from messages.utils import blockquote, count_without_entities, summay_media
 17from networking import download_file, download_media, hx_req
 18from publish import publish_telegraph
 19from utils import nowstr, rand_string
 20
 21
 22async def preview_wechat(client: Client, message: Message, url: str = "", db_key: str = "", **kwargs):
 23    """Preview wechat link in the message.
 24
 25    Args:
 26        client (Client): The Pyrogram client.
 27        message (Message): The trigger message object.
 28        url (str, optional): wechat link
 29        db_key (str, optional): The cache key.
 30    """
 31    if kwargs.get("show_progress") and "progress" not in kwargs:
 32        res = await send2tg(client, message, texts=f"🔗正在解析微信链接\n{url}", **kwargs)
 33        kwargs["progress"] = res[0]
 34    if kv := await get_db(db_key):
 35        logger.debug(f"WeChat preview {DB.ENGINE} cache hit for key={db_key}")
 36        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 37            return
 38        await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
 39    logger.info(f"WeChat link preview for {url}")
 40
 41    post_info = await get_wechat_info(url)
 42    if error := post_info.get("error"):
 43        await modify_progress(text=f"❌微信链接解析失败{url}\n{error}", force_update=True, **kwargs)
 44        return
 45    sent_messages = []
 46    length = await count_without_entities(post_info["header"] + post_info["markdown"])
 47    if not post_info.get("media"):  # 无图片
 48        if length < TEXT_LENGTH - 8:  # 无图片短文
 49            texts = f"{post_info['header']}\n{blockquote(post_info['markdown'])}"
 50            sent_messages.extend(await send2tg(client, message, texts=texts, **kwargs))
 51        else:  # 无图片长文
 52            texts = f"{post_info['header']}"
 53            telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url)
 54            if telegraph_url:
 55                texts += f"\n⚡️[即时预览]({telegraph_url})"
 56            sent_messages.extend(await send2tg(client, message, texts=texts, media=[{"document": post_info["html_path"]}], **kwargs))
 57    elif length < CAPTION_LENGTH - 8:  # 有图片短文
 58        texts = f"{post_info['header']}\n{blockquote(post_info['markdown'])}"
 59        sent_messages.extend(await send2tg(client, message, texts=texts, media=post_info["media"], **kwargs))
 60    else:  # 有图片长文
 61        texts = f"{post_info['header']}"
 62        telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url)
 63        if telegraph_url:
 64            texts += f"\n⚡️[即时预览]({telegraph_url})"
 65        sent_messages.extend(await send2tg(client, message, texts=texts, media=[{"document": post_info["path"]}], **kwargs))
 66        kwargs["reply_msg_id"] = -1  # do not send as reply
 67        sent_messages.extend(await send2tg(client, message, texts=texts, media=post_info["media"], **kwargs))
 68    await modify_progress(del_status=True, **kwargs)
 69    await save_messages(messages=sent_messages, key=db_key)
 70
 71
 72async def get_wechat_info(url: str, **kwargs) -> dict:
 73    """Get WeChat post info."""
 74    api_url = API.TIKHUB_WECHAT + quote_plus(url)
 75    logger.info(f"Preview WeChat TikHub for {api_url}")
 76    headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
 77    resp = await hx_req(api_url, headers=headers, check_keys=["data.content.raw_content", "data.title"], check_kv={"code": 200})
 78    if resp.get("hx_error"):
 79        return {"error": resp["hx_error"]}
 80
 81    try:
 82        data = resp["data"]
 83        title = data["title"]
 84        author = data.get("author", "author")
 85        dt = nowstr()
 86        with contextlib.suppress(Exception):
 87            dt = data["datetime"]  # 2025-04-28T06:12:35.833830
 88            dt = dt[:19].replace("T", " ")  # 2025-04-28 06:12:35
 89        header = f"🟢[{author}]({url})\n🕒{dt}\n**📝{title}**"
 90        media = []
 91        htmls = ""
 92        texts = ""
 93        markdowns = ""
 94        for tag in data["content"]["raw_content"]:
 95            html = ""
 96            if text := tag.get("text", ""):
 97                html = f"<h3>{text}</h3>" if tag.get("type", "") == "section" else f"<p>{text}</p>"
 98                markdown = f"\n\n**{text}**" if tag.get("type", "") == "section" else f"\n{text}"
 99                text = f"\n\n{text}" if tag.get("type", "") == "section" else f"\n{text}"
100                htmls += f"<br>{html}"
101                markdowns += f"\n{markdown}"
102                texts += f"\n{text}"
103            if images := tag.get("images", []):
104                for img in images:
105                    src = img.get("src", "")
106                    ext = img.get("type", "png")
107                    media.append({"photo": download_file(src, path=f"{DOWNLOAD_DIR}/{rand_string()}.{ext}", proxy=PROXY.WECHAT, **kwargs)})
108                    htmls += f"<br><img src='{PROXY.IMG}{src}' alt='微信图片'/>"
109        await modify_progress(text=f"✅解析成功...\n⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
110        media = await download_media(media, **kwargs)
111        txt_path = Path(DOWNLOAD_DIR) / f"{title}.txt"
112        with txt_path.open("w") as f:
113            f.write(f"📝{title}\n👤{author}\n🕒{dt}\n🔗{url}\n\n" + texts.strip())
114    except Exception as e:
115        logger.error(e)
116        return {"error": str(e)}
117    return {"html": htmls, "path": txt_path.as_posix(), "markdown": markdowns, "media": media, "title": title, "author": author, "header": header}