Commit 8da8a97
src/preview/ytdlp.py
@@ -1,12 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
+import json
import re
import threading
import time
import warnings
-from collections import Counter
from pathlib import Path
+from urllib.parse import quote_plus, unquote_plus
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
from loguru import logger
@@ -60,7 +61,7 @@ async def preview_ytdlp(
youtube_comments_provider (str, optional): The youtube comments extractor: "free" or "false".
proxy (str, optional): Proxy to use. Defaults to None.
"""
- logger.trace(f"url: {url} kwargs: {kwargs}")
+ logger.trace(f"{url=} {proxy=} {kwargs=}")
if kwargs.get("show_progress") and "progress" not in kwargs:
res = await send2tg(client, message, texts=f"🔗正在解析{platform}链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
@@ -75,7 +76,6 @@ async def preview_ytdlp(
if ytdlp_audio_only:
ytdlp_send_video = False
if not ytdlp_send_video:
- ytdlp_audio_only = True
ytdlp_send_audio = True
ydl_opts = {
@@ -95,7 +95,7 @@ async def preview_ytdlp(
"retry_sleep_functions": {"http": lambda _: 1}, # sleep 1 second between retries
"nocheckcertificate": True,
"source_address": "0.0.0.0", # force-ipv4 # noqa: S104
- "outtmpl": "%(title)s.%(ext)s",
+ "outtmpl": "%(id)s.%(ext)s",
"noplaylist": True,
"color": "no_color-tty",
"logger": logger,
@@ -104,64 +104,36 @@ async def preview_ytdlp(
loop = asyncio.get_running_loop()
hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
ydl_opts["progress_hooks"] = [hook]
-
- await modify_progress(text="⏬正在下载, 请稍候...", **kwargs)
- error_msg, info = await download_video_async(url, ydl_opts)
- if error_msg:
- if proxy != PROXY.YTDLP_FALLBACK and ("Sign in" in error_msg or "请登录" in error_msg):
- raise ProxyError(error_msg)
- cache.delete("modify_progress")
- await modify_progress(text=error_msg, force_update=True, **kwargs)
- return
- logger.trace(info)
- download_info = info["requested_downloads"][0]
- if not download_info:
- await modify_progress(text="❌下载失败, 请重试", force_update=True, **kwargs)
+ json_file = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
+ info = download_video_info(url, ydl_opts, json_file)
+ if ytdlp_error := info.get("ytdlp_error"):
+ if proxy != PROXY.YTDLP_FALLBACK and ("Sign in" in ytdlp_error or "请登录" in ytdlp_error):
+ raise ProxyError(ytdlp_error)
+ await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
return
-
- final_path = Path(download_info.get("filepath", "")) # maybe video or audio
- if not final_path.is_file():
+ await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
+ ytdlp_error = await download_video_async(json_file, ydl_opts)
+ if ytdlp_error:
+ await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
return
+ video_path = info.get("video_path", Path(""))
+ audio_path = info.get("audio_path", Path(""))
# only save messages when both video and audio are uploaded
- save_to_db = False
- if ytdlp_audio_only: # audio only
- ytdlp_send_video = False
- video_path = Path("")
- audio_path = final_path
- else: # video and audio
- video_path = final_path
- audio_info = next((x for x in download_info.get("requested_formats", []) if x["audio_ext"].lower() != "none"), {})
- audio_format_id = audio_info.get("format_id", "")
- audio_ext = audio_info.get("audio_ext", "")
- audio_path = video_path.with_suffix(f".f{audio_format_id}.{audio_ext}")
- if video_path.is_file() and audio_path.is_file():
- save_to_db = True
- msg = "✅下载成功:"
- if video_path.is_file():
- msg += f"\n🎬视频大小: {readable_size(path=video_path)}"
- if audio_path.is_file():
- msg += f"\n🎧音频大小: {readable_size(path=audio_path)}"
- title = info.get("title", "")
- msg += f"\n📝{title}"
+ save_to_db = bool(video_path.is_file() and audio_path.is_file())
+ msg = f"✅下载成功:\n{info['summary']}"
logger.success(f"{msg!r}")
await modify_progress(text=msg.strip(), **kwargs)
- author = info.get("uploader", info.get("series", info.get("extractor", "")))
- author_url = info.get("uploader_url", "")
- if not author_url:
- author_id = info.get("uploader_id", "")
- author_url = f"https://www.youtube.com/{author_id}" if platform == "youtube" else f"https://space.bilibili.com/{author_id}"
-
duration = round(float(info.get("duration", "0")))
texts = kwargs.get("send_from_user") or ""
- platform_emoji = "🅱️" if platform == "bilibili" else "🔴"
+ emoji = platform_emoji(info["extractor"])
# author
if true(kwargs.get("no_author")):
pass
- elif author and author_url:
- texts += f"{platform_emoji}[{author}]({author_url})"
- elif author:
- texts += f"{platform_emoji}{author}"
+ elif info["author"] and info["author_url"]:
+ texts += f"{emoji}[{info['author']}]({info['author_url']})"
+ elif info["author"]:
+ texts += f"{emoji}{info['author']}"
# date
create_time = ""
@@ -173,8 +145,8 @@ async def preview_ytdlp(
texts += f"\n🕒{create_time}"
# title
- if not true(kwargs.get("no_title")) and title:
- texts += f"\n📝[{title}]({url})"
+ if not true(kwargs.get("no_title")) and info["title"]:
+ texts += f"\n📝[{info['title']}]({url})"
# desc
if not true(kwargs.get("no_description")) and (desc := info.get("description")) and (desc != "-"):
@@ -184,9 +156,9 @@ async def preview_ytdlp(
texts += f"\n{make_bvid_clickable(desc_text)}"
# comments
comments = []
- if platform == "bilibili":
+ if "bilibili" in info["extractor"]:
comments = await get_bilibili_comments(kwargs.get("bvid"), bilibili_comments_provider)
- if platform == "youtube":
+ if "youtube" in info["extractor"]:
comments = await get_youtube_comments(kwargs.get("vid"), youtube_comments_provider)
for comment in comments:
@@ -197,24 +169,18 @@ async def preview_ytdlp(
target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
reply_msg_id = kwargs.get("reply_msg_id", 0)
reply_parameters = get_reply_to(message.id, reply_msg_id)
- thumb = generate_cover(final_path) # generate cover based on final_path
+ thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
if not Path(thumb).is_file():
thumb = None
# split large videos into multiple parts (less than 2GB)
if video_path.is_file():
if video_path.stat().st_size < MAX_FILE_BYTES:
- await modify_progress(text=f"🎬视频大小: {readable_size(path=video_path)}", **kwargs)
- else:
await modify_progress(text="🎬视频大小超过Telegram限制(2000MB), 正在切分...", **kwargs)
videos = preprocess_media([{"video": video_path, "thumb": thumb}])
- if len(videos) > 1:
- await modify_progress(text=f"🎬视频已切分为{len(videos)}份, 开始上传...", **kwargs)
- await asyncio.sleep(1)
-
for idx, video in enumerate(videos):
video["thumb"] = thumb
caption = texts.replace("📝[", f"📝[P{idx + 1}-") if len(videos) > 1 else texts
- await modify_progress(text=f"⏫视频上传中-P{idx + 1}: {readable_size(path=video['video'])}\n🎬{Path(video['video']).name}", force_update=True, **kwargs)
+ await modify_progress(text=f"🎬视频上传中-P{idx + 1}: {readable_size(path=video['video'])}", force_update=True, **kwargs)
sent_messages.append(
await client.send_video(
chat_id=to_int(target_chat),
@@ -227,15 +193,15 @@ async def preview_ytdlp(
)
if audio_path.is_file():
target_chat = target_chat if ytdlp_send_audio else TID.CHANNEL_YTDLP_BACKUP # backup to channel if not send audio, so we can save it to db
- await modify_progress(text=f"⏫音频上传中: {readable_size(path=audio_path)}\n🎧{audio_path.name}", force_update=True, **kwargs)
+ await modify_progress(text=f"🎧音频上传中: {readable_size(path=audio_path)}", force_update=True, **kwargs)
target_chat = to_int(target_chat)
sent_messages.append(
await client.send_audio(
chat_id=target_chat,
audio=audio_path.as_posix(),
caption=texts[:CAPTION_LENGTH],
- performer=author,
- title=title,
+ performer=info["author"],
+ title=info["title"],
duration=duration,
reply_parameters=reply_parameters,
progress=telegram_uploading,
@@ -251,7 +217,8 @@ async def preview_ytdlp(
metadata[k] = unicode_to_ascii(v)
await save_messages(messages=sent_messages, key=url, metadata=metadata)
- cleanup_ytdlp(title)
+ Path(json_file).unlink(missing_ok=True)
+ cleanup_ytdlp(info["id"])
def get_ytdlp_proxy(platform: str) -> str | None:
@@ -356,6 +323,57 @@ def create_hook(message: Message | None, loop, *, detail_progress: bool):
return hook
+def download_video_info(url: str, ydl_opts: dict, json_path: str | Path) -> dict:
+ try:
+ with YoutubeDL(ydl_opts) as ydl:
+ info: dict = ydl.extract_info(url, download=False) # type: ignore
+ with Path(json_path).open("w") as f:
+ json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
+ # add custom fields
+ info["extractor"] = info.get("extractor", "")
+ info["author"] = info.get("uploader", info.get("series", info["extractor"]))
+ info["author_url"] = uploader_url(info, info["extractor"])
+ info["title"] = info.get("title", "")
+ info["duration"] = round(float(info.get("duration", "0")))
+ info["id"] = info.get("id", "")
+ video_info = {}
+ audio_info = {}
+ if requested_formats := info.get("requested_formats", []):
+ # both video and audio are requested
+ video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
+ audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
+ video_ext = video_info.get("ext", "")
+ audio_ext = audio_info.get("ext", "")
+ audio_format_id = audio_info.get("format_id", "")
+ info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
+ info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
+ elif info.get("video_ext", "").lower() != "none": # only video
+ video_ext = info.get("ext", "")
+ info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
+ elif info.get("audio_ext", "").lower() != "none": # only audio
+ audio_ext = info.get("ext", "")
+ info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
+ summary = ""
+ if info["author"]:
+ summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
+ if info["title"]:
+ summary += f"\n📝{info['title']}"
+ if video_info:
+ info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
+ summary += f"\n🎬视频: {video_info['format']} ({readable_size(info['video_size'])})".removesuffix(" (0.0 B)")
+ if audio_info:
+ info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
+ summary += f"\n🎧音频: {audio_info['format']} ({readable_size(info['audio_size'])})".removesuffix(" (0.0 B)")
+ if info["duration"]:
+ summary += f"\n🕒时长: {readable_time(info['duration'])}"
+ info["summary"] = summary.strip()
+ except Exception as e:
+ logger.error(f"Failed to download video info: {e}")
+ info = {"ytdlp_error": str(e)}
+ logger.trace(info)
+ return info
+
+
def retry(func, max_retries=5):
def wrapper(*args, **kwargs):
retries = 0
@@ -378,7 +396,7 @@ def retry(func, max_retries=5):
time.sleep(1)
logger.error(f"Failed after {retries} retries: {msg}")
if msg:
- args[2]["error_msg"] = msg.replace("<", "[").replace(">", "]")
+ args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
return args[2]
return {}
@@ -386,27 +404,47 @@ def retry(func, max_retries=5):
@retry
-def download_video(url: str, ydl_opts: dict, result: dict) -> dict:
+def download_video(json_path: str, ydl_opts: dict, result: dict) -> dict:
with YoutubeDL(ydl_opts) as ydl:
- info: dict = ydl.extract_info(url, download=True, process=True) # type: ignore
- result["info"] = info
+ error_code = ydl.download_with_info_file(json_path) # 0: success, 1: error
+ if error_code != 0 and not result.get("ytdlp_error"):
+ url = unquote_plus(Path(json_path).stem)
+ result["ytdlp_error"] = f"❌下载失败\n{url}"
return result
-async def download_video_async(url: str, ydl_opts: dict) -> tuple[str, dict]:
+async def download_video_async(json_path: str, ydl_opts: dict) -> str:
"""Wrapper to run the download function in a thread.
Generated by GPT-4o.
"""
# Shared dictionary to hold the results
result = {}
- # Create and start the thread
- download_thread = threading.Thread(target=download_video, args=(url, ydl_opts, result))
+ download_thread = threading.Thread(target=download_video, args=(json_path, ydl_opts, result))
download_thread.start()
- # Wait for the thread to finish
await asyncio.to_thread(download_thread.join)
- # Return the result
- return result.get("error_msg", ""), result.get("info", {})
+ return result.get("ytdlp_error", "")
+
+
+def uploader_url(info: dict, extractor: str) -> str:
+ if url := info.get("uploader_url"):
+ return url
+ if author_id := info.get("uploader_id"):
+ extractor = extractor.lower()
+ if "youtube" in extractor:
+ return f"https://www.youtube.com/{author_id}"
+ if "bilibili" in extractor:
+ return f"https://space.bilibili.com/{author_id}"
+ return ""
+
+
+def platform_emoji(extractor: str) -> str:
+ extractor = extractor.lower()
+ if "bilibili" in extractor:
+ return "🅱️"
+ if "youtube" in extractor:
+ return "🔴"
+ return "🆔"
@cache.memoize(ttl=60)
@@ -516,19 +554,11 @@ def make_bvid_clickable(texts: str) -> str:
return re.sub(pattern, markdown_url, texts)
-def cleanup_ytdlp(title: str):
- """Clean up ytdlp files.
-
- Some unicode characters can't be matched with title, so we use common characters to match.
- """
- if not title:
+def cleanup_ytdlp(vid: str):
+ if not vid:
return
- logger.debug(f"Cleaning up: {title}")
- for p in Path(DOWNLOAD_DIR).glob("*"):
- if not p.is_file():
- continue
- fname = re.sub(r"(.*)\.f\d+$", r"\1", p.stem) # remove format id ( title.f137.m4a -> title.m4a )
- common_char = sum((Counter(fname) & Counter(title)).values())
- if common_char / len(fname) > 0.8: # filename overlaps more than 80%
+ logger.debug(f"Cleaning up: {vid}")
+ for p in Path(DOWNLOAD_DIR).glob(f"{vid}.*"):
+ if p.is_file():
logger.trace(f"Deleting ytdlp files: {p}")
p.unlink(missing_ok=True)
src/config.py
@@ -108,9 +108,10 @@ class PROXY: # format: socks5://127.0.0.1:7890
SUBTITLE = os.getenv("SUBTITLE_PROXY", None)
DOWNLOAD = os.getenv("DOWNLOAD_PROXY", None)
WEIBO_COOKIE = os.getenv("WEIBO_COOKIE_PROXY", None) # Weibo visitor cookie
+ YTDLP = os.getenv("YTDLP_PROXY", None) # general proxy for ytdlp
+ YTDLP_FALLBACK = os.getenv("YTDLP_FALLBACK_PROXY", None)
BILIBILI = os.getenv("BILIBILI_PROXY", None)
YOUTUBE = os.getenv("YOUTUBE_PROXY", None)
- YTDLP_FALLBACK = os.getenv("YTDLP_FALLBACK_PROXY", None)
class COOKIE: # See: https://github.com/easychen/CookieCloud
src/utils.py
@@ -250,6 +250,7 @@ if __name__ == "__main__":
print(rand_string())
print(rand_number())
print(cleanup_old_files())
+ print(readable_size(0))
print(readable_size(2000 * 1024 * 1024))
print(unicode_to_ascii("你好"))
print(unicode_to_ascii(1.1))