main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import json
4import re
5from datetime import datetime
6from pathlib import Path
7from typing import Literal
8from zoneinfo import ZoneInfo
9
10from glom import Coalesce, glom
11from glom import Path as GlomPath
12from loguru import logger
13from pyrogram.client import Client
14from pyrogram.types import Message
15
16from bridge.social import send_to_social_media_bridge
17from config import API, DB, DOWNLOAD_DIR, PROVIDER, PROXY, TOKEN, TZ
18from database.database import get_db
19from messages.database import copy_messages_from_db, save_messages
20from messages.progress import modify_progress
21from messages.sender import send2tg
22from messages.utils import blockquote, summay_media
23from networking import download_file, download_first_success_urls, download_media, hx_req
24from others.emoji import emojify
25from utils import rand_number, readable_count, true
26
27
28async def preview_douyin(
29 client: Client,
30 message: Message,
31 url: str = "",
32 db_key: str = "",
33 platform: str = "douyin",
34 douyin_provider: str = PROVIDER.DOUYIN,
35 douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS,
36 *,
37 show_author: bool = True,
38 show_pubdate: bool = True,
39 show_statistics: bool = True,
40 show_description: bool = True,
41 **kwargs,
42):
43 """Preview douyin or tiktok link in the message.
44
45 Args:
46 client (Client): The Pyrogram client.
47 message (Message): The trigger message object.
48 url (str, optional): The douyin or tiktok link.
49 db_key (str, optional): The cache key.
50 platform(str, optional): The platform name. Defaults to "douyin".
51 douyin_provider (str, optional): The douyin extractor: "direct", "free", "tikhub", "bridge", or combined strings.
52 douyin_comments_provider (str, optional): The douyin comments extractor: "free", "tikhub" or "free-tikhub".
53 """
54 if kwargs.get("show_progress") and "progress" not in kwargs:
55 res = await send2tg(client, message, texts=f"🔗正在解析抖音链接\n{url}", **kwargs)
56 kwargs["progress"] = res[0]
57 if kv := await get_db(db_key):
58 logger.debug(f"{platform} preview {DB.ENGINE} cache hit for key={db_key}")
59 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
60 return
61 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
62
63 proxy = PROXY.DOUYIN if platform == "douyin" else PROXY.TIKTOK
64 logger.info(f"{platform} link preview for {url}")
65 succ = False
66 data = {}
67 if "direct" in douyin_provider: # try direct
68 succ, data = await parse_via_direct(url, platform, proxy)
69 if not succ and "free" in douyin_provider: # try free api
70 succ, data = await parse_via_tikhub(url, platform, proxy, provider="free")
71 if not succ and "tikhub" in douyin_provider: # try tikhub
72 succ, data = await parse_via_tikhub(url, platform, proxy, provider="tikhub")
73 if not succ and "bridge" in douyin_provider: # try bridge
74 logger.error("❌抖音解析失败, 尝试第三方Bot...")
75 kwargs |= {"target_mid": message.id}
76 await send_to_social_media_bridge(client, message, url, platform, **kwargs)
77 return
78 if not succ:
79 await modify_progress(text="❌抖音解析失败", force_update=True, **kwargs)
80 return
81 texts = ""
82 if true(show_author) and data.get("author"):
83 texts += f"\n🎶**[{data['author']}]({url})**"
84 if true(show_pubdate) and data.get("create_time"):
85 dt = datetime.fromtimestamp(data["create_time"]).astimezone(ZoneInfo(TZ))
86 texts += f"\n🕒{dt:%Y-%m-%d %H:%M:%S}"
87 if true(show_statistics) and data.get("statistics"):
88 texts += f"\n{data['statistics']}"
89 if true(show_description) and data.get("desc"):
90 texts += f"\n{data['desc']}"
91
92 comments = await get_comments(data["aweme_id"], platform, douyin_comments_provider)
93 sent_messages = await send2tg(client, message, texts=emojify(texts) + comments, media=data.get("media", []), **kwargs)
94 await modify_progress(del_status=True, **kwargs)
95 await save_messages(messages=sent_messages, key=db_key)
96
97
98async def parse_via_direct(url: str = "", platform: str = "douyin", proxy: str | None = None, **kwargs) -> tuple[bool, dict]:
99 """Get douyin info from direct response.
100
101 Returns:
102 tuple[bool, dict]: True for success, else False. Info as the second item.
103
104 Info:
105 {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
106 """
107 # !TODO: the video_url returned by tiktok can't be directly downloaded for now
108 if platform == "tiktok":
109 return False, {}
110 try:
111 logger.trace(f"{platform} API [direct] for: {url}")
112 video_id = Path(url).stem
113 api_url = f"https://www.iesdouyin.com/share/video/{video_id}" if platform == "douyin" else url
114 resp = await hx_req(api_url, mobile=True, rformat="content", proxy=proxy, max_retry=0, timeout=3)
115 pattern = r"window\._ROUTER_DATA\s*=\s*(.*?)</script>" if platform == "douyin" else r'"__UNIVERSAL_DATA_FOR_REHYDRATION__"\s*type="application/json">(.*?)</script>'
116 if matched := re.search(pattern, resp["content"].decode(), flags=re.DOTALL):
117 data = json.loads(matched.group(1).strip())
118 info = glom(
119 data,
120 Coalesce(
121 "loaderData.video_(id)/page.videoInfoRes.item_list.0", # douyin video
122 "loaderData.note_(id)/page.videoInfoRes.item_list.0", # douyin image post
123 GlomPath("__DEFAULT_SCOPE__", "webapp.reflow.video.detail", "itemInfo", "itemStruct"), # tiktok video
124 ),
125 default={},
126 )
127 if int(info.get("aweme_type", 4)) != 4: # image post
128 media = [{"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)} for x in info.get("images", [])]
129 elif platform == "douyin" and (video_url := glom(info, "video.play_addr.url_list.0", default="").replace("playwm", "play")): # noqa: SIM114
130 media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
131 elif platform == "tiktok" and (video_url := glom(info, "video.playAddr", default="")):
132 media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
133 else:
134 return False, {}
135 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
136 media = await download_media(media, **kwargs)
137 if not media:
138 logger.warning(f"{platform} API [direct] media download failed")
139 return False, {}
140 statistics = ""
141 if like := glom(info, "statistics.digg_count", default=0):
142 statistics += f"❤️{readable_count(like)}"
143 if comment := glom(info, "statistics.comment_count", default=0):
144 statistics += f"💬{readable_count(comment)}"
145 if favorite := glom(info, "statistics.collect_count", default=0):
146 statistics += f"⭐️{readable_count(favorite)}"
147 if share := glom(info, "statistics.share_count", default=0):
148 statistics += f"↗️{readable_count(share)}"
149
150 return True, {
151 "aweme_id": info.get("aweme_id", video_id),
152 "media": media,
153 "author": glom(info, "author.nickname", default=""),
154 "create_time": info.get("create_time"),
155 "desc": info.get("desc"),
156 "statistics": statistics,
157 }
158 logger.warning(f"{platform} API [direct] matched nothing")
159 except Exception:
160 logger.warning(f"{platform} API [direct] failed")
161 return False, {}
162
163
164async def parse_via_tikhub(url: str = "", platform: str = "douyin", proxy: str | None = None, provider: Literal["free", "tikhub"] = "free", **kwargs) -> tuple[bool, dict]:
165 """Get douyin info from tikhub API.
166
167 Returns:
168 tuple[bool, dict]: True for success, else False. Info as the second item.
169
170 Info:
171 {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
172 """
173 try:
174 logger.trace(f"{platform} API [{provider}] for: {url}")
175 api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}" if provider == "free" else f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
176 headers = {"accept": "application/json"}
177 if provider == "tikhub":
178 headers |= {"authorization": f"Bearer {TOKEN.TIKHUB}"}
179 retry = 0 if provider == "free" else 2
180 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=retry, timeout=5)
181 info = resp["data"]
182
183 if info.get("media_type", 4) != 4: # image post
184 # may have livephotos
185 media = []
186 for x in info.get("images", []):
187 if x.get("live_photo_type"):
188 video_urls = []
189 for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
190 video_urls.extend(glom(x, f"video.{key}.url_list", default=[]))
191 media.append({"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)})
192 else:
193 media.append({"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)})
194 else: # video post
195 video_urls = []
196 for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
197 video_urls.extend(glom(info, f"video.{key}.url_list", default=[]))
198 media = [{"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
199 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
200 media = await download_media(media, **kwargs)
201 if not media:
202 logger.warning(f"{platform} API [{provider}] media download failed")
203 return False, {}
204 statistics = ""
205 if like := glom(info, "statistics.digg_count", default=0):
206 statistics += f"❤️{readable_count(like)}"
207 if comment := glom(info, "statistics.comment_count", default=0):
208 statistics += f"💬{readable_count(comment)}"
209 if favorite := glom(info, "statistics.collect_count", default=0):
210 statistics += f"⭐️{readable_count(favorite)}"
211 if share := glom(info, "statistics.share_count", default=0):
212 statistics += f"↗️{readable_count(share)}"
213
214 return True, {
215 "aweme_id": info.get("aweme_id", Path(url).stem),
216 "media": media,
217 "author": glom(info, "author.nickname", default=""),
218 "create_time": info.get("create_time"),
219 "desc": info.get("desc"),
220 "statistics": statistics,
221 }
222 except Exception:
223 logger.warning(f"{platform} API [{provider}] failed")
224
225 return False, {}
226
227
228def prefer_jpg_urls(url_list: list[str] | None = None) -> list[str]:
229 """Filter url_list to prefer jpg format."""
230 if not url_list:
231 return []
232 urls = []
233 for url in url_list:
234 if ".jpg" in url or ".jpeg" in url:
235 urls.insert(0, url)
236 else:
237 urls.append(url)
238 return urls
239
240
241async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS) -> str:
242 """Fetch douyin or tiktok comments.
243
244 Args:
245 aweme_id (str, optional): post id.
246 platform (str, optional): douyin or tiktok. Defaults to "douyin".
247 douyin_comments_provider (str, optional): The douyin comments extractor: "free" or "tikhub".
248
249 Returns:
250 str: comments string.
251 """
252 if not true(douyin_comments_provider):
253 return ""
254
255 api_urls = {
256 "douyin_tikhub": f"{API.TIKHUB}/api/v1/douyin/app/v3/fetch_video_comments?aweme_id={aweme_id}",
257 "douyin_free": f"{API.TIKHUB_FREE}/api/douyin/web/fetch_video_comments?aweme_id={aweme_id}",
258 "tiktok_tikhub": f"{API.TIKHUB}/api/v1/tiktok/app/v3/fetch_video_comments?aweme_id={aweme_id}",
259 "tiktok_free": f"{API.TIKHUB_FREE}/api/tiktok/web/fetch_post_comment?aweme_id={aweme_id}",
260 }
261 succ = False
262 data = []
263 if "free" in douyin_comments_provider: # try free first
264 api_url = api_urls.get(f"{platform}_free")
265 headers = {"accept": "application/json"}
266 try:
267 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
268 data = resp["data"].get("comments", [])
269 succ = True
270 except Exception:
271 logger.warning(f"{platform} comments API [free] failed")
272 if not succ and "tikhub" in douyin_comments_provider: # try tikhub
273 api_url = api_urls.get(f"{platform}_tikhub")
274 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
275 try:
276 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
277 data = resp["data"].get("comments", [])
278 except Exception:
279 logger.warning(f"{platform} comments API [tikhub] failed")
280 return ""
281 comments = []
282 try:
283 for node in data:
284 name = glom(node, "user.nickname", default="")
285 region = f"({node['ip_label']})" if node.get("ip_label") else ""
286 text = node.get("text", "")
287 if uid := glom(node, "user.sec_uid", default=""):
288 name = f"[{name}](https://www.{platform}.com/user/{uid})"
289 if name and text:
290 comments.append({"name": name, "text": emojify(text.strip()), "region": region})
291 except Exception as e:
292 logger.error(e)
293 return ""
294
295 comments_str = ""
296 for idx, cmt in enumerate(comments):
297 if idx == 0:
298 comments_str += f"\n{blockquote('💬**点此展开评论区**:')}"
299 cmt_str = f"💬**{cmt['name']}**{cmt['region']}: {cmt['text']}"
300 comments_str += f"\n{blockquote(cmt_str)}"
301 return comments_str