main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import json
5import threading
6import time
7from pathlib import Path
8from typing import Literal
9from urllib.parse import quote_plus, unquote_plus
10
11from glom import Coalesce, glom
12from loguru import logger
13from pyrogram.types import Message
14from yt_dlp import YoutubeDL
15from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
16
17from config import DOWNLOAD_DIR, PROXY, YTDLP_DOWNLOAD_MAX_FILE_BYTES
18from messages.progress import modify_progress
19from utils import readable_size, readable_time, true
20from ytdlp.utils import ProxyError, find_thumbnail, get_ytdlp_opts, platform_emoji, uploader_url
21
22
23async def ytdlp_download(
24 url: str,
25 platform: Literal["youtube", "bilibili", "ytdlp"] | None = None,
26 proxy: str | None = None,
27 *,
28 use_aria2: bool = False,
29 **kwargs,
30) -> dict:
31 """Download video from url.
32
33 Returns:
34 dict: downloaded info.
35 {
36 "video_path": Path("video_path"),
37 "audio_path": Path("audio_path"),
38 "thumb": str(thumbnail_path),
39 "author": "author",
40 "author_url": "author_url",
41 "title": "title",
42 "duration": 123,
43 "extractor": "youtube",
44 "id": "id",
45 "json_path": "json_path",
46 "summary": "summary",
47 }
48 """
49 placeholder = {"video_path": Path("/non-exist"), "audio_path": Path("/non-exist"), "thumb": None}
50 ytdlp_opts = await get_ytdlp_opts(url=url, platform=platform, proxy=proxy, video=true(kwargs.get("ytdlp_download_video")), use_aria2=use_aria2)
51 if kwargs.get("show_progress"):
52 loop = asyncio.get_running_loop()
53 hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
54 ytdlp_opts["progress_hooks"] = [hook]
55 logger.info(f"Downloading via proxy: {ytdlp_opts['proxy']} of {url}")
56 # download json first
57 json_path = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
58 info = download_video_info(url, ytdlp_opts, json_path)
59 if ytdlp_error := info.get("ytdlp_error"):
60 if PROXY.YTDLP_FALLBACK and proxy != PROXY.YTDLP_FALLBACK:
61 await modify_progress(del_status=True, **kwargs)
62 raise ProxyError(ytdlp_error)
63 await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
64 return placeholder
65 await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
66 ytdlp_error = await download_video_async(json_path, ytdlp_opts)
67 if ytdlp_error:
68 await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
69 return placeholder
70 msg = f"✅下载成功:\n{info['summary']}"
71 logger.success(f"{msg!r}")
72 info["thumb"] = find_thumbnail(info["video_path"], info["audio_path"])
73 # correct audio format == .mp4
74 if info["audio_path"].suffix == ".mp4":
75 new_path = info["audio_path"].with_suffix(".m4a")
76 info["audio_path"].rename(new_path)
77 info["audio_path"] = new_path
78
79 # delete video_only file (no audio channel), this file is no longer needed
80 format_id = info.get("format_id", "") # 299+140
81 for fmt_id in [x.strip() for x in format_id.split("+") if x.strip()]: # ['299', '140']
82 video_ext = info["video_path"].suffix # .mp4
83 Path(DOWNLOAD_DIR).joinpath(f"{info['id']}.f{fmt_id}{video_ext}").unlink(missing_ok=True)
84
85 await modify_progress(text=msg.strip(), force_update=True, **kwargs)
86 return info
87
88
89def download_video_info(url: str, ytdlp_opts: dict, json_path: str | Path) -> dict:
90 try:
91 with YoutubeDL(ytdlp_opts) as ydl:
92 info: dict = ydl.extract_info(url, download=False)
93 with Path(json_path).open("w") as f:
94 json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
95 # add custom fields
96 info["extractor"] = info.get("extractor", "").lower()
97 info["author"] = glom(info, Coalesce("uploader", "series", "extractor"))
98 info["author_url"] = uploader_url(info, info["extractor"])
99 info["title"] = info.get("title", "")
100 info["duration"] = round(float(info.get("duration", "0")))
101 info["id"] = info.get("id", "")
102 info["json_path"] = Path(json_path).as_posix()
103 video_info = {}
104 audio_info = {}
105 if requested_formats := info.get("requested_formats", []):
106 # both video and audio are requested
107 video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
108 audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
109 video_ext = video_info.get("ext", "")
110 audio_ext = audio_info.get("ext", "")
111 audio_format_id = audio_info.get("format_id", "")
112 info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
113 info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
114 elif info.get("video_ext", "").lower() != "none": # only video
115 video_ext = info.get("ext", "")
116 info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
117 info["audio_path"] = Path("/non-exist")
118 elif info.get("audio_ext", "").lower() != "none": # only audio
119 audio_ext = info.get("ext", "")
120 info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
121 info["video_path"] = Path("/non-exist")
122 else:
123 info["video_path"] = Path("/non-exist")
124 info["audio_path"] = Path("/non-exist")
125 summary = ""
126 if info["author"]:
127 summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
128 if info["title"]:
129 summary += f"\n📝{info['title']}"
130 if video_info:
131 info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
132 summary += f"\n🎬视频: {video_info['format']} ({readable_size(info['video_size'])})".removesuffix(" (0.0 B)")
133 if audio_info:
134 info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
135 summary += f"\n🎧音频: {audio_info['format']} ({readable_size(info['audio_size'])})".removesuffix(" (0.0 B)")
136 if info["duration"]:
137 summary += f"\n🕒时长: {readable_time(info['duration'])}"
138 info["summary"] = summary.strip()
139 media_size = int(info.get("video_size", 0)) + int(info.get("audio_size", 0))
140 if media_size > YTDLP_DOWNLOAD_MAX_FILE_BYTES:
141 info["ytdlp_error"] = f"{summary.strip()}\n**⚠️视频文件过大: {readable_size(media_size)}**\n**⚠️机器硬盘限制: {readable_size(YTDLP_DOWNLOAD_MAX_FILE_BYTES)}**"
142
143 except Exception as e:
144 logger.error(f"Failed to download video info: {e}")
145 info = {"ytdlp_error": str(e)}
146 logger.trace(info)
147 return info
148
149
150def retry(func, max_retries=5):
151 def wrapper(*args, **kwargs):
152 retries = 0
153 msg = ""
154 while retries < max_retries:
155 try:
156 return func(*args, **kwargs)
157 except ExtractorError as e:
158 msg = f"ExtractorError: {str(e.orig_msg).removeprefix('ERROR: ')}"
159 except DownloadError as e:
160 msg = f"DownloadError: {str(e.msg).removeprefix('ERROR: ')}"
161 if any(x in msg.lower() for x in ["sign in", "请登录", "地区", "国家", "country", "删除", "deleted"]):
162 retries += 1
163 break
164 except YoutubeDLError as e:
165 msg = f"YoutubeDLError: {str(e.msg).removeprefix('ERROR: ')}"
166 except Exception as e:
167 msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
168 retries += 1
169 time.sleep(1)
170 logger.error(f"Failed after {retries} retries: {msg}")
171 if msg:
172 args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
173 return args[2]
174 return {}
175
176 return wrapper
177
178
179@retry
180def download_video(json_path: str, ytdlp_opts: dict, result: dict) -> dict:
181 with YoutubeDL(ytdlp_opts) as ydl:
182 error_code = ydl.download_with_info_file(json_path) # 0: success, 1: error
183 if error_code != 0 and not result.get("ytdlp_error"):
184 url = unquote_plus(Path(json_path).stem)
185 result["ytdlp_error"] = f"❌下载失败\n{url}"
186 return result
187
188
189async def download_video_async(json_path: str, ytdlp_opts: dict) -> str:
190 """Wrapper to run the download function in a thread.
191
192 Generated by GPT-4o.
193 """
194 # Shared dictionary to hold the results
195 result = {}
196 download_thread = threading.Thread(target=download_video, args=(json_path, ytdlp_opts, result))
197 download_thread.start()
198 await asyncio.to_thread(download_thread.join)
199 return result.get("ytdlp_error", "")
200
201
202def create_hook(message: Message | None, loop, *, detail_progress: bool):
203 """Hook to show downloading progress."""
204
205 def hook(d):
206 msg = ""
207 title = d.get("info_dict", {}).get("title", "")
208 ftype = "视频" if d.get("info_dict", {}).get("video_ext", "").lower() != "none" else "音频"
209 emoji = "🎬" if ftype == "视频" else "🎧"
210 status = d.get("status", "")
211 if status == "downloading":
212 downloaded_bytes = float(d.get("downloaded_bytes")) if d.get("downloaded_bytes") else 0
213 total_bytes = float(d.get("total_bytes")) if d.get("total_bytes") else 0
214 total_bytes_estimate = float(d.get("total_bytes_estimate")) if d.get("total_bytes_estimate") else 0
215 total = max(total_bytes, total_bytes_estimate)
216 eta = float(d.get("eta")) if d.get("eta") else 0 # seconds
217 speed = float(d.get("speed")) if d.get("speed") else 0 # bytes/second
218 finished = downloaded_bytes / total if total > 0 else 0
219 msg += f"⏬{ftype}下载: {readable_size(downloaded_bytes)} / {readable_size(total)} ({finished:.2%})\n"
220 msg += f"⚡️当前网速: {readable_size(speed)}/s\n"
221 msg += f"🕒剩余时长: {readable_time(eta)}\n"
222 msg += f"{emoji}{title}"
223 elif status == "finished":
224 msg = f"✅{ftype}下载完成\n{emoji}{title}"
225 elif status == "error":
226 msg = f"❌{ftype}下载失败\n{emoji}{title}"
227 asyncio.run_coroutine_threadsafe(modify_progress(message, msg.strip(), detail_progress=detail_progress), loop)
228
229 return hook