Commit 7ba1f2a
Changed files (3)
src
src/preview/douyin.py
@@ -1,10 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import json
+import re
from datetime import datetime
from pathlib import Path
+from typing import Literal
from zoneinfo import ZoneInfo
-from glom import glom
+from glom import Coalesce, glom
+from glom import Path as GlomPath
from loguru import logger
from pyrogram.client import Client
from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
@@ -17,7 +21,7 @@ from messages.database import copy_messages_from_db, save_messages
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import summay_media
-from networking import download_first_success_urls, download_media, hx_req
+from networking import download_file, download_first_success_urls, download_media, hx_req
from others.emoji import emojify
from utils import true
@@ -40,7 +44,7 @@ async def preview_douyin(
url (str, optional): The douyin or tiktok link.
db_key (str, optional): The cache key.
platform(str, optional): The platform name. Defaults to "douyin".
- douyin_provider (str, optional): The douyin extractor: "free", "tikhub", "bridge", or combined strings.
+ douyin_provider (str, optional): The douyin extractor: "direct", "free", "tikhub", "bridge", or combined strings.
douyin_comments_provider (str, optional): The douyin comments extractor: "free", "tikhub" or "free-tikhub".
"""
if kwargs.get("show_progress") and "progress" not in kwargs:
@@ -56,44 +60,18 @@ async def preview_douyin(
logger.info(f"{platform} link preview for {url}")
succ = False
data = {}
- if "free" in douyin_provider: # try free first
- api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}"
- headers = {"accept": "application/json"}
- try:
- resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
- data = resp["data"]
- succ = True
- except Exception:
- logger.warning(f"{platform} API [free] failed")
+ if "direct" in douyin_provider: # try direct
+ succ, data = await parse_via_direct(url, platform, proxy)
+ if not succ and "free" in douyin_provider: # try free api
+ succ, data = await parse_via_tikhub(url, platform, proxy, provider="free")
if not succ and "tikhub" in douyin_provider: # try tikhub
- api_url = f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
- headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- try:
- resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200})
- data = resp["data"]
- succ = True
- except Exception:
- logger.warning(f"{platform} API [tikhub] failed")
+ succ, data = await parse_via_tikhub(url, platform, proxy, provider="tikhub")
if not succ and "bridge" in douyin_provider: # try bridge
logger.error("❌抖音解析失败, 尝试第三方Bot...")
await send_to_social_media_bridge(client, message, url, platform, **kwargs)
return
- aweme_id = glom(data, "aweme_id", default=Path(url).stem)
- if int(glom(data, "media_type", default=4)) == 2: # image post
- media = [{"photo": download_first_success_urls(glom(x, "url_list", default=[]), proxy=proxy, **kwargs)} for x in glom(data, "images", default=[])]
- else: # video post
- video_urls = []
- for key in ["play_addr_h264", "play_addr_265", "play_addr"]:
- video_urls.extend(glom(data, f"video.{key}.url_list", default=[]))
- media = [{"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=proxy, stream=True, **kwargs)}]
- await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
- media = await download_media(media, **kwargs)
- if not media:
- logger.error("❌抖音媒体下载失败, 尝试第三方Bot...")
- await send_to_social_media_bridge(client, message, url, platform, **kwargs)
- return
texts = ""
- if author := glom(data, "author.nickname", default=""):
+ if author := data.get("author"):
texts += f"\n🎶**[{author}]({url})**"
if ts := data.get("create_time"):
dt = datetime.fromtimestamp(ts).astimezone(ZoneInfo(TZ))
@@ -102,15 +80,131 @@ async def preview_douyin(
texts += f"\n{decs}"
comments = []
- if comments_list := await get_comments(aweme_id, platform, douyin_comments_provider):
+ if comments_list := await get_comments(data["aweme_id"], platform, douyin_comments_provider):
comments.append(f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}💬**点此展开评论区**:")
comments.extend(f"\n💬**{cmt['name']}**{cmt['region']}: {cmt['text']}" for cmt in comments_list)
- sent_messages = await send2tg(client, message, texts=emojify(texts), media=media, comments=comments, **kwargs)
+ sent_messages = await send2tg(client, message, texts=emojify(texts), media=data.get("media", []), comments=comments, **kwargs)
await modify_progress(del_status=True, **kwargs)
await save_messages(messages=sent_messages, key=db_key)
+async def parse_via_direct(url: str = "", platform: str = "douyin", proxy: str | None = None, **kwargs) -> tuple[bool, dict]:
+ """Get douyin info from direct response.
+
+ Returns:
+ tuple[bool, dict]: True for success, else False. Info as the second item.
+
+ Info:
+ {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
+ """
+ # !TODO: the video_url returned by tiktok can't be directly downloaded for now
+ if platform == "tiktok":
+ return False, {}
+ try:
+ video_id = Path(url).stem
+ api_url = f"https://www.iesdouyin.com/share/video/{video_id}" if platform == "douyin" else url
+ resp = await hx_req(api_url, mobile=True, rformat="content", proxy=proxy, max_retry=0, timeout=3)
+ pattern = r"window\._ROUTER_DATA\s*=\s*(.*?)</script>" if platform == "douyin" else r'"__UNIVERSAL_DATA_FOR_REHYDRATION__"\s*type="application/json">(.*?)</script>'
+ if matched := re.search(pattern, resp["content"].decode(), flags=re.DOTALL):
+ data = json.loads(matched.group(1).strip())
+ info = glom(
+ data,
+ Coalesce(
+ "loaderData.video_(id)/page.videoInfoRes.item_list.0", # douyin video
+ "loaderData.note_(id)/page.videoInfoRes.item_list.0", # douyin image post
+ GlomPath("__DEFAULT_SCOPE__", "webapp.reflow.video.detail", "itemInfo", "itemStruct"), # tiktok video
+ ),
+ default={},
+ )
+ if int(info.get("aweme_type", 4)) != 4: # image post
+ media = [{"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)} for x in info.get("images", [])]
+ elif platform == "douyin" and (video_url := glom(info, "video.play_addr.url_list.0", default="").replace("playwm", "play")): # noqa: SIM114
+ media = [{"video": download_file(video_url, suffix=".mp4", proxy=proxy, stream=True)}]
+ elif platform == "tiktok" and (video_url := glom(info, "video.playAddr", default="")):
+ media = [{"video": download_file(video_url, suffix=".mp4", proxy=proxy, stream=True)}]
+ else:
+ return False, {}
+ await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
+ media = await download_media(media, **kwargs)
+ if not media:
+ return False, {}
+ return True, {
+ "aweme_id": info.get("aweme_id", video_id),
+ "media": media,
+ "author": glom(info, "author.nickname", default=""),
+ "create_time": info.get("create_time"),
+ "desc": info.get("desc"),
+ }
+ except Exception:
+ logger.warning(f"{platform} API [direct] failed")
+ return False, {}
+
+
+async def parse_via_tikhub(url: str = "", platform: str = "douyin", proxy: str | None = None, provider: Literal["free", "tikhub"] = "free", **kwargs) -> tuple[bool, dict]:
+ """Get douyin info from tikhub API.
+
+ Returns:
+ tuple[bool, dict]: True for success, else False. Info as the second item.
+
+ Info:
+ {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
+ """
+ try:
+ api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}" if provider == "free" else f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
+ headers = {"accept": "application/json"}
+ if provider == "tikhub":
+ headers |= {"authorization": f"Bearer {TOKEN.TIKHUB}"}
+ retry = 0 if provider == "free" else 2
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=retry, timeout=5)
+ info = resp["data"]
+
+ if info.get("media_type", 4) != 4: # image post
+ # may have livephotos
+ media = []
+ for x in info.get("images", []):
+ if x.get("live_photo_type"):
+ video_urls = []
+ for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
+ video_urls.extend(glom(x, f"video.{key}.url_list", default=[]))
+ media.append({"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=proxy, stream=True)})
+ else:
+ media.append({"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)})
+ else: # video post
+ video_urls = []
+ for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
+ video_urls.extend(glom(info, f"video.{key}.url_list", default=[]))
+ media = [{"video": download_first_success_urls(video_urls, suffix=".mp4", proxy=proxy, stream=True)}]
+ await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
+ media = await download_media(media, **kwargs)
+ if not media:
+ return False, {}
+ return True, {
+ "aweme_id": info.get("aweme_id", Path(url).stem),
+ "media": media,
+ "author": glom(info, "author.nickname", default=""),
+ "create_time": info.get("create_time"),
+ "desc": info.get("desc"),
+ }
+ except Exception:
+ logger.warning(f"{platform} API [{provider}] failed")
+
+ return False, {}
+
+
+def prefer_jpg_urls(url_list: list[str] | None = None) -> list[str]:
+ """Filter url_list to prefer jpg format."""
+ if not url_list:
+ return []
+ urls = []
+ for url in url_list:
+ if ".jpg" in url or ".jpeg" in url:
+ urls.insert(0, url)
+ else:
+ urls.append(url)
+ return urls
+
+
async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS) -> list[dict]:
"""Fetch douyin or tiktok comments.
src/config.py
@@ -135,7 +135,7 @@ class DANMU:
class PROVIDER: # default API provider
- DOUYIN = os.getenv("DOUYIN_PROVIDER", "free-tikhub-bridge").lower() # free or tikhub
+ DOUYIN = os.getenv("DOUYIN_PROVIDER", "direct-free-tikhub-bridge").lower() # free or tikhub
DOUYIN_COMMENTS = os.getenv("DOUYIN_COMMENTS_PROVIDER", "free-tikhub").lower() # free or tikhub or a false value (0, false, none, null, etc.)
TWITTER = os.getenv("TWITTER_PROVIDER", "tikhub-vxtwitter-fxtwitter").lower()
TWITTER_COMMENTS = os.getenv("TWITTER_COMMENTS_PROVIDER", "tikhub").lower() # tikhub or a false value (0, false, none, null, etc.)
src/networking.py
@@ -6,7 +6,7 @@ import contextlib
import json
import re
from pathlib import Path
-from typing import Any
+from typing import Any, Literal
from urllib.parse import parse_qs, urlparse
import anyio
@@ -52,7 +52,7 @@ async def hx_req(
max_retry: int = 2,
silent: bool = False,
mobile: bool = False,
- rformat: str = "json", # "json", "text", "content"
+ rformat: Literal["json", "text", "content"] = "json",
last_error: str = "",
) -> dict[str, Any]:
"""Request the given URL with the given method and return the response as a dictionary.
@@ -227,11 +227,12 @@ async def download_media(media: list[dict], **kwargs) -> list[dict]:
tasks.append(task)
# run all tasks
results = await asyncio.gather(*tasks, return_exceptions=True)
-
final_media = []
for item, result in zip(media, results, strict=True):
if isinstance(result, Exception):
logger.error(f"Failed to download: {result}")
+ elif isinstance(result, str) and not Path(result).is_file():
+ logger.error(f"Downloaded file is not exists: {result}")
else:
if item.get("photo"): # async function
item["photo"] = result