main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4from pathlib import Path
5from typing import Literal
6from urllib.parse import urlparse
7
8from glom import Coalesce, glom
9from loguru import logger
10from pyrogram.enums import ParseMode
11from pyrogram.types import Message
12from yt_dlp.utils import YoutubeDLError
13
14from asr.voice_recognition import asr_file
15from config import CAPTION_LENGTH, COOKIE, DOWNLOAD_DIR, PROXY
16from cookies import ytdlp_bilibili_cookie
17from messages.utils import smart_split
18from multimedia import convert_img_to_telegram_format, generate_cover
19from networking import match_social_media_link
20from subtitles.base import fetch_subtitle
21from utils import count_subtitles, remove_none_values
22
23
24class ProxyError(Exception):
25 pass
26
27
28def get_ytdlp_proxy(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None) -> str | None:
29 """Get ytdlp proxy."""
30 if platform is None: # detect platform from url
31 if not url:
32 logger.warning("No url provided, fallback to default proxy")
33 return PROXY.YTDLP
34 parsed = urlparse(url)
35 host = parsed.netloc # www.youtube.com
36 platform = host.split(".")[-2] # type: ignore
37
38 if proxy is None: # proxy is not set
39 proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
40
41 # empty: no proxy
42 # None: default ytdlp proxy
43 if proxy is None: # fallback to default proxy is unset
44 proxy = PROXY.YTDLP
45 elif proxy == "": # empty string means no proxy
46 proxy = None
47 logger.debug(f"YTDLP Proxy of {platform}: {proxy}")
48 return proxy
49
50
51async def get_ytdlp_opts(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None, *, video: bool = True) -> dict:
52 """Get ytdlp options."""
53 if not proxy:
54 proxy = get_ytdlp_proxy(platform=platform, url=url, proxy=proxy)
55 ytdlp_opts = {
56 "paths": {"home": DOWNLOAD_DIR},
57 "cachedir": DOWNLOAD_DIR,
58 "simulate": False,
59 "skip_download": False,
60 "keepvideo": True,
61 "format": video_selector if video else "m4a/bestaudio/best",
62 "writethumbnail": True,
63 "trim_file_name": 60, # filesystem limit for filename is 255 bytes. UFT-8 char is 1-4 bytes.
64 "proxy": proxy,
65 "extractor_args": {
66 "youtube": {
67 "lang": ["zh-CN", "zh-HK", "zh-TW", "en", "en-GB"],
68 "player_client": ["default", "-tv_simply"], # tv_simply is broken
69 }
70 },
71 "ignore_no_formats_error": False,
72 "live_from_start": False,
73 "retries": 5,
74 "retry_sleep_functions": {"http": lambda _: 1}, # sleep 1 second between retries
75 "nocheckcertificate": True,
76 "source_address": "0.0.0.0", # force-ipv4 # noqa: S104
77 "outtmpl": "%(id)s.%(ext)s",
78 "noplaylist": True,
79 "color": "no_color-tty",
80 "logger": logger,
81 }
82 if platform == "bilibili" and COOKIE.YTDLP_BILIBILI_USE_COOKIE:
83 cookiefile = await ytdlp_bilibili_cookie()
84 logger.trace(f"Use cookie file: {cookiefile}")
85 ytdlp_opts["cookiefile"] = cookiefile
86 ytdlp_opts["external_downloader"] = {"default": "aria2c"}
87 return ytdlp_opts
88
89
90def video_selector(ctx):
91 """Select the best format.
92
93 For the best compatibility, we choose .mp4 extension with AVC codec for video, .m4a extension for audio.
94 """
95 # formats are already sorted worst to best
96 formats = ctx.get("formats")[::-1]
97 if not formats:
98 msg = "No format found."
99 raise YoutubeDLError(msg)
100 formats = remove_none_values(formats)
101 logger.trace(f"Choose best format from {len(formats)} extracted formats")
102 # acodec='none' means there is no audio
103 # find compatible extension, VP9 is not supported by iOS, use AVC instead
104 all_videos = [f for f in formats if f.get("video_ext", "").lower() != "none"]
105 all_audios = [f for f in formats if f.get("audio_ext", "").lower() != "none"]
106 videos = [f for f in all_videos if f.get("video_ext", "").lower() == "mp4" and f.get("acodec", "").lower() == "none" and f.get("vcodec", "").lower().startswith("avc")]
107 audios = [f for f in all_audios if (f.get("resolution", "").lower() == "audio only" and f.get("audio_ext", "").lower() == "m4a")]
108 logger.trace(f"Found {len(videos)} video formats")
109 logger.trace(f"Found {len(audios)} video formats")
110
111 # if no compatible format found, fallback to the best format
112 if not videos:
113 videos = all_videos
114 if not audios:
115 audios = all_audios
116
117 if not videos and not audios:
118 msg = "No video and audio format found."
119 raise YoutubeDLError(msg)
120 elif not videos:
121 best_audio = audios[0]
122 logger.debug(f"Use audio format: {best_audio['format']}")
123 yield {
124 "format_id": f"{best_audio['format_id']}",
125 "ext": best_audio["ext"],
126 "requested_formats": [best_audio],
127 "protocol": f"{best_audio['protocol']}",
128 }
129 elif not audios:
130 best_video = videos[0]
131 logger.debug(f"Use video format: {best_video['format']}")
132 yield {
133 "format_id": f"{best_video['format_id']}",
134 "ext": best_video["ext"],
135 "requested_formats": [best_video],
136 "protocol": f"{best_video['protocol']}",
137 }
138 else:
139 best_video = videos[0]
140 best_audio = audios[0]
141 logger.debug(f"Use video format: {best_video['format']}")
142 logger.debug(f"Use audio format: {best_audio['format']}")
143 yield {
144 "format_id": f"{best_video['format_id']}+{best_audio['format_id']}",
145 "ext": best_video["ext"],
146 "requested_formats": [best_video, best_audio],
147 "protocol": f"{best_video['protocol']}+{best_audio['protocol']}",
148 }
149
150
151def uploader_url(info: dict, extractor: str) -> str:
152 if url := info.get("uploader_url"):
153 return url
154 if author_id := info.get("uploader_id"):
155 if "youtube" in extractor:
156 return f"https://www.youtube.com/{author_id}"
157 if "bilibili" in extractor:
158 return f"https://space.bilibili.com/{author_id}"
159 return ""
160
161
162def platform_emoji(extractor: str) -> str:
163 if "bilibili" in extractor:
164 return "🅱️"
165 if "youtube" in extractor:
166 return "🔴"
167 if "twitch" in extractor:
168 return "🟣"
169 if "facebook" in extractor:
170 return "🔵"
171 return "🆔"
172
173
174def find_thumbnail(video_path: str | Path, audio_path: str | Path) -> str | None:
175 video_path = Path(video_path)
176 audio_path = Path(audio_path)
177 if video_path.is_file():
178 for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
179 if video_path.with_suffix(suffix).is_file():
180 return convert_img_to_telegram_format(video_path.with_suffix(suffix)).as_posix()
181 if audio_path.is_file():
182 stem = audio_path.stem.split(".")[0] # remove format_id
183 for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
184 if audio_path.parent.joinpath(stem + suffix).is_file():
185 return convert_img_to_telegram_format(audio_path.parent.joinpath(stem + suffix)).as_posix()
186 # no thumbnail found, generate one
187 thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
188 if Path(thumb).is_file():
189 return convert_img_to_telegram_format(thumb).as_posix()
190 return None
191
192
193async def get_subtitles(audio_path: str | Path, url: str, asr_engine: str, vinfo: dict) -> str:
194 # send subtitles
195 subtitles = ""
196 matched = await match_social_media_link(url)
197 reference = generate_prompt(vinfo, "correction")
198 if matched["platform"] in ["bilibili", "youtube"]: # get subtitle from API first
199 res = await fetch_subtitle(url=url, reference=reference)
200 subtitles = res.get("subtitles", "") # only subtitles, no Bilibili's AI summary
201 if not subtitles:
202 res = await asr_file(audio_path, asr_engine, corrector_reference=reference, silent=True)
203 subtitles = res.get("texts", "")
204 if count_subtitles(subtitles) < 20:
205 subtitles = "" # ignore too short transcription
206 return subtitles
207
208
209async def append_subtitle(name: str, sent_messages: dict) -> dict:
210 """Add subtitle to sent messages.
211
212 sent_message:
213 {
214 "video": list[Message],
215 "audio": Message,
216 }
217 """
218
219 async def new_caption(m: Message) -> str:
220 # insert name after description
221 html = glom(m, "content.html", default="")
222 lines = html.split("\n")
223 pos = -1
224 for i, line in enumerate(lines):
225 if line.startswith("📝<a href="):
226 pos = i + 1
227 break
228 lines.insert(pos, name)
229 captions = await smart_split("\n".join(lines), CAPTION_LENGTH)
230 caption = captions[0]
231 if "<blockquote expandable>" in caption and "</blockquote>" not in caption:
232 caption += "</blockquote>"
233 return caption
234
235 video_msgs = []
236 audio_msg = None
237 for k, message in sent_messages.items():
238 if k == "video":
239 video_msgs = [await msg.edit_caption(await new_caption(msg), parse_mode=ParseMode.HTML) for msg in message]
240 else:
241 audio_msg = await message.edit_caption(await new_caption(message), parse_mode=ParseMode.HTML)
242 modified = {}
243 if all(isinstance(x, Message) for x in video_msgs):
244 modified["video"] = video_msgs
245 if isinstance(audio_msg, Message):
246 modified["audio"] = audio_msg
247 return modified
248
249
250def generate_prompt(info: dict, target: Literal["summary", "correction"]) -> str:
251 """Generate prompt for AI summary or correction."""
252 prompt = f"以上是{info['extractor'].title()}视频" if target == "summary" else f"本次转录稿为{info['extractor'].title()}平台"
253 if author := info.get("author"):
254 prompt += f"作者【{author}】"
255 prompt += "的一期节目的文字稿。该期节目详情如下:\n"
256 if title := info.get("title"):
257 prompt += f"节目标题: {title}\n"
258 if pubdate := glom(info, Coalesce("pubdate", "upload_date"), default=""):
259 prompt += f"发布日期: {pubdate}\n"
260 if desc := info.get("description"):
261 prompt += f"节目简介: {desc}\n"
262 prompt += "\n请解读本期节目内容。要求: 直接输出节目内容解读, 以“该节目讲述了”开头" if target == "summary" else ""
263 return prompt
264
265
266def cleanup_ytdlp(vid: str):
267 if not vid:
268 return
269 logger.debug(f"Cleaning up: {vid}")
270 for p in Path(DOWNLOAD_DIR).glob(f"{vid}*"):
271 if p.is_file():
272 logger.trace(f"Deleting ytdlp files: {p}")
273 p.unlink(missing_ok=True)