Commit 23c1ccb
Changed files (3)
src
preview
src/preview/twitter.py
@@ -1,34 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
-import copy
import json
import re
-from datetime import UTC, datetime
from pathlib import Path
-from zoneinfo import ZoneInfo
from glom import Coalesce, glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import LinkPreviewOptions, Message
-from bridge.social import send_to_social_media_bridge
-from config import AI, API, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ, cache
+from config import AI, API, CAPTION_LENGTH, PROXY, TELEGRAM_UA, TEXT_LENGTH, TZ
from database.r2 import get_cf_r2
from messages.database import copy_messages_from_db, save_messages
from messages.progress import modify_progress
from messages.sender import send2tg
-from messages.utils import blockquote, smart_split, summay_media
-from networking import download_file, download_media, flatten_rediercts, hx_req
-from preview.utils import add_summary_url
+from messages.utils import sender_markdown_to_html, smart_split, summay_media
+from networking import download_file, download_media, hx_req
+from preview.utils import add_summary_url, trim
from publish import publish_telegraph
from summarize.summarize import summarize
-from utils import nowstr, readable_count, remove_consecutive_newlines, remove_none_values, split_parts, true
-
-
-class APIError(Exception):
- pass
+from utils import nowdt, readable_count, remove_consecutive_newlines, true, ts_to_dt
async def preview_twitter(
@@ -36,13 +28,10 @@ async def preview_twitter(
message: Message,
url: str = "",
db_key: str = "",
- platform: str = "x",
- twitter_provider: str = PROVIDER.TWITTER,
+ handle: str = "",
+ post_id: int = 0,
*,
twitter_comments: bool = True,
- show_author: bool = True,
- show_pubdate: bool = True,
- show_device: bool = False,
show_statistics: bool = True,
summary_twitter: bool = False,
summary_twitter_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
@@ -68,181 +57,97 @@ async def preview_twitter(
res = await send2tg(client, message, texts=f"🔗正在解析推特链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
- succ = False
- master_info = {}
- this_info = {}
- quote_info = {}
- if "tikhub" in twitter_provider: # try tikhub first
- try:
- this_info = await get_tweet_info_via_tikhub(url=url, **kwargs)
- if not this_info:
- error = "❌[Tikhub]推特解析失败"
- await modify_progress(text=error, **kwargs)
- raise APIError(error) # noqa: TRY301
- quote_info = await get_tweet_info_via_tikhub(quote_info=this_info["quote_info"], **kwargs) if this_info["has_quote"] else {}
- params = copy.deepcopy(kwargs)
- params.pop("post_id", None)
- master_info = await get_tweet_info_via_tikhub(post_id=this_info["master_thread_id"], **params) if this_info["has_master"] else {}
- succ = True
- except Exception as e:
- logger.warning(f"Twitter API [tikhub] failed: {e}")
- if not succ and "fxtwitter" in twitter_provider: # try fxtwitter
- try:
- this_info = await get_tweet_info_via_fxtwitter(url=url)
- if not this_info:
- error = "❌[FxTwitter]推特解析失败"
- await modify_progress(text=error, **kwargs)
- raise APIError(error) # noqa: TRY301
- master_info = await get_tweet_info_via_fxtwitter(handle=this_info["replying_to_user"], post_id=this_info["replying_post_id"]) if this_info["has_master"] else {}
- quote_info = await get_tweet_info_via_fxtwitter(quote_info=this_info["quote_info"]) if this_info["has_quote"] else {}
- succ = True
- except Exception as e:
- logger.warning(f"Twitter API [fxtwitter] failed: {e}")
-
- if not succ and "vxtwitter" in twitter_provider: # try vxtwitter
- try:
- this_info = await get_tweet_info_via_vxtwitter(url=url)
- if not this_info:
- error = "❌[VxTwitter]推特解析失败"
- await modify_progress(text=error, **kwargs)
- raise APIError(error) # noqa: TRY301
- master_info = await get_tweet_info_via_vxtwitter(handle=this_info["replying_to_user"], post_id=this_info["replying_post_id"]) if this_info["has_master"] else {}
- quote_info = await get_tweet_info_via_vxtwitter(quote_info=this_info["quote_info"]) if this_info["has_quote"] else {}
- succ = True
- except Exception as e:
- logger.warning(f"Twitter API [vxtwitter] failed: {e}")
-
- if not succ:
- if "bridge" in twitter_provider:
- await modify_progress(text="❌推特解析失败, 尝试第三方Bot...", **kwargs)
- kwargs |= {"target_mid": message.id}
- await send_to_social_media_bridge(client, message, url, platform, **kwargs)
+ api_url = f"{API.FXTWITTER}/2/thread/{post_id}?lang=zh-cn"
+ logger.info(f"Twitter preview: {api_url}")
+ headers = {"user-agent": TELEGRAM_UA}
+ resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
+ resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
+ if resp.get("hx_error"):
+ if status := kwargs.get("progress"):
+ link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=f"https://fixupx.com/{handle}/status/{post_id}")
+ await status.edit_text(f"❌推特解析失败\n{url}", link_preview_options=link_preview)
return
-
+ resp = trim(resp)
+ thread: list[dict] = resp.get("thread", [])
+ caption = ""
media = []
- media_ids = set() # deduplicate media
- master_media = []
- for x in master_info.get("media", []):
- if x["id"] in media_ids:
- continue
- media_ids.add(x["id"])
- x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
- master_media.append(x)
-
- this_media = []
- for x in this_info.get("media", []):
- if x["id"] in media_ids:
- continue
- media_ids.add(x["id"])
- x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
- this_media.append(x)
-
- quote_media = []
- for x in quote_info.get("media", []):
- if x["id"] in media_ids:
- continue
- media_ids.add(x["id"])
- x[x["type"]] = download_file(x["url"], proxy=PROXY.TWITTER, **kwargs)
- quote_media.append(x)
- # 生成图片数量说明
- n_media_this = len(this_media)
- n_media_master = len(master_media) if this_info["has_master"] else 0
- n_media_quote = len(quote_media) if this_info["has_quote"] else 0
- part_strs = split_parts(n_media_master, n_media_this, n_media_quote)
-
- msg = ""
- master_handle = master_info.get("handle", "")
- # 被回复主推
- if master_info:
- if true(show_author) and master_info.get("author"):
- msg += f'\n🕊<a href="https://x.com/{master_info["handle"]}/status/{master_info["post_id"]}"><b>{master_info["author"]}</b></a>'
- if true(show_pubdate) and master_info.get("time"):
- msg += f"\n🕒{master_info['time']}"
- if part_strs["first"]:
- msg += f" {part_strs['first']}"
- if true(show_device) and master_info.get("device"):
- msg += f"📱{master_info['device']}"
- if true(show_statistics) and master_info.get("statistics"):
- msg += f"\n{master_info['statistics']}"
- if texts := master_info.get("texts"):
- msg += f"\n{texts}"
- if true(twitter_comments) and (comments := master_info.get("comments")):
- comments_str = "💬**点此展开评论区**:"
- for cmt in comments:
- if str(cmt["post_id"]) == str(this_info["post_id"]):
- continue
- comments_str += f"\n💬**{cmt['author']}**: {cmt['text']}"
- msg += blockquote(comments_str)
- media.extend(master_media)
-
- # 本条推文
- media.extend(this_media)
- if master_info:
- msg += "\n⤴️"
- if true(show_author) and this_info.get("author"):
- msg += f'\n🕊<a href="{url}"><b>{this_info["author"]}</b></a>'
- msg = msg.replace("\n⤴️\n🕊", "\n⤴️")
- if true(show_pubdate) and this_info.get("time"):
- msg += f"\n🕒{this_info['time']}"
- if part_strs["middle"] and (this_info["has_master"] or this_info["has_quote"]): # 当有supp_info时, 附加图片数量说明
- msg += f" {part_strs['middle']}"
- if true(show_device) and this_info.get("device"):
- msg += f"📱{this_info['device']}"
- if true(show_statistics) and this_info.get("statistics"):
- msg += f"\n{this_info['statistics']}"
-
- if texts := this_info.get("texts"):
- msg += f"\n{texts}"
-
- if true(twitter_comments) and (comments := this_info.get("comments")):
- comments_str = "💬**点此展开评论区**:"
- for cmt in comments:
- cleaned = cmt["text"].strip().removeprefix(f"@{master_handle}").strip() # 有时回推的comment前会附带被回推的handle, 这里去掉
- msg += f"\n💬**{cmt['author']}**: {cleaned}"
- msg += blockquote(comments_str)
-
- # 引用推文
- if quote_info:
- # 有时候引用推文时会在正文末尾附带引推链接, 这里去掉
- quote_x_url = f"https://x.com/{quote_info.get('handle', '')}/status/{quote_info.get('post_id', '')}"
- msg = remove_twitter_suffix(msg, post_id=quote_info["post_id"], same_id_only=True)
- msg += "\n//"
- if true(show_author) and quote_info.get("author"):
- msg += f'\n🕊<a href="{quote_x_url}"><b>{quote_info["author"]}</b></a>'
- msg = msg.replace("\n//\n", "\n//")
- if true(show_pubdate) and quote_info.get("time"):
- msg += f"\n🕒{quote_info['time']}"
- if part_strs["last"]:
- msg += f" {part_strs['last']}"
- if true(show_device) and quote_info.get("device"):
- msg += f"📱{quote_info['device']}"
- if true(show_statistics) and quote_info.get("statistics"):
- msg += f"\n{quote_info['statistics']}"
-
- if texts := quote_info.get("texts"):
- msg += f"\n{texts}"
- media.extend(quote_media)
+ media_cursor = 1
+ media_indicator = ""
+ article_url = None
+ article_html = ""
+ sender_tag = sender_markdown_to_html(kwargs.pop("send_from_user", ""))
+ for idx, post in enumerate(sorted(thread, key=lambda x: x.get("created_timestamp", 0))):
+ author = glom(post, "author.name", default="Anonymous")
+ tweet_url = glom(post, "url", default=url)
+ emoji = "🕊" if idx == 0 else "⤴️"
+ author_tag = sender_tag if idx == 0 else ""
+ author_tag += f'<a href="{tweet_url}"><b>{emoji}{author}</b></a>'
+ if post.get("article"):
+ post |= await parse_article(post["article"], author, tweet_url) # noqa: PLW2901
+ article_url = post.get("article_url")
+ article_html = post.get("html", "")
+ post_media = glom(post, "media.all", default=[])
+ media.extend(parse_media(post_media))
+ if post_media:
+ media_indicator = f"🏞P{media_cursor}-{media_cursor + len(post_media) - 1}" if len(post_media) > 1 else f"🏞P{media_cursor}"
+ media_cursor += len(post_media)
+ if (len(thread) == 1 and not post.get("quote")) or len(post_media) == 0:
+ media_indicator = ""
+ dt = ts_to_dt(post.get("created_timestamp")) or nowdt(TZ)
+ date_str = f"🕒{dt.strftime('%Y-%m-%d %H:%M:%S')} {media_indicator}".strip()
+ text = glom(post, Coalesce("html_no_media", "translation.text", "text"), default="")
+ stats = get_statistics(post, show_statistics=show_statistics) if idx == len(thread) - 1 else ""
+ caption += f"\n{author_tag}\n{date_str}\n{stats}\n".replace("\n\n", "\n") + clean_handle(text)
+ if quote := post.get("quote"):
+ quote_author = glom(quote, "author.name", default="Anonymous")
+ quote_url = glom(quote, "url", default=url)
+ quote_text = glom(quote, Coalesce("translation.text", "text"), default="")
+ quote_media = glom(quote, "media.all", default=[])
+ if article := quote.get("article"):
+ title = article.get("title", "Twitter Article")
+ preview_text = article.get("preview_text", "")
+ quote_text = f'<h1><a href="{quote_url}">{title}</a></h1>\n{preview_text}'
+ media.extend(parse_media(quote_media))
+ if quote_media:
+ media_indicator = f"🏞P{media_cursor}-{media_cursor + len(quote_media) - 1}" if len(quote_media) > 1 else f"🏞P{media_cursor}"
+ media_cursor += len(quote_media)
+ if len(quote_media) == 0:
+ media_indicator = ""
+ quote_dt = ts_to_dt(quote.get("created_timestamp")) or nowdt(TZ)
+ quote_date_str = f"🕒{quote_dt.strftime('%Y-%m-%d %H:%M:%S')} {media_indicator}".strip()
+ quote_stats = get_statistics(quote, show_statistics=show_statistics)
+ caption += f'\n<a href="{quote_url}"><b>↪️{quote_author}</b></a>\n{quote_date_str}\n{quote_stats}\n'.replace("\n\n", "\n") + clean_handle(quote_text)
await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
media = await download_media(media, **kwargs)
sent_messages = []
- if master_info.get("is_article") or this_info.get("is_article") or quote_info.get("is_article"):
- msg = msg.replace("<blockquote>", f"\n{'—' * 10}\n").replace("</blockquote>", f"\n{'—' * 10}\n")
- msg = msg.replace("<pre>", "</blockquote><pre>").replace("</pre>", "</pre><blockquote expandable>")
- article_url = master_info.get("article_url") or this_info.get("article_url") or quote_info.get("article_url") or url
- cur_msg = None
+ caption = caption.strip()
+ if article_url:
+ head, _ = caption.split("</h1>", maxsplit=1)
+ head += "</h1>"
+ caption = caption.strip().replace("<blockquote>", f"\n{'—' * 10}\n").replace("</blockquote>", f"\n{'—' * 10}\n")
+ caption = caption.replace("<pre>", "</blockquote><pre>").replace("</pre>", "</pre><blockquote expandable>")
link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=article_url)
- for m in await smart_split(msg):
- if not isinstance(cur_msg, Message):
- cur_msg = await message.reply_text(text=f"<blockquote expandable>{m}</blockquote>", quote=True, link_preview_options=link_preview)
+ for idx, m in enumerate(await smart_split(caption)):
+ if idx == 0: # first msg
+ text = f"{head}\n<blockquote expandable>{m.removeprefix(head)}</blockquote>" if m.startswith(head) else f"<blockquote expandable>{m}</blockquote>"
+ cur_msg = await message.reply_text(text=text, quote=True, link_preview_options=link_preview)
else:
cur_msg = await cur_msg.reply_text(f"<blockquote expandable>{m}</blockquote>", quote=True)
if isinstance(cur_msg, Message):
sent_messages.append(cur_msg)
await asyncio.sleep(1)
sent_messages.extend(await send2tg(client, cur_msg or message, media=media, keep_file=True, **kwargs))
- else:
- sent_messages = await send2tg(client, message, texts=msg.strip(), media=media, keep_file=True, **kwargs)
+ else: # Normal tweet
+ comments_list = await get_comments(post_id, twitter_comments=twitter_comments)
+ caption_with_comments = caption
+ max_length = CAPTION_LENGTH if media else TEXT_LENGTH
+ for cmt in comments_list:
+ if len(await smart_split(f"{caption_with_comments}\n<blockquote expandable>{cmt}</blockquote>", max_length)) == 1:
+ caption_with_comments += f"\n{cmt}"
+ comments = caption_with_comments.removeprefix(caption).strip()
+ texts = f"{caption}\n<blockquote expandable>{comments}</blockquote>" if comments else caption
+ sent_messages = await send2tg(client, message, texts=texts, media=media, keep_file=True, **kwargs)
await modify_progress(del_status=True, **kwargs)
# Summary twitter
# find the first message that has a caption
@@ -254,308 +159,57 @@ async def preview_twitter(
index = idx
break
if summary_twitter and caption_msg:
- edited_msg = await summarize_twitter(caption_msg, this_info, master_info, quote_info, media, summary_twitter_model)
+ edited_msg = await summarize_twitter(caption_msg, resp, article_html, media, summary_twitter_model)
sent_messages[index] = edited_msg
await save_messages(messages=sent_messages, key=db_key)
# Clean up
[Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
-@cache.memoize(ttl=30)
-async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info: dict | None = None, **kwargs) -> dict:
- """Get a single tweet info.
-
- url: https://x.com/{handle}/status/{post_id}
- """
- if not post_id:
- post_id = url.rsplit("/", maxsplit=1)[-1]
- api_url = f"{API.TIKHUB_TWITTER}{post_id}"
- logger.info(f"Twitter preview via TikHub: {api_url}")
- data = {}
-
- if quote_info: # quote_info is directly parsed from the this_info
- data = copy.deepcopy(quote_info)
- post_id = quote_info.get("tweet_id", "")
- data["id"] = post_id
- await modify_progress(text="✅正在解析引用推文...", **kwargs)
- else:
- headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_keys=["data.author.screen_name"], check_kv={"data.id": post_id})
- if resp.get("hx_error") or glom(resp, "data.author.screen_name") is None:
- logger.error("Failed to get tweet info via TikHub")
- return {}
- data: dict = resp["data"]
- await modify_progress(text=f"✅推文{post_id}解析成功, 正在处理...", **kwargs)
- data = remove_none_values(data)
- handle = glom(data, "author.screen_name", default="") or ""
- post_id = glom(data, "id", default=post_id) or post_id
- info = {"handle": handle, "post_id": post_id}
-
- # API old style
- media_info = glom(data, "media", default={}) or {}
- # the master thread media may be repeated in the reply tweet
- # so we do not download the media file here but record media "id" for de-duplication
- media = [{"type": "photo", "url": x.get("media_url_https", ""), "id": x.get("id", "0")} for x in media_info.get("photo", [])]
- for x in media_info.get("video", []):
- if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
- mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
- media.append({"type": "video", "url": mp4_url, "id": x.get("id", "0")})
- # API new style
- if not media:
- entities = glom(data, "entities.media", default=[])
- for entity in entities:
- if entity.get("type", "") == "video" and glom(entity, "video_info.variants", default=[]):
- variants = glom(entity, "video_info.variants", default=[])
- variants = [x for x in variants if "mp4" in x.get("content_type", "")]
- mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
- media.append({"type": "video", "url": mp4_url, "id": entity.get("id_str", "0")})
- elif entity.get("type", "") == "photo":
- media.append({"type": "photo", "url": entity.get("media_url_https", ""), "id": entity.get("id_str", "0")})
-
- info["media"] = media
- info["author"] = glom(data, "author.name", default="") or ""
- if date_string := glom(data, "created_at", default=""):
- dt = datetime.strptime(date_string, "%a %b %d %H:%M:%S %z %Y").astimezone(ZoneInfo(TZ))
- info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
- texts = await remove_tco_suffix(glom(data, "text", default="") or "", post_id=post_id)
- texts = await flatten_rediercts(texts)
- info["texts"] = texts
-
- conversation_id = glom(data, "conversation_id", default="0") or "0"
- if int(conversation_id) != int(post_id):
- info["has_master"] = True
- info["master_thread_id"] = conversation_id
- else:
- info["has_master"] = False
-
- # parse comments
- threads = glom(data, "thread", default=[]) or []
- threads = [x for x in threads if int(x.get("conversation_id", "0")) == int(conversation_id) and int(x.get("id", "0")) != int(post_id)]
- threads = sorted(threads, key=lambda x: x.get("id", {}))
- comments = []
- for node in threads:
- comment_handle = glom(node, "author.screen_name", default="")
- if comment_post_id := node.get("id", ""):
- comment_author = f"[{comment_handle}](https://x.com/{comment_handle}/status/{comment_post_id})"
- else:
- comment_author = f"[{comment_handle}](https://x.com/{comment_handle})"
- comment_text = node.get("text", "").removeprefix(f"@{handle}")
- comment_text = re.sub(r"https?://t\.co/\w+$", "", comment_text) # remove t.co link suffix
- comment_text = await remove_tco_suffix(comment_text, post_id=node.get("id", ""))
- comment_text = await flatten_rediercts(comment_text)
- comment_text = comment_text.strip()
- if comment_handle and comment_text:
- comments.append({"author": comment_author, "text": comment_text, "post_id": comment_post_id})
-
+def get_statistics(post: dict, *, show_statistics: bool = True) -> str:
+ if not true(show_statistics):
+ return ""
statistics = ""
- if view := glom(data, "views", default=0):
+ if view := glom(post, "views", default=0):
statistics += f"👁{readable_count(view)}"
- if like := glom(data, "likes", default=0):
+ if like := glom(post, "likes", default=0):
statistics += f"❤️{readable_count(like)}"
- if comment := glom(data, "replies", default=0):
+ if comment := glom(post, "replies", default=0):
statistics += f"💬{readable_count(comment)}"
- if share := glom(data, "retweets", default=0):
+ if share := glom(post, "reposts", default=0):
statistics += f"🔁{readable_count(share)}"
- info["statistics"] = statistics
- info["comments"] = comments
- info["quote_info"] = glom(data, "quoted", default={}) or {}
- info["has_quote"] = bool(info["quote_info"])
- return info
+ if bookmark := glom(post, "bookmarks", default=0):
+ statistics += f"🔖{readable_count(bookmark)}"
+ return statistics
-@cache.memoize(ttl=30)
-async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict:
- """Get a single tweet info.
+def clean_handle(s: str) -> str:
+ """Remove handle prefix."""
+ return re.sub(r"^(\s*@[a-zA-Z0-9_]+)+\s*", "", s)
- url: https://x.com/{handle}/status/{post_id}
- """
- data = {}
- if quote_info:
- data = copy.deepcopy(quote_info)
- handle = glom(data, "author.name", default="")
- post_id = data.get("id", "")
- else:
- if not handle or not post_id:
- handle = url.split("/")[-3]
- post_id = url.rsplit("/", maxsplit=1)[-1]
- api_url = f"{API.FXTWITTER}/{handle}/status/{post_id}/zh"
- logger.info(f"Twitter preview via fxtwitter: {api_url}")
- headers = {"user-agent": TELEGRAM_UA}
- resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER)
- if resp.get("hx_error"):
- logger.error("Failed to get tweet info via fxtwitter")
- return {}
- data: dict = resp["tweet"]
- if data.get("article"):
- data |= parse_article(data["article"])
- author = glom(data, "author.screen_name", default="Anonymous")
- url = f"https://x.com/{author}/status/{post_id}"
- data["article_url"] = await publish_telegraph(title=data["title"], author=author, url=url, html=data["html"])
- data["text"] = data["text"].replace(f"<h1>{data['title']}</h1>", f'<h1><a href="{data["article_url"]}">{data["title"]}</a></h1>')
-
- info = {"handle": glom(data, "author.screen_name", default=handle), "post_id": data.get("id", post_id)}
- media = glom(data, "media.all", default=[])
- for x in media:
- if x.get("type", "") == "video": # this is a m3u8 url, choose mp4 instead
- m3u8_url = x.get("url", "")
- mp4_url = ""
- if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
- mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
- x["url"] = mp4_url or m3u8_url
- if x.get("type", "") == "gif":
- x["type"] = "video"
- x["id"] = x["url"] # record media "id" for de-duplication
-
- statistics = ""
- if view := glom(data, "views", default=0):
- statistics += f"👁{readable_count(view)}"
- if like := glom(data, "likes", default=0):
- statistics += f"❤️{readable_count(like)}"
- if comment := glom(data, "replies", default=0):
- statistics += f"💬{readable_count(comment)}"
- if share := glom(data, "retweets", default=0):
- statistics += f"🔁{readable_count(share)}"
- info["statistics"] = statistics
- info["media"] = media
- info["author"] = glom(data, "author.name", default="")
- if ts := data.get("created_timestamp", ""):
- dt = datetime.fromtimestamp(round(float(ts)), tz=UTC).astimezone(ZoneInfo(TZ))
- info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
- info["texts"] = glom(data, Coalesce("translation.text", "text"), default="")
- info["html"] = data.get("html", "")
- info["is_article"] = data.get("is_article", False)
- info["article_url"] = data.get("article_url")
- info["device"] = data.get("source", "").removeprefix("Twitter for").removeprefix("Twitter").removesuffix("App").strip().removesuffix("Web")
- info["replying_to_user"] = data.get("replying_to", "")
- info["replying_post_id"] = data.get("replying_to_status", "")
- info["quote_info"] = data.get("quote", {})
- info["has_master"] = bool(data.get("replying_to"))
- info["has_quote"] = bool(info["quote_info"])
- return info
-
-
-@cache.memoize(ttl=30)
-async def get_tweet_info_via_vxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict:
- """Get a single tweet info.
-
- url: https://x.com/{handle}/status/{post_id}
- """
- data = {}
- if quote_info:
- data = copy.deepcopy(quote_info)
- handle = data.get("user_screen_name", "")
- post_id = data.get("tweetID", "")
- else:
- if not handle or not post_id:
- handle = url.split("/")[-3]
- post_id = url.rsplit("/", maxsplit=1)[-1]
- api_url = f"{API.VXTWITTER}/Twitter/status/{post_id}"
- logger.info(f"Twitter preview via vxtwitter: {api_url}")
- headers = {"user-agent": TELEGRAM_UA}
- data = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"tweetID": post_id})
- if data.get("hx_error"):
- logger.error("Failed to get tweet info via vxtwitter")
- return {}
- if data.get("retweet"):
- data = data["retweet"]
- info = {"handle": glom(data, "screen_name", default=handle), "post_id": data.get("tweetID", post_id)}
- media = data.get("media_extended", [])
- for x in media:
- x["id"] = x.get("url", "") # record media "id" for de-duplication
- if x.get("type", "") == "image": # change `image` -> `photo`
- x["type"] = "photo"
- if x.get("type", "") == "gif":
- x["type"] = "video"
- statistics = ""
- if view := glom(data, "views", default=0):
- statistics += f"👁{readable_count(view)}"
- if like := glom(data, "likes", default=0):
- statistics += f"❤️{readable_count(like)}"
- if comment := glom(data, "replies", default=0):
- statistics += f"💬{readable_count(comment)}"
- if share := glom(data, "retweets", default=0):
- statistics += f"🔁{readable_count(share)}"
- info["statistics"] = statistics
- info["media"] = media
- info["author"] = data.get("user_name", f"@{info['handle']}")
- if ts := data.get("date_epoch", 0):
- dt = datetime.fromtimestamp(round(float(ts)), tz=UTC).astimezone(ZoneInfo(TZ))
- info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
- info["texts"] = data.get("text", "")
- info["device"] = data.get("source", "").removeprefix("Twitter for").removeprefix("Twitter").removesuffix("App").strip().removesuffix("Web")
- info["replying_to_user"] = data.get("replyingTo", "")
- info["replying_post_id"] = data.get("replyingToID", "")
- info["quote_info"] = data.get("qrt", {})
- info["has_master"] = bool(data.get("replyingTo"))
- info["has_quote"] = bool(data.get("qrt"))
- return info
-
-
-def remove_twitter_suffix(text: str, post_id: str = "", *, same_id_only: bool = True) -> str:
- """Remove twitter link suffix.
-
- Some tweet ends with a twitter link to the tweet itself.
-
- Args:
- text (str): The tweet text.
- post_id (str): The text belongs to this post_id .
- force (bool): Force remove the suffix.
- same_id_only (bool): Only remove the suffix when the post_id is the same.
- """
- text = str(text).strip()
-
- match_url = ""
- match_post_id = ""
- if matched := re.search(r"https?://(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)$", text):
- match_url = matched.group(0)
- match_post_id = matched.group(3)
-
- if same_id_only and post_id and str(post_id) == str(match_post_id):
- return text.removesuffix(match_url).strip()
-
- return text
-
-
-async def remove_tco_suffix(text: str, post_id: str = "") -> str:
- """Parse t.co link suffix.
-
- Some tweet ends with t.co link in TikHub parsed info (this is a bug of TikHub). The t.co link may be a redirect link to the tweet itself.
- Here we extract the t.co link and check if it is the same as the post_id, if so, remove the t.co link.
-
- Args:
- text (str): The text to be parsed.
- post_id (str): The text belongs to this post_id .
- """
- text = str(text).strip()
- # not end with t.co link, do nothing
- if not (matched := re.search(r"https?://t\.co/\w+$", text)):
- return text
-
- # t.co at the end of the text
- t_co_url: str = matched.group(0)
-
- # parse t.co redirect
- raw_url = await flatten_rediercts(t_co_url)
-
- # check if the redirect url is a twitter link the same with post_id
- match_post_id = ""
- if matched := re.search(r"https?://(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)", raw_url):
- match_post_id = matched.group(3)
-
- if str(post_id) == str(match_post_id):
- return text.removesuffix(t_co_url).strip()
-
- return text
-
-
-def parse_article(article: dict) -> dict:
+def parse_media(media_list: list[dict]) -> list[dict]:
+ """Parse media list."""
+ media = []
+ for x in media_list:
+ if x.get("type") == "photo":
+ media.append({"url": x.get("url"), "photo": download_file(x.get("url", ""), proxy=PROXY.TWITTER)})
+ elif x.get("type") in ["gif", "video"]:
+ mp4 = [x for x in x.get("formats", []) if x.get("codec") == "h264"]
+ mp4_url = next((x.get("url", "") for x in sorted(mp4, key=lambda x: x.get("bitrate", 0), reverse=True)), "")
+ if not mp4_url:
+ mp4_url = x.get("url", "")
+ media.append({"url": mp4_url, "video": download_file(mp4_url, proxy=PROXY.TWITTER)})
+ return media
+
+
+async def parse_article(article: dict, author: str, tweet_url: str) -> dict:
def inline_style(text: str, styles: list[dict]) -> str:
"""处理内联样式 (加粗、斜体等字符级格式).
使用前后缀注入法, 避免直接修改字符串导致 Index 偏移
"""
- if not text.strip():
+ if not isinstance(text, str) or not text.strip():
return ""
styles = styles or []
text_len = len(text)
@@ -587,11 +241,10 @@ def parse_article(article: dict) -> dict:
html = ""
if cover_url := glom(article, "cover_media.media_info.original_img_url", default=""):
html += f'\n<img src="{cover_url}" alt="Cover" />'
-
media_list = []
for media in article.get("media_entities", []):
if variants := [x for x in glom(media, "media_info.variants", default=[]) if x.get("content_type") == "video/mp4"]: # video
- variants = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)
+ variants = sorted(variants, key=lambda x: x.get("bit_rate", 0), reverse=True)
if video_url := glom(variants, "0.url", default=""):
media_list.append({"url": video_url, "type": "video", "media_id": media.get("media_id")})
elif img_url := glom(media, "media_info.original_img_url", default=""):
@@ -667,47 +320,73 @@ def parse_article(article: dict) -> dict:
# 移除所有img和video标签
clean_html = img_pattern.sub("", html)
clean_html = video_pattern.sub("", clean_html)
+ title = article.get("title", "Twitter Article")
+ if article_url := await publish_telegraph(title=title, author=author, url=tweet_url, html=html):
+ clean_html = f'<h1><a href="{article_url}">{title}</a></h1>\n{clean_html.strip()}'
+ html = f'<h1><a href="{article_url}">{title}</a></h1>\n{html.strip()}'
+
return {
"is_article": True,
- "text": remove_consecutive_newlines(clean_html).strip(),
+ "html_no_media": remove_consecutive_newlines(clean_html).strip(),
"image_urls": img_pattern.findall(html),
"video_urls": video_pattern.findall(html),
"html": html,
+ "article_url": article_url,
"media": {"all": media},
"title": article.get("title", "Twitter Article"),
}
-async def summarize_twitter(message: Message, this_info: dict, master_info: dict, quote_info: dict, media_list: list[dict], model: str) -> Message:
+async def get_comments(post_id: int, *, twitter_comments: bool = True) -> list[str]:
+ """Get comments."""
+ if not true(twitter_comments):
+ return []
+ api_url = f"{API.FXTWITTER}/2/conversation/{post_id}?lang=zh-cn"
+ logger.info(f"Get Twitter comments: {api_url}")
+ headers = {"user-agent": TELEGRAM_UA}
+ resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_kv={"status.id": post_id})
+ if resp.get("hx_error"):
+ return []
+ resp = trim(resp)
+ comments = []
+ replies = resp.get("replies", [])
+ for reply in sorted(replies, key=lambda x: x["created_timestamp"]):
+ author = glom(reply, "author.name", default="Anonymous")
+ tweet_url = glom(reply, "url", default="https://x.com")
+ if text := glom(reply, Coalesce("text", "raw_text.text"), default=""):
+ comments.append(f'<a href="{tweet_url}"><b>💬{author}:</b></a> {clean_handle(text)}')
+ if comments:
+ comments.insert(0, "<b>💬点此展开评论区:</b>")
+ return comments
+
+
+async def summarize_twitter(message: Message, tweet: dict, article: str, media_list: list[dict], model: str) -> Message:
"""Generate source for AI summary."""
-
- def trim(obj: dict) -> dict:
- if isinstance(obj, dict):
- return {k: trim(v) for k, v in obj.items() if v not in ["", None]}
- if isinstance(obj, list):
- return [trim(item) for item in obj if item not in ["", None]] # ty:ignore[invalid-return-type]
- return obj
-
- def cleanup(info: dict) -> dict:
- cleaned = {}
- keep_keys = {"author_name": "author", "created_at": "time", "content": ["markdown", "texts"], "post_id": "post_id", "handle": "handle"}
- for k, v in keep_keys.items():
- target = trim(info.copy())
- spec = v if isinstance(v, str) else Coalesce(*v)
- cleaned[k] = glom(target, spec, default=None)
- if cleaned.get("post_id") and cleaned.get("handle"):
- cleaned["url"] = f"https://x.com/{cleaned['handle']}/status/{cleaned['post_id']}"
- cleaned.pop("handle")
- return trim(cleaned)
-
- def get_key(cleaned: dict, key: str, *, default: str = "") -> str:
- return glom(cleaned, Coalesce(key, f"quote_tweet.{key}", f"replying_to_tweet.{key}"), default=default)
-
- article = {"platform": "Twitter / X"} | cleanup(this_info)
- if cleanup(quote_info):
- article |= {"quote_tweet": cleanup(quote_info)}
- if cleanup(master_info):
- article |= {"replying_to_tweet": cleanup(master_info)}
+ thread = tweet.get("thread", [])
+ posts = []
+ for post in sorted(thread, key=lambda x: x.get("created_timestamp", 0)):
+ author = glom(post, "author.name", default="Anonymous")
+ dt = ts_to_dt(post.get("created_timestamp")) or nowdt(TZ)
+ date_str = f"{dt.strftime('%Y-%m-%d %H:%M:%S')}"
+ text = article or glom(post, Coalesce("translation.text", "text"), default="")
+ post_info = {"author": author, "date": date_str, "text": clean_handle(text)}
+ if quote := post.get("quote"):
+ quote_author = glom(quote, "author.name", default="Anonymous")
+ quote_text = glom(quote, Coalesce("translation.text", "text"), default="")
+ if article := quote.get("article"):
+ title = article.get("title", "Twitter Article")
+ preview_text = article.get("preview_text", "")
+ quote_text = f"<h1>{title}</h1>\n{preview_text}"
+ quote_dt = ts_to_dt(quote.get("created_timestamp")) or nowdt(TZ)
+ quote_date_str = f"{quote_dt.strftime('%Y-%m-%d %H:%M:%S')}"
+ post_info["quote_tweet"] = {"author": quote_author, "date": quote_date_str, "text": clean_handle(quote_text)}
+ posts.append(post_info)
+
+ summary_info: dict = {"platform": "Twitter / X"}
+ if len(posts) > 1:
+ summary_info["thread"] = posts
+ elif len(posts) == 1:
+ summary_info |= posts[0]
sources = []
min_text_length = 1000 # skip short tweets
@@ -719,19 +398,17 @@ async def summarize_twitter(message: Message, this_info: dict, master_info: dict
min_text_length = None
min_video_duration = 120 # skip short videos less than 3 minutes
sources.append({"type": "video", "path": media["video"]})
- if this_info.get("markdown") or quote_info.get("markdown") or master_info.get("markdown"):
+ if article:
min_text_length = None # This is twitter article
min_video_duration = None
- sources.append({"type": "text", "text": json.dumps(article, ensure_ascii=False)})
- author_name = get_key(article, "author_name", default="Anonymous")
- pid = get_key(article, "post_id", default="")
+ sources.append({"type": "text", "text": json.dumps(summary_info, ensure_ascii=False)})
summary = await summarize(
sources=sources,
model=model,
- title=f"🕊{author_name} - {pid}",
- author=author_name,
- url=get_key(article, "url", default="https://x.com"),
- date=get_key(article, "time", default=nowstr(TZ)),
+ title=f"🕊{author}",
+ author=glom(tweet, "status.author.name", default="Anonymous"),
+ url=glom(tweet, "status.url", default="https://x.com"),
+ date=ts_to_dt(glom(tweet, "status.created_timestamp", default=None)) or nowdt(TZ),
min_text_length=min_text_length,
min_video_duration=min_video_duration,
max_video_duration=3600, # skip long videos more than 1 hour
src/preview/utils.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
+from typing import Any
from glom import glom
from loguru import logger
@@ -53,3 +54,11 @@ async def add_summary_url(url: str, message: Message) -> Message:
link_preview = LinkPreviewOptions(is_disabled=False, show_above_text=True, url=url)
await message.reply_text(f'<a href="{url}"><b>🤖AI导读</b></a>', quote=True, parse_mode=ParseMode.HTML, link_preview_options=link_preview)
return message
+
+
+def trim(obj: Any) -> Any:
+ if isinstance(obj, dict):
+ return {k: trim(v) for k, v in obj.items() if v not in ["", None, {}]}
+ if isinstance(obj, list):
+ return [trim(item) for item in obj if item not in ["", None, {}]]
+ return obj
src/config.py
@@ -75,13 +75,11 @@ class PREFIX:
class API:
FXTWITTER = os.getenv("FXTWITTER_API", "https://api.fxtwitter.com")
- VXTWITTER = os.getenv("VXTWITTER_API", "https://api.vxtwitter.com")
DDINSTAGRAM = os.getenv("DDINSTAGRAM_API", "https://www.ddinstagram.com")
TIKHUB = os.getenv("TIKHUB", "https://api.tikhub.io")
TIKHUB_FREE = os.getenv("TIKHUB_FREE", "https://api.douyin.wtf")
TIKHUB_INSTAGRAM = os.getenv("TIKHUB_INSTAGRAM_API", "https://api.tikhub.io/api/v1/instagram/v1/fetch_post_by_url?post_url=")
TIKHUB_INSTAGRAM_STORY = os.getenv("TIKHUB_INSTAGRAM_STORY_API", "https://api.tikhub.io/api/v1/instagram/v3/get_user_stories?username=")
- TIKHUB_TWITTER = os.getenv("TIKHUB_TWITTER_API", "https://api.tikhub.io/api/v1/twitter/web/fetch_post_comments?tweet_id=")
TIKHUB_WEIBO_VIDEO = os.getenv("TIKHUB_WEIBO_VIDEO_API", "https://api.tikhub.io/api/v1/weibo/web/fetch_short_video_data?share_text=")
TIKHUB_WECHAT = os.getenv("TIKHUB_WECHAT", "https://api.tikhub.io/api/v1/wechat_mp/web/fetch_mp_article_detail_json?url=")
BINANCE_SPOT = os.getenv("BINANCE_SPOT_API", "https://data-api.binance.vision")
@@ -112,7 +110,6 @@ class DANMU:
class PROVIDER: # default API provider
DOUYIN = os.getenv("DOUYIN_PROVIDER", "direct-free-tikhub-bridge").lower()
DOUYIN_COMMENTS = os.getenv("DOUYIN_COMMENTS_PROVIDER", "free-tikhub").lower() # a false value (0, false, none, null) to disable it
- TWITTER = os.getenv("TWITTER_PROVIDER", "tikhub-vxtwitter-fxtwitter-bridge").lower()
INSTAGRAM = os.getenv("INSTAGRAM_PROVIDER", "tikhub-ddinstagram-bridge").lower()
WEIBO = os.getenv("WEIBO_PROVIDER", "direct-bridge").lower()
XHS = os.getenv("XHS_PROVIDER", "direct-bridge").lower()