main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import copy
4import re
5from datetime import UTC, datetime
6from zoneinfo import ZoneInfo
7
8from glom import glom
9from loguru import logger
10from pyrogram.client import Client
11from pyrogram.types import Message
12
13from bridge.social import send_to_social_media_bridge
14from config import API, DB, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ, cache
15from database.database import get_db
16from messages.database import copy_messages_from_db, save_messages
17from messages.progress import modify_progress
18from messages.sender import send2tg
19from messages.utils import blockquote, remove_img_tag, summay_media
20from networking import download_file, download_media, flatten_rediercts, hx_req
21from utils import convert_html, readable_count, remove_consecutive_newlines, remove_none_values, split_parts, true
22
23
24class APIError(Exception):
25 pass
26
27
28async def preview_twitter(
29 client: Client,
30 message: Message,
31 url: str = "",
32 db_key: str = "",
33 platform: str = "x",
34 twitter_provider: str = PROVIDER.TWITTER,
35 *,
36 twitter_comments: bool = True,
37 show_author: bool = True,
38 show_pubdate: bool = True,
39 show_device: bool = False,
40 show_statistics: bool = True,
41 **kwargs,
42):
43 """Preview twitter link in the message.
44
45 Args:
46 client (Client): The Pyrogram client.
47 message (Message): The trigger message object.
48 url (str, optional): The twitter link.
49 db_key (str, optional): The cache key.
50 platform (str): The social media platform.
51 twitter_provider (str): The extractor to use: fxtwitter or tikhub.
52 twitter_comments (bool, optional): Add twitter comments. Defaults to True
53 """
54 if kwargs.get("show_progress") and "progress" not in kwargs:
55 res = await send2tg(client, message, texts=f"🔗正在解析推特链接\n{url}", **kwargs)
56 kwargs["progress"] = res[0]
57
58 if kv := await get_db(db_key):
59 logger.debug(f"Twitter preview {DB.ENGINE} cache hit for key={db_key}")
60 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
61 return
62 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
63 succ = False
64 master_info = {}
65 this_info = {}
66 quote_info = {}
67 if "tikhub" in twitter_provider: # try tikhub first
68 try:
69 this_info = await get_tweet_info_via_tikhub(url=url, **kwargs)
70 if not this_info:
71 error = "❌[Tikhub]推特解析失败"
72 await modify_progress(text=error, **kwargs)
73 raise APIError(error) # noqa: TRY301
74 quote_info = await get_tweet_info_via_tikhub(quote_info=this_info["quote_info"], **kwargs) if this_info["has_quote"] else {}
75 params = copy.deepcopy(kwargs)
76 params.pop("post_id", None)
77 master_info = await get_tweet_info_via_tikhub(post_id=this_info["master_thread_id"], **params) if this_info["has_master"] else {}
78 succ = True
79 except Exception as e:
80 logger.warning(f"Twitter API [tikhub] failed: {e}")
81 if not succ and "fxtwitter" in twitter_provider: # try fxtwitter
82 try:
83 this_info = await get_tweet_info_via_fxtwitter(url=url)
84 if not this_info:
85 error = "❌[FxTwitter]推特解析失败"
86 await modify_progress(text=error, **kwargs)
87 raise APIError(error) # noqa: TRY301
88 master_info = await get_tweet_info_via_fxtwitter(handle=this_info["replying_to_user"], post_id=this_info["replying_post_id"]) if this_info["has_master"] else {}
89 quote_info = await get_tweet_info_via_fxtwitter(quote_info=this_info["quote_info"]) if this_info["has_quote"] else {}
90 succ = True
91 except Exception as e:
92 logger.warning(f"Twitter API [fxtwitter] failed: {e}")
93
94 if not succ and "vxtwitter" in twitter_provider: # try vxtwitter
95 try:
96 this_info = await get_tweet_info_via_vxtwitter(url=url)
97 if not this_info:
98 error = "❌[VxTwitter]推特解析失败"
99 await modify_progress(text=error, **kwargs)
100 raise APIError(error) # noqa: TRY301
101 master_info = await get_tweet_info_via_vxtwitter(handle=this_info["replying_to_user"], post_id=this_info["replying_post_id"]) if this_info["has_master"] else {}
102 quote_info = await get_tweet_info_via_vxtwitter(quote_info=this_info["quote_info"]) if this_info["has_quote"] else {}
103 succ = True
104 except Exception as e:
105 logger.warning(f"Twitter API [vxtwitter] failed: {e}")
106
107 if not succ:
108 if "bridge" in twitter_provider:
109 await modify_progress(text="❌推特解析失败, 尝试第三方Bot...", **kwargs)
110 kwargs |= {"target_mid": message.id}
111 await send_to_social_media_bridge(client, message, url, platform, **kwargs)
112 return
113
114 media = []
115 media_ids = set() # deduplicate media
116 master_media = []
117 for x in master_info.get("media", []):
118 if x["id"] in media_ids:
119 continue
120 media_ids.add(x["id"])
121 x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
122 master_media.append(x)
123
124 this_media = []
125 for x in this_info.get("media", []):
126 if x["id"] in media_ids:
127 continue
128 media_ids.add(x["id"])
129 x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
130 this_media.append(x)
131
132 quote_media = []
133 for x in quote_info.get("media", []):
134 if x["id"] in media_ids:
135 continue
136 media_ids.add(x["id"])
137 x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
138 quote_media.append(x)
139 # 生成图片数量说明
140 n_media_this = len(this_media)
141 n_media_master = len(master_media) if this_info["has_master"] else 0
142 n_media_quote = len(quote_media) if this_info["has_quote"] else 0
143 part_strs = split_parts(n_media_master, n_media_this, n_media_quote)
144
145 msg = ""
146 master_handle = master_info.get("handle", "")
147 # 被回复主推
148 if master_info:
149 if true(show_author) and master_info.get("author"):
150 msg += f"\n🕊**[{master_info['author']}](https://x.com/{master_info['handle']}/status/{master_info['post_id']})**"
151 if true(show_pubdate) and master_info.get("time"):
152 msg += f"\n🕒{master_info['time']}"
153 if part_strs["first"]:
154 msg += f" {part_strs['first']}"
155 if true(show_device) and master_info.get("device"):
156 msg += f"📱{master_info['device']}"
157 if true(show_statistics) and master_info.get("statistics"):
158 msg += f"\n{master_info['statistics']}"
159 if texts := master_info.get("texts"):
160 msg += f"\n{texts}"
161 if true(twitter_comments) and (comments := master_info.get("comments")):
162 msg += f"\n{blockquote('💬**点此展开评论区**:')}"
163 for cmt in comments:
164 if str(cmt["post_id"]) == str(this_info["post_id"]):
165 continue
166 full_cmt = f"💬**{cmt['author']}**: {cmt['text']}"
167 msg += f"\n{blockquote(full_cmt)}"
168 media.extend(master_media)
169
170 # 本条推文
171 media.extend(this_media)
172 if master_info:
173 msg += "\n⤴️"
174 if true(show_author) and this_info.get("author"):
175 msg += f"\n🕊**[{this_info['author']}]({url})**"
176 msg = msg.replace("\n⤴️\n🕊", "\n⤴️")
177 if true(show_pubdate) and this_info.get("time"):
178 msg += f"\n🕒{this_info['time']}"
179 if part_strs["middle"] and (this_info["has_master"] or this_info["has_quote"]): # 当有supp_info时, 附加图片数量说明
180 msg += f" {part_strs['middle']}"
181 if true(show_device) and this_info.get("device"):
182 msg += f"📱{this_info['device']}"
183 if true(show_statistics) and this_info.get("statistics"):
184 msg += f"\n{this_info['statistics']}"
185
186 if texts := this_info.get("texts"):
187 msg += f"\n{texts}"
188
189 if true(twitter_comments) and (comments := this_info.get("comments")):
190 msg += f"\n{blockquote('💬**点此展开评论区**:')}"
191 for cmt in comments:
192 cmt_texts = cmt["text"].strip().removeprefix(f"@{master_handle}").strip() # 有时回推的comment前会附带被回推的handle, 这里去掉
193 full_cmt = f"💬**{cmt['author']}**: {cmt_texts}"
194 msg += f"\n{blockquote(full_cmt)}"
195
196 # 引用推文
197 if quote_info:
198 # 有时候引用推文时会在正文末尾附带引推链接, 这里去掉
199 quote_x_url = f"https://x.com/{quote_info.get('handle', '')}/status/{quote_info.get('post_id', '')}"
200 msg = remove_twitter_suffix(msg, post_id=quote_info["post_id"], same_id_only=True)
201 msg += "\n//"
202 if true(show_author) and quote_info.get("author"):
203 msg += f"\n🕊**[{quote_info['author']}]({quote_x_url})**"
204 msg = msg.replace("\n//\n", "\n//")
205 if true(show_pubdate) and quote_info.get("time"):
206 msg += f"\n🕒{quote_info['time']}"
207 if part_strs["last"]:
208 msg += f" {part_strs['last']}"
209 if true(show_device) and quote_info.get("device"):
210 msg += f"📱{quote_info['device']}"
211 if true(show_statistics) and quote_info.get("statistics"):
212 msg += f"\n{quote_info['statistics']}"
213
214 if texts := quote_info.get("texts"):
215 msg += f"\n{texts}"
216 media.extend(quote_media)
217
218 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
219 media = await download_media(media, **kwargs)
220 sent_messages = await send2tg(client, message, texts=msg.strip(), media=media, **kwargs)
221 await modify_progress(del_status=True, **kwargs)
222 await save_messages(messages=sent_messages, key=db_key)
223
224
225@cache.memoize(ttl=30)
226async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info: dict | None = None, **kwargs) -> dict:
227 """Get a single tweet info.
228
229 url: https://x.com/{handle}/status/{post_id}
230 """
231 if not post_id:
232 post_id = url.rsplit("/", maxsplit=1)[-1]
233 api_url = f"{API.TIKHUB_TWITTER}{post_id}"
234 logger.info(f"Twitter preview via TikHub: {api_url}")
235 data = {}
236
237 if quote_info: # quote_info is directly parsed from the this_info
238 data = copy.deepcopy(quote_info)
239 post_id = quote_info.get("tweet_id", "")
240 data["id"] = post_id
241 await modify_progress(text="✅正在解析引用推文...", **kwargs)
242 else:
243 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
244 resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_keys=["data.author.screen_name"], check_kv={"data.id": post_id})
245 if resp.get("hx_error") or glom(resp, "data.author.screen_name") is None:
246 logger.error("Failed to get tweet info via TikHub")
247 return {}
248 data: dict = resp["data"]
249 await modify_progress(text=f"✅推文{post_id}解析成功, 正在处理...", **kwargs)
250 data = remove_none_values(data)
251 handle = glom(data, "author.screen_name", default="") or ""
252 post_id = glom(data, "id", default=post_id) or post_id
253 info = {"handle": handle, "post_id": post_id}
254
255 # API old style
256 media_info = glom(data, "media", default={}) or {}
257 # the master thread media may be repeated in the reply tweet
258 # so we do not download the media file here but record media "id" for de-duplication
259 media = [{"type": "photo", "url": x.get("media_url_https", ""), "id": x.get("id", "0")} for x in media_info.get("photo", [])]
260 for x in media_info.get("video", []):
261 if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
262 mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
263 media.append({"type": "video", "url": mp4_url, "id": x.get("id", "0")})
264 # API new style
265 if not media:
266 entities = glom(data, "entities.media", default=[])
267 for entity in entities:
268 if entity.get("type", "") == "video" and glom(entity, "video_info.variants", default=[]):
269 variants = glom(entity, "video_info.variants", default=[])
270 variants = [x for x in variants if "mp4" in x.get("content_type", "")]
271 mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
272 media.append({"type": "video", "url": mp4_url, "id": entity.get("id_str", "0")})
273 elif entity.get("type", "") == "photo":
274 media.append({"type": "photo", "url": entity.get("media_url_https", ""), "id": entity.get("id_str", "0")})
275
276 info["media"] = media
277 info["author"] = glom(data, "author.name", default="") or ""
278 if date_string := glom(data, "created_at", default=""):
279 dt = datetime.strptime(date_string, "%a %b %d %H:%M:%S %z %Y").astimezone(ZoneInfo(TZ))
280 info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
281 texts = await remove_tco_suffix(glom(data, "text", default="") or "", post_id=post_id)
282 texts = await flatten_rediercts(texts)
283 info["texts"] = texts
284
285 conversation_id = glom(data, "conversation_id", default="0") or "0"
286 if int(conversation_id) != int(post_id):
287 info["has_master"] = True
288 info["master_thread_id"] = conversation_id
289 else:
290 info["has_master"] = False
291
292 # parse comments
293 threads = glom(data, "thread", default=[]) or []
294 threads = [x for x in threads if int(x.get("conversation_id", "0")) == int(conversation_id) and int(x.get("id", "0")) != int(post_id)]
295 threads = sorted(threads, key=lambda x: x.get("id", {}))
296 comments = []
297 for node in threads:
298 comment_handle = glom(node, "author.screen_name", default="")
299 if comment_post_id := node.get("id", ""):
300 comment_author = f"[{comment_handle}](https://x.com/{comment_handle}/status/{comment_post_id})"
301 else:
302 comment_author = f"[{comment_handle}](https://x.com/{comment_handle})"
303 comment_text = node.get("text", "").removeprefix(f"@{handle}")
304 comment_text = re.sub(r"https?://t\.co/\w+$", "", comment_text) # remove t.co link suffix
305 comment_text = await remove_tco_suffix(comment_text, post_id=node.get("id", ""))
306 comment_text = await flatten_rediercts(comment_text)
307 comment_text = comment_text.strip()
308 if comment_handle and comment_text:
309 comments.append({"author": comment_author, "text": comment_text, "post_id": comment_post_id})
310
311 statistics = ""
312 if view := glom(data, "views", default=0):
313 statistics += f"👁{readable_count(view)}"
314 if like := glom(data, "likes", default=0):
315 statistics += f"❤️{readable_count(like)}"
316 if comment := glom(data, "replies", default=0):
317 statistics += f"💬{readable_count(comment)}"
318 if share := glom(data, "retweets", default=0):
319 statistics += f"🔁{readable_count(share)}"
320 info["statistics"] = statistics
321 info["comments"] = comments
322 info["quote_info"] = glom(data, "quoted", default={}) or {}
323 info["has_quote"] = bool(info["quote_info"])
324 return info
325
326
327@cache.memoize(ttl=30)
328async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict:
329 """Get a single tweet info.
330
331 url: https://x.com/{handle}/status/{post_id}
332 """
333 data = {}
334 if quote_info:
335 data = copy.deepcopy(quote_info)
336 handle = glom(data, "author.name", default="")
337 post_id = data.get("id", "")
338 else:
339 if not handle or not post_id:
340 handle = url.split("/")[-3]
341 post_id = url.rsplit("/", maxsplit=1)[-1]
342 api_url = f"{API.FXTWITTER}/{handle}/status/{post_id}"
343 logger.info(f"Twitter preview via fxtwitter: {api_url}")
344 headers = {"user-agent": TELEGRAM_UA}
345 resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER)
346 if resp.get("hx_error") or str(glom(resp, "tweet.id", default="")) != str(post_id):
347 logger.error("Failed to get tweet info via fxtwitter")
348 return {}
349 data: dict = resp["tweet"]
350
351 if data.get("article"):
352 data |= parse_article(data["article"])
353
354 info = {"handle": glom(data, "author.screen_name", default=handle), "post_id": data.get("id", post_id)}
355 media = glom(data, "media.all", default=[])
356 for x in media:
357 if x.get("type", "") == "video" and "mp4" not in x.get("format", ""): # this is a m3u8 url, choose mp4 instead
358 m3u8_url = x.get("url", "")
359 mp4_url = ""
360 if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
361 mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
362 x["url"] = mp4_url or m3u8_url
363 if x.get("type", "") == "gif":
364 x["type"] = "video"
365 x["id"] = x["url"] # record media "id" for de-duplication
366
367 statistics = ""
368 if view := glom(data, "views", default=0):
369 statistics += f"👁{readable_count(view)}"
370 if like := glom(data, "likes", default=0):
371 statistics += f"❤️{readable_count(like)}"
372 if comment := glom(data, "replies", default=0):
373 statistics += f"💬{readable_count(comment)}"
374 if share := glom(data, "retweets", default=0):
375 statistics += f"🔁{readable_count(share)}"
376 info["statistics"] = statistics
377 info["media"] = media
378 info["author"] = glom(data, "author.name", default="")
379 if ts := data.get("created_timestamp", ""):
380 dt = datetime.fromtimestamp(round(float(ts)), tz=UTC).astimezone(ZoneInfo(TZ))
381 info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
382 info["texts"] = data.get("text", "")
383 info["device"] = data.get("source", "").removeprefix("Twitter for").removeprefix("Twitter").removesuffix("App").strip().removesuffix("Web")
384 info["replying_to_user"] = data.get("replying_to", "")
385 info["replying_post_id"] = data.get("replying_to_status", "")
386 info["quote_info"] = data.get("quote", {})
387 info["has_master"] = bool(data.get("replying_to"))
388 info["has_quote"] = bool(info["quote_info"])
389 return info
390
391
392@cache.memoize(ttl=30)
393async def get_tweet_info_via_vxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict:
394 """Get a single tweet info.
395
396 url: https://x.com/{handle}/status/{post_id}
397 """
398 data = {}
399 if quote_info:
400 data = copy.deepcopy(quote_info)
401 handle = data.get("user_screen_name", "")
402 post_id = data.get("tweetID", "")
403 else:
404 if not handle or not post_id:
405 handle = url.split("/")[-3]
406 post_id = url.rsplit("/", maxsplit=1)[-1]
407 api_url = f"{API.VXTWITTER}/Twitter/status/{post_id}"
408 logger.info(f"Twitter preview via vxtwitter: {api_url}")
409 headers = {"user-agent": TELEGRAM_UA}
410 data = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"tweetID": post_id})
411 if data.get("hx_error"):
412 logger.error("Failed to get tweet info via vxtwitter")
413 return {}
414 if data.get("retweet"):
415 data = data["retweet"]
416 info = {"handle": glom(data, "screen_name", default=handle), "post_id": data.get("tweetID", post_id)}
417 media = data.get("media_extended", [])
418 for x in media:
419 x["id"] = x.get("url", "") # record media "id" for de-duplication
420 if x.get("type", "") == "image": # change `image` -> `photo`
421 x["type"] = "photo"
422 if x.get("type", "") == "gif":
423 x["type"] = "video"
424 statistics = ""
425 if view := glom(data, "views", default=0):
426 statistics += f"👁{readable_count(view)}"
427 if like := glom(data, "likes", default=0):
428 statistics += f"❤️{readable_count(like)}"
429 if comment := glom(data, "replies", default=0):
430 statistics += f"💬{readable_count(comment)}"
431 if share := glom(data, "retweets", default=0):
432 statistics += f"🔁{readable_count(share)}"
433 info["statistics"] = statistics
434 info["media"] = media
435 info["author"] = data.get("user_name", f"@{info['handle']}")
436 if ts := data.get("date_epoch", 0):
437 dt = datetime.fromtimestamp(round(float(ts)), tz=UTC).astimezone(ZoneInfo(TZ))
438 info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
439 info["texts"] = data.get("text", "")
440 info["device"] = data.get("source", "").removeprefix("Twitter for").removeprefix("Twitter").removesuffix("App").strip().removesuffix("Web")
441 info["replying_to_user"] = data.get("replyingTo", "")
442 info["replying_post_id"] = data.get("replyingToID", "")
443 info["quote_info"] = data.get("qrt", {})
444 info["has_master"] = bool(data.get("replyingTo"))
445 info["has_quote"] = bool(data.get("qrt"))
446 return info
447
448
449def remove_twitter_suffix(text: str, post_id: str = "", *, same_id_only: bool = True) -> str:
450 """Remove twitter link suffix.
451
452 Some tweet ends with a twitter link to the tweet itself.
453
454 Args:
455 text (str): The tweet text.
456 post_id (str): The text belongs to this post_id .
457 force (bool): Force remove the suffix.
458 same_id_only (bool): Only remove the suffix when the post_id is the same.
459 """
460 text = str(text).strip()
461
462 match_url = ""
463 match_post_id = ""
464 if matched := re.search(r"https?://(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)$", text):
465 match_url = matched.group(0)
466 match_post_id = matched.group(3)
467
468 if same_id_only and post_id and str(post_id) == str(match_post_id):
469 return text.removesuffix(match_url).strip()
470
471 return text
472
473
474async def remove_tco_suffix(text: str, post_id: str = "") -> str:
475 """Parse t.co link suffix.
476
477 Some tweet ends with t.co link in TikHub parsed info (this is a bug of TikHub). The t.co link may be a redirect link to the tweet itself.
478 Here we extract the t.co link and check if it is the same as the post_id, if so, remove the t.co link.
479
480 Args:
481 text (str): The text to be parsed.
482 post_id (str): The text belongs to this post_id .
483 """
484 text = str(text).strip()
485 # not end with t.co link, do nothing
486 if not (matched := re.search(r"https?://t\.co/\w+$", text)):
487 return text
488
489 # t.co at the end of the text
490 t_co_url: str = matched.group(0)
491
492 # parse t.co redirect
493 raw_url = await flatten_rediercts(t_co_url)
494
495 # check if the redirect url is a twitter link the same with post_id
496 match_post_id = ""
497 if matched := re.search(r"https?://(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)", raw_url):
498 match_post_id = matched.group(3)
499
500 if str(post_id) == str(match_post_id):
501 return text.removesuffix(t_co_url).strip()
502
503 return text
504
505
506def parse_article(article: dict) -> dict:
507 def inline_style(text: str, styles: list[dict]) -> str:
508 """处理内联样式 (加粗、斜体等字符级格式).
509
510 使用前后缀注入法, 避免直接修改字符串导致 Index 偏移
511 """
512 if not text.strip():
513 return ""
514 styles = styles or []
515 text_len = len(text)
516 prefixes = {i: [] for i in range(text_len + 1)}
517 suffixes = {i: [] for i in range(text_len + 1)}
518 for style in styles:
519 style_ = style["style"].lower()
520 start = style["offset"]
521 end = start + style["length"]
522 tag = ""
523 if style_ == "bold":
524 tag = "**"
525 elif style_ == "italic":
526 tag = "*"
527 if tag:
528 prefixes[start].append(tag)
529 suffixes[end].insert(0, tag) # 使用 insert(0) 确保闭合标签以正确的嵌套顺序反向闭合
530
531 formatted_text = ""
532 for i in range(text_len + 1):
533 formatted_text += "".join(suffixes[i]) # 先闭合
534 formatted_text += "".join(prefixes[i]) # 再开启
535 if i < text_len:
536 formatted_text += text[i]
537 return formatted_text
538
539 def parse_atomic(entities: list[dict]) -> str:
540 """Parse atomic block."""
541 if not entities:
542 return ""
543 texts = ""
544 for x in entities:
545 if entity := entity_dict.get(str(x["key"])):
546 e_type = entity.get("type", "").upper()
547 if e_type == "MEDIA":
548 media_id = glom(entity, "data.mediaItems.0.mediaId", default="")
549 if img_url := media_dict.get(str(media_id)):
550 texts += f""
551 elif e_type == "DIVIDER":
552 texts += "\n"
553 elif e_type == "TWEET":
554 if tweet_id := glom(entity, "data.tweetId", default=""):
555 texts += f"[QuoteTweet](https://x.com/i/status/{tweet_id})"
556 elif e_type == "MARKDOWN":
557 texts += glom(entity, "data.markdown", default="")
558 return texts
559
560 markdown = ""
561 if title := article.get("title"):
562 markdown += f"\n\n# {title}"
563 if cover_url := glom(article, "cover_media.media_info.original_img_url", default=""):
564 markdown += f"\n\n"
565
566 media_dict: dict = {} # {media_id: media_url} # currently, articles in X only support images
567 for media in article.get("media_entities", []):
568 media_dict[str(media.get("media_id"))] = glom(media, "media_info.original_img_url", default="")
569
570 entity_map = glom(article, "content.entityMap", default={})
571 entity_dict = {str(x["key"]): x["value"] for x in entity_map} if isinstance(entity_map, list) else {str(k): v for k, v in entity_map.items()}
572
573 # blocks
574 for block in glom(article, "content.blocks", default=[]):
575 text = inline_style(block.get("text"), block.get("inlineStyleRanges"))
576 entities = block.get("entityRanges", [])
577 match block.get("type"):
578 case "header-one" | "header-two" | "header-three" | "header-four":
579 markdown += f"\n\n**{text}**"
580 case "blockquote":
581 markdown += f"\n\n> {text}"
582 case "ordered-list-item" | "unordered-list-item":
583 markdown += f"\n\n• {text}"
584 case "atomic":
585 markdown += f"\n\n{parse_atomic(entities)}"
586 case _:
587 markdown += f"\n\n{text}" if text else ""
588
589 markdown_no_img, image_urls = remove_img_tag(markdown)
590 return {
591 "markdown": remove_consecutive_newlines(markdown).strip(),
592 "text": remove_consecutive_newlines(markdown_no_img).strip(),
593 "image_urls": image_urls,
594 "html": convert_html(markdown),
595 "media": {"all": [{"url": url, "type": "photo"} for url in image_urls]},
596 }