main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import re
4from datetime import UTC, datetime
5from pathlib import Path
6from zoneinfo import ZoneInfo
7
8from glom import glom
9from loguru import logger
10from pyrogram.client import Client
11from pyrogram.types import Message
12
13from config import PROXY, TELEGRAM_UA, TOKEN, TZ
14from messages.progress import modify_progress
15from messages.sender import send2tg
16from messages.utils import summay_media
17from networking import download_file, download_media, hx_req
18from utils import number_to_emoji
19
20
21async def preview_v2ex(client: Client, message: Message, url: str = "", topic_id: str = "", **kwargs):
22 """Preview v2ex link in the message.
23
24 Args:
25 client (Client): The Pyrogram client.
26 message (Message): The trigger message object.
27 url (str, optional): v2ex link
28 db_key (str, optional): The cache key.
29 """
30 if kwargs.get("show_progress") and "progress" not in kwargs:
31 res = await send2tg(client, message, texts=f"🔗正在解析V2ex链接\n{url}", **kwargs)
32 kwargs["progress"] = res[0]
33 logger.info(f"v2ex link preview for {url}")
34 headers = {"Authorization": f"Bearer {TOKEN.V2EX}"}
35 topic_api = f"https://www.v2ex.com/api/v2/topics/{topic_id}"
36 resp = await hx_req(topic_api, proxy=PROXY.V2EX, headers=headers, check_kv={"success": True, "result.id": topic_id})
37 if error := resp.get("error"):
38 await modify_progress(text=f"❌v2ex链接解析失败{url}\n{error}", force_update=True, **kwargs)
39 return
40 author = glom(resp, "result.member.username", default="V2EX_User")
41 author_url = f"https://www.v2ex.com/member/{author}"
42 title = glom(resp, "result.title", default="Title")
43 ts = glom(resp, "result.created", default=0)
44 texts = f"💻[{author}]({author_url})\n"
45 texts += f"🕒{datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ)).strftime('%Y-%m-%d %H:%M:%S')}\n"
46 texts += f"📝[{title}]({url})\n"
47 content, img_urls = extract_and_remove_images_regex(glom(resp, "result.content", default=""))
48 texts += content + "\n"
49 if supplements := glom(resp, "result.supplements", default=[]):
50 for idx, supp in enumerate(supplements):
51 texts += f"\n补充留言{number_to_emoji(idx + 1)}:\n{supp.get('content', '')}\n"
52
53 media = await download_imgs(img_urls)
54 if media:
55 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
56 media = await download_media(media, **kwargs)
57 await send2tg(client, message, texts=texts, media=media, **kwargs)
58 await modify_progress(del_status=True, **kwargs)
59
60
61def extract_and_remove_images_regex(markdown_text: str) -> tuple[str, list[str]]:
62 """Extract images from markdown text and remove them from the text.
63
64 Returns:
65 tuple[str, list[str]]: The markdown text without images and the extracted image URLs.
66 """
67 image_pattern = r'!\[([^\]]*)\]\((.*?)\s*(".*?")?\)'
68
69 image_urls = re.findall(image_pattern, markdown_text)
70 urls = [url[1].strip() for url in image_urls] # only need urls
71
72 text_without_images = re.sub(image_pattern, "", markdown_text)
73
74 return text_without_images, urls
75
76
77async def download_imgs(img_urls: list[str]) -> list[dict]:
78 """Download images from img_urls."""
79 media = []
80 for img_url in img_urls:
81 # handle imgur.com
82 if img_url.startswith("https://i.imgur.com/"):
83 referer_url = f"https://imgur.com/{Path(img_url).stem}"
84 headers = {"Referer": referer_url, "User-Agent": TELEGRAM_UA}
85 media.append({"photo": download_file(img_url, proxy=PROXY.WARP, headers=headers)})
86 else:
87 media.append({"photo": download_file(img_url, proxy=PROXY.V2EX)})
88 return media