main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import json
5import re
6from pathlib import Path
7
8from glom import Coalesce, glom
9from loguru import logger
10from pyrogram.client import Client
11from pyrogram.types import LinkPreviewOptions, Message
12
13from config import AI, API, CAPTION_LENGTH, PROXY, TELEGRAM_UA, TEXT_LENGTH, TZ
14from database.r2 import get_cf_r2
15from messages.database import copy_messages_from_db, save_messages
16from messages.progress import modify_progress
17from messages.sender import send2tg
18from messages.utils import sender_markdown_to_html, smart_split, summay_media
19from networking import download_file, download_media, hx_req
20from preview.utils import add_summary_url, trim
21from publish import publish_telegraph
22from summarize.summarize import summarize
23from utils import nowdt, readable_count, remove_consecutive_newlines, true, ts_to_dt
24
25
26async def preview_twitter(
27 client: Client,
28 message: Message,
29 url: str = "",
30 db_key: str = "",
31 handle: str = "",
32 post_id: int = 0,
33 *,
34 twitter_comments: bool = True,
35 show_statistics: bool = True,
36 summary_twitter: bool = False,
37 summary_twitter_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
38 **kwargs,
39):
40 """Preview twitter link in the message.
41
42 Args:
43 client (Client): The Pyrogram client.
44 message (Message): The trigger message object.
45 url (str, optional): The twitter link.
46 db_key (str, optional): The cache key.
47 platform (str): The social media platform.
48 twitter_provider (str): The extractor to use: fxtwitter or tikhub.
49 twitter_comments (bool, optional): Add twitter comments. Defaults to True
50 """
51 if kv := await get_cf_r2(db_key):
52 logger.debug(f"Twitter preview cache hit for key={db_key}")
53 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
54 return
55 logger.warning("❌从缓存中转发失败, 尝试重新解析...")
56 link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=f"https://fixupx.com/{handle}/status/{post_id}")
57 if kwargs.get("show_progress") and "progress" not in kwargs:
58 status = await message.reply_text(f"🔗正在解析推特链接\n{url}", link_preview_options=link_preview)
59 kwargs["progress"] = status
60
61 api_url = f"{API.FXTWITTER}/2/thread/{post_id}?lang=zh-cn"
62 logger.info(f"Twitter preview: {api_url}")
63 headers = {"user-agent": TELEGRAM_UA}
64 resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
65 resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
66 if resp.get("hx_error"):
67 if status := kwargs.get("progress"):
68 await status.edit_text(f"❌推特解析失败\n{url}", link_preview_options=link_preview)
69 return
70 resp = trim(resp)
71 thread: list[dict] = resp.get("thread", [])
72 caption = ""
73 media = []
74 media_cursor = 1
75 media_indicator = ""
76 article_url = None
77 article_html = ""
78 sender_tag = sender_markdown_to_html(kwargs.pop("send_from_user", ""))
79 for idx, post in enumerate(sorted(thread, key=lambda x: x.get("created_timestamp", 0))):
80 author = glom(post, "author.name", default="Anonymous")
81 tweet_url = glom(post, "url", default=url)
82 emoji = "🕊" if idx == 0 else "⤴️"
83 author_tag = sender_tag if idx == 0 else ""
84 author_tag += f'<a href="{tweet_url}"><b>{emoji}{author}</b></a>'
85 if post.get("article"):
86 post |= await parse_article(post["article"], author, tweet_url) # noqa: PLW2901
87 article_url = post.get("article_url")
88 article_html = post.get("html", "")
89 post_media = glom(post, "media.all", default=[])
90 media.extend(parse_media(post_media))
91 if post_media:
92 media_indicator = f"🏞P{media_cursor}-{media_cursor + len(post_media) - 1}" if len(post_media) > 1 else f"🏞P{media_cursor}"
93 media_cursor += len(post_media)
94 if (len(thread) == 1 and not post.get("quote")) or len(post_media) == 0:
95 media_indicator = ""
96 dt = ts_to_dt(post.get("created_timestamp")) or nowdt(TZ)
97 date_str = f"🕒{dt.strftime('%Y-%m-%d %H:%M:%S')} {media_indicator}".strip()
98 text = glom(post, Coalesce("html_no_media", "translation.text", "text"), default="")
99 stats = get_statistics(post, show_statistics=show_statistics) if idx == len(thread) - 1 else ""
100 caption += f"\n{author_tag}\n{date_str}\n{stats}\n".replace("\n\n", "\n") + clean_handle(text)
101 if quote := post.get("quote"):
102 quote_author = glom(quote, "author.name", default="Anonymous")
103 quote_url = glom(quote, "url", default=url)
104 quote_text = glom(quote, Coalesce("translation.text", "text"), default="")
105 quote_media = glom(quote, "media.all", default=[])
106 if article := quote.get("article"):
107 title = article.get("title", "Twitter Article")
108 preview_text = article.get("preview_text", "")
109 quote_text = f'<h1><a href="{quote_url}">{title}</a></h1>\n{preview_text}'
110 media.extend(parse_media(quote_media))
111 if quote_media:
112 media_indicator = f"🏞P{media_cursor}-{media_cursor + len(quote_media) - 1}" if len(quote_media) > 1 else f"🏞P{media_cursor}"
113 media_cursor += len(quote_media)
114 if len(quote_media) == 0:
115 media_indicator = ""
116 quote_dt = ts_to_dt(quote.get("created_timestamp")) or nowdt(TZ)
117 quote_date_str = f"🕒{quote_dt.strftime('%Y-%m-%d %H:%M:%S')} {media_indicator}".strip()
118 quote_stats = get_statistics(quote, show_statistics=show_statistics)
119 caption += f'\n<a href="{quote_url}"><b>↪️{quote_author}</b></a>\n{quote_date_str}\n{quote_stats}\n'.replace("\n\n", "\n") + clean_handle(quote_text)
120
121 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
122 media = await download_media(media, **kwargs)
123 sent_messages = []
124 caption = caption.strip()
125 if article_url:
126 head, _ = caption.split("</h1>", maxsplit=1)
127 head += "</h1>"
128 caption = caption.strip().replace("<blockquote>", f"\n{'—' * 10}\n").replace("</blockquote>", f"\n{'—' * 10}\n")
129 caption = caption.replace("<pre>", "</blockquote><pre>").replace("</pre>", "</pre><blockquote expandable>")
130 link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=article_url)
131 for idx, m in enumerate(await smart_split(caption)):
132 if idx == 0: # first msg
133 text = f"{head}\n<blockquote expandable>{m.removeprefix(head)}</blockquote>" if m.startswith(head) else f"<blockquote expandable>{m}</blockquote>"
134 cur_msg = await message.reply_text(text=text, quote=True, link_preview_options=link_preview)
135 else:
136 cur_msg = await cur_msg.reply_text(f"<blockquote expandable>{m}</blockquote>", quote=True)
137 if isinstance(cur_msg, Message):
138 sent_messages.append(cur_msg)
139 await asyncio.sleep(1)
140 sent_messages.extend(await send2tg(client, cur_msg or message, media=media, keep_file=True, **kwargs))
141 else: # Normal tweet
142 comments_list = await get_comments(post_id, twitter_comments=twitter_comments)
143 caption_with_comments = caption
144 max_length = CAPTION_LENGTH if media else TEXT_LENGTH
145 for cmt in comments_list:
146 if len(await smart_split(f"{caption_with_comments}\n<blockquote expandable>{cmt}</blockquote>", max_length)) == 1:
147 caption_with_comments += f"\n{cmt}"
148 comments = caption_with_comments.removeprefix(caption).strip()
149 texts = f"{caption}\n<blockquote expandable>{comments}</blockquote>" if comments else caption
150 sent_messages = await send2tg(client, message, texts=texts, media=media, keep_file=True, **kwargs)
151 await modify_progress(del_status=True, **kwargs)
152 # Summary twitter
153 # find the first message that has a caption
154 caption_msg = None
155 index = -1
156 for idx, m in enumerate(sent_messages):
157 if isinstance(m, Message) and (m.caption or m.text):
158 caption_msg = m
159 index = idx
160 break
161 if summary_twitter and caption_msg:
162 edited_msg = await summarize_twitter(caption_msg, resp, article_html, media, summary_twitter_model)
163 sent_messages[index] = edited_msg
164 await save_messages(messages=sent_messages, key=db_key)
165 # Clean up
166 [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
167
168
169def get_statistics(post: dict, *, show_statistics: bool = True) -> str:
170 if not true(show_statistics):
171 return ""
172 statistics = ""
173 if view := glom(post, "views", default=0):
174 statistics += f"👁{readable_count(view)}"
175 if like := glom(post, "likes", default=0):
176 statistics += f"❤️{readable_count(like)}"
177 if comment := glom(post, "replies", default=0):
178 statistics += f"💬{readable_count(comment)}"
179 if share := glom(post, "reposts", default=0):
180 statistics += f"🔁{readable_count(share)}"
181 if bookmark := glom(post, "bookmarks", default=0):
182 statistics += f"🔖{readable_count(bookmark)}"
183 return statistics
184
185
186def clean_handle(s: str) -> str:
187 """Remove handle prefix."""
188 return re.sub(r"^(\s*@[a-zA-Z0-9_]+)+\s*", "", s)
189
190
191def parse_media(media_list: list[dict]) -> list[dict]:
192 """Parse media list."""
193 media = []
194 for x in media_list:
195 if x.get("type") == "photo":
196 media.append({"url": x.get("url"), "photo": download_file(x.get("url", ""), proxy=PROXY.TWITTER)})
197 elif x.get("type") in ["gif", "video"]:
198 mp4 = [x for x in x.get("formats", []) if x.get("codec") == "h264"]
199 mp4_url = next((x.get("url", "") for x in sorted(mp4, key=lambda x: x.get("bitrate", 0), reverse=True)), "")
200 if not mp4_url:
201 mp4_url = x.get("url", "")
202 media.append({"url": mp4_url, "video": download_file(mp4_url, proxy=PROXY.TWITTER)})
203 return media
204
205
206async def parse_article(article: dict, author: str, tweet_url: str) -> dict:
207 def inline_style(text: str, styles: list[dict]) -> str:
208 """处理内联样式 (加粗、斜体等字符级格式).
209
210 使用前后缀注入法, 避免直接修改字符串导致 Index 偏移
211 """
212 if not isinstance(text, str) or not text.strip():
213 return ""
214 styles = styles or []
215 text_len = len(text)
216 prefixes = {i: [] for i in range(text_len + 1)}
217 suffixes = {i: [] for i in range(text_len + 1)}
218 for style in styles:
219 style_ = style["style"].lower()
220 start = style["offset"]
221 end = start + style["length"]
222 tag_start = ""
223 if style_ == "bold":
224 tag_start = "<b>"
225 tag_end = "</b>"
226 elif style_ == "italic":
227 tag_start = "<i>"
228 tag_end = "</i>"
229 if tag_start:
230 prefixes[start].append(tag_start)
231 suffixes[end].insert(0, tag_end) # 使用 insert(0) 确保闭合标签以正确的嵌套顺序反向闭合
232
233 formatted_text = ""
234 for i in range(text_len + 1):
235 formatted_text += "".join(suffixes[i]) # 先闭合
236 formatted_text += "".join(prefixes[i]) # 再开启
237 if i < text_len:
238 formatted_text += text[i]
239 return formatted_text
240
241 html = ""
242 if cover_url := glom(article, "cover_media.media_info.original_img_url", default=""):
243 html += f'\n<img src="{cover_url}" alt="Cover" />'
244 media_list = []
245 for media in article.get("media_entities", []):
246 if variants := [x for x in glom(media, "media_info.variants", default=[]) if x.get("content_type") == "video/mp4"]: # video
247 variants = sorted(variants, key=lambda x: x.get("bit_rate", 0), reverse=True)
248 if video_url := glom(variants, "0.url", default=""):
249 media_list.append({"url": video_url, "type": "video", "media_id": media.get("media_id")})
250 elif img_url := glom(media, "media_info.original_img_url", default=""):
251 media_list.append({"url": img_url, "type": "photo", "media_id": media.get("media_id")})
252
253 entity_map = glom(article, "content.entityMap", default={})
254 entity_dict = {str(x["key"]): x["value"] for x in entity_map} if isinstance(entity_map, list) else {str(k): v for k, v in entity_map.items()}
255
256 def parse_atomic(entities: list[dict]) -> str:
257 """Parse atomic block."""
258 if not entities:
259 return ""
260 texts = ""
261 for x in entities:
262 if entity := entity_dict.get(str(x["key"])):
263 e_type = entity.get("type", "").upper()
264 if e_type == "MEDIA":
265 media_id = glom(entity, "data.mediaItems.0.mediaId", default="")
266 if img_url := next((x["url"] for x in media_list if x["type"] == "photo" and x["media_id"] == media_id), None):
267 texts += f'\n<img src="{img_url}" alt="IMG-{media_id}" />'
268 elif video_url := next((x["url"] for x in media_list if x["type"] == "video" and x["media_id"] == media_id), None):
269 texts += f'\n<video src="{video_url}" />'
270 elif e_type == "DIVIDER":
271 texts += "\n"
272 elif e_type == "TWEET":
273 if tweet_id := glom(entity, "data.tweetId", default=""):
274 texts += f'\n<a href="https://x.com/i/status/{tweet_id}">QuoteTweet</a>'
275 elif e_type == "MARKDOWN":
276 markdown = glom(entity, "data.markdown", default="").strip("`")
277 lang, raw = markdown.split("\n", maxsplit=1)
278 if lang:
279 texts += f'\n<pre language="{lang}">{raw}</pre>'
280 else:
281 texts += f"\n<pre>{markdown}</pre>"
282 return texts.strip()
283
284 # blocks
285 for block in glom(article, "content.blocks", default=[]):
286 text = inline_style(block.get("text"), block.get("inlineStyleRanges"))
287 entities = block.get("entityRanges", [])
288
289 block_type = block.get("type")
290 match block_type:
291 case "header-one":
292 html += f"\n<h1>{text}</h1>"
293 case "header-two":
294 html += f"\n<h2>{text}</h2>"
295 case "header-three":
296 html += f"\n<h3>{text}</h3>"
297 case "header-four":
298 html += f"\n<h4>{text}</h4>"
299 case "blockquote":
300 html += f"\n<blockquote>{text}</blockquote>"
301 case "ordered-list-item" | "unordered-list-item":
302 html += f"\n・{text}"
303 case "atomic":
304 html += f"\n{parse_atomic(entities)}"
305 case _:
306 html += f"\n<p>{text}</p>" if text else ""
307
308 # form ordered media list
309 media = []
310 # 匹配img标签的正则表达式(支持单引号和双引号)
311 img_pattern = re.compile(r'<img\s+[^>]*?src\s*=\s*["\'](.*?)["\'][^>]*?>', re.IGNORECASE)
312 # 匹配video标签的正则表达式(支持单引号和双引号)
313 video_pattern = re.compile(r'<video\s+[^>]*?src\s*=\s*["\'](.*?)["\'][^>]*?>', re.IGNORECASE)
314 for line in html.splitlines():
315 if match_img := img_pattern.search(line):
316 media.append({"url": match_img.group(1), "type": "photo"})
317 if match_vid := video_pattern.search(line):
318 media.append({"url": match_vid.group(1), "type": "video"})
319
320 # 移除所有img和video标签
321 clean_html = img_pattern.sub("", html)
322 clean_html = video_pattern.sub("", clean_html)
323 title = article.get("title", "Twitter Article")
324 if article_url := await publish_telegraph(title=title, author=author, url=tweet_url, html=html):
325 clean_html = f'<h1><a href="{article_url}">{title}</a></h1>\n{clean_html.strip()}'
326 html = f'<h1><a href="{article_url}">{title}</a></h1>\n{html.strip()}'
327
328 return {
329 "is_article": True,
330 "html_no_media": remove_consecutive_newlines(clean_html).strip(),
331 "image_urls": img_pattern.findall(html),
332 "video_urls": video_pattern.findall(html),
333 "html": html,
334 "article_url": article_url,
335 "media": {"all": media},
336 "title": article.get("title", "Twitter Article"),
337 }
338
339
340async def get_comments(post_id: int, *, twitter_comments: bool = True) -> list[str]:
341 """Get comments."""
342 if not true(twitter_comments):
343 return []
344 api_url = f"{API.FXTWITTER}/2/conversation/{post_id}?lang=zh-cn"
345 logger.info(f"Get Twitter comments: {api_url}")
346 headers = {"user-agent": TELEGRAM_UA}
347 resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
348 if resp.get("hx_error"):
349 return []
350 resp = trim(resp)
351 comments = []
352 replies = resp.get("replies", [])
353 for reply in sorted(replies, key=lambda x: x["created_timestamp"]):
354 author = glom(reply, "author.name", default="Anonymous")
355 tweet_url = glom(reply, "url", default="https://x.com")
356 if text := glom(reply, Coalesce("text", "raw_text.text"), default=""):
357 comments.append(f'<a href="{tweet_url}"><b>💬{author}:</b></a> {clean_handle(text)}')
358 if comments:
359 comments.insert(0, "<b>💬点击展开评论:</b>")
360 return comments
361
362
363async def summarize_twitter(message: Message, tweet: dict, article: str, media_list: list[dict], model: str) -> Message:
364 """Generate source for AI summary."""
365 thread = tweet.get("thread", [])
366 posts = []
367 for post in sorted(thread, key=lambda x: x.get("created_timestamp", 0)):
368 author = glom(post, "author.name", default="Anonymous")
369 dt = ts_to_dt(post.get("created_timestamp")) or nowdt(TZ)
370 date_str = f"{dt.strftime('%Y-%m-%d %H:%M:%S')}"
371 text = article or glom(post, Coalesce("translation.text", "text"), default="")
372 post_info = {"author": author, "date": date_str, "text": clean_handle(text)}
373 if quote := post.get("quote"):
374 quote_author = glom(quote, "author.name", default="Anonymous")
375 quote_text = glom(quote, Coalesce("translation.text", "text"), default="")
376 if article := quote.get("article"):
377 title = article.get("title", "Twitter Article")
378 preview_text = article.get("preview_text", "")
379 quote_text = f"<h1>{title}</h1>\n{preview_text}"
380 quote_dt = ts_to_dt(quote.get("created_timestamp")) or nowdt(TZ)
381 quote_date_str = f"{quote_dt.strftime('%Y-%m-%d %H:%M:%S')}"
382 post_info["quote_tweet"] = {"author": quote_author, "date": quote_date_str, "text": clean_handle(quote_text)}
383 posts.append(post_info)
384
385 summary_info: dict = {"platform": "Twitter / X"}
386 if len(posts) > 1:
387 summary_info["thread"] = posts
388 elif len(posts) == 1:
389 summary_info |= posts[0]
390
391 sources = []
392 min_text_length = 1000 # skip short tweets
393 min_video_duration = None
394 for media in media_list:
395 if media.get("photo"):
396 sources.append({"type": "image", "path": media["photo"]})
397 if media.get("video"):
398 min_text_length = None
399 min_video_duration = 120 # skip short videos less than 3 minutes
400 sources.append({"type": "video", "path": media["video"]})
401 if article:
402 min_text_length = None # This is twitter article
403 min_video_duration = None
404 sources.append({"type": "text", "text": json.dumps(summary_info, ensure_ascii=False)})
405 summary = await summarize(
406 sources=sources,
407 model=model,
408 title=f"🕊{author}",
409 author=glom(tweet, "status.author.name", default="Anonymous"),
410 url=glom(tweet, "status.url", default="https://x.com"),
411 date=ts_to_dt(glom(tweet, "status.created_timestamp", default=None)) or nowdt(TZ),
412 min_text_length=min_text_length,
413 min_video_duration=min_video_duration,
414 max_video_duration=3600, # skip long videos more than 1 hour
415 )
416 telegraph_url = summary.get("telegraph_url")
417 if not telegraph_url:
418 return message
419 return await add_summary_url(telegraph_url, message) or message