Commit 72ecb54
Changed files (1)
src
preview
src/preview/twitter.py
@@ -17,9 +17,13 @@ from database.database import get_db
from messages.database import copy_messages_from_db, save_messages
from messages.progress import modify_progress
from messages.sender import send2tg
-from messages.utils import summay_media
+from messages.utils import remove_img_tag, summay_media
from networking import download_file, download_media, flatten_rediercts, hx_req
-from utils import readable_count, remove_none_values, split_parts, true
+from utils import convert_html, readable_count, remove_consecutive_newlines, remove_none_values, split_parts, true
+
+
+class APIError(Exception):
+ pass
async def preview_twitter(
@@ -231,7 +235,7 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
url: https://x.com/{handle}/status/{post_id}
"""
if not post_id:
- post_id = url.split("/")[-1]
+ post_id = url.rsplit("/", maxsplit=1)[-1]
api_url = f"{API.TIKHUB_TWITTER}{post_id}"
logger.info(f"Twitter preview via TikHub: {api_url}")
data = {}
@@ -340,7 +344,7 @@ async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id:
else:
if not handle or not post_id:
handle = url.split("/")[-3]
- post_id = url.split("/")[-1]
+ post_id = url.rsplit("/", maxsplit=1)[-1]
api_url = f"{API.FXTWITTER}/{handle}/status/{post_id}"
logger.info(f"Twitter preview via fxtwitter: {api_url}")
headers = {"user-agent": TELEGRAM_UA}
@@ -350,6 +354,9 @@ async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id:
return {}
data: dict = resp["tweet"]
+ if data.get("article"):
+ data |= parse_article(data["article"])
+
info = {"handle": glom(data, "author.screen_name", default=handle), "post_id": data.get("id", post_id)}
media = glom(data, "media.all", default=[])
for x in media:
@@ -362,6 +369,7 @@ async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id:
if x.get("type", "") == "gif":
x["type"] = "video"
x["id"] = x["url"] # record media "id" for de-duplication
+
statistics = ""
if view := glom(data, "views", default=0):
statistics += f"👁{readable_count(view)}"
@@ -401,7 +409,7 @@ async def get_tweet_info_via_vxtwitter(url: str = "", handle: str = "", post_id:
else:
if not handle or not post_id:
handle = url.split("/")[-3]
- post_id = url.split("/")[-1]
+ post_id = url.rsplit("/", maxsplit=1)[-1]
api_url = f"{API.VXTWITTER}/Twitter/status/{post_id}"
logger.info(f"Twitter preview via vxtwitter: {api_url}")
headers = {"user-agent": TELEGRAM_UA}
@@ -501,5 +509,93 @@ async def remove_tco_suffix(text: str, post_id: str = "") -> str:
return text
-class APIError(Exception):
- pass
+def parse_article(article: dict) -> dict:
+ def inline_style(text: str, styles: list[dict]) -> str:
+ """处理内联样式 (加粗、斜体等字符级格式).
+
+ 使用前后缀注入法, 避免直接修改字符串导致 Index 偏移
+ """
+ if not text.strip():
+ return ""
+ styles = styles or []
+ text_len = len(text)
+ prefixes = {i: [] for i in range(text_len + 1)}
+ suffixes = {i: [] for i in range(text_len + 1)}
+ for style in styles:
+ style_ = style["style"].lower()
+ start = style["offset"]
+ end = start + style["length"]
+ tag = ""
+ if style_ == "bold":
+ tag = "**"
+ elif style_ == "italic":
+ tag = "*"
+ if tag:
+ prefixes[start].append(tag)
+ suffixes[end].insert(0, tag) # 使用 insert(0) 确保闭合标签以正确的嵌套顺序反向闭合
+
+ formatted_text = ""
+ for i in range(text_len + 1):
+ formatted_text += "".join(suffixes[i]) # 先闭合
+ formatted_text += "".join(prefixes[i]) # 再开启
+ if i < text_len:
+ formatted_text += text[i]
+ return formatted_text
+
+ def parse_atomic(entities: list[dict]) -> str:
+ """Parse atomic block."""
+ if not entities:
+ return ""
+ texts = ""
+ for x in entities:
+ if entity := entity_dict.get(str(x["key"])):
+ e_type = entity.get("type", "").upper()
+ if e_type == "MEDIA":
+ media_id = glom(entity, "data.mediaItems.0.mediaId", default="")
+ if media_dict.get(str(media_id)):
+ # texts += f""
+ texts += "[IMAGE]"
+ elif e_type == "DIVIDER":
+ texts += "\n"
+ elif e_type == "TWEET":
+ if tweet_id := glom(entity, "data.tweetId", default=""):
+ texts += f"[QuoteTweet](https://x.com/i/status/{tweet_id})"
+ return texts
+
+ markdown = ""
+ if title := article.get("title"):
+ markdown += f"\n\n# {title}"
+ if cover_url := glom(article, "cover_media.media_info.original_img_url", default=""):
+ markdown += f"\n\n"
+
+ media_dict: dict = {} # {media_id: media_url} # currently, articles in X only support images
+ for media in article.get("media_entities", []):
+ media_dict[str(media.get("media_id"))] = glom(media, "media_info.original_img_url", default="")
+
+ entity_map = glom(article, "content.entityMap", default={})
+ entity_dict = {str(x["key"]): x["value"] for x in entity_map} if isinstance(entity_map, list) else {str(k): v for k, v in entity_map.items()}
+
+ # blocks
+ for block in glom(article, "content.blocks", default=[]):
+ text = inline_style(block.get("text"), block.get("inlineStyleRanges"))
+ entities = block.get("entityRanges", [])
+ match block.get("type"):
+ case "header-one" | "header-two" | "header-three" | "header-four":
+ markdown += f"\n\n**{text}**"
+ case "blockquote":
+ markdown += f"\n\n> {text}"
+ case "ordered-list-item" | "unordered-list-item":
+ markdown += f"\n\n• {text}"
+ case "atomic":
+ markdown += f"\n\n{parse_atomic(entities)}"
+ case _:
+ markdown += f"\n\n{text}" if text else ""
+
+ markdown_no_img, image_urls = remove_img_tag(markdown)
+ return {
+ "markdown": remove_consecutive_newlines(markdown).strip(),
+ "text": remove_consecutive_newlines(markdown_no_img).strip(),
+ "image_urls": image_urls,
+ "html": convert_html(markdown),
+ "media": {"all": [{"url": url, "type": "photo"} for url in image_urls]},
+ }