main
 1#!/usr/bin/env python
 2# -*- coding: utf-8 -*-
 3import re
 4from datetime import UTC, datetime
 5from pathlib import Path
 6from zoneinfo import ZoneInfo
 7
 8from glom import glom
 9from loguru import logger
10from pyrogram.client import Client
11from pyrogram.types import Message
12
13from config import PROXY, TELEGRAM_UA, TOKEN, TZ
14from messages.progress import modify_progress
15from messages.sender import send2tg
16from messages.utils import summay_media
17from networking import download_file, download_media, hx_req
18from utils import number_to_emoji
19
20
21async def preview_v2ex(client: Client, message: Message, url: str = "", topic_id: str = "", **kwargs):
22    """Preview v2ex link in the message.
23
24    Args:
25        client (Client): The Pyrogram client.
26        message (Message): The trigger message object.
27        url (str, optional): v2ex link
28        db_key (str, optional): The cache key.
29    """
30    if kwargs.get("show_progress") and "progress" not in kwargs:
31        res = await send2tg(client, message, texts=f"🔗正在解析V2ex链接\n{url}", **kwargs)
32        kwargs["progress"] = res[0]
33    logger.info(f"v2ex link preview for {url}")
34    headers = {"Authorization": f"Bearer {TOKEN.V2EX}"}
35    topic_api = f"https://www.v2ex.com/api/v2/topics/{topic_id}"
36    resp = await hx_req(topic_api, proxy=PROXY.V2EX, headers=headers, check_kv={"success": True, "result.id": topic_id})
37    if error := resp.get("error"):
38        await modify_progress(text=f"❌v2ex链接解析失败{url}\n{error}", force_update=True, **kwargs)
39        return
40    author = glom(resp, "result.member.username", default="V2EX_User")
41    author_url = f"https://www.v2ex.com/member/{author}"
42    title = glom(resp, "result.title", default="Title")
43    ts = glom(resp, "result.created", default=0)
44    texts = f"💻[{author}]({author_url})\n"
45    texts += f"🕒{datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ)).strftime('%Y-%m-%d %H:%M:%S')}\n"
46    texts += f"📝[{title}]({url})\n"
47    content, img_urls = extract_and_remove_images_regex(glom(resp, "result.content", default=""))
48    texts += content + "\n"
49    if supplements := glom(resp, "result.supplements", default=[]):
50        for idx, supp in enumerate(supplements):
51            texts += f"\n补充留言{number_to_emoji(idx + 1)}:\n{supp.get('content', '')}\n"
52
53    media = await download_imgs(img_urls)
54    if media:
55        await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
56    media = await download_media(media, **kwargs)
57    await send2tg(client, message, texts=texts, media=media, **kwargs)
58    await modify_progress(del_status=True, **kwargs)
59
60
61def extract_and_remove_images_regex(markdown_text: str) -> tuple[str, list[str]]:
62    """Extract images from markdown text and remove them from the text.
63
64    Returns:
65        tuple[str, list[str]]: The markdown text without images and the extracted image URLs.
66    """
67    image_pattern = r'!\[([^\]]*)\]\((.*?)\s*(".*?")?\)'
68
69    image_urls = re.findall(image_pattern, markdown_text)
70    urls = [url[1].strip() for url in image_urls]  # only need urls
71
72    text_without_images = re.sub(image_pattern, "", markdown_text)
73
74    return text_without_images, urls
75
76
77async def download_imgs(img_urls: list[str]) -> list[dict]:
78    """Download images from img_urls."""
79    media = []
80    for img_url in img_urls:
81        # handle imgur.com
82        if img_url.startswith("https://i.imgur.com/"):
83            referer_url = f"https://imgur.com/{Path(img_url).stem}"
84            headers = {"Referer": referer_url, "User-Agent": TELEGRAM_UA}
85            media.append({"photo": download_file(img_url, proxy=PROXY.WARP, headers=headers)})
86        else:
87            media.append({"photo": download_file(img_url, proxy=PROXY.V2EX)})
88    return media