main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import json
  5import re
  6from pathlib import Path
  7
  8from glom import Coalesce, glom
  9from loguru import logger
 10from pyrogram.client import Client
 11from pyrogram.types import LinkPreviewOptions, Message
 12
 13from config import AI, API, CAPTION_LENGTH, PROXY, TELEGRAM_UA, TEXT_LENGTH, TZ
 14from database.r2 import get_cf_r2
 15from messages.database import copy_messages_from_db, save_messages
 16from messages.progress import modify_progress
 17from messages.sender import send2tg
 18from messages.utils import sender_markdown_to_html, smart_split, summay_media
 19from networking import download_file, download_media, hx_req
 20from preview.utils import add_summary_url, trim
 21from publish import publish_telegraph
 22from summarize.summarize import summarize
 23from utils import nowdt, readable_count, remove_consecutive_newlines, true, ts_to_dt
 24
 25
 26async def preview_twitter(
 27    client: Client,
 28    message: Message,
 29    url: str = "",
 30    db_key: str = "",
 31    handle: str = "",
 32    post_id: int = 0,
 33    *,
 34    twitter_comments: bool = True,
 35    show_statistics: bool = True,
 36    summary_twitter: bool = False,
 37    summary_twitter_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
 38    **kwargs,
 39):
 40    """Preview twitter link in the message.
 41
 42    Args:
 43        client (Client): The Pyrogram client.
 44        message (Message): The trigger message object.
 45        url (str, optional): The twitter link.
 46        db_key (str, optional): The cache key.
 47        platform (str): The social media platform.
 48        twitter_provider (str): The extractor to use: fxtwitter or tikhub.
 49        twitter_comments (bool, optional): Add twitter comments. Defaults to True
 50    """
 51    if kv := await get_cf_r2(db_key):
 52        logger.debug(f"Twitter preview cache hit for key={db_key}")
 53        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 54            return
 55        logger.warning("❌从缓存中转发失败, 尝试重新解析...")
 56    link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=f"https://fixupx.com/{handle}/status/{post_id}")
 57    if kwargs.get("show_progress") and "progress" not in kwargs:
 58        status = await message.reply_text(f"🔗正在解析推特链接\n{url}", link_preview_options=link_preview)
 59        kwargs["progress"] = status
 60
 61    api_url = f"{API.FXTWITTER}/2/thread/{post_id}?lang=zh-cn"
 62    logger.info(f"Twitter preview: {api_url}")
 63    headers = {"user-agent": TELEGRAM_UA}
 64    resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
 65    resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
 66    if resp.get("hx_error"):
 67        if status := kwargs.get("progress"):
 68            await status.edit_text(f"❌推特解析失败\n{url}", link_preview_options=link_preview)
 69        return
 70    resp = trim(resp)
 71    thread: list[dict] = resp.get("thread", [])
 72    caption = ""
 73    media = []
 74    media_cursor = 1
 75    media_indicator = ""
 76    article_url = None
 77    article_html = ""
 78    sender_tag = sender_markdown_to_html(kwargs.pop("send_from_user", ""))
 79    for idx, post in enumerate(sorted(thread, key=lambda x: x.get("created_timestamp", 0))):
 80        author = glom(post, "author.name", default="Anonymous")
 81        tweet_url = glom(post, "url", default=url)
 82        emoji = "🕊" if idx == 0 else "⤴️"
 83        author_tag = sender_tag if idx == 0 else ""
 84        author_tag += f'<a href="{tweet_url}"><b>{emoji}{author}</b></a>'
 85        if post.get("article"):
 86            post |= await parse_article(post["article"], author, tweet_url)  # noqa: PLW2901
 87            article_url = post.get("article_url")
 88            article_html = post.get("html", "")
 89        post_media = glom(post, "media.all", default=[])
 90        media.extend(parse_media(post_media))
 91        if post_media:
 92            media_indicator = f"🏞P{media_cursor}-{media_cursor + len(post_media) - 1}" if len(post_media) > 1 else f"🏞P{media_cursor}"
 93            media_cursor += len(post_media)
 94        if (len(thread) == 1 and not post.get("quote")) or len(post_media) == 0:
 95            media_indicator = ""
 96        dt = ts_to_dt(post.get("created_timestamp")) or nowdt(TZ)
 97        date_str = f"🕒{dt.strftime('%Y-%m-%d %H:%M:%S')} {media_indicator}".strip()
 98        text = glom(post, Coalesce("html_no_media", "translation.text", "text"), default="")
 99        stats = get_statistics(post, show_statistics=show_statistics) if idx == len(thread) - 1 else ""
100        caption += f"\n{author_tag}\n{date_str}\n{stats}\n".replace("\n\n", "\n") + clean_handle(text)
101        if quote := post.get("quote"):
102            quote_author = glom(quote, "author.name", default="Anonymous")
103            quote_url = glom(quote, "url", default=url)
104            quote_text = glom(quote, Coalesce("translation.text", "text"), default="")
105            quote_media = glom(quote, "media.all", default=[])
106            if article := quote.get("article"):
107                title = article.get("title", "Twitter Article")
108                preview_text = article.get("preview_text", "")
109                quote_text = f'<h1><a href="{quote_url}">{title}</a></h1>\n{preview_text}'
110            media.extend(parse_media(quote_media))
111            if quote_media:
112                media_indicator = f"🏞P{media_cursor}-{media_cursor + len(quote_media) - 1}" if len(quote_media) > 1 else f"🏞P{media_cursor}"
113                media_cursor += len(quote_media)
114            if len(quote_media) == 0:
115                media_indicator = ""
116            quote_dt = ts_to_dt(quote.get("created_timestamp")) or nowdt(TZ)
117            quote_date_str = f"🕒{quote_dt.strftime('%Y-%m-%d %H:%M:%S')} {media_indicator}".strip()
118            quote_stats = get_statistics(quote, show_statistics=show_statistics)
119            caption += f'\n<a href="{quote_url}"><b>↪️{quote_author}</b></a>\n{quote_date_str}\n{quote_stats}\n'.replace("\n\n", "\n") + clean_handle(quote_text)
120
121    await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
122    media = await download_media(media, **kwargs)
123    sent_messages = []
124    caption = caption.strip()
125    if article_url:
126        head, _ = caption.split("</h1>", maxsplit=1)
127        head += "</h1>"
128        caption = caption.strip().replace("<blockquote>", f"\n{'' * 10}\n").replace("</blockquote>", f"\n{'' * 10}\n")
129        caption = caption.replace("<pre>", "</blockquote><pre>").replace("</pre>", "</pre><blockquote expandable>")
130        link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=article_url)
131        for idx, m in enumerate(await smart_split(caption)):
132            if idx == 0:  # first msg
133                text = f"{head}\n<blockquote expandable>{m.removeprefix(head)}</blockquote>" if m.startswith(head) else f"<blockquote expandable>{m}</blockquote>"
134                cur_msg = await message.reply_text(text=text, quote=True, link_preview_options=link_preview)
135            else:
136                cur_msg = await cur_msg.reply_text(f"<blockquote expandable>{m}</blockquote>", quote=True)
137            if isinstance(cur_msg, Message):
138                sent_messages.append(cur_msg)
139            await asyncio.sleep(1)
140        sent_messages.extend(await send2tg(client, cur_msg or message, media=media, keep_file=True, **kwargs))
141    else:  # Normal tweet
142        comments_list = await get_comments(post_id, twitter_comments=twitter_comments)
143        caption_with_comments = caption
144        max_length = CAPTION_LENGTH if media else TEXT_LENGTH
145        for cmt in comments_list:
146            if len(await smart_split(f"{caption_with_comments}\n<blockquote expandable>{cmt}</blockquote>", max_length)) == 1:
147                caption_with_comments += f"\n{cmt}"
148        comments = caption_with_comments.removeprefix(caption).strip()
149        texts = f"{caption}\n<blockquote expandable>{comments}</blockquote>" if comments else caption
150        sent_messages = await send2tg(client, message, texts=texts, media=media, keep_file=True, **kwargs)
151    await modify_progress(del_status=True, **kwargs)
152    # Summary twitter
153    # find the first message that has a caption
154    caption_msg = None
155    index = -1
156    for idx, m in enumerate(sent_messages):
157        if isinstance(m, Message) and (m.caption or m.text):
158            caption_msg = m
159            index = idx
160            break
161    if summary_twitter and caption_msg:
162        edited_msg = await summarize_twitter(caption_msg, resp, article_html, media, summary_twitter_model)
163        sent_messages[index] = edited_msg
164    await save_messages(messages=sent_messages, key=db_key)
165    # Clean up
166    [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
167
168
169def get_statistics(post: dict, *, show_statistics: bool = True) -> str:
170    if not true(show_statistics):
171        return ""
172    statistics = ""
173    if view := glom(post, "views", default=0):
174        statistics += f"👁{readable_count(view)}"
175    if like := glom(post, "likes", default=0):
176        statistics += f"❤️{readable_count(like)}"
177    if comment := glom(post, "replies", default=0):
178        statistics += f"💬{readable_count(comment)}"
179    if share := glom(post, "reposts", default=0):
180        statistics += f"🔁{readable_count(share)}"
181    if bookmark := glom(post, "bookmarks", default=0):
182        statistics += f"🔖{readable_count(bookmark)}"
183    return statistics
184
185
186def clean_handle(s: str) -> str:
187    """Remove handle prefix."""
188    return re.sub(r"^(\s*@[a-zA-Z0-9_]+)+\s*", "", s)
189
190
191def parse_media(media_list: list[dict]) -> list[dict]:
192    """Parse media list."""
193    media = []
194    for x in media_list:
195        if x.get("type") == "photo":
196            media.append({"url": x.get("url"), "photo": download_file(x.get("url", ""), proxy=PROXY.TWITTER)})
197        elif x.get("type") in ["gif", "video"]:
198            mp4 = [x for x in x.get("formats", []) if x.get("codec") == "h264"]
199            mp4_url = next((x.get("url", "") for x in sorted(mp4, key=lambda x: x.get("bitrate", 0), reverse=True)), "")
200            if not mp4_url:
201                mp4_url = x.get("url", "")
202            media.append({"url": mp4_url, "video": download_file(mp4_url, proxy=PROXY.TWITTER)})
203    return media
204
205
206async def parse_article(article: dict, author: str, tweet_url: str) -> dict:
207    def inline_style(text: str, styles: list[dict]) -> str:
208        """处理内联样式 (加粗、斜体等字符级格式).
209
210        使用前后缀注入法, 避免直接修改字符串导致 Index 偏移
211        """
212        if not isinstance(text, str) or not text.strip():
213            return ""
214        styles = styles or []
215        text_len = len(text)
216        prefixes = {i: [] for i in range(text_len + 1)}
217        suffixes = {i: [] for i in range(text_len + 1)}
218        for style in styles:
219            style_ = style["style"].lower()
220            start = style["offset"]
221            end = start + style["length"]
222            tag_start = ""
223            if style_ == "bold":
224                tag_start = "<b>"
225                tag_end = "</b>"
226            elif style_ == "italic":
227                tag_start = "<i>"
228                tag_end = "</i>"
229            if tag_start:
230                prefixes[start].append(tag_start)
231                suffixes[end].insert(0, tag_end)  # 使用 insert(0) 确保闭合标签以正确的嵌套顺序反向闭合
232
233        formatted_text = ""
234        for i in range(text_len + 1):
235            formatted_text += "".join(suffixes[i])  # 先闭合
236            formatted_text += "".join(prefixes[i])  # 再开启
237            if i < text_len:
238                formatted_text += text[i]
239        return formatted_text
240
241    html = ""
242    if cover_url := glom(article, "cover_media.media_info.original_img_url", default=""):
243        html += f'\n<img src="{cover_url}" alt="Cover" />'
244    media_list = []
245    for media in article.get("media_entities", []):
246        if variants := [x for x in glom(media, "media_info.variants", default=[]) if x.get("content_type") == "video/mp4"]:  # video
247            variants = sorted(variants, key=lambda x: x.get("bit_rate", 0), reverse=True)
248            if video_url := glom(variants, "0.url", default=""):
249                media_list.append({"url": video_url, "type": "video", "media_id": media.get("media_id")})
250        elif img_url := glom(media, "media_info.original_img_url", default=""):
251            media_list.append({"url": img_url, "type": "photo", "media_id": media.get("media_id")})
252
253    entity_map = glom(article, "content.entityMap", default={})
254    entity_dict = {str(x["key"]): x["value"] for x in entity_map} if isinstance(entity_map, list) else {str(k): v for k, v in entity_map.items()}
255
256    def parse_atomic(entities: list[dict]) -> str:
257        """Parse atomic block."""
258        if not entities:
259            return ""
260        texts = ""
261        for x in entities:
262            if entity := entity_dict.get(str(x["key"])):
263                e_type = entity.get("type", "").upper()
264                if e_type == "MEDIA":
265                    media_id = glom(entity, "data.mediaItems.0.mediaId", default="")
266                    if img_url := next((x["url"] for x in media_list if x["type"] == "photo" and x["media_id"] == media_id), None):
267                        texts += f'\n<img src="{img_url}" alt="IMG-{media_id}" />'
268                    elif video_url := next((x["url"] for x in media_list if x["type"] == "video" and x["media_id"] == media_id), None):
269                        texts += f'\n<video src="{video_url}" />'
270                elif e_type == "DIVIDER":
271                    texts += "\n"
272                elif e_type == "TWEET":
273                    if tweet_id := glom(entity, "data.tweetId", default=""):
274                        texts += f'\n<a href="https://x.com/i/status/{tweet_id}">QuoteTweet</a>'
275                elif e_type == "MARKDOWN":
276                    markdown = glom(entity, "data.markdown", default="").strip("`")
277                    lang, raw = markdown.split("\n", maxsplit=1)
278                    if lang:
279                        texts += f'\n<pre language="{lang}">{raw}</pre>'
280                    else:
281                        texts += f"\n<pre>{markdown}</pre>"
282        return texts.strip()
283
284    # blocks
285    for block in glom(article, "content.blocks", default=[]):
286        text = inline_style(block.get("text"), block.get("inlineStyleRanges"))
287        entities = block.get("entityRanges", [])
288
289        block_type = block.get("type")
290        match block_type:
291            case "header-one":
292                html += f"\n<h1>{text}</h1>"
293            case "header-two":
294                html += f"\n<h2>{text}</h2>"
295            case "header-three":
296                html += f"\n<h3>{text}</h3>"
297            case "header-four":
298                html += f"\n<h4>{text}</h4>"
299            case "blockquote":
300                html += f"\n<blockquote>{text}</blockquote>"
301            case "ordered-list-item" | "unordered-list-item":
302                html += f"\n{text}"
303            case "atomic":
304                html += f"\n{parse_atomic(entities)}"
305            case _:
306                html += f"\n<p>{text}</p>" if text else ""
307
308    # form ordered media list
309    media = []
310    # 匹配img标签的正则表达式(支持单引号和双引号)
311    img_pattern = re.compile(r'<img\s+[^>]*?src\s*=\s*["\'](.*?)["\'][^>]*?>', re.IGNORECASE)
312    # 匹配video标签的正则表达式(支持单引号和双引号)
313    video_pattern = re.compile(r'<video\s+[^>]*?src\s*=\s*["\'](.*?)["\'][^>]*?>', re.IGNORECASE)
314    for line in html.splitlines():
315        if match_img := img_pattern.search(line):
316            media.append({"url": match_img.group(1), "type": "photo"})
317        if match_vid := video_pattern.search(line):
318            media.append({"url": match_vid.group(1), "type": "video"})
319
320    # 移除所有img和video标签
321    clean_html = img_pattern.sub("", html)
322    clean_html = video_pattern.sub("", clean_html)
323    title = article.get("title", "Twitter Article")
324    if article_url := await publish_telegraph(title=title, author=author, url=tweet_url, html=html):
325        clean_html = f'<h1><a href="{article_url}">{title}</a></h1>\n{clean_html.strip()}'
326        html = f'<h1><a href="{article_url}">{title}</a></h1>\n{html.strip()}'
327
328    return {
329        "is_article": True,
330        "html_no_media": remove_consecutive_newlines(clean_html).strip(),
331        "image_urls": img_pattern.findall(html),
332        "video_urls": video_pattern.findall(html),
333        "html": html,
334        "article_url": article_url,
335        "media": {"all": media},
336        "title": article.get("title", "Twitter Article"),
337    }
338
339
340async def get_comments(post_id: int, *, twitter_comments: bool = True) -> list[str]:
341    """Get comments."""
342    if not true(twitter_comments):
343        return []
344    api_url = f"{API.FXTWITTER}/2/conversation/{post_id}?lang=zh-cn"
345    logger.info(f"Get Twitter comments: {api_url}")
346    headers = {"user-agent": TELEGRAM_UA}
347    resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
348    if resp.get("hx_error"):
349        return []
350    resp = trim(resp)
351    comments = []
352    replies = resp.get("replies", [])
353    for reply in sorted(replies, key=lambda x: x["created_timestamp"]):
354        author = glom(reply, "author.name", default="Anonymous")
355        tweet_url = glom(reply, "url", default="https://x.com")
356        if text := glom(reply, Coalesce("text", "raw_text.text"), default=""):
357            comments.append(f'<a href="{tweet_url}"><b>💬{author}:</b></a> {clean_handle(text)}')
358    if comments:
359        comments.insert(0, "<b>💬点击展开评论:</b>")
360    return comments
361
362
363async def summarize_twitter(message: Message, tweet: dict, article: str, media_list: list[dict], model: str) -> Message:
364    """Generate source for AI summary."""
365    thread = tweet.get("thread", [])
366    posts = []
367    for post in sorted(thread, key=lambda x: x.get("created_timestamp", 0)):
368        author = glom(post, "author.name", default="Anonymous")
369        dt = ts_to_dt(post.get("created_timestamp")) or nowdt(TZ)
370        date_str = f"{dt.strftime('%Y-%m-%d %H:%M:%S')}"
371        text = article or glom(post, Coalesce("translation.text", "text"), default="")
372        post_info = {"author": author, "date": date_str, "text": clean_handle(text)}
373        if quote := post.get("quote"):
374            quote_author = glom(quote, "author.name", default="Anonymous")
375            quote_text = glom(quote, Coalesce("translation.text", "text"), default="")
376            if article := quote.get("article"):
377                title = article.get("title", "Twitter Article")
378                preview_text = article.get("preview_text", "")
379                quote_text = f"<h1>{title}</h1>\n{preview_text}"
380            quote_dt = ts_to_dt(quote.get("created_timestamp")) or nowdt(TZ)
381            quote_date_str = f"{quote_dt.strftime('%Y-%m-%d %H:%M:%S')}"
382            post_info["quote_tweet"] = {"author": quote_author, "date": quote_date_str, "text": clean_handle(quote_text)}
383        posts.append(post_info)
384
385    summary_info: dict = {"platform": "Twitter / X"}
386    if len(posts) > 1:
387        summary_info["thread"] = posts
388    elif len(posts) == 1:
389        summary_info |= posts[0]
390
391    sources = []
392    min_text_length = 1000  # skip short tweets
393    min_video_duration = None
394    for media in media_list:
395        if media.get("photo"):
396            sources.append({"type": "image", "path": media["photo"]})
397        if media.get("video"):
398            min_text_length = None
399            min_video_duration = 120  # skip short videos less than 3 minutes
400            sources.append({"type": "video", "path": media["video"]})
401    if article:
402        min_text_length = None  # This is twitter article
403        min_video_duration = None
404    sources.append({"type": "text", "text": json.dumps(summary_info, ensure_ascii=False)})
405    summary = await summarize(
406        sources=sources,
407        model=model,
408        title=f"🕊{author}",
409        author=glom(tweet, "status.author.name", default="Anonymous"),
410        url=glom(tweet, "status.url", default="https://x.com"),
411        date=ts_to_dt(glom(tweet, "status.created_timestamp", default=None)) or nowdt(TZ),
412        min_text_length=min_text_length,
413        min_video_duration=min_video_duration,
414        max_video_duration=3600,  # skip long videos more than 1 hour
415    )
416    telegraph_url = summary.get("telegraph_url")
417    if not telegraph_url:
418        return message
419    return await add_summary_url(telegraph_url, message) or message