main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import json
  4import re
  5from datetime import datetime
  6from pathlib import Path
  7from typing import Literal
  8from zoneinfo import ZoneInfo
  9
 10from glom import Coalesce, glom
 11from glom import Path as GlomPath
 12from loguru import logger
 13from pyrogram.client import Client
 14from pyrogram.types import Message
 15
 16from bridge.social import send_to_social_media_bridge
 17from config import API, DB, DOWNLOAD_DIR, PROVIDER, PROXY, TOKEN, TZ
 18from database.database import get_db
 19from messages.database import copy_messages_from_db, save_messages
 20from messages.progress import modify_progress
 21from messages.sender import send2tg
 22from messages.utils import blockquote, summay_media
 23from networking import download_file, download_first_success_urls, download_media, hx_req
 24from others.emoji import emojify
 25from utils import rand_number, readable_count, true
 26
 27
 28async def preview_douyin(
 29    client: Client,
 30    message: Message,
 31    url: str = "",
 32    db_key: str = "",
 33    platform: str = "douyin",
 34    douyin_provider: str = PROVIDER.DOUYIN,
 35    douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS,
 36    *,
 37    show_author: bool = True,
 38    show_pubdate: bool = True,
 39    show_statistics: bool = True,
 40    show_description: bool = True,
 41    **kwargs,
 42):
 43    """Preview douyin or tiktok link in the message.
 44
 45    Args:
 46        client (Client): The Pyrogram client.
 47        message (Message): The trigger message object.
 48        url (str, optional): The douyin or tiktok link.
 49        db_key (str, optional): The cache key.
 50        platform(str, optional): The platform name. Defaults to "douyin".
 51        douyin_provider (str, optional): The douyin extractor: "direct", "free", "tikhub", "bridge", or combined strings.
 52        douyin_comments_provider (str, optional): The douyin comments extractor: "free", "tikhub" or "free-tikhub".
 53    """
 54    if kwargs.get("show_progress") and "progress" not in kwargs:
 55        res = await send2tg(client, message, texts=f"🔗正在解析抖音链接\n{url}", **kwargs)
 56        kwargs["progress"] = res[0]
 57    if kv := await get_db(db_key):
 58        logger.debug(f"{platform} preview {DB.ENGINE} cache hit for key={db_key}")
 59        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 60            return
 61        await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
 62
 63    proxy = PROXY.DOUYIN if platform == "douyin" else PROXY.TIKTOK
 64    logger.info(f"{platform} link preview for {url}")
 65    succ = False
 66    data = {}
 67    if "direct" in douyin_provider:  # try direct
 68        succ, data = await parse_via_direct(url, platform, proxy)
 69    if not succ and "free" in douyin_provider:  # try free api
 70        succ, data = await parse_via_tikhub(url, platform, proxy, provider="free")
 71    if not succ and "tikhub" in douyin_provider:  # try tikhub
 72        succ, data = await parse_via_tikhub(url, platform, proxy, provider="tikhub")
 73    if not succ and "bridge" in douyin_provider:  # try bridge
 74        logger.error("❌抖音解析失败, 尝试第三方Bot...")
 75        kwargs |= {"target_mid": message.id}
 76        await send_to_social_media_bridge(client, message, url, platform, **kwargs)
 77        return
 78    if not succ:
 79        await modify_progress(text="❌抖音解析失败", force_update=True, **kwargs)
 80        return
 81    texts = ""
 82    if true(show_author) and data.get("author"):
 83        texts += f"\n🎶**[{data['author']}]({url})**"
 84    if true(show_pubdate) and data.get("create_time"):
 85        dt = datetime.fromtimestamp(data["create_time"]).astimezone(ZoneInfo(TZ))
 86        texts += f"\n🕒{dt:%Y-%m-%d %H:%M:%S}"
 87    if true(show_statistics) and data.get("statistics"):
 88        texts += f"\n{data['statistics']}"
 89    if true(show_description) and data.get("desc"):
 90        texts += f"\n{data['desc']}"
 91
 92    comments = await get_comments(data["aweme_id"], platform, douyin_comments_provider)
 93    sent_messages = await send2tg(client, message, texts=emojify(texts) + comments, media=data.get("media", []), **kwargs)
 94    await modify_progress(del_status=True, **kwargs)
 95    await save_messages(messages=sent_messages, key=db_key)
 96
 97
 98async def parse_via_direct(url: str = "", platform: str = "douyin", proxy: str | None = None, **kwargs) -> tuple[bool, dict]:
 99    """Get douyin info from direct response.
100
101    Returns:
102        tuple[bool, dict]: True for success, else False. Info as the second item.
103
104    Info:
105        {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
106    """
107    # !TODO: the video_url returned by tiktok can't be directly downloaded for now
108    if platform == "tiktok":
109        return False, {}
110    try:
111        logger.trace(f"{platform} API [direct] for: {url}")
112        video_id = Path(url).stem
113        api_url = f"https://www.iesdouyin.com/share/video/{video_id}" if platform == "douyin" else url
114        resp = await hx_req(api_url, mobile=True, rformat="content", proxy=proxy, max_retry=0, timeout=3)
115        pattern = r"window\._ROUTER_DATA\s*=\s*(.*?)</script>" if platform == "douyin" else r'"__UNIVERSAL_DATA_FOR_REHYDRATION__"\s*type="application/json">(.*?)</script>'
116        if matched := re.search(pattern, resp["content"].decode(), flags=re.DOTALL):
117            data = json.loads(matched.group(1).strip())
118            info = glom(
119                data,
120                Coalesce(
121                    "loaderData.video_(id)/page.videoInfoRes.item_list.0",  # douyin video
122                    "loaderData.note_(id)/page.videoInfoRes.item_list.0",  # douyin image post
123                    GlomPath("__DEFAULT_SCOPE__", "webapp.reflow.video.detail", "itemInfo", "itemStruct"),  # tiktok video
124                ),
125                default={},
126            )
127            if int(info.get("aweme_type", 4)) != 4:  # image post
128                media = [{"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)} for x in info.get("images", [])]
129            elif platform == "douyin" and (video_url := glom(info, "video.play_addr.url_list.0", default="").replace("playwm", "play")):  # noqa: SIM114
130                media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
131            elif platform == "tiktok" and (video_url := glom(info, "video.playAddr", default="")):
132                media = [{"video": download_file(video_url, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
133            else:
134                return False, {}
135            await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
136            media = await download_media(media, **kwargs)
137            if not media:
138                logger.warning(f"{platform} API [direct] media download failed")
139                return False, {}
140            statistics = ""
141            if like := glom(info, "statistics.digg_count", default=0):
142                statistics += f"❤️{readable_count(like)}"
143            if comment := glom(info, "statistics.comment_count", default=0):
144                statistics += f"💬{readable_count(comment)}"
145            if favorite := glom(info, "statistics.collect_count", default=0):
146                statistics += f"⭐️{readable_count(favorite)}"
147            if share := glom(info, "statistics.share_count", default=0):
148                statistics += f"↗️{readable_count(share)}"
149
150            return True, {
151                "aweme_id": info.get("aweme_id", video_id),
152                "media": media,
153                "author": glom(info, "author.nickname", default=""),
154                "create_time": info.get("create_time"),
155                "desc": info.get("desc"),
156                "statistics": statistics,
157            }
158        logger.warning(f"{platform} API [direct] matched nothing")
159    except Exception:
160        logger.warning(f"{platform} API [direct] failed")
161    return False, {}
162
163
164async def parse_via_tikhub(url: str = "", platform: str = "douyin", proxy: str | None = None, provider: Literal["free", "tikhub"] = "free", **kwargs) -> tuple[bool, dict]:
165    """Get douyin info from tikhub API.
166
167    Returns:
168        tuple[bool, dict]: True for success, else False. Info as the second item.
169
170    Info:
171        {"aweme_id": str, "media": list[dict], "author": str, "create_time": int, "desc": str}
172    """
173    try:
174        logger.trace(f"{platform} API [{provider}] for: {url}")
175        api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}" if provider == "free" else f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
176        headers = {"accept": "application/json"}
177        if provider == "tikhub":
178            headers |= {"authorization": f"Bearer {TOKEN.TIKHUB}"}
179        retry = 0 if provider == "free" else 2
180        resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=retry, timeout=5)
181        info = resp["data"]
182
183        if info.get("media_type", 4) != 4:  # image post
184            # may have livephotos
185            media = []
186            for x in info.get("images", []):
187                if x.get("live_photo_type"):
188                    video_urls = []
189                    for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
190                        video_urls.extend(glom(x, f"video.{key}.url_list", default=[]))
191                    media.append({"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)})
192                else:
193                    media.append({"photo": download_first_success_urls(prefer_jpg_urls(x.get("url_list")), proxy=proxy)})
194        else:  # video post
195            video_urls = []
196            for key in ["play_addr_h264", "play_addr_265", "play_addr", "play_addr_lowbr"]:
197                video_urls.extend(glom(info, f"video.{key}.url_list", default=[]))
198            media = [{"video": download_first_success_urls(video_urls, path=Path(DOWNLOAD_DIR).joinpath(f"{rand_number()}.mp4"), proxy=proxy, stream=True)}]
199        await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
200        media = await download_media(media, **kwargs)
201        if not media:
202            logger.warning(f"{platform} API [{provider}] media download failed")
203            return False, {}
204        statistics = ""
205        if like := glom(info, "statistics.digg_count", default=0):
206            statistics += f"❤️{readable_count(like)}"
207        if comment := glom(info, "statistics.comment_count", default=0):
208            statistics += f"💬{readable_count(comment)}"
209        if favorite := glom(info, "statistics.collect_count", default=0):
210            statistics += f"⭐️{readable_count(favorite)}"
211        if share := glom(info, "statistics.share_count", default=0):
212            statistics += f"↗️{readable_count(share)}"
213
214        return True, {
215            "aweme_id": info.get("aweme_id", Path(url).stem),
216            "media": media,
217            "author": glom(info, "author.nickname", default=""),
218            "create_time": info.get("create_time"),
219            "desc": info.get("desc"),
220            "statistics": statistics,
221        }
222    except Exception:
223        logger.warning(f"{platform} API [{provider}] failed")
224
225    return False, {}
226
227
228def prefer_jpg_urls(url_list: list[str] | None = None) -> list[str]:
229    """Filter url_list to prefer jpg format."""
230    if not url_list:
231        return []
232    urls = []
233    for url in url_list:
234        if ".jpg" in url or ".jpeg" in url:
235            urls.insert(0, url)
236        else:
237            urls.append(url)
238    return urls
239
240
241async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS) -> str:
242    """Fetch douyin or tiktok comments.
243
244    Args:
245        aweme_id (str, optional): post id.
246        platform (str, optional): douyin or tiktok. Defaults to "douyin".
247        douyin_comments_provider (str, optional): The douyin comments extractor: "free" or "tikhub".
248
249    Returns:
250        str: comments string.
251    """
252    if not true(douyin_comments_provider):
253        return ""
254
255    api_urls = {
256        "douyin_tikhub": f"{API.TIKHUB}/api/v1/douyin/app/v3/fetch_video_comments?aweme_id={aweme_id}",
257        "douyin_free": f"{API.TIKHUB_FREE}/api/douyin/web/fetch_video_comments?aweme_id={aweme_id}",
258        "tiktok_tikhub": f"{API.TIKHUB}/api/v1/tiktok/app/v3/fetch_video_comments?aweme_id={aweme_id}",
259        "tiktok_free": f"{API.TIKHUB_FREE}/api/tiktok/web/fetch_post_comment?aweme_id={aweme_id}",
260    }
261    succ = False
262    data = []
263    if "free" in douyin_comments_provider:  # try free first
264        api_url = api_urls.get(f"{platform}_free")
265        headers = {"accept": "application/json"}
266        try:
267            resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
268            data = resp["data"].get("comments", [])
269            succ = True
270        except Exception:
271            logger.warning(f"{platform} comments API [free] failed")
272    if not succ and "tikhub" in douyin_comments_provider:  # try tikhub
273        api_url = api_urls.get(f"{platform}_tikhub")
274        headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
275        try:
276            resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
277            data = resp["data"].get("comments", [])
278        except Exception:
279            logger.warning(f"{platform} comments API [tikhub] failed")
280            return ""
281    comments = []
282    try:
283        for node in data:
284            name = glom(node, "user.nickname", default="")
285            region = f"({node['ip_label']})" if node.get("ip_label") else ""
286            text = node.get("text", "")
287            if uid := glom(node, "user.sec_uid", default=""):
288                name = f"[{name}](https://www.{platform}.com/user/{uid})"
289            if name and text:
290                comments.append({"name": name, "text": emojify(text.strip()), "region": region})
291    except Exception as e:
292        logger.error(e)
293        return ""
294
295    comments_str = ""
296    for idx, cmt in enumerate(comments):
297        if idx == 0:
298            comments_str += f"\n{blockquote('💬**点此展开评论区**:')}"
299        cmt_str = f"💬**{cmt['name']}**{cmt['region']}: {cmt['text']}"
300        comments_str += f"\n{blockquote(cmt_str)}"
301    return comments_str