main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import json
4import re
5from datetime import UTC, datetime
6from pathlib import Path
7from zoneinfo import ZoneInfo
8
9from glom import Coalesce, glom
10from loguru import logger
11from pyrogram.client import Client
12from pyrogram.types import Message
13
14from ai.utils import trim_none
15from config import AI, PROXY, TELEGRAM_UA, TOKEN, TZ
16from database.kv import get_cf_kv, set_cf_kv
17from messages.progress import modify_progress
18from messages.sender import send2tg
19from messages.utils import summay_media
20from networking import download_file, download_media, hx_req
21from preview.utils import add_summary_url
22from summarize.summarize import summarize
23from utils import nowstr, number_to_emoji
24
25
26async def preview_v2ex(
27 client: Client,
28 message: Message,
29 url: str = "",
30 topic_id: str = "",
31 *,
32 summary_v2ex: bool = True,
33 summary_v2ex_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
34 **kwargs,
35):
36 """Preview v2ex link in the message.
37
38 Args:
39 client (Client): The Pyrogram client.
40 message (Message): The trigger message object.
41 url (str, optional): v2ex link
42 db_key (str, optional): The cache key.
43 """
44 if kwargs.get("show_progress") and "progress" not in kwargs:
45 res = await send2tg(client, message, texts=f"🔗正在解析V2ex链接\n{url}", **kwargs)
46 kwargs["progress"] = res[0]
47 logger.info(f"v2ex link preview for {url}")
48 token = await refresh_v2ex_token()
49 if not token:
50 await modify_progress(text="❌V2EX Token已失效, 请手动创建", force_update=True, **kwargs)
51 return
52 headers = {"Authorization": f"Bearer {token}"}
53 topic_api = f"https://www.v2ex.com/api/v2/topics/{topic_id}"
54 resp = await hx_req(topic_api, proxy=PROXY.V2EX, headers=headers, check_kv={"success": True, "result.id": topic_id})
55 if error := resp.get("error"):
56 await modify_progress(text=f"❌v2ex链接解析失败{url}\n{error}", force_update=True, **kwargs)
57 return
58 author = glom(resp, "result.member.username", default="V2EX_User")
59 author_url = f"https://www.v2ex.com/member/{author}"
60 title = glom(resp, "result.title", default="Title")
61 ts = glom(resp, "result.created", default=0)
62 texts = f"💻**[{author}]({author_url})**\n"
63 texts += f"🕒{datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ)).strftime('%Y-%m-%d %H:%M:%S')}\n"
64 texts += f"📝**[{title}]({url})**\n"
65 content, img_urls = extract_and_remove_images_regex(glom(resp, "result.content", default=""))
66 texts += content + "\n"
67 if supplements := glom(resp, "result.supplements", default=[]):
68 for idx, supp in enumerate(supplements):
69 texts += f"\n补充留言{number_to_emoji(idx + 1)}:\n{supp.get('content', '')}\n"
70
71 media = await download_imgs(img_urls)
72 if media:
73 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
74 media = await download_media(media, **kwargs)
75 sent_messages = await send2tg(client, message, texts=texts, media=media, keep_file=True, **kwargs)
76 await modify_progress(del_status=True, **kwargs)
77 # Summary v2ex
78 # find the first message that has a caption
79 caption_msg = None
80 for m in sent_messages:
81 if isinstance(m, Message) and (m.caption or m.text):
82 caption_msg = m
83 break
84 if summary_v2ex and caption_msg:
85 await summarize_v2ex(caption_msg, resp.get("result", {}), media, summary_v2ex_model, url)
86 # Clean up
87 [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
88
89
90def extract_and_remove_images_regex(markdown_text: str) -> tuple[str, list[str]]:
91 """Extract images from markdown text and remove them from the text.
92
93 Returns:
94 tuple[str, list[str]]: The markdown text without images and the extracted image URLs.
95 """
96 image_pattern = r'!\[([^\]]*)\]\((.*?)\s*(".*?")?\)'
97
98 image_urls = re.findall(image_pattern, markdown_text)
99 urls = [url[1].strip() for url in image_urls] # only need urls
100
101 text_without_images = re.sub(image_pattern, "", markdown_text)
102
103 return text_without_images, urls
104
105
106async def download_imgs(img_urls: list[str]) -> list[dict]:
107 """Download images from img_urls."""
108 media = []
109 for img_url in img_urls:
110 # handle imgur.com
111 if img_url.startswith("https://i.imgur.com/"):
112 referer_url = f"https://imgur.com/{Path(img_url).stem}"
113 headers = {"Referer": referer_url, "User-Agent": TELEGRAM_UA}
114 media.append({"photo": download_file(img_url, proxy=PROXY.WARP, headers=headers)})
115 else:
116 media.append({"photo": download_file(img_url, proxy=PROXY.V2EX)})
117 return media
118
119
120async def refresh_v2ex_token() -> str:
121 """Refresh v2ex token.
122
123 V2EX API token expires after 180 days.
124 """
125
126 async def check_token(token: str) -> tuple[bool, int]:
127 resp = await hx_req(
128 "https://www.v2ex.com/api/v2/token",
129 proxy=PROXY.V2EX,
130 headers={"Authorization": f"Bearer {token}"},
131 check_kv={"success": True},
132 max_retry=0,
133 )
134 return bool(glom(resp, "result.token", default=None)), glom(resp, "result.expiration", default=0)
135
136 async def create_token(token: str) -> str:
137 resp = await hx_req(
138 "https://www.v2ex.com/api/v2/tokens",
139 method="POST",
140 proxy=PROXY.V2EX,
141 json_data={"scope": "everything", "expiration": 15552000},
142 headers={"Authorization": f"Bearer {token}"},
143 max_retry=0,
144 check_kv={"success": True},
145 )
146 return glom(resp, "result.token", default="")
147
148 if TOKEN.V2EX:
149 valid, ttl = await check_token(TOKEN.V2EX)
150 if valid:
151 if ttl < 86400 * 3: # 3天之内
152 new_token = await create_token(TOKEN.V2EX)
153 logger.warning("V2EX Token即将失效, 正在重新创建...")
154 await set_cf_kv("v2ex_token", {"token": new_token})
155 return TOKEN.V2EX
156 logger.warning("V2EX Token已失效, 从KV获取...")
157 token = (await get_cf_kv("v2ex_token")).get("token", "")
158 valid, ttl = await check_token(token)
159 if valid:
160 if ttl < 86400 * 3: # 3天之内
161 new_token = await create_token(token)
162 logger.warning("V2EX Token即将失效, 正在重新创建...")
163 await set_cf_kv("v2ex_token", {"token": new_token})
164 return token
165 return ""
166
167
168async def summarize_v2ex(message: Message, v2ex: dict, media_list: list[dict], model: str, url: str) -> Message:
169 """Generate source for AI summary."""
170
171 def date_str(ts: int) -> str | None:
172 if not ts:
173 return None
174 return f"{datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ)).strftime('%Y-%m-%d %H:%M:%S')}"
175
176 data = {
177 "platform": "V2EX",
178 "title": glom(v2ex, "title", default=""),
179 "author_name": glom(v2ex, "member.username", default="Anonymous"),
180 "url": url,
181 "content": glom(v2ex, Coalesce("content_rendered", "content"), default=None),
182 }
183
184 if ts := glom(v2ex, "created", default=0):
185 data["created_at"] = date_str(ts)
186 supplements = [
187 {
188 "created_at": date_str(supp.get("created", 0)),
189 "content": glom(supp, Coalesce("content_rendered", "content"), default=None),
190 }
191 for supp in glom(v2ex, "supplements", default=[])
192 ]
193 if supplements:
194 data["supplements"] = supplements
195 data = trim_none(data)
196 sources = []
197 min_text_length = 1000 # skip short tweets
198 min_video_duration = None
199 for media in media_list:
200 if media.get("photo"):
201 sources.append({"type": "image", "path": media["photo"]})
202 if media.get("video"):
203 min_text_length = None # always summarize video
204 min_video_duration = 120
205 sources.append({"type": "video", "path": media["video"]})
206 sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
207 summary = await summarize(
208 sources=sources,
209 model=model,
210 title=data["title"] or url,
211 author=data["author_name"],
212 url=url,
213 date=data.get("created_at") or nowstr(TZ),
214 min_text_length=min_text_length,
215 min_video_duration=min_video_duration,
216 max_video_duration=3600, # skip long videos more than 1 hour
217 )
218 telegraph_url = summary.get("telegraph_url")
219 if not telegraph_url:
220 return message
221 return await add_summary_url(telegraph_url, message) or message