Commit e4305ae

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-07-28 16:31:35
fix(subtitle): align with the latest version of `youtube_transcript_api`
1 parent ac97c84
Changed files (1)
src
subtitles
src/subtitles/base.py
@@ -5,13 +5,15 @@ from datetime import timedelta
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.types import Message
-from youtube_transcript_api import YouTubeTranscriptApi  # type: ignore
+from youtube_transcript_api import IpBlocked, RequestBlocked, YouTubeTranscriptApi
+from youtube_transcript_api.proxies import GenericProxyConfig
 
 from config import API, PREFIX, PROXY, READING_SPEED, TOKEN, cache
 from messages.parser import parse_msg
 from messages.utils import startswith_prefix
 from networking import hx_req, match_social_media_link
 from preview.utils import bilibili_subtitle_and_summary
+from utils import seconds_to_time
 
 
 async def match_url(client: Client, message: Message) -> str:
@@ -64,13 +66,18 @@ async def fetch_subtitle(url: str, provider: str) -> dict:
     if matched["platform"] == "bilibili":
         return await bilibili_subtitle_and_summary(url)
     video_id = matched["vid"]
-    try:
-        if "free" in provider:
-            proxy = {"http": PROXY.SUBTITLE, "https": PROXY.SUBTITLE} if PROXY.SUBTITLE else None
-            logger.info(f"Fetch Subtitle via YouTubeTranscriptApi for {video_id=}, {proxy=}")
-            subtitles: list[dict] = YouTubeTranscriptApi.get_transcript(video_id=video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"], proxies=proxy)
+    if "free" in provider:
+        try:
+            proxy = GenericProxyConfig(http_url=PROXY.SUBTITLE, https_url=PROXY.SUBTITLE) if PROXY.SUBTITLE else None
+            logger.info(f"Fetch Subtitle via YouTubeTranscriptApi for {video_id=}, proxy={PROXY.SUBTITLE}")
+            ytt_api = YouTubeTranscriptApi(proxy_config=proxy)
+            resp = ytt_api.fetch(video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"])
+            subtitles: list[dict] = resp.to_raw_data()
             succ = True
-        if not succ and "tikhub" in provider:  # try tikhub
+        except (IpBlocked, RequestBlocked):
+            logger.warning(f"Subtitle API IP blocked: {video_id=}")
+    if not succ and "tikhub" in provider:  # try tikhub
+        try:
             logger.info(f"Fetch Subtitle via TikHub for {video_id=}")
             api_url = f"{API.TIKHUB}/api/v1/youtube/web/get_video_subtitles?video_id={video_id}"
             headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
@@ -81,14 +88,14 @@ async def fetch_subtitle(url: str, provider: str) -> dict:
             if error := resp["data"].get("detail", []):
                 return {"error": error}
             subtitles = resp["data"].get("subtitles", [])
-    except Exception as e:
-        logger.error(f"Failed to get subtitle: {e}")
-        return {"error": error}
-    return await to_transcription(subtitles)
+        except Exception as e:
+            logger.error(f"Failed to get subtitle: {e}")
+            return {"error": error}
+    return to_transcription(subtitles)
 
 
-async def to_transcription(subtitles: list[dict]) -> dict:
-    """Converts subtitles to "[minute:second] transcription" format.
+def to_transcription(subtitles: list[dict]) -> dict:
+    """Converts subtitles to "[hh:mm:ss] transcription" format.
 
     sample subtitles = [
         {'text': 'hello', 'start': 0.056, 'duration': 2.88},
@@ -97,7 +104,7 @@ async def to_transcription(subtitles: list[dict]) -> dict:
 
     Returns:
         dict: {
-            "subtitles": "[minute:second] texts",
+            "subtitles": "[hh:mm:ss] texts",
             "num_chars": len(texts),
             "reading_minutes": 2,
             }
@@ -109,9 +116,8 @@ async def to_transcription(subtitles: list[dict]) -> dict:
     num_chars = 0
 
     for subtitle in subtitles:
-        minutes = int(float(subtitle["start"]) // 60)
-        seconds = int(float(subtitle["start"]) % 60)
-        sentences.append(f"[{minutes}:{seconds:02d}] {subtitle['text']}")
+        seconds = subtitle["start"]
+        sentences.append(f"[{seconds_to_time(seconds)}] {subtitle['text']}")
         num_chars += len(subtitle["text"])
     return {
         "subtitles": "\n".join(sentences),