Commit 205c40d
Changed files (3)
src
src/others/subtitle.py
@@ -1,9 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
-
import io
-from datetime import timedelta
+from datetime import UTC, datetime, timedelta
+from zoneinfo import ZoneInfo
from glom import glom
from loguru import logger
@@ -11,7 +10,7 @@ from pyrogram.client import Client
from pyrogram.types import Message
from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
-from config import API, PREFIX, PROXY, READING_SPEED, TOKEN
+from config import API, PREFIX, PROVIDER, PROXY, READING_SPEED, TOKEN, TZ
from database import cache
from messages.parser import parse_msg
from messages.progress import modify_progress
@@ -29,7 +28,7 @@ HELP = f"""📃**提取字幕**
"""
-async def get_subtitle(client: Client, message: Message, **kwargs):
+async def get_subtitle(client: Client, message: Message, youtube_subtitle_provider: str = PROVIDER.YOUTUBE_SUBTITLE, **kwargs):
"""Get YouTube Subtitle."""
target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
# send docs if message == "/subtitle", without reply
@@ -50,10 +49,7 @@ async def get_subtitle(client: Client, message: Message, **kwargs):
return
cache.set(f"subtitle-{message.chat.id}-{media_group_id}", "1", ttl=120)
- res = await fetch_subtitle(vid)
- if not res:
- await modify_progress(text="❌获取字幕失败", force_update=True, **kwargs)
- return
+ res = await fetch_subtitle(vid, youtube_subtitle_provider)
if error := res.get("error", ""):
await modify_progress(text=error, force_update=True, **kwargs)
return
@@ -62,7 +58,8 @@ async def get_subtitle(client: Client, message: Message, **kwargs):
subtitles = res.get("subtitle", "")
logger.success(subtitles)
if vinfo := await fetch_youtube_video_info(vid):
- caption = f"[{vinfo['title']}]({yt_url})\n字符数: {res['num_chars']}\n阅读时长: {res['reading_minutes']:.1f}分钟"
+ caption = f"🔴[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['date']:%Y-%m-%d %H:%M:%S}\n"
+ caption += f"📝[{vinfo['title']}]({yt_url})\n字符数: {res['num_chars']}\n阅读时长: {res['reading_minutes']:.1f}分钟"
with io.BytesIO(subtitles.encode("utf-8")) as f:
await client.send_document(to_int(target_chat), f, file_name="字幕文件.txt", caption=caption)
else:
@@ -104,30 +101,30 @@ async def find_yt_vid(client: Client, message: Message) -> str:
return ""
-async def fetch_subtitle(video_id: str) -> dict:
- proxy = {"http": PROXY.SUBTITLE, "https": PROXY.SUBTITLE} if PROXY.SUBTITLE else None
- logger.info(f"Fetch Subtitle for {video_id=}, {proxy=}")
+async def fetch_subtitle(video_id: str, provider: str) -> dict:
+ succ = False
+ subtitles = []
try:
- subtitles: list[dict] = YouTubeTranscriptApi.get_transcript(video_id=video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"], proxies=proxy)
- return to_webvtt(subtitles)
+ if "free" in provider:
+ proxy = {"http": PROXY.SUBTITLE, "https": PROXY.SUBTITLE} if PROXY.SUBTITLE else None
+ logger.info(f"Fetch Subtitle via YouTubeTranscriptApi for {video_id=}, {proxy=}")
+ subtitles: list[dict] = YouTubeTranscriptApi.get_transcript(video_id=video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"], proxies=proxy)
+ succ = True
+ if not succ and "tikhub" in provider: # try tikhub
+ logger.info(f"Fetch Subtitle via TikHub for {video_id=}")
+ api_url = f"{API.TIKHUB}/api/v1/youtube/web/get_video_subtitles?video_id={video_id}"
+ headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0)
+ if resp.get("hx_error"):
+ logger.warning(f"Subtitle API failed: {resp['hx_error']}")
+ return {"error": resp["hx_error"]}
+ if error := resp["data"].get("detail", []):
+ return {"error": error}
+ subtitles = resp["data"].get("subtitles", [])
except Exception as e:
logger.error(f"Failed to get subtitle: {e}")
- return await fetch_subtitle_tikhub(video_id)
-
-
-async def fetch_subtitle_tikhub(video_id: str) -> dict:
- logger.info(f"Fetch Subtitle for {video_id=}")
- api_url = f"{API.TIKHUB}/api/v1/youtube/web/get_video_subtitles?video_id={video_id}"
- headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0)
- if resp.get("hx_error"):
- logger.warning(f"Subtitle API failed: {resp['hx_error']}")
- return {}
- if subtitles := resp["data"].get("subtitles", []):
- return to_webvtt(subtitles)
- if error := resp["data"].get("detail", []):
- return {"error": error}
- return {}
+ return {"error": str(e)}
+ return to_webvtt(subtitles)
def to_webvtt(subtitles: list[dict]) -> dict:
@@ -145,6 +142,8 @@ def to_webvtt(subtitles: list[dict]) -> dict:
"num_tokens": 2,
}
"""
+ if not subtitles:
+ return {}
def format_timestamp(seconds: str | float) -> str:
"""Converts seconds to WebVTT timestamp format (hh:mm:ss.mmm)."""
@@ -185,7 +184,16 @@ async def fetch_youtube_video_info(video_id: str) -> dict:
return {}
title = glom(resp, "items.0.snippet.title")
desc = glom(resp, "items.0.snippet.description")
+ author = glom(resp, "items.0.snippet.channelTitle")
+ channel = glom(resp, "items.0.snippet.channelId")
+ pubdate = glom(resp, "items.0.snippet.publishedAt")
except Exception as e:
logger.error(f"Failed to get video info: {e}")
return {}
- return {"title": title, "description": desc}
+ return {
+ "title": title,
+ "description": desc,
+ "author": author,
+ "channel": f"https://www.youtube.com/channel/{channel}",
+ "date": datetime.strptime(pubdate, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ)),
+ }
src/preview/ytdlp.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
+import io
import json
import os
import threading
@@ -29,6 +30,7 @@ from messages.utils import count_without_entities, get_reply_to, smart_split, wa
from multimedia import convert_to_h264, generate_cover
from networking import hx_req
from others.emoji import emojify
+from others.subtitle import fetch_subtitle
from preview.utils import bv2av, make_bvid_clickable
from utils import readable_size, readable_time, remove_none_values, soup_to_text, to_int, true, ts_to_dt, unicode_to_ascii
@@ -48,6 +50,7 @@ async def preview_ytdlp(
bilibili_comments_provider: str = PROVIDER.BILIBILI_COMMENTS,
youtube_comments_provider: str = PROVIDER.YOUTUBE_COMMENTS,
proxy: str | None = None,
+ append_youtube_subtitle: bool = True,
**kwargs,
):
"""Preview ytdlp link in the message.
@@ -62,6 +65,7 @@ async def preview_ytdlp(
bilibili_comments_provider (str, optional): The bilibili comments extractor: "free", "tikhub" or "false"
youtube_comments_provider (str, optional): The youtube comments extractor: "free" or "false".
proxy (str, optional): Proxy to use. Defaults to None.
+ append_youtube_subtitle (bool, optional): Also send youtube subtitle.
"""
logger.trace(f"{url=} {kwargs=}")
if kwargs.get("show_progress") and "progress" not in kwargs:
@@ -198,13 +202,12 @@ async def preview_ytdlp(
)
)
if audio_path.is_file():
- target_chat = target_chat if ytdlp_send_audio else TID.CHANNEL_YTDLP_BACKUP # backup to channel if not send audio, so we can save it to db
+ audio_target_chat = target_chat if ytdlp_send_audio else TID.CHANNEL_YTDLP_BACKUP # backup to channel if not send audio, so we can save it to db
await modify_progress(text=f"🎧音频上传中: {readable_size(path=audio_path)}", force_update=True, **kwargs)
- target_chat = to_int(target_chat)
caption = (await smart_split(texts, CAPTION_LENGTH))[0]
sent_messages.append(
await client.send_audio(
- chat_id=target_chat,
+ chat_id=to_int(audio_target_chat),
audio=audio_path.as_posix(),
caption=warp_comments(caption),
performer=info["author"],
@@ -223,7 +226,12 @@ async def preview_ytdlp(
if v := locals().get(k):
metadata[k] = unicode_to_ascii(v)
await save_messages(messages=sent_messages, key=url, metadata=metadata)
-
+ if "youtube" in info["extractor"] and append_youtube_subtitle and (video_path.is_file() or audio_path.is_file()):
+ res = await fetch_subtitle(video_id=info["id"], provider="free")
+ if subtitles := res.get("subtitle"):
+ caption = f"{emoji}[{info['author']}]({info['author_url']})\n🕒{create_time}\n📝[{info['title']}]({url})\n字符数: {res['num_chars']}\n阅读时长: {res['reading_minutes']:.1f}分钟"
+ with io.BytesIO(subtitles.encode("utf-8")) as f:
+ await client.send_document(to_int(target_chat), f, file_name="字幕文件.txt", caption=caption)
Path(json_file).unlink(missing_ok=True)
cleanup_ytdlp(info["id"])
src/config.py
@@ -21,7 +21,7 @@ MAX_FILE_BYTES = int(os.getenv("MAX_FILE_BYTES", "2000")) * 1024 * 1024 # 4000
ASR_MAX_DURATION = int(os.getenv("ASR_MAX_DURATION", "600"))
MAX_MESSAGE_RETRIEVED = int(os.getenv("MAX_MESSAGE_RETRIEVED", "1000000")) # Maximum number of messages to retrieve
MAX_MESSAGE_SUMMARY = int(os.getenv("MAX_MESSAGE_SUMMARY", "9999")) # Maximum number of messages to summay
-READING_SPEED = int(os.getenv("READING_SPEED", "300")) # words per minute
+READING_SPEED = int(os.getenv("READING_SPEED", "600")) # words per minute
DAILY_MESSAGES = os.getenv("DAILY_MESSAGES", "{}") # Useful for daily checkin for some services. Should be a json string: '{"chat-1": "msg-1", "chat-2": "msg-2"}'
# For ytdlp downloaded video, re-encoding to H264 format. This set the max file size for re-encoding. Default: 1PB
YTDLP_RE_ENCODING_MAX_FILE_BYTES = int(os.getenv("YTDLP_RE_ENCODING_MAX_FILE_BYTES", "1125899906842624"))
@@ -102,6 +102,7 @@ class PROVIDER: # default API provider
WEIBO_COMMENTS = os.getenv("WEIBO_COMMENTS_PROVIDER", "free").lower() # free or a false value (0, false, none, null, etc.)
BILIBILI_COMMENTS = os.getenv("BILIBILI_COMMENTS_PROVIDER", "cookie-free-tikhub").lower() # or a false value to disable (0, false, none, null, etc.)
YOUTUBE_COMMENTS = os.getenv("YOUTUBE_COMMENTS_PROVIDER", "free").lower() # free or a false value (0, false, none, null, etc.)
+ YOUTUBE_SUBTITLE = os.getenv("YOUTUBE_SUBTITLE_PROVIDER", "free-tikhub").lower() # free or tikhub
class TOKEN: