main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4from pathlib import Path
5from typing import Literal
6from urllib.parse import urlparse
7
8from glom import Coalesce, glom
9from loguru import logger
10from pyrogram.enums import ParseMode
11from pyrogram.types import Message
12from yt_dlp.utils import YoutubeDLError
13
14from asr.voice_recognition import asr_file
15from config import CAPTION_LENGTH, DOWNLOAD_DIR, PROXY
16from cookies import ytdlp_bilibili_cookie
17from messages.utils import smart_split
18from multimedia import convert_img_to_telegram_format, generate_cover
19from networking import match_social_media_link
20from subtitles.base import fetch_subtitle
21from utils import count_subtitles, remove_none_values
22
23
24class ProxyError(Exception):
25 pass
26
27
28def get_ytdlp_proxy(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None) -> str | None:
29 """Get ytdlp proxy."""
30 if platform is None: # detect platform from url
31 if not url:
32 logger.warning("No url provided, fallback to default proxy")
33 return PROXY.YTDLP
34 parsed = urlparse(url)
35 host = parsed.netloc # www.youtube.com
36 platform = host.split(".")[-2] # type: ignore
37
38 if proxy is None: # proxy is not set
39 proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
40
41 # empty: no proxy
42 # None: default ytdlp proxy
43 if proxy is None: # fallback to default proxy is unset
44 proxy = PROXY.YTDLP
45 elif proxy == "": # empty string means no proxy
46 proxy = None
47 logger.debug(f"YTDLP Proxy of {platform}: {proxy}")
48 return proxy
49
50
51async def get_ytdlp_opts(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None, *, video: bool = True, use_aria2: bool = False) -> dict:
52 """Get ytdlp options."""
53 if not proxy:
54 proxy = get_ytdlp_proxy(platform=platform, url=url, proxy=proxy)
55 ytdlp_opts = {
56 "paths": {"home": DOWNLOAD_DIR},
57 "cachedir": DOWNLOAD_DIR,
58 "simulate": False,
59 "skip_download": False,
60 "keepvideo": True,
61 "format": video_selector if video else "m4a/bestaudio/best",
62 "writethumbnail": True,
63 "trim_file_name": 60, # filesystem limit for filename is 255 bytes. UFT-8 char is 1-4 bytes.
64 "proxy": proxy,
65 "extractor_args": {
66 "youtube": {
67 "lang": ["zh-CN", "zh-HK", "zh-TW", "en", "en-GB"],
68 "player_client": ["default", "-tv_simply"], # tv_simply is broken
69 }
70 },
71 "ignore_no_formats_error": False,
72 "live_from_start": False,
73 "retries": 20,
74 "retry_sleep_functions": {"http": lambda _: 1}, # sleep 1 second between retries
75 "nocheckcertificate": True,
76 "source_address": "0.0.0.0", # force-ipv4 # noqa: S104
77 "outtmpl": "%(id)s.%(ext)s",
78 "noplaylist": True,
79 "color": "no_color-tty",
80 "logger": logger,
81 }
82 if platform == "bilibili":
83 cookiefile = await ytdlp_bilibili_cookie()
84 logger.trace(f"Use cookie file: {cookiefile}")
85 ytdlp_opts["cookiefile"] = cookiefile
86 if use_aria2:
87 ytdlp_opts["external_downloader"] = {"default": "aria2c"}
88 return ytdlp_opts
89
90
91def video_selector(ctx):
92 """Select the best format.
93
94 For the best compatibility, we choose .mp4 extension with AVC codec for video, .m4a extension for audio.
95 """
96 # formats are already sorted worst to best
97 formats = ctx.get("formats")[::-1]
98 if not formats:
99 msg = "No format found."
100 raise YoutubeDLError(msg)
101 formats = remove_none_values(formats)
102 logger.trace(f"Choose best format from {len(formats)} extracted formats")
103 # acodec='none' means there is no audio
104 # find compatible extension, VP9 is not supported by iOS, use AVC instead
105 all_videos = [f for f in formats if f.get("video_ext", "").lower() != "none"]
106 all_audios = [f for f in formats if f.get("audio_ext", "").lower() != "none"]
107 videos = [f for f in all_videos if f.get("video_ext", "").lower() == "mp4" and f.get("acodec", "").lower() == "none" and f.get("vcodec", "").lower().startswith("avc")]
108 audios = [f for f in all_audios if (f.get("resolution", "").lower() == "audio only" and f.get("audio_ext", "").lower() == "m4a")]
109 logger.trace(f"Found {len(videos)} video formats")
110 logger.trace(f"Found {len(audios)} video formats")
111
112 # if no compatible format found, fallback to the best format
113 if not videos:
114 videos = all_videos
115 if not audios:
116 audios = all_audios
117
118 if not videos and not audios:
119 msg = "No video and audio format found."
120 raise YoutubeDLError(msg)
121 elif not videos:
122 best_audio = audios[0]
123 logger.debug(f"Use audio format: {best_audio['format']}")
124 yield {
125 "format_id": f"{best_audio['format_id']}",
126 "ext": best_audio["ext"],
127 "requested_formats": [best_audio],
128 "protocol": f"{best_audio['protocol']}",
129 }
130 elif not audios:
131 best_video = videos[0]
132 logger.debug(f"Use video format: {best_video['format']}")
133 yield {
134 "format_id": f"{best_video['format_id']}",
135 "ext": best_video["ext"],
136 "requested_formats": [best_video],
137 "protocol": f"{best_video['protocol']}",
138 }
139 else:
140 best_video = videos[0]
141 best_audio = audios[0]
142 logger.debug(f"Use video format: {best_video['format']}")
143 logger.debug(f"Use audio format: {best_audio['format']}")
144 yield {
145 "format_id": f"{best_video['format_id']}+{best_audio['format_id']}",
146 "ext": best_video["ext"],
147 "requested_formats": [best_video, best_audio],
148 "protocol": f"{best_video['protocol']}+{best_audio['protocol']}",
149 }
150
151
152def uploader_url(info: dict, extractor: str) -> str:
153 if url := info.get("uploader_url"):
154 return url
155 if author_id := info.get("uploader_id"):
156 if "youtube" in extractor:
157 return f"https://www.youtube.com/{author_id}"
158 if "bilibili" in extractor:
159 return f"https://space.bilibili.com/{author_id}"
160 return ""
161
162
163def platform_emoji(extractor: str) -> str:
164 if "bilibili" in extractor:
165 return "๐
ฑ๏ธ"
166 if "youtube" in extractor:
167 return "๐ด"
168 if "twitch" in extractor:
169 return "๐ฃ"
170 if "facebook" in extractor:
171 return "๐ต"
172 return "๐"
173
174
175def find_thumbnail(video_path: str | Path, audio_path: str | Path) -> str | None:
176 video_path = Path(video_path)
177 audio_path = Path(audio_path)
178 if video_path.is_file():
179 for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
180 if video_path.with_suffix(suffix).is_file():
181 return convert_img_to_telegram_format(video_path.with_suffix(suffix)).as_posix()
182 if audio_path.is_file():
183 stem = audio_path.stem.split(".")[0] # remove format_id
184 for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
185 if audio_path.parent.joinpath(stem + suffix).is_file():
186 return convert_img_to_telegram_format(audio_path.parent.joinpath(stem + suffix)).as_posix()
187 # no thumbnail found, generate one
188 thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
189 if Path(thumb).is_file():
190 return convert_img_to_telegram_format(thumb).as_posix()
191 return None
192
193
194async def get_subtitles(audio_path: str | Path, url: str, asr_engine: str, vinfo: dict, *, enable_corrector: bool = False) -> str:
195 # send subtitles
196 subtitles = ""
197 matched = await match_social_media_link(url)
198 reference = generate_prompt(vinfo)
199 if matched["platform"] in ["bilibili", "youtube"]: # get subtitle from API first
200 res = await fetch_subtitle(url=url, reference=reference, enable_corrector=enable_corrector)
201 subtitles = res.get("subtitles", "") # only subtitles, no Bilibili's AI summary
202 if not subtitles:
203 res = await asr_file(audio_path, asr_engine, corrector_reference=reference, enable_corrector=enable_corrector, silent=True)
204 subtitles = res.get("texts", "")
205 if count_subtitles(subtitles) < 200:
206 subtitles = "" # ignore too short transcription
207 return subtitles
208
209
210async def append_tag(name: str, sent_messages: dict) -> dict:
211 """Add subtitle to sent messages.
212
213 sent_message:
214 {
215 "video": list[Message],
216 "audio": Message,
217 }
218 """
219
220 async def new_caption(m: Message) -> str:
221 # insert name after description
222 html = glom(m, "content.html", default="")
223 lines = html.split("\n")
224 pos = -1
225 for i, line in enumerate(lines):
226 if line.startswith("๐<a href="):
227 pos = i + 1
228 break
229 lines.insert(pos, name)
230 captions = await smart_split("\n".join(lines), CAPTION_LENGTH)
231 caption = captions[0]
232 if "<blockquote expandable>" in caption and "</blockquote>" not in caption:
233 caption += "</blockquote>"
234 return caption
235
236 video_msgs = []
237 audio_msg = None
238 for k, message in sent_messages.items():
239 if k == "video":
240 video_msgs = [await msg.edit_caption(await new_caption(msg), parse_mode=ParseMode.HTML) for msg in message]
241 else:
242 audio_msg = await message.edit_caption(await new_caption(message), parse_mode=ParseMode.HTML)
243 modified = {}
244 if all(isinstance(x, Message) for x in video_msgs):
245 modified["video"] = video_msgs
246 if isinstance(audio_msg, Message):
247 modified["audio"] = audio_msg
248 return modified
249
250
251def generate_prompt(info: dict) -> str:
252 """Generate prompt for AI summary or correction."""
253 prompt = f"่ฏฅ่ฝฌๅฝ็จฟๅฏนๅบไบ{info['extractor'].title()}ๅนณๅฐ"
254 if author := info.get("author"):
255 prompt += f"ไฝ่
ใ{author}ใ"
256 prompt += "็ไธๆ่็ฎ๏ผ่็ฎ่ฏฆๆ
ๅฆไธ:\n"
257 if title := info.get("title"):
258 prompt += f"่็ฎๆ ้ข: {title}\n"
259 if pubdate := glom(info, Coalesce("pubdate", "upload_date"), default=""):
260 prompt += f"ๅๅธๆฅๆ: {pubdate}\n"
261 if desc := info.get("description"):
262 prompt += f"่็ฎ็ฎไป: {desc}\n"
263 return prompt
264
265
266def cleanup_ytdlp(vid: str):
267 if not vid:
268 return
269 logger.debug(f"Cleaning up: {vid}")
270 for p in Path(DOWNLOAD_DIR).glob(f"{vid}*"):
271 if p.is_file():
272 logger.trace(f"Deleting ytdlp files: {p}")
273 p.unlink(missing_ok=True)