main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import json
  4import re
  5from datetime import datetime
  6from pathlib import Path
  7from typing import Literal
  8from zoneinfo import ZoneInfo
  9
 10from glom import Coalesce, glom
 11from glom import Path as GlomPath
 12from loguru import logger
 13from pyrogram.client import Client
 14from pyrogram.types import Message
 15
 16from ai.utils import trim_none
 17from bridge.social import send_to_social_media_bridge
 18from config import AI, API, DOWNLOAD_DIR, PROVIDER, PROXY, TOKEN, TZ
 19from database.r2 import get_cf_r2
 20from messages.database import copy_messages_from_db, save_messages
 21from messages.progress import modify_progress
 22from messages.sender import send2tg
 23from messages.utils import blockquote, summay_media
 24from networking import download_file, download_first_success_urls, download_media, hx_req
 25from others.emoji import emojify
 26from preview.utils import add_summary_url
 27from summarize.summarize import summarize
 28from utils import nowstr, rand_number, readable_count, true
 29
 30
 31async def preview_douyin(
 32    client: Client,
 33    message: Message,
 34    url: str = "",
 35    db_key: str = "",
 36    platform: str = "douyin",
 37    douyin_provider: str = PROVIDER.DOUYIN,
 38    douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS,
 39    *,
 40    summary_douyin: bool = False,
 41    summary_douyin_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
 42    show_author: bool = True,
 43    show_pubdate: bool = True,
 44    show_statistics: bool = True,
 45    show_description: bool = True,
 46    **kwargs,
 47):
 48    """Preview douyin or tiktok link in the message.
 49
 50    Args:
 51        client (Client): The Pyrogram client.
 52        message (Message): The trigger message object.
 53        url (str, optional): The douyin or tiktok link.
 54        db_key (str, optional): The cache key.
 55        platform(str, optional): The platform name. Defaults to "douyin".
 56        douyin_provider (str, optional): The douyin extractor: "direct", "free", "tikhub", "bridge", or combined strings.
 57        douyin_comments_provider (str, optional): The douyin comments extractor: "free", "tikhub" or "free-tikhub".
 58    """
 59    if kv := await get_cf_r2(db_key):
 60        logger.debug(f"{platform} preview cache hit for key={db_key}")
 61        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 62            return
 63        logger.warning("❌从缓存中转发失败, 尝试重新解析...")
 64    if kwargs.get("show_progress") and "progress" not in kwargs:
 65        res = await send2tg(client, message, texts=f"🔗正在解析抖音链接\n{url}", **kwargs)
 66        kwargs["progress"] = res[0]
 67
 68    proxy = PROXY.DOUYIN if platform == "douyin" else PROXY.TIKTOK
 69    logger.info(f"{platform} link preview for {url}")
 70    succ = False
 71    data = {}
 72    if "direct" in douyin_provider:  # try direct
 73        succ, data = await parse_via_direct(url, platform, proxy)
 74    if not succ and "free" in douyin_provider:  # try free api
 75        succ, data = await parse_via_tikhub(url, platform, proxy, provider="free")
 76    if not succ and "tikhub" in douyin_provider:  # try tikhub
 77        succ, data = await parse_via_tikhub(url, platform, proxy, provider="tikhub")
 78    if not succ and "bridge" in douyin_provider:  # try bridge
 79        logger.error("❌抖音解析失败, 尝试第三方Bot...")
 80        kwargs |= {"target_mid": message.id}
 81        await send_to_social_media_bridge(client, message, url, platform, **kwargs)
 82        return
 83    if not succ:
 84        await modify_progress(text="❌抖音解析失败", force_update=True, **kwargs)
 85        return
 86    texts = ""
 87    if true(show_author) and data.get("author"):
 88        texts += f"\n🎶**[{data['author']}]({url})**"
 89    if true(show_pubdate) and data.get("create_time"):
 90        dt = datetime.fromtimestamp(data["create_time"]).astimezone(ZoneInfo(TZ))
 91        texts += f"\n🕒{dt:%Y-%m-%d %H:%M:%S}"
 92    if true(show_statistics) and data.get("statistics"):
 93        texts += f"\n{data['statistics']}"
 94    if true(show_description) and data.get("desc"):
 95        texts += f"\n{data['desc']}"
 96
 97    comments = await get_comments(data["aweme_id"], platform, douyin_comments_provider)
 98    sent_messages = await send2tg(client, message, texts=emojify(texts) + comments, media=data.get("media", []), keep_file=True, **kwargs)
 99    await modify_progress(del_status=True, **kwargs)
100    # Summary douyin
101    # find the first message that has a caption
102    caption_msg = None
103    index = -1
104    for idx, m in enumerate(sent_messages):
105        if isinstance(m, Message) and (m.caption or m.text):
106            caption_msg = m
107            index = idx
108            break
109    if summary_douyin and caption_msg:
110        edited_msg = await summarize_douyin(caption_msg, data, data.get("media", []), summary_douyin_model, url)
111        sent_messages[index] = edited_msg
112    await save_messages(messages=sent_messages, key=db_key)
113    # Clean up
114    [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in data.get("media", [])]
115
116
117async def parse_via_direct(url: str = "", platform: str = "douyin", proxy: str | None = None, **kwargs) -> tuple[bool, dict]:
118    """Get douyin info from direct response.
119
120    Returns:
121        tuple[bool, dict]: True for success, else False. Info as the second item.
122
123    Info:
124        {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
125    """
126    # !TODO: the video_url returned by tiktok can't be directly downloaded for now
127    if platform == "tiktok":
128        return False, {}
129    try:
130        logger.trace(f"{platform} API [direct] for: {url}")
131        video_id = Path(url).stem
132        api_url = f"https://www.iesdouyin.com/share/video/{video_id}" if platform == "douyin" else url
133        resp = await hx_req(api_url, mobile=True, rformat="content", proxy=proxy, max_retry=0, timeout=3)
134        pattern = r"window\._ROUTER_DATA\s*=\s*(.*?)</script>" if platform == "douyin" else r'"__UNIVERSAL_DATA_FOR_REHYDRATION__"\s*type="application/json">(.*?)</script>'
135        if matched := re.search(pattern, resp["content"].decode(), flags=re.DOTALL):
136            data = json.loads(matched.group(1).strip())
137            info = glom(
138                data,
139                Coalesce(
140                    "loaderData.video_(id)/page.videoInfoRes.item_list.0",  # douyin video
141                    "loaderData.note_(id)/page.videoInfoRes.item_list.0",  # douyin image post
142                    GlomPath("__DEFAULT_SCOPE__", "webapp.reflow.video.detail", "itemInfo", "itemStruct"),  # tiktok video
143                ),
144                default={},
145            )
146            if int(info.get("aweme_type", 4)) != 4:  # image post
147                media = [{"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)} for x in info.get("images", [])]
148            elif platform == "douyin" and (video_url := glom(info, "video.play_addr.url_list.0", default="").replace("playwm", "play")):  # noqa: SIM114
149                media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
150            elif platform == "tiktok" and (video_url := glom(info, "video.playAddr", default="")):
151                media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
152            else:
153                return False, {}
154            await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
155            media = await download_media(media, **kwargs)
156            if not media:
157                logger.warning(f"{platform} API [direct] media download failed")
158                return False, {}
159            statistics = ""
160            if like := glom(info, "statistics.digg_count", default=0):
161                statistics += f"❤️{readable_count(like)}"
162            if comment := glom(info, "statistics.comment_count", default=0):
163                statistics += f"💬{readable_count(comment)}"
164            if favorite := glom(info, "statistics.collect_count", default=0):
165                statistics += f"⭐️{readable_count(favorite)}"
166            if share := glom(info, "statistics.share_count", default=0):
167                statistics += f"↗️{readable_count(share)}"
168
169            return True, {
170                "aweme_id": info.get("aweme_id", video_id),
171                "media": media,
172                "author": glom(info, "author.nickname", default=""),
173                "create_time": info.get("create_time"),
174                "desc": info.get("desc"),
175                "statistics": statistics,
176            }
177        logger.warning(f"{platform} API [direct] matched nothing")
178    except Exception:
179        logger.warning(f"{platform} API [direct] failed")
180    return False, {}
181
182
183async def parse_via_tikhub(url: str = "", platform: str = "douyin", proxy: str | None = None, provider: Literal["free", "tikhub"] = "free", **kwargs) -> tuple[bool, dict]:
184    """Get douyin info from tikhub API.
185
186    Returns:
187        tuple[bool, dict]: True for success, else False. Info as the second item.
188
189    Info:
190        {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
191    """
192    try:
193        logger.trace(f"{platform} API [{provider}] for: {url}")
194        api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}" if provider == "free" else f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
195        headers = {"accept": "application/json"}
196        if provider == "tikhub":
197            headers |= {"authorization": f"Bearer {TOKEN.TIKHUB}"}
198        retry = 0 if provider == "free" else 2
199        resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=retry, timeout=5)
200        info = resp["data"]
201
202        if info.get("media_type", 4) != 4:  # image post
203            # may have livephotos
204            media = []
205            for x in info.get("images", []):
206                if x.get("live_photo_type"):
207                    video_urls = []
208                    for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
209                        video_urls.extend(glom(x, f"video.{key}.url_list", default=[]))
210                    media.append({"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)})
211                else:
212                    media.append({"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)})
213        else:  # video post
214            video_urls = []
215            for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
216                video_urls.extend(glom(info, f"video.{key}.url_list", default=[]))
217            media = [{"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
218        await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
219        media = await download_media(media, **kwargs)
220        if not media:
221            logger.warning(f"{platform} API [{provider}] media download failed")
222            return False, {}
223        statistics = ""
224        if like := glom(info, "statistics.digg_count", default=0):
225            statistics += f"❤️{readable_count(like)}"
226        if comment := glom(info, "statistics.comment_count", default=0):
227            statistics += f"💬{readable_count(comment)}"
228        if favorite := glom(info, "statistics.collect_count", default=0):
229            statistics += f"⭐️{readable_count(favorite)}"
230        if share := glom(info, "statistics.share_count", default=0):
231            statistics += f"↗️{readable_count(share)}"
232
233        return True, {
234            "aweme_id": info.get("aweme_id", Path(url).stem),
235            "media": media,
236            "author": glom(info, "author.nickname", default=""),
237            "create_time": info.get("create_time"),
238            "desc": info.get("desc"),
239            "statistics": statistics,
240        }
241    except Exception:
242        logger.warning(f"{platform} API [{provider}] failed")
243
244    return False, {}
245
246
247def prefer_jpg_urls(url_list: list[str] | None = None) -> list[str]:
248    """Filter url_list to prefer jpg format."""
249    if not url_list:
250        return []
251    urls = []
252    for url in url_list:
253        if ".jpg" in url or ".jpeg" in url:
254            urls.insert(0, url)
255        else:
256            urls.append(url)
257    return urls
258
259
260async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS) -> str:
261    """Fetch douyin or tiktok comments.
262
263    Args:
264        aweme_id (str, optional): post id.
265        platform (str, optional): douyin or tiktok. Defaults to "douyin".
266        douyin_comments_provider (str, optional): The douyin comments extractor: "free" or "tikhub".
267
268    Returns:
269        str: comments string.
270    """
271    if not true(douyin_comments_provider):
272        return ""
273
274    api_urls = {
275        "douyin_tikhub": f"{API.TIKHUB}/api/v1/douyin/app/v3/fetch_video_comments?aweme_id={aweme_id}",
276        "douyin_free": f"{API.TIKHUB_FREE}/api/douyin/web/fetch_video_comments?aweme_id={aweme_id}",
277        "tiktok_tikhub": f"{API.TIKHUB}/api/v1/tiktok/app/v3/fetch_video_comments?aweme_id={aweme_id}",
278        "tiktok_free": f"{API.TIKHUB_FREE}/api/tiktok/web/fetch_post_comment?aweme_id={aweme_id}",
279    }
280    succ = False
281    data = []
282    if "free" in douyin_comments_provider:  # try free first
283        api_url = api_urls.get(f"{platform}_free")
284        headers = {"accept": "application/json"}
285        try:
286            resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
287            data = resp["data"].get("comments", [])
288            succ = True
289        except Exception:
290            logger.warning(f"{platform} comments API [free] failed")
291    if not succ and "tikhub" in douyin_comments_provider:  # try tikhub
292        api_url = api_urls.get(f"{platform}_tikhub")
293        headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
294        try:
295            resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
296            data = resp["data"].get("comments", [])
297        except Exception:
298            logger.warning(f"{platform} comments API [tikhub] failed")
299            return ""
300    comments = []
301    try:
302        for node in data:
303            name = glom(node, "user.nickname", default="")
304            region = f"({node['ip_label']})" if node.get("ip_label") else ""
305            text = node.get("text", "")
306            if uid := glom(node, "user.sec_uid", default=""):
307                name = f"[{name}](https://www.{platform}.com/user/{uid})"
308            if name and text:
309                comments.append({"name": name, "text": emojify(text.strip()), "region": region})
310    except Exception as e:
311        logger.error(e)
312        return ""
313
314    comments_str = ""
315    for idx, cmt in enumerate(comments):
316        if idx == 0:
317            comments_str += "💬**点击展开评论**:"
318        comments_str += f"\n💬**{cmt['name']}**{cmt['region']}: {cmt['text']}"
319    return blockquote(comments_str)
320
321
322async def summarize_douyin(message: Message, douyin: dict, media_list: list[dict], model: str, url: str) -> Message:
323    """Generate source for AI summary."""
324    data = {
325        "platform": "Tiktok" if "tiktok.com" in url else "抖音",
326        "author_name": douyin.get("author"),
327        "url": url,
328        "description": douyin.get("desc"),
329    }
330    if douyin.get("create_time"):
331        dt = datetime.fromtimestamp(douyin["create_time"]).astimezone(ZoneInfo(TZ))
332        data["created_at"] = f"{dt:%Y-%m-%d %H:%M:%S}"
333    data = trim_none(data)
334
335    sources = []
336    min_text_length = 1000  # skip short tweets
337    min_video_duration = None
338    for media in media_list:
339        if media.get("photo"):
340            sources.append({"type": "image", "path": media["photo"]})
341        if media.get("video"):
342            min_text_length = None  # always summarize video
343            min_video_duration = 120
344            sources.append({"type": "video", "path": media["video"]})
345    sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
346    author_name = douyin.get("author", "Anonymous")
347    pid = douyin["aweme_id"]
348    summary = await summarize(
349        sources=sources,
350        model=model,
351        title=f"🎶{author_name} - {pid}",
352        author=author_name,
353        url=url,
354        date=data.get("created_at") or nowstr(TZ),
355        min_text_length=min_text_length,
356        min_video_duration=min_video_duration,
357        max_video_duration=3600,  # skip long videos more than 1 hour
358    )
359    telegraph_url = summary.get("telegraph_url")
360    if not telegraph_url:
361        return message
362    return await add_summary_url(telegraph_url, message) or message