main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import os
  4from pathlib import Path
  5from typing import Literal
  6from urllib.parse import urlparse
  7
  8from glom import Coalesce, glom
  9from loguru import logger
 10from pyrogram.enums import ParseMode
 11from pyrogram.types import Message
 12from yt_dlp.utils import YoutubeDLError
 13
 14from asr.voice_recognition import asr_file
 15from config import CAPTION_LENGTH, COOKIE, DOWNLOAD_DIR, PROXY
 16from cookies import ytdlp_bilibili_cookie
 17from messages.utils import smart_split
 18from multimedia import convert_img_to_telegram_format, generate_cover
 19from networking import match_social_media_link
 20from subtitles.base import fetch_subtitle
 21from utils import count_subtitles, remove_none_values
 22
 23
 24class ProxyError(Exception):
 25    pass
 26
 27
 28def get_ytdlp_proxy(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None) -> str | None:
 29    """Get ytdlp proxy."""
 30    if platform is None:  # detect platform from url
 31        if not url:
 32            logger.warning("No url provided, fallback to default proxy")
 33            return PROXY.YTDLP
 34        parsed = urlparse(url)
 35        host = parsed.netloc  # www.youtube.com
 36        platform = host.split(".")[-2]  # type: ignore
 37
 38    if proxy is None:  # proxy is not set
 39        proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
 40
 41    # empty: no proxy
 42    # None: default ytdlp proxy
 43    if proxy is None:  # fallback to default proxy is unset
 44        proxy = PROXY.YTDLP
 45    elif proxy == "":  # empty string means no proxy
 46        proxy = None
 47    logger.debug(f"YTDLP Proxy of {platform}: {proxy}")
 48    return proxy
 49
 50
 51async def get_ytdlp_opts(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None, *, video: bool = True) -> dict:
 52    """Get ytdlp options."""
 53    if not proxy:
 54        proxy = get_ytdlp_proxy(platform=platform, url=url, proxy=proxy)
 55    ytdlp_opts = {
 56        "paths": {"home": DOWNLOAD_DIR},
 57        "cachedir": DOWNLOAD_DIR,
 58        "simulate": False,
 59        "skip_download": False,
 60        "keepvideo": True,
 61        "format": video_selector if video else "m4a/bestaudio/best",
 62        "writethumbnail": True,
 63        "trim_file_name": 60,  # filesystem limit for filename is 255 bytes. UFT-8 char is 1-4 bytes.
 64        "proxy": proxy,
 65        "extractor_args": {
 66            "youtube": {
 67                "lang": ["zh-CN", "zh-HK", "zh-TW", "en", "en-GB"],
 68                "player_client": ["default", "-tv_simply"],  # tv_simply is broken
 69            }
 70        },
 71        "ignore_no_formats_error": False,
 72        "live_from_start": False,
 73        "retries": 5,
 74        "retry_sleep_functions": {"http": lambda _: 1},  # sleep 1 second between retries
 75        "nocheckcertificate": True,
 76        "source_address": "0.0.0.0",  # force-ipv4  # noqa: S104
 77        "outtmpl": "%(id)s.%(ext)s",
 78        "noplaylist": True,
 79        "color": "no_color-tty",
 80        "logger": logger,
 81    }
 82    if platform == "bilibili" and COOKIE.YTDLP_BILIBILI_USE_COOKIE:
 83        cookiefile = await ytdlp_bilibili_cookie()
 84        logger.trace(f"Use cookie file: {cookiefile}")
 85        ytdlp_opts["cookiefile"] = cookiefile
 86        ytdlp_opts["external_downloader"] = {"default": "aria2c"}
 87    return ytdlp_opts
 88
 89
 90def video_selector(ctx):
 91    """Select the best format.
 92
 93    For the best compatibility, we choose .mp4 extension with AVC codec for video, .m4a extension for audio.
 94    """
 95    # formats are already sorted worst to best
 96    formats = ctx.get("formats")[::-1]
 97    if not formats:
 98        msg = "No format found."
 99        raise YoutubeDLError(msg)
100    formats = remove_none_values(formats)
101    logger.trace(f"Choose best format from {len(formats)} extracted formats")
102    # acodec='none' means there is no audio
103    # find compatible extension, VP9 is not supported by iOS, use AVC instead
104    all_videos = [f for f in formats if f.get("video_ext", "").lower() != "none"]
105    all_audios = [f for f in formats if f.get("audio_ext", "").lower() != "none"]
106    videos = [f for f in all_videos if f.get("video_ext", "").lower() == "mp4" and f.get("acodec", "").lower() == "none" and f.get("vcodec", "").lower().startswith("avc")]
107    audios = [f for f in all_audios if (f.get("resolution", "").lower() == "audio only" and f.get("audio_ext", "").lower() == "m4a")]
108    logger.trace(f"Found {len(videos)} video formats")
109    logger.trace(f"Found {len(audios)} video formats")
110
111    # if no compatible format found, fallback to the best format
112    if not videos:
113        videos = all_videos
114    if not audios:
115        audios = all_audios
116
117    if not videos and not audios:
118        msg = "No video and audio format found."
119        raise YoutubeDLError(msg)
120    elif not videos:
121        best_audio = audios[0]
122        logger.debug(f"Use audio format: {best_audio['format']}")
123        yield {
124            "format_id": f"{best_audio['format_id']}",
125            "ext": best_audio["ext"],
126            "requested_formats": [best_audio],
127            "protocol": f"{best_audio['protocol']}",
128        }
129    elif not audios:
130        best_video = videos[0]
131        logger.debug(f"Use video format: {best_video['format']}")
132        yield {
133            "format_id": f"{best_video['format_id']}",
134            "ext": best_video["ext"],
135            "requested_formats": [best_video],
136            "protocol": f"{best_video['protocol']}",
137        }
138    else:
139        best_video = videos[0]
140        best_audio = audios[0]
141        logger.debug(f"Use video format: {best_video['format']}")
142        logger.debug(f"Use audio format: {best_audio['format']}")
143        yield {
144            "format_id": f"{best_video['format_id']}+{best_audio['format_id']}",
145            "ext": best_video["ext"],
146            "requested_formats": [best_video, best_audio],
147            "protocol": f"{best_video['protocol']}+{best_audio['protocol']}",
148        }
149
150
151def uploader_url(info: dict, extractor: str) -> str:
152    if url := info.get("uploader_url"):
153        return url
154    if author_id := info.get("uploader_id"):
155        if "youtube" in extractor:
156            return f"https://www.youtube.com/{author_id}"
157        if "bilibili" in extractor:
158            return f"https://space.bilibili.com/{author_id}"
159    return ""
160
161
162def platform_emoji(extractor: str) -> str:
163    if "bilibili" in extractor:
164        return "🅱️"
165    if "youtube" in extractor:
166        return "🔴"
167    if "twitch" in extractor:
168        return "🟣"
169    if "facebook" in extractor:
170        return "🔵"
171    return "🆔"
172
173
174def find_thumbnail(video_path: str | Path, audio_path: str | Path) -> str | None:
175    video_path = Path(video_path)
176    audio_path = Path(audio_path)
177    if video_path.is_file():
178        for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
179            if video_path.with_suffix(suffix).is_file():
180                return convert_img_to_telegram_format(video_path.with_suffix(suffix)).as_posix()
181    if audio_path.is_file():
182        stem = audio_path.stem.split(".")[0]  # remove format_id
183        for suffix in [".jpg", ".png", ".jpeg", ".webp"]:
184            if audio_path.parent.joinpath(stem + suffix).is_file():
185                return convert_img_to_telegram_format(audio_path.parent.joinpath(stem + suffix)).as_posix()
186    # no thumbnail found, generate one
187    thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
188    if Path(thumb).is_file():
189        return convert_img_to_telegram_format(thumb).as_posix()
190    return None
191
192
193async def get_subtitles(audio_path: str | Path, url: str, asr_engine: str, vinfo: dict) -> str:
194    # send subtitles
195    subtitles = ""
196    matched = await match_social_media_link(url)
197    reference = generate_prompt(vinfo, "correction")
198    if matched["platform"] in ["bilibili", "youtube"]:  # get subtitle from API first
199        res = await fetch_subtitle(url=url, reference=reference)
200        subtitles = res.get("subtitles", "")  # only subtitles, no Bilibili's AI summary
201    if not subtitles:
202        res = await asr_file(audio_path, asr_engine, corrector_reference=reference, silent=True)
203        subtitles = res.get("texts", "")
204        if count_subtitles(subtitles) < 20:
205            subtitles = ""  # ignore too  short transcription
206    return subtitles
207
208
209async def append_subtitle(name: str, sent_messages: dict) -> dict:
210    """Add subtitle to sent messages.
211
212    sent_message:
213    {
214        "video": list[Message],
215        "audio": Message,
216    }
217    """
218
219    async def new_caption(m: Message) -> str:
220        # insert name after description
221        html = glom(m, "content.html", default="")
222        lines = html.split("\n")
223        pos = -1
224        for i, line in enumerate(lines):
225            if line.startswith("📝<a href="):
226                pos = i + 1
227                break
228        lines.insert(pos, name)
229        captions = await smart_split("\n".join(lines), CAPTION_LENGTH)
230        caption = captions[0]
231        if "<blockquote expandable>" in caption and "</blockquote>" not in caption:
232            caption += "</blockquote>"
233        return caption
234
235    video_msgs = []
236    audio_msg = None
237    for k, message in sent_messages.items():
238        if k == "video":
239            video_msgs = [await msg.edit_caption(await new_caption(msg), parse_mode=ParseMode.HTML) for msg in message]
240        else:
241            audio_msg = await message.edit_caption(await new_caption(message), parse_mode=ParseMode.HTML)
242    modified = {}
243    if all(isinstance(x, Message) for x in video_msgs):
244        modified["video"] = video_msgs
245    if isinstance(audio_msg, Message):
246        modified["audio"] = audio_msg
247    return modified
248
249
250def generate_prompt(info: dict, target: Literal["summary", "correction"]) -> str:
251    """Generate prompt for AI summary or correction."""
252    prompt = f"以上是{info['extractor'].title()}视频" if target == "summary" else f"本次转录稿为{info['extractor'].title()}平台"
253    if author := info.get("author"):
254        prompt += f"作者【{author}"
255    prompt += "的一期节目的文字稿。该期节目详情如下:\n"
256    if title := info.get("title"):
257        prompt += f"节目标题: {title}\n"
258    if pubdate := glom(info, Coalesce("pubdate", "upload_date"), default=""):
259        prompt += f"发布日期: {pubdate}\n"
260    if desc := info.get("description"):
261        prompt += f"节目简介: {desc}\n"
262    prompt += "\n请解读本期节目内容。要求: 直接输出节目内容解读, 以“该节目讲述了”开头" if target == "summary" else ""
263    return prompt
264
265
266def cleanup_ytdlp(vid: str):
267    if not vid:
268        return
269    logger.debug(f"Cleaning up: {vid}")
270    for p in Path(DOWNLOAD_DIR).glob(f"{vid}*"):
271        if p.is_file():
272            logger.trace(f"Deleting ytdlp files: {p}")
273            p.unlink(missing_ok=True)