Commit 53d2b51
Changed files (1)
src
preview
src/preview/twitter.py
@@ -224,6 +224,7 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
api_url = f"{API.TIKHUB_TWITTER}{post_id}"
logger.info(f"Twitter preview via TikHub: {api_url}")
data = {}
+
if quote_info: # quote_info is directly parsed from the this_info
data = copy.deepcopy(quote_info)
post_id = quote_info.get("tweet_id", "")
@@ -231,18 +232,19 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
await modify_progress(text="✅正在解析引用推文...", **kwargs)
else:
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_keys=["data.author.screen_name"], check_kv={"data.id": post_id})
+ resp = await hx_req(api_url, headers=headers, proxy=PROXY.TWITTER, check_keys=["data.author.screen_name"], check_kv={"data.id": post_id})
if resp.get("hx_error"):
logger.error("Failed to get tweet info via TikHub")
return {}
data: dict = resp["data"]
await modify_progress(text=f"✅推文{post_id}解析成功, 正在处理...", **kwargs)
-
data = remove_none_values(data)
- handle = data.get("author", {}).get("screen_name", "")
- post_id = data.get("id", post_id)
+ handle = glom(data, "author.screen_name", default="") or ""
+ post_id = glom(data, "id", default=post_id) or post_id
info = {"handle": handle, "post_id": post_id}
- media_info = data.get("media", {})
+
+ # API old style
+ media_info = glom(data, "media", default={}) or {}
# the master thread media may be repeated in the reply tweet
# so we do not download the media file here but record media "id" for de-duplication
media = [{"type": "photo", "url": x.get("media_url_https", ""), "id": x.get("id", "0")} for x in media_info.get("photo", [])]
@@ -250,16 +252,28 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
if variants := [x for x in x.get("variants", []) if "mp4" in x.get("content_type", "")]:
mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
media.append({"type": "video", "url": mp4_url, "id": x.get("id", "0")})
+ # API new style
+ if not media:
+ entities = glom(data, "entities.media", default=[])
+ for entity in entities:
+ if entity.get("type", "") == "video" and glom(entity, "video_info.variants", default=[]):
+ variants = glom(entity, "video_info.variants", default=[])
+ variants = [x for x in variants if "mp4" in x.get("content_type", "")]
+ mp4_url = sorted(variants, key=lambda x: x.get("bitrate", 0), reverse=True)[0]["url"]
+ media.append({"type": "video", "url": mp4_url, "id": entity.get("id_str", "0")})
+ elif entity.get("type", "") == "photo":
+ media.append({"type": "photo", "url": entity.get("media_url_https", ""), "id": entity.get("id_str", "0")})
+
info["media"] = media
- info["author"] = data.get("author", {}).get("name", "")
- if date_string := data.get("created_at", ""):
+ info["author"] = glom(data, "author.name", default="") or ""
+ if date_string := glom(data, "created_at", default=""):
dt = datetime.strptime(date_string, "%a %b %d %H:%M:%S %z %Y").astimezone(ZoneInfo(TZ))
info["time"] = f"{dt:%Y-%m-%d %H:%M:%S}"
- texts = await remove_tco_suffix(data.get("text", ""), post_id=post_id)
+ texts = await remove_tco_suffix(glom(data, "text", default="") or "", post_id=post_id)
texts = await flatten_rediercts(texts)
info["texts"] = texts
- conversation_id = data.get("conversation_id", "0")
+ conversation_id = glom(data, "conversation_id", default="0") or "0"
if int(conversation_id) != int(post_id):
info["has_master"] = True
info["master_thread_id"] = conversation_id
@@ -267,7 +281,8 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
info["has_master"] = False
# parse comments
- threads = [x for x in data.get("thread", []) if int(x.get("conversation_id", "0")) == int(conversation_id) and int(x.get("id", "0")) != int(post_id)]
+ threads = glom(data, "thread", default=[]) or []
+ threads = [x for x in threads if int(x.get("conversation_id", "0")) == int(conversation_id) and int(x.get("id", "0")) != int(post_id)]
threads = sorted(threads, key=lambda x: x.get("id", {}))
comments = []
for node in threads:
@@ -285,7 +300,7 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
comments.append({"author": comment_author, "text": comment_text, "post_id": comment_post_id})
info["comments"] = comments
- info["quote_info"] = data.get("quoted", {})
+ info["quote_info"] = glom(data, "quoted", default={}) or {}
info["has_quote"] = bool(info["quote_info"])
return info