main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import json
5import threading
6import time
7from pathlib import Path
8from typing import Literal
9from urllib.parse import quote_plus, unquote_plus
10
11from glom import Coalesce, glom
12from loguru import logger
13from pyrogram.types import Message
14from yt_dlp import YoutubeDL
15from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
16
17from config import DOWNLOAD_DIR, PROXY, YTDLP_DOWNLOAD_MAX_FILE_BYTES
18from messages.progress import modify_progress
19from utils import readable_size, readable_time, true
20from ytdlp.utils import ProxyError, find_thumbnail, get_ytdlp_opts, platform_emoji, uploader_url
21
22
23async def ytdlp_download(
24 url: str,
25 platform: Literal["youtube", "bilibili", "ytdlp"] | None = None,
26 proxy: str | None = None,
27 **kwargs,
28) -> dict:
29 """Download video from url.
30
31 Returns:
32 dict: downloaded info.
33 {
34 "video_path": Path("video_path"),
35 "audio_path": Path("audio_path"),
36 "thumb": str(thumbnail_path),
37 "author": "author",
38 "author_url": "author_url",
39 "title": "title",
40 "duration": 123,
41 "extractor": "youtube",
42 "id": "id",
43 "json_path": "json_path",
44 "summary": "summary",
45 }
46 """
47 placeholder = {"video_path": Path("/non-exist"), "audio_path": Path("/non-exist"), "thumb": None}
48 ytdlp_opts = await get_ytdlp_opts(url=url, platform=platform, proxy=proxy, video=true(kwargs.get("ytdlp_download_video")))
49 if kwargs.get("show_progress"):
50 loop = asyncio.get_running_loop()
51 hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
52 ytdlp_opts["progress_hooks"] = [hook]
53 logger.info(f"Downloading via proxy: {ytdlp_opts['proxy']} of {url}")
54 # download json first
55 json_path = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
56 info = download_video_info(url, ytdlp_opts, json_path)
57 if ytdlp_error := info.get("ytdlp_error"):
58 if PROXY.YTDLP_FALLBACK and proxy != PROXY.YTDLP_FALLBACK:
59 await modify_progress(del_status=True, **kwargs)
60 raise ProxyError(ytdlp_error)
61 await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
62 return placeholder
63 await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
64 ytdlp_error = await download_video_async(json_path, ytdlp_opts)
65 if ytdlp_error:
66 await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
67 return placeholder
68 msg = f"✅下载成功:\n{info['summary']}"
69 logger.success(f"{msg!r}")
70 info["thumb"] = find_thumbnail(info["video_path"], info["audio_path"])
71 # correct audio format == .mp4
72 if info["audio_path"].suffix == ".mp4":
73 new_path = info["audio_path"].with_suffix(".m4a")
74 info["audio_path"].rename(new_path)
75 info["audio_path"] = new_path
76
77 # delete video_only file (no audio channel), this file is no longer needed
78 format_id = info.get("format_id", "") # 299+140
79 for fmt_id in [x.strip() for x in format_id.split("+") if x.strip()]: # ['299', '140']
80 video_ext = info["video_path"].suffix # .mp4
81 Path(DOWNLOAD_DIR).joinpath(f"{info['id']}.f{fmt_id}{video_ext}").unlink(missing_ok=True)
82 # summary
83 await modify_progress(text=msg.strip(), force_update=True, **kwargs)
84 return info
85
86
87def download_video_info(url: str, ytdlp_opts: dict, json_path: str | Path) -> dict:
88 try:
89 with YoutubeDL(ytdlp_opts) as ydl:
90 info: dict = ydl.extract_info(url, download=False)
91 with Path(json_path).open("w") as f:
92 json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
93 # add custom fields
94 info["extractor"] = info.get("extractor", "").lower()
95 info["author"] = glom(info, Coalesce("uploader", "series", "extractor"))
96 info["author_url"] = uploader_url(info, info["extractor"])
97 info["title"] = info.get("title", "")
98 info["duration"] = round(float(info.get("duration", "0")))
99 info["id"] = info.get("id", "")
100 info["json_path"] = Path(json_path).as_posix()
101 video_info = {}
102 audio_info = {}
103 if requested_formats := info.get("requested_formats", []):
104 # both video and audio are requested
105 video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
106 audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
107 video_ext = video_info.get("ext", "")
108 audio_ext = audio_info.get("ext", "")
109 audio_format_id = audio_info.get("format_id", "")
110 info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
111 info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
112 elif info.get("video_ext", "").lower() != "none": # only video
113 video_ext = info.get("ext", "")
114 info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
115 info["audio_path"] = Path("/non-exist")
116 elif info.get("audio_ext", "").lower() != "none": # only audio
117 audio_ext = info.get("ext", "")
118 info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
119 info["video_path"] = Path("/non-exist")
120 else:
121 info["video_path"] = Path("/non-exist")
122 info["audio_path"] = Path("/non-exist")
123 summary = ""
124 if info["author"]:
125 summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
126 if info["title"]:
127 summary += f"\n📝{info['title']}"
128 if video_info:
129 info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
130 summary += f"\n🎬视频: {video_info['format']} ({readable_size(info['video_size'])})".removesuffix(" (0.0 B)")
131 if audio_info:
132 info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
133 summary += f"\n🎧音频: {audio_info['format']} ({readable_size(info['audio_size'])})".removesuffix(" (0.0 B)")
134 if info["duration"]:
135 summary += f"\n🕒时长: {readable_time(info['duration'])}"
136 info["summary"] = summary.strip()
137 media_size = int(info.get("video_size", 0)) + int(info.get("audio_size", 0))
138 if media_size > YTDLP_DOWNLOAD_MAX_FILE_BYTES:
139 info["ytdlp_error"] = f"{summary.strip()}\n**⚠️视频文件过大: {readable_size(media_size)}**\n**⚠️机器硬盘限制: {readable_size(YTDLP_DOWNLOAD_MAX_FILE_BYTES)}**"
140
141 except Exception as e:
142 logger.error(f"Failed to download video info: {e}")
143 info = {"ytdlp_error": str(e)}
144 logger.trace(info)
145 return info
146
147
148def retry(func, max_retries=5):
149 def wrapper(*args, **kwargs):
150 retries = 0
151 msg = ""
152 while retries < max_retries:
153 try:
154 return func(*args, **kwargs)
155 except ExtractorError as e:
156 msg = f"ExtractorError: {str(e.orig_msg).removeprefix('ERROR: ')}"
157 except DownloadError as e:
158 msg = f"DownloadError: {str(e.msg).removeprefix('ERROR: ')}"
159 if any(x in msg.lower() for x in ["sign in", "请登录", "地区", "国家", "country", "删除", "deleted"]):
160 retries += 1
161 break
162 except YoutubeDLError as e:
163 msg = f"YoutubeDLError: {str(e.msg).removeprefix('ERROR: ')}"
164 except Exception as e:
165 msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
166 retries += 1
167 time.sleep(1)
168 logger.error(f"Failed after {retries} retries: {msg}")
169 if msg:
170 args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
171 return args[2]
172 return {}
173
174 return wrapper
175
176
177@retry
178def download_video(json_path: str, ytdlp_opts: dict, result: dict) -> dict:
179 with YoutubeDL(ytdlp_opts) as ydl:
180 error_code = ydl.download_with_info_file(json_path) # 0: success, 1: error
181 if error_code != 0 and not result.get("ytdlp_error"):
182 url = unquote_plus(Path(json_path).stem)
183 result["ytdlp_error"] = f"❌下载失败\n{url}"
184 return result
185
186
187async def download_video_async(json_path: str, ytdlp_opts: dict) -> str:
188 """Wrapper to run the download function in a thread.
189
190 Generated by GPT-4o.
191 """
192 # Shared dictionary to hold the results
193 result = {}
194 download_thread = threading.Thread(target=download_video, args=(json_path, ytdlp_opts, result))
195 download_thread.start()
196 await asyncio.to_thread(download_thread.join)
197 return result.get("ytdlp_error", "")
198
199
200def create_hook(message: Message | None, loop, *, detail_progress: bool):
201 """Hook to show downloading progress."""
202
203 def hook(d):
204 msg = ""
205 title = d.get("info_dict", {}).get("title", "")
206 ftype = "视频" if d.get("info_dict", {}).get("video_ext", "").lower() != "none" else "音频"
207 emoji = "🎬" if ftype == "视频" else "🎧"
208 status = d.get("status", "")
209 if status == "downloading":
210 downloaded_bytes = float(d.get("downloaded_bytes")) if d.get("downloaded_bytes") else 0
211 total_bytes = float(d.get("total_bytes")) if d.get("total_bytes") else 0
212 total_bytes_estimate = float(d.get("total_bytes_estimate")) if d.get("total_bytes_estimate") else 0
213 total = max(total_bytes, total_bytes_estimate)
214 eta = float(d.get("eta")) if d.get("eta") else 0 # seconds
215 speed = float(d.get("speed")) if d.get("speed") else 0 # bytes/second
216 finished = downloaded_bytes / total if total > 0 else 0
217 msg += f"⏬{ftype}下载: {readable_size(downloaded_bytes)} / {readable_size(total)} ({finished:.2%})\n"
218 msg += f"⚡️当前网速: {readable_size(speed)}/s\n"
219 msg += f"🕒剩余时长: {readable_time(eta)}\n"
220 msg += f"{emoji}{title}"
221 elif status == "finished":
222 msg = f"✅{ftype}下载完成\n{emoji}{title}"
223 elif status == "error":
224 msg = f"❌{ftype}下载失败\n{emoji}{title}"
225 asyncio.run_coroutine_threadsafe(modify_progress(message, msg.strip(), detail_progress=detail_progress), loop)
226
227 return hook