main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import json
  5import threading
  6import time
  7from pathlib import Path
  8from typing import Literal
  9from urllib.parse import quote_plus, unquote_plus
 10
 11from glom import Coalesce, glom
 12from loguru import logger
 13from pyrogram.types import Message
 14from yt_dlp import YoutubeDL
 15from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
 16
 17from config import DOWNLOAD_DIR, PROXY, YTDLP_DOWNLOAD_MAX_FILE_BYTES
 18from messages.progress import modify_progress
 19from utils import readable_size, readable_time, true
 20from ytdlp.utils import ProxyError, find_thumbnail, get_ytdlp_opts, platform_emoji, uploader_url
 21
 22
 23async def ytdlp_download(
 24    url: str,
 25    platform: Literal["youtube", "bilibili", "ytdlp"] | None = None,
 26    proxy: str | None = None,
 27    *,
 28    use_aria2: bool = False,
 29    **kwargs,
 30) -> dict:
 31    """Download video from url.
 32
 33    Returns:
 34        dict: downloaded info.
 35    {
 36        "video_path": Path("video_path"),
 37        "audio_path": Path("audio_path"),
 38        "thumb": str(thumbnail_path),
 39        "author": "author",
 40        "author_url": "author_url",
 41        "title": "title",
 42        "duration": 123,
 43        "extractor": "youtube",
 44        "id": "id",
 45        "json_path": "json_path",
 46        "summary": "summary",
 47    }
 48    """
 49    placeholder = {"video_path": Path("/non-exist"), "audio_path": Path("/non-exist"), "thumb": None}
 50    ytdlp_opts = await get_ytdlp_opts(url=url, platform=platform, proxy=proxy, video=true(kwargs.get("ytdlp_download_video")), use_aria2=use_aria2)
 51    if kwargs.get("show_progress"):
 52        loop = asyncio.get_running_loop()
 53        hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
 54        ytdlp_opts["progress_hooks"] = [hook]
 55    logger.info(f"Downloading via proxy: {ytdlp_opts['proxy']} of {url}")
 56    # download json first
 57    json_path = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
 58    info = download_video_info(url, ytdlp_opts, json_path)
 59    if ytdlp_error := info.get("ytdlp_error"):
 60        if PROXY.YTDLP_FALLBACK and proxy != PROXY.YTDLP_FALLBACK:
 61            await modify_progress(del_status=True, **kwargs)
 62            raise ProxyError(ytdlp_error)
 63        await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
 64        return placeholder
 65    await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
 66    ytdlp_error = await download_video_async(json_path, ytdlp_opts)
 67    if ytdlp_error:
 68        await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
 69        return placeholder
 70    msg = f"✅下载成功:\n{info['summary']}"
 71    logger.success(f"{msg!r}")
 72    info["thumb"] = find_thumbnail(info["video_path"], info["audio_path"])
 73    # correct audio format == .mp4
 74    if info["audio_path"].suffix == ".mp4":
 75        new_path = info["audio_path"].with_suffix(".m4a")
 76        info["audio_path"].rename(new_path)
 77        info["audio_path"] = new_path
 78
 79    # delete video_only file (no audio channel), this file is no longer needed
 80    format_id = info.get("format_id", "")  # 299+140
 81    for fmt_id in [x.strip() for x in format_id.split("+") if x.strip()]:  # ['299', '140']
 82        video_ext = info["video_path"].suffix  # .mp4
 83        Path(DOWNLOAD_DIR).joinpath(f"{info['id']}.f{fmt_id}{video_ext}").unlink(missing_ok=True)
 84
 85    await modify_progress(text=msg.strip(), force_update=True, **kwargs)
 86    return info
 87
 88
 89def download_video_info(url: str, ytdlp_opts: dict, json_path: str | Path) -> dict:
 90    try:
 91        with YoutubeDL(ytdlp_opts) as ydl:
 92            info: dict = ydl.extract_info(url, download=False)
 93            with Path(json_path).open("w") as f:
 94                json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
 95            # add custom fields
 96            info["extractor"] = info.get("extractor", "").lower()
 97            info["author"] = glom(info, Coalesce("uploader", "series", "extractor"))
 98            info["author_url"] = uploader_url(info, info["extractor"])
 99            info["title"] = info.get("title", "")
100            info["duration"] = round(float(info.get("duration", "0")))
101            info["id"] = info.get("id", "")
102            info["json_path"] = Path(json_path).as_posix()
103            video_info = {}
104            audio_info = {}
105            if requested_formats := info.get("requested_formats", []):
106                # both video and audio are requested
107                video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
108                audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
109                video_ext = video_info.get("ext", "")
110                audio_ext = audio_info.get("ext", "")
111                audio_format_id = audio_info.get("format_id", "")
112                info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
113                info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
114            elif info.get("video_ext", "").lower() != "none":  # only video
115                video_ext = info.get("ext", "")
116                info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
117                info["audio_path"] = Path("/non-exist")
118            elif info.get("audio_ext", "").lower() != "none":  # only audio
119                audio_ext = info.get("ext", "")
120                info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
121                info["video_path"] = Path("/non-exist")
122            else:
123                info["video_path"] = Path("/non-exist")
124                info["audio_path"] = Path("/non-exist")
125            summary = ""
126            if info["author"]:
127                summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
128            if info["title"]:
129                summary += f"\n📝{info['title']}"
130            if video_info:
131                info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
132                summary += f"\n🎬视频: {video_info['format']}  ({readable_size(info['video_size'])})".removesuffix("  (0.0 B)")
133            if audio_info:
134                info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
135                summary += f"\n🎧音频: {audio_info['format']}  ({readable_size(info['audio_size'])})".removesuffix("  (0.0 B)")
136            if info["duration"]:
137                summary += f"\n🕒时长: {readable_time(info['duration'])}"
138            info["summary"] = summary.strip()
139            media_size = int(info.get("video_size", 0)) + int(info.get("audio_size", 0))
140            if media_size > YTDLP_DOWNLOAD_MAX_FILE_BYTES:
141                info["ytdlp_error"] = f"{summary.strip()}\n**⚠️视频文件过大: {readable_size(media_size)}**\n**⚠️机器硬盘限制: {readable_size(YTDLP_DOWNLOAD_MAX_FILE_BYTES)}**"
142
143    except Exception as e:
144        logger.error(f"Failed to download video info: {e}")
145        info = {"ytdlp_error": str(e)}
146    logger.trace(info)
147    return info
148
149
150def retry(func, max_retries=5):
151    def wrapper(*args, **kwargs):
152        retries = 0
153        msg = ""
154        while retries < max_retries:
155            try:
156                return func(*args, **kwargs)
157            except ExtractorError as e:
158                msg = f"ExtractorError: {str(e.orig_msg).removeprefix('ERROR: ')}"
159            except DownloadError as e:
160                msg = f"DownloadError: {str(e.msg).removeprefix('ERROR: ')}"
161                if any(x in msg.lower() for x in ["sign in", "请登录", "地区", "国家", "country", "删除", "deleted"]):
162                    retries += 1
163                    break
164            except YoutubeDLError as e:
165                msg = f"YoutubeDLError: {str(e.msg).removeprefix('ERROR: ')}"
166            except Exception as e:
167                msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
168            retries += 1
169            time.sleep(1)
170        logger.error(f"Failed after {retries} retries: {msg}")
171        if msg:
172            args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
173            return args[2]
174        return {}
175
176    return wrapper
177
178
179@retry
180def download_video(json_path: str, ytdlp_opts: dict, result: dict) -> dict:
181    with YoutubeDL(ytdlp_opts) as ydl:
182        error_code = ydl.download_with_info_file(json_path)  # 0: success, 1: error
183    if error_code != 0 and not result.get("ytdlp_error"):
184        url = unquote_plus(Path(json_path).stem)
185        result["ytdlp_error"] = f"❌下载失败\n{url}"
186    return result
187
188
189async def download_video_async(json_path: str, ytdlp_opts: dict) -> str:
190    """Wrapper to run the download function in a thread.
191
192    Generated by GPT-4o.
193    """
194    # Shared dictionary to hold the results
195    result = {}
196    download_thread = threading.Thread(target=download_video, args=(json_path, ytdlp_opts, result))
197    download_thread.start()
198    await asyncio.to_thread(download_thread.join)
199    return result.get("ytdlp_error", "")
200
201
202def create_hook(message: Message | None, loop, *, detail_progress: bool):
203    """Hook to show downloading progress."""
204
205    def hook(d):
206        msg = ""
207        title = d.get("info_dict", {}).get("title", "")
208        ftype = "视频" if d.get("info_dict", {}).get("video_ext", "").lower() != "none" else "音频"
209        emoji = "🎬" if ftype == "视频" else "🎧"
210        status = d.get("status", "")
211        if status == "downloading":
212            downloaded_bytes = float(d.get("downloaded_bytes")) if d.get("downloaded_bytes") else 0
213            total_bytes = float(d.get("total_bytes")) if d.get("total_bytes") else 0
214            total_bytes_estimate = float(d.get("total_bytes_estimate")) if d.get("total_bytes_estimate") else 0
215            total = max(total_bytes, total_bytes_estimate)
216            eta = float(d.get("eta")) if d.get("eta") else 0  # seconds
217            speed = float(d.get("speed")) if d.get("speed") else 0  # bytes/second
218            finished = downloaded_bytes / total if total > 0 else 0
219            msg += f"{ftype}下载: {readable_size(downloaded_bytes)} / {readable_size(total)} ({finished:.2%})\n"
220            msg += f"⚡️当前网速: {readable_size(speed)}/s\n"
221            msg += f"🕒剩余时长: {readable_time(eta)}\n"
222            msg += f"{emoji}{title}"
223        elif status == "finished":
224            msg = f"{ftype}下载完成\n{emoji}{title}"
225        elif status == "error":
226            msg = f"{ftype}下载失败\n{emoji}{title}"
227        asyncio.run_coroutine_threadsafe(modify_progress(message, msg.strip(), detail_progress=detail_progress), loop)
228
229    return hook