main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import copy
  4import re
  5from datetime import UTC, datetime
  6from zoneinfo import ZoneInfo
  7
  8from glom import glom
  9from loguru import logger
 10from pyrogram.client import Client
 11from pyrogram.types import Message
 12
 13from bridge.social import send_to_social_media_bridge
 14from config import API, DB, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ, cache
 15from database.database import get_db
 16from messages.database import copy_messages_from_db, save_messages
 17from messages.progress import modify_progress
 18from messages.sender import send2tg
 19from messages.utils import blockquote, remove_img_tag, summay_media
 20from networking import download_file, download_media, flatten_rediercts, hx_req
 21from utils import convert_html, readable_count, remove_consecutive_newlines, remove_none_values, split_parts, true
 22
 23
 24class APIError(Exception):
 25    pass
 26
 27
 28async def preview_twitter(
 29    client: Client,
 30    message: Message,
 31    url: str = "",
 32    db_key: str = "",
 33    platform: str = "x",
 34    twitter_provider: str = PROVIDER.TWITTER,
 35    *,
 36    twitter_comments: bool = True,
 37    show_author: bool = True,
 38    show_pubdate: bool = True,
 39    show_device: bool = False,
 40    show_statistics: bool = True,
 41    **kwargs,
 42):
 43    """Preview twitter link in the message.
 44
 45    Args:
 46        client (Client): The Pyrogram client.
 47        message (Message): The trigger message object.
 48        url (str, optional): The twitter link.
 49        db_key (str, optional): The cache key.
 50        platform (str): The social media platform.
 51        twitter_provider (str): The extractor to use: fxtwitter or tikhub.
 52        twitter_comments (bool, optional): Add twitter comments. Defaults to True
 53    """
 54    if kwargs.get("show_progress") and "progress" not in kwargs:
 55        res = await send2tg(client, message, texts=f"🔗正在解析推特链接\n{url}", **kwargs)
 56        kwargs["progress"] = res[0]
 57
 58    if kv := await get_db(db_key):
 59        logger.debug(f"Twitter preview {DB.ENGINE} cache hit for key={db_key}")
 60        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 61            return
 62        await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
 63    succ = False
 64    master_info = {}
 65    this_info = {}
 66    quote_info = {}
 67    if "tikhub" in twitter_provider:  # try tikhub first
 68        try:
 69            this_info = await get_tweet_info_via_tikhub(url=url, **kwargs)
 70            if not this_info:
 71                error = "❌[Tikhub]推特解析失败"
 72                await modify_progress(text=error, **kwargs)
 73                raise APIError(error)  # noqa: TRY301
 74            quote_info = await get_tweet_info_via_tikhub(quote_info=this_info["quote_info"], **kwargs) if this_info["has_quote"] else {}
 75            params = copy.deepcopy(kwargs)
 76            params.pop("post_id", None)
 77            master_info = await get_tweet_info_via_tikhub(post_id=this_info["master_thread_id"], **params) if this_info["has_master"] else {}
 78            succ = True
 79        except Exception as e:
 80            logger.warning(f"Twitter API [tikhub] failed: {e}")
 81    if not succ and "fxtwitter" in twitter_provider:  # try fxtwitter
 82        try:
 83            this_info = await get_tweet_info_via_fxtwitter(url=url)
 84            if not this_info:
 85                error = "❌[FxTwitter]推特解析失败"
 86                await modify_progress(text=error, **kwargs)
 87                raise APIError(error)  # noqa: TRY301
 88            master_info = await get_tweet_info_via_fxtwitter(handle=this_info["replying_to_user"], post_id=this_info["replying_post_id"]) if this_info["has_master"] else {}
 89            quote_info = await get_tweet_info_via_fxtwitter(quote_info=this_info["quote_info"]) if this_info["has_quote"] else {}
 90            succ = True
 91        except Exception as e:
 92            logger.warning(f"Twitter API [fxtwitter] failed: {e}")
 93
 94    if not succ and "vxtwitter" in twitter_provider:  # try vxtwitter
 95        try:
 96            this_info = await get_tweet_info_via_vxtwitter(url=url)
 97            if not this_info:
 98                error = "❌[VxTwitter]推特解析失败"
 99                await modify_progress(text=error, **kwargs)
100                raise APIError(error)  # noqa: TRY301
101            master_info = await get_tweet_info_via_vxtwitter(handle=this_info["replying_to_user"], post_id=this_info["replying_post_id"]) if this_info["has_master"] else {}
102            quote_info = await get_tweet_info_via_vxtwitter(quote_info=this_info["quote_info"]) if this_info["has_quote"] else {}
103            succ = True
104        except Exception as e:
105            logger.warning(f"Twitter API [vxtwitter] failed: {e}")
106
107    if not succ:
108        if "bridge" in twitter_provider:
109            await modify_progress(text="❌推特解析失败, 尝试第三方Bot...", **kwargs)
110            kwargs |= {"target_mid": message.id}
111            await send_to_social_media_bridge(client, message, url, platform, **kwargs)
112        return
113
114    media = []
115    media_ids = set()  # deduplicate media
116    master_media = []
117    for x in master_info.get("media", []):
118        if x["id"] in media_ids:
119            continue
120        media_ids.add(x["id"])
121        x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
122        master_media.append(x)
123
124    this_media = []
125    for x in this_info.get("media", []):
126        if x["id"] in media_ids:
127            continue
128        media_ids.add(x["id"])
129        x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
130        this_media.append(x)
131
132    quote_media = []
133    for x in quote_info.get("media", []):
134        if x["id"] in media_ids:
135            continue
136        media_ids.add(x["id"])
137        x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
138        quote_media.append(x)
139    # 生成图片数量说明
140    n_media_this = len(this_media)
141    n_media_master = len(master_media) if this_info["has_master"] else 0
142    n_media_quote = len(quote_media) if this_info["has_quote"] else 0
143    part_strs = split_parts(n_media_master, n_media_this, n_media_quote)
144
145    msg = ""
146    master_handle = master_info.get("handle", "")
147    # 被回复主推
148    if master_info:
149        if true(show_author) and master_info.get("author"):
150            msg += f"\n🕊**[{master_info['author']}](https://x.com/{master_info['handle']}/status/{master_info['post_id']})**"
151        if true(show_pubdate) and master_info.get("time"):
152            msg += f"\n🕒{master_info['time']}"
153        if part_strs["first"]:
154            msg += f" {part_strs['first']}"
155        if true(show_device) and master_info.get("device"):
156            msg += f"📱{master_info['device']}"
157        if true(show_statistics) and master_info.get("statistics"):
158            msg += f"\n{master_info['statistics']}"
159        if texts := master_info.get("texts"):
160            msg += f"\n{texts}"
161        if true(twitter_comments) and (comments := master_info.get("comments")):
162            msg += f"\n{blockquote('💬**点此展开评论区**:')}"
163            for cmt in comments:
164                if str(cmt["post_id"]) == str(this_info["post_id"]):
165                    continue
166                full_cmt = f"💬**{cmt['author']}**: {cmt['text']}"
167                msg += f"\n{blockquote(full_cmt)}"
168        media.extend(master_media)
169
170    # 本条推文
171    media.extend(this_media)
172    if master_info:
173        msg += "\n⤴️"
174    if true(show_author) and this_info.get("author"):
175        msg += f"\n🕊**[{this_info['author']}]({url})**"
176        msg = msg.replace("\n⤴️\n🕊", "\n⤴️")
177    if true(show_pubdate) and this_info.get("time"):
178        msg += f"\n🕒{this_info['time']}"
179    if part_strs["middle"] and (this_info["has_master"] or this_info["has_quote"]):  # 当有supp_info时, 附加图片数量说明
180        msg += f" {part_strs['middle']}"
181    if true(show_device) and this_info.get("device"):
182        msg += f"📱{this_info['device']}"
183    if true(show_statistics) and this_info.get("statistics"):
184        msg += f"\n{this_info['statistics']}"
185
186    if texts := this_info.get("texts"):
187        msg += f"\n{texts}"
188
189    if true(twitter_comments) and (comments := this_info.get("comments")):
190        msg += f"\n{blockquote('💬**点此展开评论区**:')}"
191        for cmt in comments:
192            cmt_texts = cmt["text"].strip().removeprefix(f"@{master_handle}").strip()  # 有时回推的comment前会附带被回推的handle, 这里去掉
193            full_cmt = f"💬**{cmt['author']}**: {cmt_texts}"
194            msg += f"\n{blockquote(full_cmt)}"
195
196    # 引用推文
197    if quote_info:
198        # 有时候引用推文时会在正文末尾附带引推链接, 这里去掉
199        quote_x_url = f"https://x.com/{quote_info.get('handle', '')}/status/{quote_info.get('post_id', '')}"
200        msg = remove_twitter_suffix(msg, post_id=quote_info["post_id"], same_id_only=True)
201        msg += "\n//"
202        if true(show_author) and quote_info.get("author"):
203            msg += f"\n🕊**[{quote_info['author']}]({quote_x_url})**"
204            msg = msg.replace("\n//\n", "\n//")
205        if true(show_pubdate) and quote_info.get("time"):
206            msg += f"\n🕒{quote_info['time']}"
207        if part_strs["last"]:
208            msg += f" {part_strs['last']}"
209        if true(show_device) and quote_info.get("device"):
210            msg += f"📱{quote_info['device']}"
211        if true(show_statistics) and quote_info.get("statistics"):
212            msg += f"\n{quote_info['statistics']}"
213
214        if texts := quote_info.get("texts"):
215            msg += f"\n{texts}"
216        media.extend(quote_media)
217
218    await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
219    media = await download_media(media, **kwargs)
220    sent_messages = await send2tg(client, message, texts=msg.strip(), media=media, **kwargs)
221    await modify_progress(del_status=True, **kwargs)
222    await save_messages(messages=sent_messages, key=db_key)
223
224
225@cache.memoize(ttl=30)
226async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info: dict | None = None, **kwargs) -> dict:
227    """Get a single tweet info.
228
229    url: https://x.com/{handle}/status/{post_id}
230    """
231    if not post_id:
232        post_id = url.rsplit("/", maxsplit=1)[-1]
233    api_url = f"{API.TIKHUB_TWITTER}{post_id}"
234    logger.info(f"Twitter preview via TikHub: {api_url}")
235    data = {}
236
237    if quote_info:  # quote_info is directly parsed from the this_info
238        data = copy.deepcopy(quote_info)
239        post_id = quote_info.get("tweet_id", "")
240        data["id"] = post_id
241        await modify_progress(text="✅正在解析引用推文...", **kwargs)
242    else:
243        headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
244        resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_keys=["data.author.screen_name"], check_kv={"data.id": post_id})
245        if resp.get("hx_error") or glom(resp, "data.author.screen_name") is None:
246            logger.error("Failed to get tweet info via TikHub")
247            return {}
248        data: dict = resp["data"]
249        await modify_progress(text=f"✅推文{post_id}解析成功, 正在处理...", **kwargs)
250    data = remove_none_values(data)
251    handle = glom(data, "author.screen_name", default="") or ""
252    post_id = glom(data, "id", default=post_id) or post_id
253    info = {"handle": handle, "post_id": post_id}
254
255    # API old style
256    media_info = glom(data, "media", default={}) or {}
257    # the master thread media may be repeated in the reply tweet
258    # so we do not download the media file here but record media "id" for de-duplication
259    media = [{"type": "photo", "url": x.get("media_url_https", ""), "id": x.get("id", "0")} for x in media_info.get("photo", [])]
260    for x in media_info.get("video", []):
261        if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
262            mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
263            media.append({"type": "video", "url": mp4_url, "id": x.get("id", "0")})
264    # API new style
265    if not media:
266        entities = glom(data, "entities.media", default=[])
267        for entity in entities:
268            if entity.get("type", "") == "video" and glom(entity, "video_info.variants", default=[]):
269                variants = glom(entity, "video_info.variants", default=[])
270                variants = [x for x in variants if "mp4" in x.get("content_type", "")]
271                mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
272                media.append({"type": "video", "url": mp4_url, "id": entity.get("id_str", "0")})
273            elif entity.get("type", "") == "photo":
274                media.append({"type": "photo", "url": entity.get("media_url_https", ""), "id": entity.get("id_str", "0")})
275
276    info["media"] = media
277    info["author"] = glom(data, "author.name", default="") or ""
278    if date_string := glom(data, "created_at", default=""):
279        dt = datetime.strptime(date_string, "%a %b %d %H:%M:%S %z %Y").astimezone(ZoneInfo(TZ))
280        info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
281    texts = await remove_tco_suffix(glom(data, "text", default="") or "", post_id=post_id)
282    texts = await flatten_rediercts(texts)
283    info["texts"] = texts
284
285    conversation_id = glom(data, "conversation_id", default="0") or "0"
286    if int(conversation_id) != int(post_id):
287        info["has_master"] = True
288        info["master_thread_id"] = conversation_id
289    else:
290        info["has_master"] = False
291
292    # parse comments
293    threads = glom(data, "thread", default=[]) or []
294    threads = [x for x in threads if int(x.get("conversation_id", "0")) == int(conversation_id) and int(x.get("id", "0")) != int(post_id)]
295    threads = sorted(threads, key=lambda x: x.get("id", {}))
296    comments = []
297    for node in threads:
298        comment_handle = glom(node, "author.screen_name", default="")
299        if comment_post_id := node.get("id", ""):
300            comment_author = f"[{comment_handle}](https://x.com/{comment_handle}/status/{comment_post_id})"
301        else:
302            comment_author = f"[{comment_handle}](https://x.com/{comment_handle})"
303        comment_text = node.get("text", "").removeprefix(f"@{handle}")
304        comment_text = re.sub(r"https?://t\.co/\w+$", "", comment_text)  # remove t.co link suffix
305        comment_text = await remove_tco_suffix(comment_text, post_id=node.get("id", ""))
306        comment_text = await flatten_rediercts(comment_text)
307        comment_text = comment_text.strip()
308        if comment_handle and comment_text:
309            comments.append({"author": comment_author, "text": comment_text, "post_id": comment_post_id})
310
311    statistics = ""
312    if view := glom(data, "views", default=0):
313        statistics += f"👁{readable_count(view)}"
314    if like := glom(data, "likes", default=0):
315        statistics += f"❤️{readable_count(like)}"
316    if comment := glom(data, "replies", default=0):
317        statistics += f"💬{readable_count(comment)}"
318    if share := glom(data, "retweets", default=0):
319        statistics += f"🔁{readable_count(share)}"
320    info["statistics"] = statistics
321    info["comments"] = comments
322    info["quote_info"] = glom(data, "quoted", default={}) or {}
323    info["has_quote"] = bool(info["quote_info"])
324    return info
325
326
327@cache.memoize(ttl=30)
328async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict:
329    """Get a single tweet info.
330
331    url: https://x.com/{handle}/status/{post_id}
332    """
333    data = {}
334    if quote_info:
335        data = copy.deepcopy(quote_info)
336        handle = glom(data, "author.name", default="")
337        post_id = data.get("id", "")
338    else:
339        if not handle or not post_id:
340            handle = url.split("/")[-3]
341            post_id = url.rsplit("/", maxsplit=1)[-1]
342        api_url = f"{API.FXTWITTER}/{handle}/status/{post_id}"
343        logger.info(f"Twitter preview via fxtwitter: {api_url}")
344        headers = {"user-agent": TELEGRAM_UA}
345        resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER)
346        if resp.get("hx_error") or str(glom(resp, "tweet.id", default="")) != str(post_id):
347            logger.error("Failed to get tweet info via fxtwitter")
348            return {}
349        data: dict = resp["tweet"]
350
351    if data.get("article"):
352        data |= parse_article(data["article"])
353
354    info = {"handle": glom(data, "author.screen_name", default=handle), "post_id": data.get("id", post_id)}
355    media = glom(data, "media.all", default=[])
356    for x in media:
357        if x.get("type", "") == "video" and "mp4" not in x.get("format", ""):  # this is a m3u8 url, choose mp4 instead
358            m3u8_url = x.get("url", "")
359            mp4_url = ""
360            if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
361                mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
362            x["url"] = mp4_url or m3u8_url
363        if x.get("type", "") == "gif":
364            x["type"] = "video"
365        x["id"] = x["url"]  # record media "id" for de-duplication
366
367    statistics = ""
368    if view := glom(data, "views", default=0):
369        statistics += f"👁{readable_count(view)}"
370    if like := glom(data, "likes", default=0):
371        statistics += f"❤️{readable_count(like)}"
372    if comment := glom(data, "replies", default=0):
373        statistics += f"💬{readable_count(comment)}"
374    if share := glom(data, "retweets", default=0):
375        statistics += f"🔁{readable_count(share)}"
376    info["statistics"] = statistics
377    info["media"] = media
378    info["author"] = glom(data, "author.name", default="")
379    if ts := data.get("created_timestamp", ""):
380        dt = datetime.fromtimestamp(round(float(ts)), tz=UTC).astimezone(ZoneInfo(TZ))
381        info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
382    info["texts"] = data.get("text", "")
383    info["device"] = data.get("source", "").removeprefix("Twitter for").removeprefix("Twitter").removesuffix("App").strip().removesuffix("Web")
384    info["replying_to_user"] = data.get("replying_to", "")
385    info["replying_post_id"] = data.get("replying_to_status", "")
386    info["quote_info"] = data.get("quote", {})
387    info["has_master"] = bool(data.get("replying_to"))
388    info["has_quote"] = bool(info["quote_info"])
389    return info
390
391
392@cache.memoize(ttl=30)
393async def get_tweet_info_via_vxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict:
394    """Get a single tweet info.
395
396    url: https://x.com/{handle}/status/{post_id}
397    """
398    data = {}
399    if quote_info:
400        data = copy.deepcopy(quote_info)
401        handle = data.get("user_screen_name", "")
402        post_id = data.get("tweetID", "")
403    else:
404        if not handle or not post_id:
405            handle = url.split("/")[-3]
406            post_id = url.rsplit("/", maxsplit=1)[-1]
407        api_url = f"{API.VXTWITTER}/Twitter/status/{post_id}"
408        logger.info(f"Twitter preview via vxtwitter: {api_url}")
409        headers = {"user-agent": TELEGRAM_UA}
410        data = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"tweetID": post_id})
411        if data.get("hx_error"):
412            logger.error("Failed to get tweet info via vxtwitter")
413            return {}
414        if data.get("retweet"):
415            data = data["retweet"]
416    info = {"handle": glom(data, "screen_name", default=handle), "post_id": data.get("tweetID", post_id)}
417    media = data.get("media_extended", [])
418    for x in media:
419        x["id"] = x.get("url", "")  # record media "id" for de-duplication
420        if x.get("type", "") == "image":  # change `image` -> `photo`
421            x["type"] = "photo"
422        if x.get("type", "") == "gif":
423            x["type"] = "video"
424    statistics = ""
425    if view := glom(data, "views", default=0):
426        statistics += f"👁{readable_count(view)}"
427    if like := glom(data, "likes", default=0):
428        statistics += f"❤️{readable_count(like)}"
429    if comment := glom(data, "replies", default=0):
430        statistics += f"💬{readable_count(comment)}"
431    if share := glom(data, "retweets", default=0):
432        statistics += f"🔁{readable_count(share)}"
433    info["statistics"] = statistics
434    info["media"] = media
435    info["author"] = data.get("user_name", f"@{info['handle']}")
436    if ts := data.get("date_epoch", 0):
437        dt = datetime.fromtimestamp(round(float(ts)), tz=UTC).astimezone(ZoneInfo(TZ))
438        info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
439    info["texts"] = data.get("text", "")
440    info["device"] = data.get("source", "").removeprefix("Twitter for").removeprefix("Twitter").removesuffix("App").strip().removesuffix("Web")
441    info["replying_to_user"] = data.get("replyingTo", "")
442    info["replying_post_id"] = data.get("replyingToID", "")
443    info["quote_info"] = data.get("qrt", {})
444    info["has_master"] = bool(data.get("replyingTo"))
445    info["has_quote"] = bool(data.get("qrt"))
446    return info
447
448
449def remove_twitter_suffix(text: str, post_id: str = "", *, same_id_only: bool = True) -> str:
450    """Remove twitter link suffix.
451
452    Some tweet ends with a twitter link to the tweet itself.
453
454    Args:
455        text (str): The tweet text.
456        post_id (str): The text belongs to this post_id .
457        force (bool): Force remove the suffix.
458        same_id_only (bool): Only remove the suffix when the post_id is the same.
459    """
460    text = str(text).strip()
461
462    match_url = ""
463    match_post_id = ""
464    if matched := re.search(r"https?://(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)$", text):
465        match_url = matched.group(0)
466        match_post_id = matched.group(3)
467
468    if same_id_only and post_id and str(post_id) == str(match_post_id):
469        return text.removesuffix(match_url).strip()
470
471    return text
472
473
474async def remove_tco_suffix(text: str, post_id: str = "") -> str:
475    """Parse t.co link suffix.
476
477    Some tweet ends with t.co link in TikHub parsed info (this is a bug of TikHub). The t.co link may be a redirect link to the tweet itself.
478    Here we extract the t.co link and check if it is the same as the post_id, if so, remove the t.co link.
479
480    Args:
481        text (str): The text to be parsed.
482        post_id (str): The text belongs to this post_id .
483    """
484    text = str(text).strip()
485    # not end with t.co link, do nothing
486    if not (matched := re.search(r"https?://t\.co/\w+$", text)):
487        return text
488
489    # t.co at the end of the text
490    t_co_url: str = matched.group(0)
491
492    # parse t.co redirect
493    raw_url = await flatten_rediercts(t_co_url)
494
495    # check if the redirect url is a twitter link the same with post_id
496    match_post_id = ""
497    if matched := re.search(r"https?://(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)", raw_url):
498        match_post_id = matched.group(3)
499
500    if str(post_id) == str(match_post_id):
501        return text.removesuffix(t_co_url).strip()
502
503    return text
504
505
506def parse_article(article: dict) -> dict:
507    def inline_style(text: str, styles: list[dict]) -> str:
508        """处理内联样式 (加粗、斜体等字符级格式).
509
510        使用前后缀注入法, 避免直接修改字符串导致 Index 偏移
511        """
512        if not text.strip():
513            return ""
514        styles = styles or []
515        text_len = len(text)
516        prefixes = {i: [] for i in range(text_len + 1)}
517        suffixes = {i: [] for i in range(text_len + 1)}
518        for style in styles:
519            style_ = style["style"].lower()
520            start = style["offset"]
521            end = start + style["length"]
522            tag = ""
523            if style_ == "bold":
524                tag = "**"
525            elif style_ == "italic":
526                tag = "*"
527            if tag:
528                prefixes[start].append(tag)
529                suffixes[end].insert(0, tag)  # 使用 insert(0) 确保闭合标签以正确的嵌套顺序反向闭合
530
531        formatted_text = ""
532        for i in range(text_len + 1):
533            formatted_text += "".join(suffixes[i])  # 先闭合
534            formatted_text += "".join(prefixes[i])  # 再开启
535            if i < text_len:
536                formatted_text += text[i]
537        return formatted_text
538
539    def parse_atomic(entities: list[dict]) -> str:
540        """Parse atomic block."""
541        if not entities:
542            return ""
543        texts = ""
544        for x in entities:
545            if entity := entity_dict.get(str(x["key"])):
546                e_type = entity.get("type", "").upper()
547                if e_type == "MEDIA":
548                    media_id = glom(entity, "data.mediaItems.0.mediaId", default="")
549                    if img_url := media_dict.get(str(media_id)):
550                        texts += f"![IMG-{media_id}]({img_url})"
551                elif e_type == "DIVIDER":
552                    texts += "\n"
553                elif e_type == "TWEET":
554                    if tweet_id := glom(entity, "data.tweetId", default=""):
555                        texts += f"[QuoteTweet](https://x.com/i/status/{tweet_id})"
556                elif e_type == "MARKDOWN":
557                    texts += glom(entity, "data.markdown", default="")
558        return texts
559
560    markdown = ""
561    if title := article.get("title"):
562        markdown += f"\n\n# {title}"
563    if cover_url := glom(article, "cover_media.media_info.original_img_url", default=""):
564        markdown += f"\n\n![cover]({cover_url})"
565
566    media_dict: dict = {}  # {media_id: media_url}  # currently, articles in X only support images
567    for media in article.get("media_entities", []):
568        media_dict[str(media.get("media_id"))] = glom(media, "media_info.original_img_url", default="")
569
570    entity_map = glom(article, "content.entityMap", default={})
571    entity_dict = {str(x["key"]): x["value"] for x in entity_map} if isinstance(entity_map, list) else {str(k): v for k, v in entity_map.items()}
572
573    # blocks
574    for block in glom(article, "content.blocks", default=[]):
575        text = inline_style(block.get("text"), block.get("inlineStyleRanges"))
576        entities = block.get("entityRanges", [])
577        match block.get("type"):
578            case "header-one" | "header-two" | "header-three" | "header-four":
579                markdown += f"\n\n**{text}**"
580            case "blockquote":
581                markdown += f"\n\n> {text}"
582            case "ordered-list-item" | "unordered-list-item":
583                markdown += f"\n\n{text}"
584            case "atomic":
585                markdown += f"\n\n{parse_atomic(entities)}"
586            case _:
587                markdown += f"\n\n{text}" if text else ""
588
589    markdown_no_img, image_urls = remove_img_tag(markdown)
590    return {
591        "markdown": remove_consecutive_newlines(markdown).strip(),
592        "text": remove_consecutive_newlines(markdown_no_img).strip(),
593        "image_urls": image_urls,
594        "html": convert_html(markdown),
595        "media": {"all": [{"url": url, "type": "photo"} for url in image_urls]},
596    }