main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import json
4import re
5from datetime import datetime
6from pathlib import Path
7from typing import Literal
8from zoneinfo import ZoneInfo
9
10from glom import Coalesce, glom
11from glom import Path as GlomPath
12from loguru import logger
13from pyrogram.client import Client
14from pyrogram.types import Message
15
16from ai.utils import trim_none
17from bridge.social import send_to_social_media_bridge
18from config import AI, API, DOWNLOAD_DIR, PROVIDER, PROXY, TOKEN, TZ
19from database.r2 import get_cf_r2
20from messages.database import copy_messages_from_db, save_messages
21from messages.progress import modify_progress
22from messages.sender import send2tg
23from messages.utils import blockquote, summay_media
24from networking import download_file, download_first_success_urls, download_media, hx_req
25from others.emoji import emojify
26from preview.utils import add_summary_url
27from summarize.summarize import summarize
28from utils import nowstr, rand_number, readable_count, true
29
30
31async def preview_douyin(
32 client: Client,
33 message: Message,
34 url: str = "",
35 db_key: str = "",
36 platform: str = "douyin",
37 douyin_provider: str = PROVIDER.DOUYIN,
38 douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS,
39 *,
40 summary_douyin: bool = False,
41 summary_douyin_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
42 show_author: bool = True,
43 show_pubdate: bool = True,
44 show_statistics: bool = True,
45 show_description: bool = True,
46 **kwargs,
47):
48 """Preview douyin or tiktok link in the message.
49
50 Args:
51 client (Client): The Pyrogram client.
52 message (Message): The trigger message object.
53 url (str, optional): The douyin or tiktok link.
54 db_key (str, optional): The cache key.
55 platform(str, optional): The platform name. Defaults to "douyin".
56 douyin_provider (str, optional): The douyin extractor: "direct", "free", "tikhub", "bridge", or combined strings.
57 douyin_comments_provider (str, optional): The douyin comments extractor: "free", "tikhub" or "free-tikhub".
58 """
59 if kv := await get_cf_r2(db_key):
60 logger.debug(f"{platform} preview cache hit for key={db_key}")
61 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
62 return
63 logger.warning("❌从缓存中转发失败, 尝试重新解析...")
64 if kwargs.get("show_progress") and "progress" not in kwargs:
65 res = await send2tg(client, message, texts=f"🔗正在解析抖音链接\n{url}", **kwargs)
66 kwargs["progress"] = res[0]
67
68 proxy = PROXY.DOUYIN if platform == "douyin" else PROXY.TIKTOK
69 logger.info(f"{platform} link preview for {url}")
70 succ = False
71 data = {}
72 if "direct" in douyin_provider: # try direct
73 succ, data = await parse_via_direct(url, platform, proxy)
74 if not succ and "free" in douyin_provider: # try free api
75 succ, data = await parse_via_tikhub(url, platform, proxy, provider="free")
76 if not succ and "tikhub" in douyin_provider: # try tikhub
77 succ, data = await parse_via_tikhub(url, platform, proxy, provider="tikhub")
78 if not succ and "bridge" in douyin_provider: # try bridge
79 logger.error("❌抖音解析失败, 尝试第三方Bot...")
80 kwargs |= {"target_mid": message.id}
81 await send_to_social_media_bridge(client, message, url, platform, **kwargs)
82 return
83 if not succ:
84 await modify_progress(text="❌抖音解析失败", force_update=True, **kwargs)
85 return
86 texts = ""
87 if true(show_author) and data.get("author"):
88 texts += f"\n🎶**[{data['author']}]({url})**"
89 if true(show_pubdate) and data.get("create_time"):
90 dt = datetime.fromtimestamp(data["create_time"]).astimezone(ZoneInfo(TZ))
91 texts += f"\n🕒{dt:%Y-%m-%d %H:%M:%S}"
92 if true(show_statistics) and data.get("statistics"):
93 texts += f"\n{data['statistics']}"
94 if true(show_description) and data.get("desc"):
95 texts += f"\n{data['desc']}"
96
97 comments = await get_comments(data["aweme_id"], platform, douyin_comments_provider)
98 sent_messages = await send2tg(client, message, texts=emojify(texts) + comments, media=data.get("media", []), keep_file=True, **kwargs)
99 await modify_progress(del_status=True, **kwargs)
100 # Summary douyin
101 # find the first message that has a caption
102 caption_msg = None
103 index = -1
104 for idx, m in enumerate(sent_messages):
105 if isinstance(m, Message) and (m.caption or m.text):
106 caption_msg = m
107 index = idx
108 break
109 if summary_douyin and caption_msg:
110 edited_msg = await summarize_douyin(caption_msg, data, data.get("media", []), summary_douyin_model, url)
111 sent_messages[index] = edited_msg
112 await save_messages(messages=sent_messages, key=db_key)
113 # Clean up
114 [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in data.get("media", [])]
115
116
117async def parse_via_direct(url: str = "", platform: str = "douyin", proxy: str | None = None, **kwargs) -> tuple[bool, dict]:
118 """Get douyin info from direct response.
119
120 Returns:
121 tuple[bool, dict]: True for success, else False. Info as the second item.
122
123 Info:
124 {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
125 """
126 # !TODO: the video_url returned by tiktok can't be directly downloaded for now
127 if platform == "tiktok":
128 return False, {}
129 try:
130 logger.trace(f"{platform} API [direct] for: {url}")
131 video_id = Path(url).stem
132 api_url = f"https://www.iesdouyin.com/share/video/{video_id}" if platform == "douyin" else url
133 resp = await hx_req(api_url, mobile=True, rformat="content", proxy=proxy, max_retry=0, timeout=3)
134 pattern = r"window\._ROUTER_DATA\s*=\s*(.*?)</script>" if platform == "douyin" else r'"__UNIVERSAL_DATA_FOR_REHYDRATION__"\s*type="application/json">(.*?)</script>'
135 if matched := re.search(pattern, resp["content"].decode(), flags=re.DOTALL):
136 data = json.loads(matched.group(1).strip())
137 info = glom(
138 data,
139 Coalesce(
140 "loaderData.video_(id)/page.videoInfoRes.item_list.0", # douyin video
141 "loaderData.note_(id)/page.videoInfoRes.item_list.0", # douyin image post
142 GlomPath("__DEFAULT_SCOPE__", "webapp.reflow.video.detail", "itemInfo", "itemStruct"), # tiktok video
143 ),
144 default={},
145 )
146 if int(info.get("aweme_type", 4)) != 4: # image post
147 media = [{"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)} for x in info.get("images", [])]
148 elif platform == "douyin" and (video_url := glom(info, "video.play_addr.url_list.0", default="").replace("playwm", "play")): # noqa: SIM114
149 media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
150 elif platform == "tiktok" and (video_url := glom(info, "video.playAddr", default="")):
151 media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
152 else:
153 return False, {}
154 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
155 media = await download_media(media, **kwargs)
156 if not media:
157 logger.warning(f"{platform} API [direct] media download failed")
158 return False, {}
159 statistics = ""
160 if like := glom(info, "statistics.digg_count", default=0):
161 statistics += f"❤️{readable_count(like)}"
162 if comment := glom(info, "statistics.comment_count", default=0):
163 statistics += f"💬{readable_count(comment)}"
164 if favorite := glom(info, "statistics.collect_count", default=0):
165 statistics += f"⭐️{readable_count(favorite)}"
166 if share := glom(info, "statistics.share_count", default=0):
167 statistics += f"↗️{readable_count(share)}"
168
169 return True, {
170 "aweme_id": info.get("aweme_id", video_id),
171 "media": media,
172 "author": glom(info, "author.nickname", default=""),
173 "create_time": info.get("create_time"),
174 "desc": info.get("desc"),
175 "statistics": statistics,
176 }
177 logger.warning(f"{platform} API [direct] matched nothing")
178 except Exception:
179 logger.warning(f"{platform} API [direct] failed")
180 return False, {}
181
182
183async def parse_via_tikhub(url: str = "", platform: str = "douyin", proxy: str | None = None, provider: Literal["free", "tikhub"] = "free", **kwargs) -> tuple[bool, dict]:
184 """Get douyin info from tikhub API.
185
186 Returns:
187 tuple[bool, dict]: True for success, else False. Info as the second item.
188
189 Info:
190 {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
191 """
192 try:
193 logger.trace(f"{platform} API [{provider}] for: {url}")
194 api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}" if provider == "free" else f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
195 headers = {"accept": "application/json"}
196 if provider == "tikhub":
197 headers |= {"authorization": f"Bearer {TOKEN.TIKHUB}"}
198 retry = 0 if provider == "free" else 2
199 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=retry, timeout=5)
200 info = resp["data"]
201
202 if info.get("media_type", 4) != 4: # image post
203 # may have livephotos
204 media = []
205 for x in info.get("images", []):
206 if x.get("live_photo_type"):
207 video_urls = []
208 for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
209 video_urls.extend(glom(x, f"video.{key}.url_list", default=[]))
210 media.append({"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)})
211 else:
212 media.append({"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)})
213 else: # video post
214 video_urls = []
215 for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
216 video_urls.extend(glom(info, f"video.{key}.url_list", default=[]))
217 media = [{"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
218 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
219 media = await download_media(media, **kwargs)
220 if not media:
221 logger.warning(f"{platform} API [{provider}] media download failed")
222 return False, {}
223 statistics = ""
224 if like := glom(info, "statistics.digg_count", default=0):
225 statistics += f"❤️{readable_count(like)}"
226 if comment := glom(info, "statistics.comment_count", default=0):
227 statistics += f"💬{readable_count(comment)}"
228 if favorite := glom(info, "statistics.collect_count", default=0):
229 statistics += f"⭐️{readable_count(favorite)}"
230 if share := glom(info, "statistics.share_count", default=0):
231 statistics += f"↗️{readable_count(share)}"
232
233 return True, {
234 "aweme_id": info.get("aweme_id", Path(url).stem),
235 "media": media,
236 "author": glom(info, "author.nickname", default=""),
237 "create_time": info.get("create_time"),
238 "desc": info.get("desc"),
239 "statistics": statistics,
240 }
241 except Exception:
242 logger.warning(f"{platform} API [{provider}] failed")
243
244 return False, {}
245
246
247def prefer_jpg_urls(url_list: list[str] | None = None) -> list[str]:
248 """Filter url_list to prefer jpg format."""
249 if not url_list:
250 return []
251 urls = []
252 for url in url_list:
253 if ".jpg" in url or ".jpeg" in url:
254 urls.insert(0, url)
255 else:
256 urls.append(url)
257 return urls
258
259
260async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS) -> str:
261 """Fetch douyin or tiktok comments.
262
263 Args:
264 aweme_id (str, optional): post id.
265 platform (str, optional): douyin or tiktok. Defaults to "douyin".
266 douyin_comments_provider (str, optional): The douyin comments extractor: "free" or "tikhub".
267
268 Returns:
269 str: comments string.
270 """
271 if not true(douyin_comments_provider):
272 return ""
273
274 api_urls = {
275 "douyin_tikhub": f"{API.TIKHUB}/api/v1/douyin/app/v3/fetch_video_comments?aweme_id={aweme_id}",
276 "douyin_free": f"{API.TIKHUB_FREE}/api/douyin/web/fetch_video_comments?aweme_id={aweme_id}",
277 "tiktok_tikhub": f"{API.TIKHUB}/api/v1/tiktok/app/v3/fetch_video_comments?aweme_id={aweme_id}",
278 "tiktok_free": f"{API.TIKHUB_FREE}/api/tiktok/web/fetch_post_comment?aweme_id={aweme_id}",
279 }
280 succ = False
281 data = []
282 if "free" in douyin_comments_provider: # try free first
283 api_url = api_urls.get(f"{platform}_free")
284 headers = {"accept": "application/json"}
285 try:
286 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
287 data = resp["data"].get("comments", [])
288 succ = True
289 except Exception:
290 logger.warning(f"{platform} comments API [free] failed")
291 if not succ and "tikhub" in douyin_comments_provider: # try tikhub
292 api_url = api_urls.get(f"{platform}_tikhub")
293 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
294 try:
295 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
296 data = resp["data"].get("comments", [])
297 except Exception:
298 logger.warning(f"{platform} comments API [tikhub] failed")
299 return ""
300 comments = []
301 try:
302 for node in data:
303 name = glom(node, "user.nickname", default="")
304 region = f"({node['ip_label']})" if node.get("ip_label") else ""
305 text = node.get("text", "")
306 if uid := glom(node, "user.sec_uid", default=""):
307 name = f"[{name}](https://www.{platform}.com/user/{uid})"
308 if name and text:
309 comments.append({"name": name, "text": emojify(text.strip()), "region": region})
310 except Exception as e:
311 logger.error(e)
312 return ""
313
314 comments_str = ""
315 for idx, cmt in enumerate(comments):
316 if idx == 0:
317 comments_str += "💬**点击展开评论**:"
318 comments_str += f"\n💬**{cmt['name']}**{cmt['region']}: {cmt['text']}"
319 return blockquote(comments_str)
320
321
322async def summarize_douyin(message: Message, douyin: dict, media_list: list[dict], model: str, url: str) -> Message:
323 """Generate source for AI summary."""
324 data = {
325 "platform": "Tiktok" if "tiktok.com" in url else "抖音",
326 "author_name": douyin.get("author"),
327 "url": url,
328 "description": douyin.get("desc"),
329 }
330 if douyin.get("create_time"):
331 dt = datetime.fromtimestamp(douyin["create_time"]).astimezone(ZoneInfo(TZ))
332 data["created_at"] = f"{dt:%Y-%m-%d %H:%M:%S}"
333 data = trim_none(data)
334
335 sources = []
336 min_text_length = 1000 # skip short tweets
337 min_video_duration = None
338 for media in media_list:
339 if media.get("photo"):
340 sources.append({"type": "image", "path": media["photo"]})
341 if media.get("video"):
342 min_text_length = None # always summarize video
343 min_video_duration = 120
344 sources.append({"type": "video", "path": media["video"]})
345 sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
346 author_name = douyin.get("author", "Anonymous")
347 pid = douyin["aweme_id"]
348 summary = await summarize(
349 sources=sources,
350 model=model,
351 title=f"🎶{author_name} - {pid}",
352 author=author_name,
353 url=url,
354 date=data.get("created_at") or nowstr(TZ),
355 min_text_length=min_text_length,
356 min_video_duration=min_video_duration,
357 max_video_duration=3600, # skip long videos more than 1 hour
358 )
359 telegraph_url = summary.get("telegraph_url")
360 if not telegraph_url:
361 return message
362 return await add_summary_url(telegraph_url, message) or message