main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import os
  4from pathlib import Path
  5from typing import Literal
  6from urllib.parse import urlparse
  7
  8from glom import Coalesce, glom
  9from loguru import logger
 10from pyrogram.enums import ParseMode
 11from pyrogram.types import Message
 12from yt_dlp.utils import YoutubeDLError
 13
 14from asr.voice_recognition import asr_file
 15from config import CAPTION_LENGTH, DOWNLOAD_DIR, PROXY
 16from cookies import ytdlp_bilibili_cookie
 17from messages.utils import smart_split
 18from multimedia import convert_img_to_telegram_format, generate_cover
 19from networking import match_social_media_link
 20from subtitles.base import fetch_subtitle
 21from utils import count_subtitles, remove_none_values
 22
 23
 24class ProxyError(Exception):
 25    pass
 26
 27
 28def get_ytdlp_proxy(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None) -> str | None:
 29    """Get ytdlp proxy."""
 30    if platform is None:  # detect platform from url
 31        if not url:
 32            logger.warning("No url provided, fallback to default proxy")
 33            return PROXY.YTDLP
 34        parsed = urlparse(url)
 35        host = parsed.netloc  # www.youtube.com
 36        platform = host.split(".")[-2]  # type: ignore
 37
 38    if proxy is None:  # proxy is not set
 39        proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
 40
 41    # empty: no proxy
 42    # None: default ytdlp proxy
 43    if proxy is None:  # fallback to default proxy is unset
 44        proxy = PROXY.YTDLP
 45    elif proxy == "":  # empty string means no proxy
 46        proxy = None
 47    logger.debug(f"YTDLP Proxy of {platform}: {proxy}")
 48    return proxy
 49
 50
 51async def get_ytdlp_opts(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None, *, video: bool = True, use_aria2: bool = False) -> dict:
 52    """Get ytdlp options."""
 53    if not proxy:
 54        proxy = get_ytdlp_proxy(platform=platform, url=url, proxy=proxy)
 55    ytdlp_opts = {
 56        "paths": {"home": DOWNLOAD_DIR},
 57        "cachedir": DOWNLOAD_DIR,
 58        "simulate": False,
 59        "skip_download": False,
 60        "keepvideo": True,
 61        "format": video_selector if video else "m4a/bestaudio/best",
 62        "writethumbnail": True,
 63        "trim_file_name": 60,  # filesystem limit for filename is 255 bytes. UFT-8 char is 1-4 bytes.
 64        "proxy": proxy,
 65        "extractor_args": {
 66            "youtube": {
 67                "lang": ["zh-CN", "zh-HK", "zh-TW", "en", "en-GB"],
 68                "player_client": ["default", "-tv_simply"],  # tv_simply is broken
 69            }
 70        },
 71        "ignore_no_formats_error": False,
 72        "live_from_start": False,
 73        "retries": 20,
 74        "retry_sleep_functions": {"http": lambda _: 1},  # sleep 1 second between retries
 75        "nocheckcertificate": True,
 76        "source_address": "0.0.0.0",  # force-ipv4  # noqa: S104
 77        "outtmpl": "%(id)s.%(ext)s",
 78        "noplaylist": True,
 79        "color": "no_color-tty",
 80        "logger": logger,
 81    }
 82    if platform == "bilibili":
 83        cookiefile = await ytdlp_bilibili_cookie()
 84        logger.trace(f"Use cookie file: {cookiefile}")
 85        ytdlp_opts["cookiefile"] = cookiefile
 86        if use_aria2:
 87            ytdlp_opts["external_downloader"] = {"default": "aria2c"}
 88    return ytdlp_opts
 89
 90
 91def video_selector(ctx):
 92    """Select the best format.
 93
 94    For the best compatibility, we choose .mp4 extension with AVC codec for video, .m4a extension for audio.
 95    """
 96    # formats are already sorted worst to best
 97    formats = ctx.get("formats")[::-1]
 98    if not formats:
 99        msg = "No format found."
100        raise YoutubeDLError(msg)
101    formats = remove_none_values(formats)
102    logger.trace(f"Choose best format from {len(formats)} extracted formats")
103    # acodec='none' means there is no audio
104    # find compatible extension, VP9 is not supported by iOS, use AVC instead
105    all_videos = [f for f in formats if f.get("video_ext", "").lower() != "none"]
106    all_audios = [f for f in formats if f.get("audio_ext", "").lower() != "none"]
107    videos = [f for f in all_videos if f.get("video_ext", "").lower() == "mp4" and f.get("acodec", "").lower() == "none" and f.get("vcodec", "").lower().startswith("avc")]
108    audios = [f for f in all_audios if (f.get("resolution", "").lower() == "audio only" and f.get("audio_ext", "").lower() == "m4a")]
109    logger.trace(f"Found {len(videos)} video formats")
110    logger.trace(f"Found {len(audios)} video formats")
111
112    # if no compatible format found, fallback to the best format
113    if not videos:
114        videos = all_videos
115    if not audios:
116        audios = all_audios
117
118    if not videos and not audios:
119        msg = "No video and audio format found."
120        raise YoutubeDLError(msg)
121    elif not videos:
122        best_audio = audios[0]
123        logger.debug(f"Use audio format: {best_audio['format']}")
124        yield {
125            "format_id": f"{best_audio['format_id']}",
126            "ext": best_audio["ext"],
127            "requested_formats": [best_audio],
128            "protocol": f"{best_audio['protocol']}",
129        }
130    elif not audios:
131        best_video = videos[0]
132        logger.debug(f"Use video format: {best_video['format']}")
133        yield {
134            "format_id": f"{best_video['format_id']}",
135            "ext": best_video["ext"],
136            "requested_formats": [best_video],
137            "protocol": f"{best_video['protocol']}",
138        }
139    else:
140        best_video = videos[0]
141        best_audio = audios[0]
142        logger.debug(f"Use video format: {best_video['format']}")
143        logger.debug(f"Use audio format: {best_audio['format']}")
144        yield {
145            "format_id": f"{best_video['format_id']}+{best_audio['format_id']}",
146            "ext": best_video["ext"],
147            "requested_formats": [best_video, best_audio],
148            "protocol": f"{best_video['protocol']}+{best_audio['protocol']}",
149        }
150
151
152def uploader_url(info: dict, extractor: str) -> str:
153    if url := info.get("uploader_url"):
154        return url
155    if author_id := info.get("uploader_id"):
156        if "youtube" in extractor:
157            return f"https://www.youtube.com/{author_id}"
158        if "bilibili" in extractor:
159            return f"https://space.bilibili.com/{author_id}"
160    return ""
161
162
163def platform_emoji(extractor: str) -> str:
164    if "bilibili" in extractor:
165        return "๐Ÿ…ฑ๏ธ"
166    if "youtube" in extractor:
167        return "๐Ÿ”ด"
168    if "twitch" in extractor:
169        return "๐ŸŸฃ"
170    if "facebook" in extractor:
171        return "๐Ÿ”ต"
172    return "๐Ÿ†”"
173
174
175def find_thumbnail(video_path: str | Path, audio_path: str | Path) -> str | None:
176    video_path = Path(video_path)
177    audio_path = Path(audio_path)
178    if video_path.is_file():
179        for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
180            if video_path.with_suffix(suffix).is_file():
181                return convert_img_to_telegram_format(video_path.with_suffix(suffix)).as_posix()
182    if audio_path.is_file():
183        stem = audio_path.stem.split(".")[0]  # remove format_id
184        for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
185            if audio_path.parent.joinpath(stem + suffix).is_file():
186                return convert_img_to_telegram_format(audio_path.parent.joinpath(stem + suffix)).as_posix()
187    # no thumbnail found, generate one
188    thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
189    if Path(thumb).is_file():
190        return convert_img_to_telegram_format(thumb).as_posix()
191    return None
192
193
194async def get_subtitles(audio_path: str | Path, url: str, asr_engine: str, vinfo: dict, *, enable_corrector: bool = False) -> str:
195    # send subtitles
196    subtitles = ""
197    matched = await match_social_media_link(url)
198    reference = generate_prompt(vinfo)
199    if matched["platform"] in ["bilibili", "youtube"]:  # get subtitle from API first
200        res = await fetch_subtitle(url=url, reference=reference, enable_corrector=enable_corrector)
201        subtitles = res.get("subtitles", "")  # only subtitles, no Bilibili's AI summary
202    if not subtitles:
203        res = await asr_file(audio_path, asr_engine, corrector_reference=reference, enable_corrector=enable_corrector, silent=True)
204        subtitles = res.get("texts", "")
205        if count_subtitles(subtitles) < 200:
206            subtitles = ""  # ignore too  short transcription
207    return subtitles
208
209
210async def append_tag(name: str, sent_messages: dict) -> dict:
211    """Add subtitle to sent messages.
212
213    sent_message:
214    {
215        "video": list[Message],
216        "audio": Message,
217    }
218    """
219
220    async def new_caption(m: Message) -> str:
221        # insert name after description
222        html = glom(m, "content.html", default="")
223        lines = html.split("\n")
224        pos = -1
225        for i, line in enumerate(lines):
226            if line.startswith("๐Ÿ“<a href="):
227                pos = i + 1
228                break
229        lines.insert(pos, name)
230        captions = await smart_split("\n".join(lines), CAPTION_LENGTH)
231        caption = captions[0]
232        if "<blockquote expandable>" in caption and "</blockquote>" not in caption:
233            caption += "</blockquote>"
234        return caption
235
236    video_msgs = []
237    audio_msg = None
238    for k, message in sent_messages.items():
239        if k == "video":
240            video_msgs = [await msg.edit_caption(await new_caption(msg), parse_mode=ParseMode.HTML) for msg in message]
241        else:
242            audio_msg = await message.edit_caption(await new_caption(message), parse_mode=ParseMode.HTML)
243    modified = {}
244    if all(isinstance(x, Message) for x in video_msgs):
245        modified["video"] = video_msgs
246    if isinstance(audio_msg, Message):
247        modified["audio"] = audio_msg
248    return modified
249
250
251def generate_prompt(info: dict) -> str:
252    """Generate prompt for AI summary or correction."""
253    prompt = f"่ฏฅ่ฝฌๅฝ•็จฟๅฏนๅบ”ไบŽ{info['extractor'].title()}ๅนณๅฐ"
254    if author := info.get("author"):
255        prompt += f"ไฝœ่€…ใ€{author}ใ€‘"
256    prompt += "็š„ไธ€ๆœŸ่Š‚็›ฎ๏ผŒ่Š‚็›ฎ่ฏฆๆƒ…ๅฆ‚ไธ‹:\n"
257    if title := info.get("title"):
258        prompt += f"่Š‚็›ฎๆ ‡้ข˜: {title}\n"
259    if pubdate := glom(info, Coalesce("pubdate", "upload_date"), default=""):
260        prompt += f"ๅ‘ๅธƒๆ—ฅๆœŸ: {pubdate}\n"
261    if desc := info.get("description"):
262        prompt += f"่Š‚็›ฎ็ฎ€ไป‹: {desc}\n"
263    return prompt
264
265
266def cleanup_ytdlp(vid: str):
267    if not vid:
268        return
269    logger.debug(f"Cleaning up: {vid}")
270    for p in Path(DOWNLOAD_DIR).glob(f"{vid}*"):
271        if p.is_file():
272            logger.trace(f"Deleting ytdlp files: {p}")
273            p.unlink(missing_ok=True)