main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import json
  5import threading
  6import time
  7from pathlib import Path
  8from typing import Literal
  9from urllib.parse import quote_plus, unquote_plus
 10
 11from glom import Coalesce, glom
 12from loguru import logger
 13from pyrogram.types import Message
 14from yt_dlp import YoutubeDL
 15from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
 16
 17from config import DOWNLOAD_DIR, PROXY, YTDLP_DOWNLOAD_MAX_FILE_BYTES
 18from messages.progress import modify_progress
 19from utils import readable_size, readable_time, true
 20from ytdlp.utils import ProxyError, find_thumbnail, get_ytdlp_opts, platform_emoji, uploader_url
 21
 22
 23async def ytdlp_download(
 24    url: str,
 25    platform: Literal["youtube", "bilibili", "ytdlp"] | None = None,
 26    proxy: str | None = None,
 27    **kwargs,
 28) -> dict:
 29    """Download video from url.
 30
 31    Returns:
 32        dict: downloaded info.
 33    {
 34        "video_path": Path("video_path"),
 35        "audio_path": Path("audio_path"),
 36        "thumb": str(thumbnail_path),
 37        "author": "author",
 38        "author_url": "author_url",
 39        "title": "title",
 40        "duration": 123,
 41        "extractor": "youtube",
 42        "id": "id",
 43        "json_path": "json_path",
 44        "summary": "summary",
 45    }
 46    """
 47    placeholder = {"video_path": Path("/non-exist"), "audio_path": Path("/non-exist"), "thumb": None}
 48    ytdlp_opts = await get_ytdlp_opts(url=url, platform=platform, proxy=proxy, video=true(kwargs.get("ytdlp_download_video")))
 49    if kwargs.get("show_progress"):
 50        loop = asyncio.get_running_loop()
 51        hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
 52        ytdlp_opts["progress_hooks"] = [hook]
 53    logger.info(f"Downloading via proxy: {ytdlp_opts['proxy']} of {url}")
 54    # download json first
 55    json_path = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
 56    info = download_video_info(url, ytdlp_opts, json_path)
 57    if ytdlp_error := info.get("ytdlp_error"):
 58        if PROXY.YTDLP_FALLBACK and proxy != PROXY.YTDLP_FALLBACK:
 59            await modify_progress(del_status=True, **kwargs)
 60            raise ProxyError(ytdlp_error)
 61        await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
 62        return placeholder
 63    await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
 64    ytdlp_error = await download_video_async(json_path, ytdlp_opts)
 65    if ytdlp_error:
 66        await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
 67        return placeholder
 68    msg = f"✅下载成功:\n{info['summary']}"
 69    logger.success(f"{msg!r}")
 70    info["thumb"] = find_thumbnail(info["video_path"], info["audio_path"])
 71    # correct audio format == .mp4
 72    if info["audio_path"].suffix == ".mp4":
 73        new_path = info["audio_path"].with_suffix(".m4a")
 74        info["audio_path"].rename(new_path)
 75        info["audio_path"] = new_path
 76
 77    # delete video_only file (no audio channel), this file is no longer needed
 78    format_id = info.get("format_id", "")  # 299+140
 79    for fmt_id in [x.strip() for x in format_id.split("+") if x.strip()]:  # ['299', '140']
 80        video_ext = info["video_path"].suffix  # .mp4
 81        Path(DOWNLOAD_DIR).joinpath(f"{info['id']}.f{fmt_id}{video_ext}").unlink(missing_ok=True)
 82    # summary
 83    await modify_progress(text=msg.strip(), force_update=True, **kwargs)
 84    return info
 85
 86
 87def download_video_info(url: str, ytdlp_opts: dict, json_path: str | Path) -> dict:
 88    try:
 89        with YoutubeDL(ytdlp_opts) as ydl:
 90            info: dict = ydl.extract_info(url, download=False)
 91            with Path(json_path).open("w") as f:
 92                json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
 93            # add custom fields
 94            info["extractor"] = info.get("extractor", "").lower()
 95            info["author"] = glom(info, Coalesce("uploader", "series", "extractor"))
 96            info["author_url"] = uploader_url(info, info["extractor"])
 97            info["title"] = info.get("title", "")
 98            info["duration"] = round(float(info.get("duration", "0")))
 99            info["id"] = info.get("id", "")
100            info["json_path"] = Path(json_path).as_posix()
101            video_info = {}
102            audio_info = {}
103            if requested_formats := info.get("requested_formats", []):
104                # both video and audio are requested
105                video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
106                audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
107                video_ext = video_info.get("ext", "")
108                audio_ext = audio_info.get("ext", "")
109                audio_format_id = audio_info.get("format_id", "")
110                info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
111                info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
112            elif info.get("video_ext", "").lower() != "none":  # only video
113                video_ext = info.get("ext", "")
114                info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
115                info["audio_path"] = Path("/non-exist")
116            elif info.get("audio_ext", "").lower() != "none":  # only audio
117                audio_ext = info.get("ext", "")
118                info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
119                info["video_path"] = Path("/non-exist")
120            else:
121                info["video_path"] = Path("/non-exist")
122                info["audio_path"] = Path("/non-exist")
123            summary = ""
124            if info["author"]:
125                summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
126            if info["title"]:
127                summary += f"\n📝{info['title']}"
128            if video_info:
129                info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
130                summary += f"\n🎬视频: {video_info['format']}  ({readable_size(info['video_size'])})".removesuffix("  (0.0 B)")
131            if audio_info:
132                info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
133                summary += f"\n🎧音频: {audio_info['format']}  ({readable_size(info['audio_size'])})".removesuffix("  (0.0 B)")
134            if info["duration"]:
135                summary += f"\n🕒时长: {readable_time(info['duration'])}"
136            info["summary"] = summary.strip()
137            media_size = int(info.get("video_size", 0)) + int(info.get("audio_size", 0))
138            if media_size > YTDLP_DOWNLOAD_MAX_FILE_BYTES:
139                info["ytdlp_error"] = f"{summary.strip()}\n**⚠️视频文件过大: {readable_size(media_size)}**\n**⚠️机器硬盘限制: {readable_size(YTDLP_DOWNLOAD_MAX_FILE_BYTES)}**"
140
141    except Exception as e:
142        logger.error(f"Failed to download video info: {e}")
143        info = {"ytdlp_error": str(e)}
144    logger.trace(info)
145    return info
146
147
148def retry(func, max_retries=5):
149    def wrapper(*args, **kwargs):
150        retries = 0
151        msg = ""
152        while retries < max_retries:
153            try:
154                return func(*args, **kwargs)
155            except ExtractorError as e:
156                msg = f"ExtractorError: {str(e.orig_msg).removeprefix('ERROR: ')}"
157            except DownloadError as e:
158                msg = f"DownloadError: {str(e.msg).removeprefix('ERROR: ')}"
159                if any(x in msg.lower() for x in ["sign in", "请登录", "地区", "国家", "country", "删除", "deleted"]):
160                    retries += 1
161                    break
162            except YoutubeDLError as e:
163                msg = f"YoutubeDLError: {str(e.msg).removeprefix('ERROR: ')}"
164            except Exception as e:
165                msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
166            retries += 1
167            time.sleep(1)
168        logger.error(f"Failed after {retries} retries: {msg}")
169        if msg:
170            args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
171            return args[2]
172        return {}
173
174    return wrapper
175
176
177@retry
178def download_video(json_path: str, ytdlp_opts: dict, result: dict) -> dict:
179    with YoutubeDL(ytdlp_opts) as ydl:
180        error_code = ydl.download_with_info_file(json_path)  # 0: success, 1: error
181    if error_code != 0 and not result.get("ytdlp_error"):
182        url = unquote_plus(Path(json_path).stem)
183        result["ytdlp_error"] = f"❌下载失败\n{url}"
184    return result
185
186
187async def download_video_async(json_path: str, ytdlp_opts: dict) -> str:
188    """Wrapper to run the download function in a thread.
189
190    Generated by GPT-4o.
191    """
192    # Shared dictionary to hold the results
193    result = {}
194    download_thread = threading.Thread(target=download_video, args=(json_path, ytdlp_opts, result))
195    download_thread.start()
196    await asyncio.to_thread(download_thread.join)
197    return result.get("ytdlp_error", "")
198
199
200def create_hook(message: Message | None, loop, *, detail_progress: bool):
201    """Hook to show downloading progress."""
202
203    def hook(d):
204        msg = ""
205        title = d.get("info_dict", {}).get("title", "")
206        ftype = "视频" if d.get("info_dict", {}).get("video_ext", "").lower() != "none" else "音频"
207        emoji = "🎬" if ftype == "视频" else "🎧"
208        status = d.get("status", "")
209        if status == "downloading":
210            downloaded_bytes = float(d.get("downloaded_bytes")) if d.get("downloaded_bytes") else 0
211            total_bytes = float(d.get("total_bytes")) if d.get("total_bytes") else 0
212            total_bytes_estimate = float(d.get("total_bytes_estimate")) if d.get("total_bytes_estimate") else 0
213            total = max(total_bytes, total_bytes_estimate)
214            eta = float(d.get("eta")) if d.get("eta") else 0  # seconds
215            speed = float(d.get("speed")) if d.get("speed") else 0  # bytes/second
216            finished = downloaded_bytes / total if total > 0 else 0
217            msg += f"{ftype}下载: {readable_size(downloaded_bytes)} / {readable_size(total)} ({finished:.2%})\n"
218            msg += f"⚡️当前网速: {readable_size(speed)}/s\n"
219            msg += f"🕒剩余时长: {readable_time(eta)}\n"
220            msg += f"{emoji}{title}"
221        elif status == "finished":
222            msg = f"{ftype}下载完成\n{emoji}{title}"
223        elif status == "error":
224            msg = f"{ftype}下载失败\n{emoji}{title}"
225        asyncio.run_coroutine_threadsafe(modify_progress(message, msg.strip(), detail_progress=detail_progress), loop)
226
227    return hook