Commit 9f060b9
Changed files (15)
src
src/asr/voice_recognition.py
@@ -149,7 +149,7 @@ async def voice_to_text(
await modify_progress(text=msg, force_update=True, **kwargs)
return
- res = await asr_file(path, engine=asr_engine, duration=asr_msg_info["duration"], tencent_language=tencent_language, client=client, message=msg_to_asr, **kwargs)
+ res = await asr_file(path, engine=asr_engine, tencent_language=tencent_language, client=client, message=msg_to_asr, **kwargs)
if error := res.get("error"):
await modify_progress(kwargs.get("progress"), text=error, force_update=True)
return
@@ -188,7 +188,6 @@ async def voice_to_text(
async def asr_file(
path: str | Path,
engine: str = "",
- duration: float = 0,
*,
tencent_language: str = "16k_zh-PY",
delete_local_file: bool = True,
src/messages/utils.py
@@ -230,7 +230,7 @@ async def set_reaction(client: Client, message: Message, reaction: str | list[st
await client.set_reaction(message.chat.id, message.id)
-async def delete_message(message: Message):
+async def delete_message(message: Message | None):
if not isinstance(message, Message):
return
with contextlib.suppress(Exception):
src/preview/bilibili.py
@@ -1,21 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+"""This file contains the code for extracting information from Bilibili videos.
-from bilibili_api import opus
-from glom import glom
+But not for downloading Bilibili videos.
+For downloading Bilibili videos, please see `src/preview/ytdlp.py`.
+"""
+
+import re
+from datetime import datetime
+from pathlib import Path
+from zoneinfo import ZoneInfo
+
+from bilibili_api import ApiException, Credential, comment, opus, video
+from glom import Coalesce, flatten, glom
from loguru import logger
from pyrogram.client import Client
+from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
from pyrogram.types import Message
-from config import DB, cache
+from config import DB, READING_SPEED, TZ, cache
+from cookies import bilibili_cookie_dict
from database.database import get_db
from messages.database import copy_messages_from_db, save_messages
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import summay_media
-from networking import download_file, download_media
+from networking import download_file, download_media, hx_req
from others.emoji import emojify
-from utils import ts_to_dt
+from utils import av2bv, count_subtitles, https_url, number_to_emoji, readable_count, seconds_to_hms, ts_to_dt
async def preview_bilibili(
@@ -29,8 +41,6 @@ async def preview_bilibili(
):
"""Preview bilibili info in the message.
- This scripit is NOT for bilibili videos. For videos, see `ytdlp.py`
-
Args:
client (Client): The Pyrogram client.
message (Message): The trigger message object.
@@ -117,3 +127,233 @@ async def parse_bilibili_opus(post_id: str, **kwargs) -> dict: # type: ignore
logger.warning(f"Bilibili Opus parse failed: {e}")
return {"error_msg": str(e)}
return info
+
+
+@cache.memoize(ttl=120)
+async def get_bilibili_vinfo(url_or_vid: int | str) -> dict:
+ """Get Bilibili video info.
+
+ Returns:
+ {
+ "downloadable": (bool),
+ "error_msg": (str),
+ "title": (str),
+ "description": (str),
+ "author": (str),
+ "channel": (str) channel url,
+ "pubdate": (str)
+ "upload_date": (str)
+ "view_count": (int),
+ "like_count": (int),
+ "favorite_count": (int),
+ "coin_count": (int),
+ "comment_count": (int),
+ "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
+ "emoji": (str) "🅱️"
+ }
+
+ """
+ if not url_or_vid:
+ return {"downloadable": False, "error_msg": "❌未提供VideoID"}
+ info = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
+ try:
+ logger.info(f"Fetch Bilibili video info for {url_or_vid}")
+ vid = bilibili_url2vid(url_or_vid)
+ v = video.Video(bvid=av2bv(vid))
+ info = await v.get_info()
+ info["title"] = info.get("title", "Title")
+ info["description"] = glom(info, Coalesce("desc", "desc_v2.0.raw_text", default=""))
+ info["author"] = glom(info, "owner.name", default="B站UP主")
+ info["channel"] = f"https://space.bilibili.com/{glom(info, 'owner.mid', default='')}"
+ info["pubdate"] = datetime.fromtimestamp(info["pubdate"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
+ info["upload_date"] = datetime.fromtimestamp(info["ctime"], tz=ZoneInfo(TZ)).strftime("%Y-%m-%d %H:%M:%S")
+
+ # statistics
+ info |= {
+ "view_count": int(glom(info, "stat.view", default=0)),
+ "like_count": int(glom(info, "stat.like", default=0)),
+ "favorite_count": int(glom(info, "stat.favorite", default=0)),
+ "coin_count": int(glom(info, "stat.coin", default=0)),
+ "comment_count": int(glom(info, "stat.reply", default=0)),
+ }
+ statistics = ""
+ if view := info.get("view_count"):
+ statistics += f"👁{readable_count(view)}"
+ if like := info.get("like_count"):
+ statistics += f"👍{readable_count(like)}"
+ if coin := info.get("coin_count"):
+ statistics += f"🪙{readable_count(coin)}"
+ if favorite := info.get("favorite_count"):
+ statistics += f"⭐️{readable_count(favorite)}"
+ if comment := info.get("comment_count"):
+ statistics += f"💬{readable_count(comment)}"
+ info["statistics"] = statistics
+
+ info |= {"downloadable": True, "error_msg": ""}
+
+ except ApiException as e:
+ logger.error(f"Failed to get video info: {e}")
+ return {"downloadable": False, "error_msg": "❌" + str(e.msg)}
+ except Exception as e:
+ logger.error(f"Failed to get video info: {e}")
+ return info
+ return info | {"emoji": "🅱️"}
+
+
+async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
+ """(Depracated) Get Bilibili subtitle.
+
+ This function is deprecated, it only returns the subtitle url,
+ We need to download it from the url and parse it.
+ Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
+
+ Returns:
+ dict: {
+ "subtitles": "[minute:second] texts",
+ "num_chars": len(texts),
+ "reading_minutes": 2,
+ }
+ """
+ try:
+ # url to vid
+ info = await get_bilibili_vinfo(url_or_vid)
+ cid = info["cid"]
+ cookie = await bilibili_cookie_dict()
+ credential = Credential(sessdata=cookie["SESSDATA"])
+ v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
+ res = await v.get_subtitle(cid=cid)
+ if subtitles := res.get("subtitles", []):
+ subtitle_url = next((x.get("subtitle_url", "") for x in subtitles if "中文" in x.get("lan_doc", "")), "")
+ data = await hx_req(https_url(subtitle_url), check_keys=["body"])
+ items = data["body"]
+ sentences = []
+ num_chars = 0
+ for subtitle in items:
+ sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
+ num_chars += len(subtitle["content"])
+ return {
+ "subtitles": "\n".join(sentences),
+ "num_chars": num_chars,
+ "reading_minutes": num_chars / READING_SPEED,
+ }
+ except Exception as e:
+ logger.error(e)
+ return {"error": "下载B站内嵌字幕失败"}
+
+
+async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
+ """Get Bilibili comments."""
+ comments = []
+ try:
+ # url to vid
+ cookie = await bilibili_cookie_dict()
+ credential = Credential(sessdata=cookie["SESSDATA"])
+ info = await get_bilibili_vinfo(url_or_vid)
+ response = await comment.get_comments_lazy(oid=info["aid"], type_=comment.CommentResourceType.VIDEO, order=comment.OrderType.LIKE, credential=credential)
+ data = response.get("replies", [])
+ data = sorted(data, key=lambda x: x.get("like", 0), reverse=True)
+ except Exception as e:
+ logger.error(f"Failed to get Bilibili comments: {e}")
+ return []
+ try:
+ for idx, x in enumerate(data):
+ name = glom(x, "member.uname", default="匿名")
+ if uid := glom(x, "member.mid", default=""):
+ name = f"[{name}](https://space.bilibili.com/{uid})"
+ location = glom(x, "reply_control.location", default="").removeprefix("IP属地:") # noqa: RUF001
+ location = f"({location})" if location else ""
+ if cmt := glom(x, "content.message", default=""):
+ if idx == 0:
+ comments.append(f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}💬**点此展开评论区**:")
+ comments.append(f"\n💬**{name}**{location}: {emojify(cmt)}")
+ except Exception as e:
+ logger.error(f"Failed to get Bilibili comments: {e}")
+ return []
+ return comments
+
+
+async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
+ """Get Bilibili subtitles and AI summary.
+
+ Returns:
+ dict: {
+ "summary": "AI summary texts",
+ "subtitles": "[minute:second] texts",
+ "num_chars": len(texts),
+ "reading_minutes": 2,
+ "full": "summary first, followed by subtitles",
+ }
+ """
+ try:
+ # url to vid
+ info = await get_bilibili_vinfo(url_or_vid)
+ cid = info["cid"]
+ cookie = await bilibili_cookie_dict()
+ credential = Credential(sessdata=cookie["SESSDATA"])
+ v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
+ res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
+ # First, get subtitles
+ if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
+ final = await get_bilibili_subtitle(url_or_vid) # use `get_bilibili_subtitle`
+ subtitles = final.get("subtitles", "")
+ else:
+ subtitles = ""
+ for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
+ if item.get("content", ""):
+ subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
+ final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
+
+ # Then get AI summary
+ summary = ""
+ if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0: # has summary
+ summary += glom(res, "model_result.summary", default="")
+ outlines = glom(res, "model_result.outline", default=[])
+ for idx, outline in enumerate(outlines):
+ summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
+ for item in glom(outline, "part_outline", default=[]):
+ summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
+ if summary:
+ final["summary"] = summary.strip()
+ if summary and subtitles:
+ final["full"] = f"AI总结(B站版):\n{summary}\n\n\n外挂字幕(B站版):\n{subtitles.strip()}" # noqa: RUF001
+ except Exception as e:
+ logger.error(e)
+ return {"error": "下载B站AI总结失败"}
+ return final
+
+
+def make_bvid_clickable(texts: str) -> str:
+ """Make bvid in texts clickable.
+
+ "BV1234567890" -> [BV1234567890](https://www.bilibili.com/video/BV1234567890)
+
+ bvid format: https://github.com/SocialSisterYi/bilibili-API-collect/blob/18c1efb/docs/misc/bvid_desc.md
+ Args:
+ texts (str): The texts to process.
+
+ Returns:
+ str: bvid with markdown url.
+ """
+ if not texts:
+ return ""
+
+ def markdown_url(match):
+ if match.group(1): # full url
+ bvid = match.group(3)
+ return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
+ # bvid only
+ bvid = match.group(0)
+ return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
+
+ # match bilibili links or bvid only
+ pattern = r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(BV1[a-zA-Z0-9]{9})\b|\bBV1[a-zA-Z0-9]{9}\b"
+ return re.sub(pattern, markdown_url, texts)
+
+
+def bilibili_url2vid(url: str | int) -> str:
+ if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(url)): # noqa: RUF001
+ base_url = matched.group(0).split("?")[0]
+ return Path(base_url).stem
+
+ # already vid
+ return av2bv(url)
src/preview/utils.py
@@ -1,224 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import asyncio
import re
-from datetime import UTC, datetime
-from pathlib import Path
-from zoneinfo import ZoneInfo
-
-from bilibili_api import Credential, comment, video
-from glom import flatten, glom
-from loguru import logger
-from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
-
-from config import PROXY, READING_SPEED, TOKEN, TZ, cache
-from cookies import bilibili_cookie_dict
-from networking import hx_req
-from others.emoji import emojify
-from utils import av2bv, count_subtitles, https_url, number_to_emoji, seconds_to_hms
-
-
-def make_bvid_clickable(texts: str) -> str:
- """Make bvid in texts clickable.
-
- "BV1234567890" -> [BV1234567890](https://www.bilibili.com/video/BV1234567890)
-
- bvid format: https://github.com/SocialSisterYi/bilibili-API-collect/blob/18c1efb/docs/misc/bvid_desc.md
- Args:
- texts (str): The texts to process.
-
- Returns:
- str: bvid with markdown url.
- """
- if not texts:
- return ""
-
- def markdown_url(match):
- if match.group(1): # full url
- bvid = match.group(3)
- return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
- # bvid only
- bvid = match.group(0)
- return f"[{bvid}](https://www.bilibili.com/video/{bvid})"
-
- # match bilibili links or bvid only
- pattern = r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(BV1[a-zA-Z0-9]{9})\b|\bBV1[a-zA-Z0-9]{9}\b"
- return re.sub(pattern, markdown_url, texts)
-
-
-def bilibili_url2vid(url: str | int) -> str:
- if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(url)): # noqa: RUF001
- base_url = matched.group(0).split("?")[0]
- return Path(base_url).stem
-
- # already vid
- return av2bv(url)
-
-
-@cache.memoize(ttl=120)
-async def get_bilibili_video_info(url_or_vid: int | str) -> dict:
- """Get Bilibili video info."""
- vid = bilibili_url2vid(url_or_vid)
- v = video.Video(bvid=av2bv(vid))
- info = await v.get_info()
- info["author"] = glom(info, "owner.name", default="匿名UP")
- info["channel"] = f"https://space.bilibili.com/{glom(info, 'owner.mid', default=1)}"
- info["date"] = datetime.fromtimestamp(info["ctime"], tz=ZoneInfo(TZ))
- info["emoji"] = "🅱️"
- return info
-
-
-async def get_bilibili_subtitle(url_or_vid: int | str) -> dict:
- """(Depracated) Get Bilibili subtitle.
-
- This function is deprecated, it just get the subtitle url first,
- then we need to download the subtitle and parse it.
- Please use `bilibili_subtitle_and_summary` instead, it can get subtitles directly with AI summary.
-
- Returns:
- dict: {
- "subtitles": "[minute:second] texts",
- "num_chars": len(texts),
- "reading_minutes": 2,
- }
- """
- try:
- # url to vid
- info = await get_bilibili_video_info(url_or_vid)
- cid = info["cid"]
- cookie = await bilibili_cookie_dict()
- credential = Credential(sessdata=cookie["SESSDATA"])
- v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
- res = await v.get_subtitle(cid=cid)
- if subtitles := res.get("subtitles", []):
- subtitle_url = next((x.get("subtitle_url", "") for x in subtitles if "中文" in x.get("lan_doc", "")), "")
- data = await hx_req(https_url(subtitle_url), check_keys=["body"])
- items = data["body"]
- sentences = []
- num_chars = 0
- for subtitle in items:
- sentences.append(f"[{seconds_to_hms(subtitle['from'])}] {subtitle['content']}")
- num_chars += len(subtitle["content"])
- return {
- "subtitles": "\n".join(sentences),
- "num_chars": num_chars,
- "reading_minutes": num_chars / READING_SPEED,
- }
- except Exception as e:
- logger.error(e)
- return {"error": "下载B站内嵌字幕失败"}
-
-
-async def get_bilibili_comments(url_or_vid: int | str) -> list[str]:
- """Get Bilibili comments."""
- comments = []
- try:
- # url to vid
- cookie = await bilibili_cookie_dict()
- credential = Credential(sessdata=cookie["SESSDATA"])
- info = await get_bilibili_video_info(url_or_vid)
- response = await comment.get_comments_lazy(oid=info["aid"], type_=comment.CommentResourceType.VIDEO, order=comment.OrderType.LIKE, credential=credential)
- data = response.get("replies", [])
- data = sorted(data, key=lambda x: x.get("like", 0), reverse=True)
- except Exception as e:
- logger.error(f"Failed to get Bilibili comments: {e}")
- return []
- try:
- for idx, x in enumerate(data):
- name = glom(x, "member.uname", default="匿名")
- if uid := glom(x, "member.mid", default=""):
- name = f"[{name}](https://space.bilibili.com/{uid})"
- location = glom(x, "reply_control.location", default="").removeprefix("IP属地:") # noqa: RUF001
- location = f"({location})" if location else ""
- if cmt := glom(x, "content.message", default=""):
- if idx == 0:
- comments.append(f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}💬**点此展开评论区**:")
- comments.append(f"\n💬**{name}**{location}: {emojify(cmt)}")
- except Exception as e:
- logger.error(f"Failed to get Bilibili comments: {e}")
- return []
- return comments
-
-
-async def bilibili_subtitle_and_summary(url_or_vid: int | str) -> dict:
- """Get Bilibili subtitles and AI summary.
-
- Returns:
- dict: {
- "summary": "AI summary texts",
- "subtitles": "[minute:second] texts",
- "num_chars": len(texts),
- "reading_minutes": 2,
- "full": "summary first, followed by subtitles",
- }
- """
- try:
- # url to vid
- info = await get_bilibili_video_info(url_or_vid)
- cid = info["cid"]
- cookie = await bilibili_cookie_dict()
- credential = Credential(sessdata=cookie["SESSDATA"])
- v = video.Video(bvid=bilibili_url2vid(url_or_vid), credential=credential)
- res = await v.get_ai_conclusion(cid=cid, up_mid=glom(info, "owner.mid", default=None))
- # First, get subtitles
- if not glom(res, "model_result.subtitle.0.part_subtitle.0", default=None):
- final = await get_bilibili_subtitle(url_or_vid) # use `get_bilibili_subtitle`
- subtitles = final.get("subtitles", "")
- else:
- subtitles = ""
- for item in flatten(glom(res, "model_result.subtitle.*.part_subtitle.*", default=None)):
- if item.get("content", ""):
- subtitles += f"\n[{seconds_to_hms(item['start_timestamp'])}] {item['content']}"
- final = {"subtitles": subtitles.strip(), "num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
-
- # Then get AI summary
- summary = ""
- if glom(res, "code", default=-1) == 0 and glom(res, "model_result.result_type", default=None) != 0: # has summary
- summary += glom(res, "model_result.summary", default="")
- outlines = glom(res, "model_result.outline", default=[])
- for idx, outline in enumerate(outlines):
- summary += f"\n\n{number_to_emoji(idx + 1)} {outline.get('title', '')}"
- for item in glom(outline, "part_outline", default=[]):
- summary += f"\n[{seconds_to_hms(item['timestamp'])}] {item['content']}"
- if summary:
- final["summary"] = summary.strip()
- if summary and subtitles:
- final["full"] = f"AI总结(B站版):\n{summary}\n\n\n外挂字幕(B站版):\n{subtitles}".strip() # noqa: RUF001
- except Exception as e:
- logger.error(e)
- return {"error": "下载B站AI总结失败"}
- return final
-
-
-@cache.memoize(ttl=120)
-async def fetch_youtube_video_info(video_id: str) -> dict:
- """Fetch YouTube video info."""
- if not video_id:
- return {}
- try:
- logger.info(f"Fetch Video info for {video_id=}, proxy={PROXY.SUBTITLE}")
- api = "https://www.googleapis.com/youtube/v3/videos"
- params = {"key": TOKEN.YOUTUBE_API_KEY, "part": "snippet", "id": video_id, "hl": "zh-CN"}
- resp = await hx_req(api, proxy=PROXY.SUBTITLE, params=params, check_keys=["items"], max_retry=0)
- if resp.get("hx_error"):
- logger.warning(f"YouTube Videos API failed: {resp['hx_error']}")
- return {}
- title = glom(resp, "items.0.snippet.title")
- desc = glom(resp, "items.0.snippet.description")
- author = glom(resp, "items.0.snippet.channelTitle")
- channel = glom(resp, "items.0.snippet.channelId")
- pubdate = glom(resp, "items.0.snippet.publishedAt")
- except Exception as e:
- logger.error(f"Failed to get video info: {e}")
- return {}
- return {
- "title": title,
- "description": desc,
- "author": author,
- "channel": f"https://www.youtube.com/channel/{channel}",
- "date": datetime.strptime(pubdate, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ)),
- "emoji": "🔴",
- }
def has_markdown_img(text: str) -> bool:
@@ -228,7 +10,3 @@ def has_markdown_img(text: str) -> bool:
"""
pattern = r"!\[.*?\]\(.*?\)"
return bool(re.search(pattern, text))
-
-
-if __name__ == "__main__":
- asyncio.run(get_bilibili_subtitle("BV1nYVyz1Er8"))
src/preview/youtube.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""This file contains the code for extracting information from YouTube videos.
+
+But not for downloading YouTube videos.
+For downloading YouTube videos, please see `src/preview/ytdlp.py`.
+"""
+
+from datetime import UTC, datetime
+from zoneinfo import ZoneInfo
+
+from glom import glom
+from loguru import logger
+from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
+
+from config import PROXY, TOKEN, TZ, cache
+from networking import hx_req
+from utils import nowstr, readable_count, true
+
+
+@cache.memoize(ttl=60)
+async def get_youtube_comments(vid: str | None) -> list[str]:
+ if not vid:
+ return []
+ api = "https://www.googleapis.com/youtube/v3/commentThreads"
+ params = {"key": TOKEN.YOUTUBE_API_KEY, "maxResults": 100, "textFormat": "plainText", "part": "snippet", "videoId": vid}
+ comments = []
+ try:
+ resp = await hx_req(api, proxy=PROXY.GOOGLE_SEARCH, params=params, check_keys=["items"])
+ if resp.get("hx_error"):
+ logger.warning(f"YouTube Comments API failed: {resp['hx_error']}")
+ return []
+ data = resp["items"]
+ for idx, x in enumerate(data):
+ name = glom(x, "snippet.topLevelComment.snippet.authorDisplayName", default="匿名")
+ name = name.removeprefix("@")
+ if author_url := glom(x, "snippet.topLevelComment.snippet.authorChannelUrl", default=""):
+ name = f"[{name}]({author_url})"
+ if cmt := glom(x, "snippet.topLevelComment.snippet.textDisplay", default=""):
+ if idx == 0:
+ comments.append(f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}💬**点此展开评论区**:")
+ comments.append(f"\n💬**{name}**: {cmt}")
+ except Exception as e:
+ logger.error(f"Failed to get YouTube comments: {e}")
+ return []
+ return comments
+
+
+@cache.memoize(ttl=120)
+async def get_youtube_vinfo(video_id: str) -> dict:
+ """Fetch YouTube video info.
+
+ Returns:
+ {
+ "downloadable": (bool),
+ "error_msg": (str),
+ "title": (str),
+ "description": (str),
+ "author": (str),
+ "channel": (str) channel url,
+ "pubdate": (str)
+ "has_subtitle": (bool),
+ "is_live": (bool),
+ "live_start": (datetime),
+ "live_end": (datetime),
+ "scheduled_start": (datetime),
+ "view_count": (int),
+ "like_count": (int),
+ "favorite_count": (int),
+ "comment_count": (int),
+ "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
+ "emoji": (str) "🔴"
+ }
+ """
+ if not video_id:
+ return {"downloadable": False, "error_msg": "❌未提供VideoID"}
+ info = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
+ try:
+ logger.info(f"Fetch YouTube video info for {video_id=}, proxy={PROXY.GOOGLE_SEARCH}")
+ api = "https://www.googleapis.com/youtube/v3/videos"
+ params = {"key": TOKEN.YOUTUBE_API_KEY, "part": "snippet,status,contentDetails,liveStreamingDetails,statistics", "id": video_id, "hl": "zh-CN"}
+ resp = await hx_req(api, proxy=PROXY.GOOGLE_SEARCH, params=params, check_keys=["items.0.snippet"], max_retry=3)
+ if resp.get("hx_error"):
+ logger.warning(f"YouTube Videos API failed: {resp['hx_error']}")
+ return {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
+ if not glom(resp, "items.0.snippet", default={}):
+ logger.warning("YouTube Videos API failed: Video not found")
+ return {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
+
+ # basci info
+ info["title"] = glom(resp, "items.0.snippet.title", default="Title")
+ info["description"] = glom(resp, "items.0.snippet.description", default="")
+ info["author"] = glom(resp, "items.0.snippet.channelTitle", default="YouTuber")
+ channel = glom(resp, "items.0.snippet.channelId", default="")
+ info["channel"] = f"https://www.youtube.com/channel/{channel}"
+ if pubdate := glom(resp, "items.0.snippet.publishedAt", default=""):
+ dt = datetime.strptime(pubdate, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
+ info["pubdate"] = f"{dt:%Y-%m-%d %H:%M:%S}"
+ else:
+ info["pubdate"] = nowstr(TZ)
+ info["has_subtitle"] = true(glom(resp, "items.0.contentDetails.caption", default=False))
+
+ # livestreaming
+ info |= {"is_live": False, "live_start": "", "live_end": "", "scheduled_start": ""}
+ if live_details := glom(resp, "items.0.liveStreamingDetails", default={}):
+ info["is_live"] = True
+ if live_start := live_details.get("actualStartTime"):
+ info["live_start"] = datetime.strptime(live_start, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
+ if live_end := live_details.get("actualEndTime"):
+ info["live_end"] = datetime.strptime(live_end, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
+ if scheduled_start := live_details.get("scheduledStartTime"):
+ info["scheduled_start"] = datetime.strptime(scheduled_start, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
+
+ # statistics
+ info |= {
+ "view_count": int(glom(resp, "items.0.statistics.viewCount", default=0)),
+ "like_count": int(glom(resp, "items.0.statistics.likeCount", default=0)),
+ "favorite_count": int(glom(resp, "items.0.statistics.favoriteCount", default=0)),
+ "comment_count": int(glom(resp, "items.0.statistics.commentCount", default=0)),
+ }
+ statistics = ""
+ if view := info.get("view_count"):
+ statistics += f"👁{readable_count(view)}"
+ if like := info.get("like_count"):
+ statistics += f"👍{readable_count(like)}"
+ if favorite := info.get("favorite_count"):
+ statistics += f"⭐️{readable_count(favorite)}"
+ if comment := info.get("comment_count"):
+ statistics += f"💬{readable_count(comment)}"
+ info["statistics"] = statistics
+
+ # downloadable
+ info |= {"downloadable": True, "error_msg": ""}
+ privacy = glom(resp, "items.0.status.privacyStatus", default="private") # public, private, unlisted
+ status = glom(resp, "items.0.status.uploadStatus", default="failed") # deleted, failed, processed, uploaded, rejected
+ if privacy not in ["public", "unlisted"]:
+ info |= {"downloadable": False, "error_msg": "❌私享视频不可下载"}
+ if status != "processed":
+ info |= {"downloadable": False, "error_msg": f"❌转码视频未完成, 当前状态: {status}"}
+ if glom(resp, "items.0.snippet.liveBroadcastContent", default="") in ["live", "upcoming"]:
+ info |= {"downloadable": False, "error_msg": f"❌直播还未完成, 当前状态: {glom(resp, 'items.0.snippet.liveBroadcastContent')}"}
+ if info["is_live"] and not info["live_end"]:
+ info |= {"downloadable": False, "error_msg": f"❌直播还未完成, 当前状态: {glom(resp, 'items.0.snippet.liveBroadcastContent')}"}
+
+ except Exception as e:
+ logger.error(f"Failed to get video info: {e}")
+ return {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
+ return info | {"emoji": "🔴"}
src/preview/ytdlp.py
@@ -1,566 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import asyncio
-import io
-import json
-import os
-import threading
-import time
-import warnings
-from pathlib import Path
-from urllib.parse import quote_plus, unquote_plus, urlparse
-
-from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
-from glom import Coalesce, glom
-from loguru import logger
-from pyrogram.client import Client
-from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
-from pyrogram.types import Message, ReplyParameters
-from yt_dlp import YoutubeDL
-from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
-
-from asr.voice_recognition import asr_file
-from config import (
- ASR,
- CAPTION_LENGTH,
- COOKIE,
- DB,
- DOWNLOAD_DIR,
- MAX_FILE_BYTES,
- PROVIDER,
- PROXY,
- READING_SPEED,
- TEXT_LENGTH,
- TID,
- TOKEN,
- YTDLP_DOWNLOAD_MAX_FILE_BYTES,
- YTDLP_RE_ENCODING_MAX_FILE_BYTES,
- cache,
-)
-from cookies import ytdlp_bilibili_cookie
-from database.database import get_db
-from messages.database import copy_messages_from_db, save_messages
-from messages.preprocess import preprocess_media
-from messages.progress import modify_progress, telegram_uploading
-from messages.sender import send2tg
-from messages.utils import blockquote, count_without_entities, get_reply_to, smart_split, warp_comments
-from multimedia import convert_to_h264, generate_cover
-from networking import hx_req
-from preview.utils import fetch_youtube_video_info, get_bilibili_comments, make_bvid_clickable
-from publish import publish_telegraph
-from subtitles.base import fetch_subtitle
-from utils import count_subtitles, nowdt, readable_size, readable_time, remove_none_values, soup_to_text, to_int, true, ts_to_dt, unicode_to_ascii
-
-
-class ProxyError(Exception):
- pass
-
-
-async def preview_ytdlp(
- client: Client,
- message: Message,
- url: str = "",
- *,
- platform: str = "",
- use_db: bool = True,
- ytdlp_audio_only: bool = False,
- ytdlp_send_video: bool = True,
- ytdlp_send_audio: bool = False,
- bilibili_comments: bool = True,
- youtube_comments_provider: str = PROVIDER.YOUTUBE_COMMENTS,
- proxy: str | None = None,
- append_transcription: bool = True,
- ytdlp_asr_engine: str = "",
- transcription_only: bool = False,
- transcription_force_file: bool = False,
- to_telegraph: bool = True,
- **kwargs,
-) -> list[Message]:
- """Preview ytdlp link in the message.
-
- Args:
- client (Client): The Pyrogram client.
- message (Message): The trigger message object.
- url (str, optional): ytdlp link.
- use_db (bool, optional): Whether to use database to cache the result. Defaults to True.
- ytdlp_audio_only (bool, optional): Download audio only. Defaults to True.
- ytdlp_send_video (bool, optional): Send video. Defaults to True.
- ytdlp_send_audio (bool, optional): Send audio. Defaults to False.
- bilibili_comments (bool, optional): The bilibili comments extractor: "free", "tikhub" or "false"
- youtube_comments_provider (str, optional): The youtube comments extractor: "free" or "false".
- proxy (str, optional): Proxy to use. Defaults to None.
- append_transcription (bool, optional): Also append transcription.
- ytdlp_asr_engine (str, optional): Method to get transcription.
- transcription_only (str, optional): If True, skip send video and audio file.
- transcription_force_file (str, optional): If True, force to send transcription as file.
- to_telegraph (bool, optional): Whether to publish the subtitle or transcription to telegraph.
- delete_files (bool, optional): Whether to delete video & audio after uploading.
- """
- logger.trace(f"{url=} {kwargs=}")
- if kwargs.get("show_progress") and not kwargs.get("progress"):
- res = await send2tg(client, message, texts=f"🔗正在解析链接\n{url}", **kwargs)
- kwargs["progress"] = res[0]
- db_key = url
- if use_db and (kv := await get_db(db_key)):
- logger.debug(f"YT-DLP preview {DB.ENGINE} cache hit for key={db_key}")
- if db_msgs := await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
- return db_msgs
- await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
-
- # set download & upload options
- if ytdlp_audio_only:
- ytdlp_send_video = False
- if not ytdlp_send_video:
- ytdlp_send_audio = True
-
- if proxy is None:
- proxy = get_ytdlp_proxy(url)
- ydl_opts = {
- "paths": {"home": DOWNLOAD_DIR},
- "cachedir": DOWNLOAD_DIR,
- "simulate": False,
- "skip_download": False,
- "keepvideo": True,
- "format": "m4a/bestaudio/best" if ytdlp_audio_only or transcription_only else video_selector,
- "writethumbnail": True,
- "trim_file_name": 60, # filesystem limit for filename is 255 bytes. UFT-8 char is 1-4 bytes.
- "proxy": proxy,
- "extractor_args": {"youtube": {"lang": ["zh-CN", "zh-HK", "zh-TW", "en", "en-GB"]}},
- "ignore_no_formats_error": False,
- "live_from_start": False,
- "retries": 10,
- "retry_sleep_functions": {"http": lambda _: 1}, # sleep 1 second between retries
- "nocheckcertificate": True,
- "source_address": "0.0.0.0", # force-ipv4 # noqa: S104
- "outtmpl": "%(id)s.%(ext)s",
- "noplaylist": True,
- "color": "no_color-tty",
- "logger": logger,
- }
- if platform == "bilibili" and COOKIE.YTDLP_BILIBILI_USE_COOKIE:
- cookiefile = await ytdlp_bilibili_cookie()
- logger.trace(f"Use cookie file: {cookiefile}")
- ydl_opts["cookiefile"] = cookiefile
- if kwargs.get("show_progress"):
- loop = asyncio.get_running_loop()
- hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
- ydl_opts["progress_hooks"] = [hook]
- json_file = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
- info = download_video_info(url, ydl_opts, json_file)
- if ytdlp_error := info.get("ytdlp_error"):
- if proxy != PROXY.YTDLP_FALLBACK:
- await modify_progress(del_status=True, **kwargs)
- raise ProxyError(ytdlp_error)
- await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
- return []
- await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
- ytdlp_error = await download_video_async(json_file, ydl_opts)
- if ytdlp_error:
- await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
- return []
- video_path = info.get("video_path", Path(""))
- audio_path = info.get("audio_path", Path(""))
- # only save messages when both video and audio are uploaded
- save_to_db = bool(use_db and video_path.is_file() and audio_path.is_file())
- msg = f"✅下载成功:\n{info['summary']}"
- logger.success(f"{msg!r}")
- await modify_progress(text=msg.strip(), **kwargs)
-
- duration = round(float(info.get("duration", "0")))
- texts = kwargs.get("send_from_user") or ""
- emoji = platform_emoji(info["extractor"])
- # author
- if true(kwargs.get("no_author")):
- pass
- elif info["author"] and info["author_url"]:
- texts += f"{emoji}[{info['author']}]({info['author_url']})"
- elif info["author"]:
- texts += f"{emoji}{info['author']}"
-
- # date
- create_time = ""
- if dt := ts_to_dt(info.get("timestamp")):
- create_time = f"{dt:%Y-%m-%d %H:%M:%S}"
- elif info.get("upload_date"):
- create_time = info["update_date"]
- elif "youtube" in info["extractor"]:
- vinfo = await fetch_youtube_video_info(kwargs.get("vid", ""))
- dt = vinfo.get("date", nowdt())
- create_time = f"{dt:%Y-%m-%d %H:%M:%S}"
- if not true(kwargs.get("no_date")):
- texts += f"\n🕒{create_time}"
-
- # title
- if not true(kwargs.get("no_title")) and info["title"]:
- texts += f"\n📝[{info['title']}]({url})"
-
- # desc
- if not true(kwargs.get("no_description")) and (desc := info.get("description")) and (desc != "-"):
- warnings.simplefilter("ignore", MarkupResemblesLocatorWarning)
- soup = BeautifulSoup(desc, "html.parser")
- desc_text = soup_to_text(soup)
- texts += f"\n{make_bvid_clickable(desc_text)}"
- # comments
- comments = []
- if "bilibili" in info["extractor"]:
- comments = await get_bilibili_comments(kwargs.get("bvid", url)) if bilibili_comments else []
- if "youtube" in info["extractor"]:
- comments = await get_youtube_comments(kwargs.get("vid"), youtube_comments_provider)
-
- for comment in comments:
- if await count_without_entities(f"{texts}{comment}") < CAPTION_LENGTH:
- texts += comment
- texts = texts.strip()
- sent_messages = [] # 把发送的消息都记录下来
- target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
- target_chat = to_int(target_chat)
- reply_msg_id = kwargs.get("reply_msg_id", 0)
- reply_parameters = get_reply_to(message.id, reply_msg_id)
- thumb = await generate_cover(video_path) if video_path.is_file() else await generate_cover(audio_path)
- if not Path(thumb).is_file():
- thumb = None
- # split large videos into multiple parts (less than 2GB)
- if video_path.is_file() and not transcription_only:
- video_path = await convert_to_h264(video_path, re_encoding=True, max_file_size=YTDLP_RE_ENCODING_MAX_FILE_BYTES, skip_h264=True)
- if video_path.stat().st_size > MAX_FILE_BYTES:
- await modify_progress(text="🎬视频大小超过Telegram限制(2000MB), 正在切分...", **kwargs)
- videos = await preprocess_media([{"video": video_path, "thumb": thumb}])
- for idx, video in enumerate(videos):
- video["thumb"] = thumb
- caption = texts.replace("📝[", f"📝[P{idx + 1}-") if len(videos) > 1 else texts
- caption = (await smart_split(caption, CAPTION_LENGTH))[0]
- await modify_progress(text=f"🎬视频上传中-P{idx + 1}: {readable_size(path=video['video'])}", force_update=True, **kwargs)
- sent_messages.append(
- await client.send_video(
- chat_id=target_chat,
- caption=warp_comments(caption),
- reply_parameters=reply_parameters,
- progress=telegram_uploading,
- progress_args=(kwargs.get("progress", False), video["video"], true(kwargs.get("detail_progress"))), # message, path, detail_progress
- **video,
- )
- )
- if audio_path.is_file() and not transcription_only:
- audio_target_chat = target_chat if ytdlp_send_audio else TID.CHANNEL_YTDLP_BACKUP # backup to channel if not send audio, so we can save it to db
- await modify_progress(text=f"🎧音频上传中: {readable_size(path=audio_path)}", force_update=True, **kwargs)
- caption = (await smart_split(texts, CAPTION_LENGTH))[0]
- sent_messages.append(
- await client.send_audio(
- chat_id=to_int(audio_target_chat),
- audio=audio_path.as_posix(),
- caption=warp_comments(caption),
- performer=info["author"],
- title=info["title"],
- duration=duration,
- reply_parameters=reply_parameters,
- progress=telegram_uploading,
- progress_args=(kwargs.get("progress", False), audio_path, true(kwargs.get("detail_progress"))), # message, path, detail_progress
- thumb=thumb, # type: ignore
- )
- )
- await modify_progress(del_status=True, **kwargs)
- if save_to_db:
- metadata = {}
- for k in ["author", "author_url", "title", "url", "create_time", "duration", "description"]:
- if v := locals().get(k):
- metadata[k] = unicode_to_ascii(v)
- await save_messages(messages=sent_messages, key=url, metadata=metadata)
- if any(x in info["extractor"] for x in ["youtube", "bilibili"]) and append_transcription and (video_path.is_file() or audio_path.is_file()):
- res = await fetch_subtitle(url=url, provider="free")
- subtitles = glom(res, Coalesce("full", "subtitles"), default="")
- if not subtitles:
- if not ytdlp_asr_engine:
- # bypass censorship
- ytdlp_asr_engine = kwargs.get("asr_engine", "uncensored") if "youtube" in info["extractor"] else ASR.DEFAULT_ENGINE
- res = await asr_file(audio_path, ytdlp_asr_engine, duration, client=client, message=message, silent=True)
- subtitles = res.get("texts", "")
- if count_subtitles(subtitles) < 20:
- subtitles = "" # ignore too short transcription
- if subtitles:
- if len(subtitles) > TEXT_LENGTH or transcription_force_file:
- caption = f"{emoji}[{info['author']}]({info['author_url']})\n🕒{create_time}"
- caption += f"\n📝[{info['title']}]({url})\n#️⃣字符数: {count_subtitles(subtitles)}\n⏳阅读时长: {readable_time(60 * count_subtitles(subtitles) / READING_SPEED)}"
- if to_telegraph:
- html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
- if telegraph_url := await publish_telegraph(title=info["title"], html=html, author=info["author"], url=url):
- caption += f"\n⚡️[即时预览]({telegraph_url})"
- with io.BytesIO(subtitles.encode("utf-8")) as f:
- sent_messages.append(await client.send_document(to_int(target_chat), f, file_name=f"{info['title']}.txt", caption=caption))
- else:
- first_msg: Message = sent_messages[0] if sent_messages else message # type: ignore
- sent_messages.append(await client.send_message(first_msg.chat.id, blockquote(subtitles), reply_parameters=ReplyParameters(message_id=first_msg.id)))
-
- Path(json_file).unlink(missing_ok=True)
- cleanup_ytdlp(info["id"])
- return sent_messages
-
-
-def get_ytdlp_proxy(url: str = "", platform: str = "") -> str | None:
- if platform:
- proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
- else:
- parsed = urlparse(url)
- host = parsed.netloc # www.youtube.com
- platform = host.split(".")[-2] # youtube
- proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
- if proxy is None: # fallback to default proxy is unset
- proxy = PROXY.YTDLP
- if proxy == "": # empty string means no proxy
- proxy = None
- logger.debug(f"YTDLP Proxy of {platform}: {proxy}")
- return proxy
-
-
-def video_selector(ctx):
- """Select the best format.
-
- For the best compatibility, we choose .mp4 extension with AVC codec for video, .m4a extension for audio.
- """
- # formats are already sorted worst to best
- formats = ctx.get("formats")[::-1]
- if not formats:
- msg = "No format found."
- raise YoutubeDLError(msg)
- formats = remove_none_values(formats)
- logger.trace(f"Choose best format from {len(formats)} extracted formats")
- # acodec='none' means there is no audio
- # find compatible extension, VP9 is not supported by iOS, use AVC instead
- all_videos = [f for f in formats if f.get("video_ext", "").lower() != "none"]
- all_audios = [f for f in formats if f.get("audio_ext", "").lower() != "none"]
- videos = [f for f in all_videos if f.get("video_ext", "").lower() == "mp4" and f.get("acodec", "").lower() == "none" and f.get("vcodec", "").lower().startswith("avc")]
- audios = [f for f in all_audios if (f.get("resolution", "").lower() == "audio only" and f.get("audio_ext", "").lower() == "m4a")]
- logger.trace(f"Found {len(videos)} video formats")
- logger.trace(f"Found {len(audios)} video formats")
-
- # if no compatible format found, fallback to the best format
- if not videos:
- videos = all_videos
- if not audios:
- audios = all_audios
-
- if not videos and not audios:
- msg = "No video and audio format found."
- raise YoutubeDLError(msg)
- elif not videos:
- best_audio = audios[0]
- logger.debug(f"Use audio format: {best_audio['format']}")
- yield {
- "format_id": f"{best_audio['format_id']}",
- "ext": best_audio["ext"],
- "requested_formats": [best_audio],
- "protocol": f"{best_audio['protocol']}",
- }
- elif not audios:
- best_video = videos[0]
- logger.debug(f"Use video format: {best_video['format']}")
- yield {
- "format_id": f"{best_video['format_id']}",
- "ext": best_video["ext"],
- "requested_formats": [best_video],
- "protocol": f"{best_video['protocol']}",
- }
- else:
- best_video = videos[0]
- best_audio = audios[0]
- logger.debug(f"Use video format: {best_video['format']}")
- logger.debug(f"Use audio format: {best_audio['format']}")
- yield {
- "format_id": f"{best_video['format_id']}+{best_audio['format_id']}",
- "ext": best_video["ext"],
- "requested_formats": [best_video, best_audio],
- "protocol": f"{best_video['protocol']}+{best_audio['protocol']}",
- }
-
-
-def create_hook(message: Message | None, loop, *, detail_progress: bool):
- """Hook to show downloading progress."""
-
- def hook(d):
- msg = ""
- title = d.get("info_dict", {}).get("title", "")
- ftype = "视频" if d.get("info_dict", {}).get("video_ext", "").lower() != "none" else "音频"
- emoji = "🎬" if ftype == "视频" else "🎧"
- status = d.get("status", "")
- if status == "downloading":
- downloaded_bytes = float(d.get("downloaded_bytes")) if d.get("downloaded_bytes") else 0
- total_bytes = float(d.get("total_bytes")) if d.get("total_bytes") else 0
- total_bytes_estimate = float(d.get("total_bytes_estimate")) if d.get("total_bytes_estimate") else 0
- total = max(total_bytes, total_bytes_estimate)
- eta = float(d.get("eta")) if d.get("eta") else 0 # seconds
- speed = float(d.get("speed")) if d.get("speed") else 0 # bytes/second
- finished = downloaded_bytes / total if total > 0 else 0
- msg += f"⏬{ftype}下载: {readable_size(downloaded_bytes)} / {readable_size(total)} ({finished:.2%})\n"
- msg += f"⚡️当前网速: {readable_size(speed)}/s\n"
- msg += f"🕒剩余时长: {readable_time(eta)}\n"
- msg += f"{emoji}{title}"
- elif status == "finished":
- msg = f"✅{ftype}下载完成\n{emoji}{title}"
- elif status == "error":
- msg = f"❌{ftype}下载失败\n{emoji}{title}"
- asyncio.run_coroutine_threadsafe(modify_progress(message, msg.strip(), detail_progress=detail_progress), loop)
-
- return hook
-
-
-def download_video_info(url: str, ydl_opts: dict, json_path: str | Path) -> dict:
- try:
- with YoutubeDL(ydl_opts) as ydl:
- info: dict = ydl.extract_info(url, download=False) # type: ignore
- with Path(json_path).open("w") as f:
- json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
- # add custom fields
- info["extractor"] = info.get("extractor", "").lower()
- info["author"] = info.get("uploader", info.get("series", info["extractor"]))
- info["author_url"] = uploader_url(info, info["extractor"])
- info["title"] = info.get("title", "")
- info["duration"] = round(float(info.get("duration", "0")))
- info["id"] = info.get("id", "")
- video_info = {}
- audio_info = {}
- if requested_formats := info.get("requested_formats", []):
- # both video and audio are requested
- video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
- audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
- video_ext = video_info.get("ext", "")
- audio_ext = audio_info.get("ext", "")
- audio_format_id = audio_info.get("format_id", "")
- info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
- info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
- elif info.get("video_ext", "").lower() != "none": # only video
- video_ext = info.get("ext", "")
- info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
- elif info.get("audio_ext", "").lower() != "none": # only audio
- audio_ext = info.get("ext", "")
- info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
- summary = ""
- if info["author"]:
- summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
- if info["title"]:
- summary += f"\n📝{info['title']}"
- if video_info:
- info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
- summary += f"\n🎬视频: {video_info['format']} ({readable_size(info['video_size'])})".removesuffix(" (0.0 B)")
- if audio_info:
- info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
- summary += f"\n🎧音频: {audio_info['format']} ({readable_size(info['audio_size'])})".removesuffix(" (0.0 B)")
- if info["duration"]:
- summary += f"\n🕒时长: {readable_time(info['duration'])}"
- info["summary"] = summary.strip()
- media_size = int(info.get("video_size", 0)) + int(info.get("audio_size", 0))
- if media_size > YTDLP_DOWNLOAD_MAX_FILE_BYTES:
- info["ytdlp_error"] = f"{summary.strip()}\n**⚠️视频文件过大: {readable_size(media_size)}**\n**⚠️机器硬盘限制: {readable_size(YTDLP_DOWNLOAD_MAX_FILE_BYTES)}**"
-
- except Exception as e:
- logger.error(f"Failed to download video info: {e}")
- info = {"ytdlp_error": str(e)}
- logger.trace(info)
- return info
-
-
-def retry(func, max_retries=5):
- def wrapper(*args, **kwargs):
- retries = 0
- msg = ""
- while retries < max_retries:
- try:
- return func(*args, **kwargs)
- except ExtractorError as e:
- msg = f"ExtractorError: {str(e.orig_msg).removeprefix('ERROR: ')}"
- except DownloadError as e:
- msg = f"DownloadError: {str(e.msg).removeprefix('ERROR: ')}"
- if any(x in msg.lower() for x in ["sign in", "请登录", "地区", "国家", "country", "删除", "deleted"]):
- retries += 1
- break
- except YoutubeDLError as e:
- msg = f"YoutubeDLError: {str(e.msg).removeprefix('ERROR: ')}"
- except Exception as e:
- msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
- retries += 1
- time.sleep(1)
- logger.error(f"Failed after {retries} retries: {msg}")
- if msg:
- args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
- return args[2]
- return {}
-
- return wrapper
-
-
-@retry
-def download_video(json_path: str, ydl_opts: dict, result: dict) -> dict:
- with YoutubeDL(ydl_opts) as ydl:
- error_code = ydl.download_with_info_file(json_path) # 0: success, 1: error
- if error_code != 0 and not result.get("ytdlp_error"):
- url = unquote_plus(Path(json_path).stem)
- result["ytdlp_error"] = f"❌下载失败\n{url}"
- return result
-
-
-async def download_video_async(json_path: str, ydl_opts: dict) -> str:
- """Wrapper to run the download function in a thread.
-
- Generated by GPT-4o.
- """
- # Shared dictionary to hold the results
- result = {}
- download_thread = threading.Thread(target=download_video, args=(json_path, ydl_opts, result))
- download_thread.start()
- await asyncio.to_thread(download_thread.join)
- return result.get("ytdlp_error", "")
-
-
-def uploader_url(info: dict, extractor: str) -> str:
- if url := info.get("uploader_url"):
- return url
- if author_id := info.get("uploader_id"):
- if "youtube" in extractor:
- return f"https://www.youtube.com/{author_id}"
- if "bilibili" in extractor:
- return f"https://space.bilibili.com/{author_id}"
- return ""
-
-
-def platform_emoji(extractor: str) -> str:
- if "bilibili" in extractor:
- return "🅱️"
- if "youtube" in extractor:
- return "🔴"
- return "🆔"
-
-
-@cache.memoize(ttl=60)
-async def get_youtube_comments(vid: str | None, provider: str = PROVIDER.YOUTUBE_COMMENTS) -> list[str]:
- if not vid or not true(provider):
- return []
- api = "https://www.googleapis.com/youtube/v3/commentThreads"
- params = {"key": TOKEN.YOUTUBE_API_KEY, "maxResults": 100, "textFormat": "plainText", "part": "snippet", "videoId": vid}
- comments = []
- try:
- resp = await hx_req(api, proxy=get_ytdlp_proxy(platform="youtube"), params=params, check_keys=["items"])
- if resp.get("hx_error"):
- logger.warning(f"YouTube Comments API failed: {resp['hx_error']}")
- return []
- data = resp["items"]
- for idx, x in enumerate(data):
- name = glom(x, "snippet.topLevelComment.snippet.authorDisplayName", default="匿名")
- name = name.removeprefix("@")
- if author_url := glom(x, "snippet.topLevelComment.snippet.authorChannelUrl", default=""):
- name = f"[{name}]({author_url})"
- if cmt := glom(x, "snippet.topLevelComment.snippet.textDisplay", default=""):
- if idx == 0:
- comments.append(f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}💬**点此展开评论区**:")
- comments.append(f"\n💬**{name}**: {cmt}")
- except Exception as e:
- logger.error(f"Failed to get YouTube comments: {e}")
- return []
- return comments
-
-
-def cleanup_ytdlp(vid: str):
- if not vid:
- return
- logger.debug(f"Cleaning up: {vid}")
- for p in Path(DOWNLOAD_DIR).glob(f"{vid}*"):
- if p.is_file():
- logger.trace(f"Deleting ytdlp files: {p}")
- p.unlink(missing_ok=True)
src/subtitles/base.py
@@ -8,11 +8,11 @@ from pyrogram.types import Message
from youtube_transcript_api import IpBlocked, RequestBlocked, YouTubeTranscriptApi
from youtube_transcript_api.proxies import GenericProxyConfig
-from config import API, PREFIX, PROXY, READING_SPEED, TOKEN, cache
+from config import PREFIX, PROXY, READING_SPEED, cache
from messages.parser import parse_msg
from messages.utils import startswith_prefix
-from networking import hx_req, match_social_media_link
-from preview.utils import bilibili_subtitle_and_summary
+from networking import match_social_media_link
+from preview.bilibili import bilibili_subtitle_and_summary
from utils import seconds_to_time
@@ -49,7 +49,7 @@ async def match_url(client: Client, message: Message) -> str:
@cache.memoize(ttl=120)
-async def fetch_subtitle(url: str, provider: str) -> dict:
+async def fetch_subtitle(url: str) -> dict:
"""Fetch subtitles from Bilibili or YouTube.
Returns:
@@ -59,41 +59,23 @@ async def fetch_subtitle(url: str, provider: str) -> dict:
"reading_minutes": 2,
}
"""
- succ = False
- error = "❌下载内嵌字幕失败\n🔄尝试使用语音转文字获取字幕"
subtitles = []
matched = await match_social_media_link(url)
if matched["platform"] == "bilibili":
return await bilibili_subtitle_and_summary(url)
video_id = matched["vid"]
- if "free" in provider:
- try:
- proxy = GenericProxyConfig(http_url=PROXY.SUBTITLE, https_url=PROXY.SUBTITLE) if PROXY.SUBTITLE else None
- logger.info(f"Fetch Subtitle via YouTubeTranscriptApi for {video_id=}, proxy={PROXY.SUBTITLE}")
- ytt_api = YouTubeTranscriptApi(proxy_config=proxy)
- resp = ytt_api.fetch(video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"])
- subtitles: list[dict] = resp.to_raw_data()
- succ = True
- except (IpBlocked, RequestBlocked):
- logger.warning(f"Subtitle API IP blocked: {video_id=}")
- except Exception as e:
- logger.error(f"Failed to get subtitle: {e}")
- if not succ and "tikhub" in provider: # try tikhub
- try:
- logger.info(f"Fetch Subtitle via TikHub for {video_id=}")
- api_url = f"{API.TIKHUB}/api/v1/youtube/web/get_video_subtitles?video_id={video_id}"
- headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0)
- if resp.get("hx_error"):
- logger.warning(f"Subtitle API failed: {resp['hx_error']}")
- return {"error": resp["hx_error"]}
- if error := resp["data"].get("detail", []):
- return {"error": error}
- subtitles = resp["data"].get("subtitles", [])
- except Exception as e:
- logger.error(f"Failed to get subtitle: {e}")
+ try:
+ proxy = GenericProxyConfig(http_url=PROXY.SUBTITLE, https_url=PROXY.SUBTITLE) if PROXY.SUBTITLE else None
+ logger.info(f"Fetch Subtitle via YouTubeTranscriptApi for {video_id=}, proxy={PROXY.SUBTITLE}")
+ ytt_api = YouTubeTranscriptApi(proxy_config=proxy)
+ resp = ytt_api.fetch(video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"])
+ subtitles: list[dict] = resp.to_raw_data()
+ except (IpBlocked, RequestBlocked):
+ logger.warning(f"Subtitle API IP blocked: {video_id=}")
+ except Exception as e:
+ logger.error(f"Failed to get subtitle: {e}")
if not subtitles:
- return {"error": error}
+ return {"error": "❌下载内嵌字幕失败\n🔄尝试使用语音转文字获取字幕"}
return to_transcription(subtitles)
src/subtitles/subtitle.py
@@ -11,18 +11,19 @@ from pyrogram.types import Message
from pyrogram.types.messages_and_media.message import Str
from asr.voice_recognition import asr_file
-from config import ASR, DOWNLOAD_DIR, PREFIX, PROVIDER, READING_SPEED, TEXT_LENGTH, cache
+from config import ASR, DOWNLOAD_DIR, PREFIX, READING_SPEED, TEXT_LENGTH, cache
from llm.gpt import gpt_response
from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
-from messages.utils import equal_prefix
+from messages.utils import delete_message, equal_prefix
from networking import match_social_media_link
-from preview.utils import fetch_youtube_video_info, get_bilibili_video_info
-from preview.ytdlp import preview_ytdlp
+from preview.bilibili import get_bilibili_vinfo
+from preview.youtube import get_youtube_vinfo
from publish import publish_telegraph
from subtitles.base import fetch_subtitle, match_url
from utils import count_subtitles, rand_number, readable_time, to_int
+from ytdlp.download import ytdlp_download
HELP = f"""📃**提取字幕**
使用说明:
@@ -38,16 +39,7 @@ HELP = f"""📃**提取字幕**
""" # noqa: RUF001
-async def get_subtitle(
- client: Client,
- message: Message,
- youtube_subtitle_provider: str = PROVIDER.YOUTUBE_SUBTITLE,
- *,
- to_telegraph: bool = True,
- ai_summary: bool = True,
- force_file: bool = True,
- **kwargs,
-):
+async def get_subtitle(client: Client, message: Message, *, to_telegraph: bool = True, ai_summary: bool = True, **kwargs):
"""Get YouTube Subtitle."""
target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
# send docs if message == "/subtitle", without reply
@@ -64,9 +56,9 @@ async def get_subtitle(
matched = await match_social_media_link(url)
platform = matched["platform"]
vid = glom(matched, Coalesce("vid", "bvid"), default=url)
- vinfo = await fetch_youtube_video_info(vid) if platform == "youtube" else await get_bilibili_video_info(vid)
+ vinfo = await get_youtube_vinfo(vid) if platform == "youtube" else await get_bilibili_vinfo(vid)
description = glom(vinfo, Coalesce("description", "desc"), default="")
- caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['date']:%Y-%m-%d %H:%M:%S}\n📝[{vinfo['title']}]({url})"
+ caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['pubdate']}\n📝[{vinfo['title']}]({url})"
msg = f"🔍**正在获取字幕:**\n{caption}"[:TEXT_LENGTH]
if kwargs.get("show_progress"):
status_msg = (await send2tg(client, message, texts=msg, **kwargs))[0]
@@ -74,15 +66,9 @@ async def get_subtitle(
this_info = parse_msg(message, silent=True)
reply_info = parse_msg(message.reply_to_message, silent=True) if message.reply_to_message else {}
-
# Fetch subtitle via API
- res = await fetch_subtitle(url, youtube_subtitle_provider)
- subtitle_file_sent = False
- subtitle_msg = None
- status_msg = kwargs.get("progress")
- subtitles = ""
- # API failed
- if error := res.get("error", ""):
+ res = await fetch_subtitle(url)
+ if error := res.get("error", ""): # API failed
asr_engine = ASR.DEFAULT_ENGINE
if platform == "youtube": # bypass censorship
asr_engine = kwargs.get("asr_engine", "uncensored")
@@ -99,45 +85,37 @@ async def get_subtitle(
res |= {"subtitles": res["texts"], "num_chars": count_subtitles(res["texts"]), "reading_minutes": count_subtitles(res["texts"]) / READING_SPEED}
else:
await modify_progress(text=error + "\n正在通过下载音频后ASR识别字幕", force_update=True, **kwargs)
- kwargs |= {
- "progress": None,
- "url": url,
- "append_transcription": True,
- "transcription_only": True,
- "transcription_force_file": force_file,
- "youtube_comments_provider": False,
- "bilibili_comments": False,
- "proxy": None,
- "use_db": False,
- "ytdlp_asr_engine": asr_engine,
- }
- # Download and send subtitle file via ytdlp
- subtitle_msg = (await preview_ytdlp(client=client, message=message, **kwargs))[0]
- data: BytesIO = await client.download_media(subtitle_msg, in_memory=True) # type: ignore
- subtitles = data.getvalue().decode("utf-8")
- subtitle_file_sent = True
+ downloaded = await ytdlp_download(url, platform, ytdlp_download_video=False)
+ if not downloaded["audio_path"].is_file():
+ await modify_progress(text="❌下载音频失败", force_update=True, **kwargs)
+ return
+ prompt = f"请转录{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目的音频。\n该期节目标题: {vinfo['title']}\n节目简介: {description}"
+ res = await asr_file(downloaded["audio_path"], engine=asr_engine, prompt=prompt, client=client, message=message, silent=True, **kwargs)
+ if res.get("error"):
+ await modify_progress(text=res["error"], force_update=True, **kwargs)
+ return
+ res |= {"subtitles": res["texts"], "num_chars": count_subtitles(res["texts"]), "reading_minutes": count_subtitles(res["texts"]) / READING_SPEED}
# Send subtitle file
- if not subtitle_file_sent:
- subtitles = glom(res, Coalesce("full", "subtitles", "summary"), default="")
- if not subtitles:
- await modify_progress(del_status=True, **kwargs)
- return
- logger.success(subtitles)
- caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['date']:%Y-%m-%d %H:%M:%S}\n"
- caption += f"📝[{vinfo['title']}]({url})\n#️⃣字符数: {res['num_chars']}\n⏳阅读时长: {readable_time(60 * res['reading_minutes'])}"
- if to_telegraph:
- html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
- if telegraph_url := await publish_telegraph(title=vinfo["title"], html=html, author=vinfo["author"], url=url):
- caption += f"\n⚡️[即时预览]({telegraph_url})"
- with BytesIO(subtitles.encode("utf-8")) as f:
- subtitle_msg = await client.send_document(to_int(target_chat), f, file_name=f"{vinfo['title']}.txt", caption=caption)
+ subtitles = glom(res, Coalesce("full", "subtitles", "summary"), default="")
+ if not subtitles:
+ await modify_progress(del_status=True, **kwargs)
+ return
+ logger.success(subtitles)
+ caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['pubdate']}\n"
+ caption += f"📝[{vinfo['title']}]({url})\n#️⃣字符数: {res['num_chars']}\n⏳阅读时长: {readable_time(60 * res['reading_minutes'])}"
+ if to_telegraph:
+ html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
+ if telegraph_url := await publish_telegraph(title=vinfo["title"], html=html, author=vinfo["author"], url=url):
+ caption += f"\n⚡️[即时预览]({telegraph_url})"
+ with BytesIO(subtitles.encode("utf-8")) as f:
+ subtitle_msg = await client.send_document(to_int(target_chat), f, file_name=f"{vinfo['title']}.txt", caption=caption)
if ai_summary and isinstance(subtitle_msg, Message):
# use real subtitle (without AI summary by Bilibili)
subtitles = re.sub(r"(.*?)AI总结(B站版):", "", subtitles, flags=re.DOTALL).strip() # noqa: RUF001
prompt = f"以上是{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目的文字稿。该期节目详情如下:\n"
- prompt += f"节目标题: {vinfo['title']}\n发布日期: {vinfo['date']:%Y-%m-%d %H:%M:%S}\n"
+ prompt += f"节目标题: {vinfo['title']}\n发布日期: {vinfo['pubdate']}\n"
if description.strip():
prompt += f"节目简介: {description}\n"
prompt += "\n请解读本期节目内容。要求: 直接输出节目内容解读, 以“该节目讲述了”开头"
@@ -152,5 +130,5 @@ async def get_subtitle(
kwargs["include_thoughts"] = False
await gpt_response(client, ai_msg, **kwargs)
with contextlib.suppress(Exception):
- [await modify_progress(msg, del_status=True) for msg in res.get("sent_messages", [])]
- await modify_progress(status_msg, del_status=True)
+ [await delete_message(msg) for msg in res.get("sent_messages", [])]
+ await delete_message(kwargs.get("progress"))
src/ytdlp/download.py
@@ -0,0 +1,213 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import json
+import threading
+import time
+from pathlib import Path
+from typing import Literal
+from urllib.parse import quote_plus, unquote_plus
+
+from glom import Coalesce, glom
+from loguru import logger
+from pyrogram.types import Message
+from yt_dlp import YoutubeDL
+from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
+
+from config import DOWNLOAD_DIR, PROXY, YTDLP_DOWNLOAD_MAX_FILE_BYTES
+from messages.progress import modify_progress
+from utils import readable_size, readable_time, true
+from ytdlp.utils import ProxyError, get_ytdlp_opts, platform_emoji, uploader_url
+
+
+async def ytdlp_download(
+ url: str,
+ platform: Literal["youtube", "bilibili", "ytdlp"] | None = None,
+ proxy: str | None = None,
+ **kwargs,
+) -> dict:
+ """Download video from url.
+
+ Returns:
+ dict: downloaded info.
+ {
+ "video_path": Path("video_path"),
+ "audio_path": Path("audio_path"),
+ "author": "author",
+ "author_url": "author_url",
+ "title": "title",
+ "duration": 123,
+ "extractor": "youtube",
+ "id": "id",
+ "json_path": "json_path",
+ "summary": "summary",
+ }
+ """
+ ytdlp_opts = await get_ytdlp_opts(url=url, platform=platform, proxy=proxy, video=true(kwargs.get("ytdlp_download_video")))
+ if kwargs.get("show_progress"):
+ loop = asyncio.get_running_loop()
+ hook = create_hook(kwargs.get("progress"), loop, detail_progress=true(kwargs.get("detail_progress")))
+ ytdlp_opts["progress_hooks"] = [hook]
+ logger.info(f"Downloading via proxy: {ytdlp_opts['proxy']} of {url}")
+ # download json first
+ json_path = f"{DOWNLOAD_DIR}/{quote_plus(url)}.json"
+ info = download_video_info(url, ytdlp_opts, json_path)
+ if ytdlp_error := info.get("ytdlp_error"):
+ if PROXY.YTDLP_FALLBACK and proxy != PROXY.YTDLP_FALLBACK:
+ await modify_progress(del_status=True, **kwargs)
+ raise ProxyError(ytdlp_error)
+ await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
+ return {}
+ await modify_progress(text=f"⏬正在下载:\n{info['summary']}", force_update=True, **kwargs)
+ ytdlp_error = await download_video_async(json_path, ytdlp_opts)
+ if ytdlp_error:
+ await modify_progress(text=ytdlp_error, force_update=True, **kwargs)
+ return {}
+ msg = f"✅下载成功:\n{info['summary']}"
+ logger.success(f"{msg!r}")
+ await modify_progress(text=msg.strip(), force_update=True, **kwargs)
+ return info
+
+
+def download_video_info(url: str, ytdlp_opts: dict, json_path: str | Path) -> dict:
+ try:
+ with YoutubeDL(ytdlp_opts) as ydl:
+ info: dict = ydl.extract_info(url, download=False) # type: ignore
+ with Path(json_path).open("w") as f:
+ json.dump(ydl.sanitize_info(info), f, ensure_ascii=False, indent=2)
+ # add custom fields
+ info["extractor"] = info.get("extractor", "").lower()
+ info["author"] = glom(info, Coalesce("uploader", "series", "extractor"))
+ info["author_url"] = uploader_url(info, info["extractor"])
+ info["title"] = info.get("title", "")
+ info["duration"] = round(float(info.get("duration", "0")))
+ info["id"] = info.get("id", "")
+ info["json_path"] = Path(json_path).as_posix()
+ video_info = {}
+ audio_info = {}
+ if requested_formats := info.get("requested_formats", []):
+ # both video and audio are requested
+ video_info = next((x for x in requested_formats if x["video_ext"].lower() != "none"), {})
+ audio_info = next((x for x in requested_formats if x["audio_ext"].lower() != "none"), {})
+ video_ext = video_info.get("ext", "")
+ audio_ext = audio_info.get("ext", "")
+ audio_format_id = audio_info.get("format_id", "")
+ info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
+ info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.f{audio_format_id}.{audio_ext}"
+ elif info.get("video_ext", "").lower() != "none": # only video
+ video_ext = info.get("ext", "")
+ info["video_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{video_ext}"
+ info["audio_path"] = Path("/non-exist")
+ elif info.get("audio_ext", "").lower() != "none": # only audio
+ audio_ext = info.get("ext", "")
+ info["audio_path"] = Path(DOWNLOAD_DIR) / f"{info['id']}.{audio_ext}"
+ info["video_path"] = Path("/non-exist")
+ else:
+ info["video_path"] = Path("/non-exist")
+ info["audio_path"] = Path("/non-exist")
+
+ summary = ""
+ if info["author"]:
+ summary += f"\n{platform_emoji(info['extractor'])}{info['author']}"
+ if info["title"]:
+ summary += f"\n📝{info['title']}"
+ if video_info:
+ info["video_size"] = video_info.get("filesize") or video_info.get("filesize_approx") or 0
+ summary += f"\n🎬视频: {video_info['format']} ({readable_size(info['video_size'])})".removesuffix(" (0.0 B)")
+ if audio_info:
+ info["audio_size"] = audio_info.get("filesize") or audio_info.get("filesize_approx") or 0
+ summary += f"\n🎧音频: {audio_info['format']} ({readable_size(info['audio_size'])})".removesuffix(" (0.0 B)")
+ if info["duration"]:
+ summary += f"\n🕒时长: {readable_time(info['duration'])}"
+ info["summary"] = summary.strip()
+ media_size = int(info.get("video_size", 0)) + int(info.get("audio_size", 0))
+ if media_size > YTDLP_DOWNLOAD_MAX_FILE_BYTES:
+ info["ytdlp_error"] = f"{summary.strip()}\n**⚠️视频文件过大: {readable_size(media_size)}**\n**⚠️机器硬盘限制: {readable_size(YTDLP_DOWNLOAD_MAX_FILE_BYTES)}**"
+
+ except Exception as e:
+ logger.error(f"Failed to download video info: {e}")
+ info = {"ytdlp_error": str(e)}
+ logger.trace(info)
+ return info
+
+
+def retry(func, max_retries=5):
+ def wrapper(*args, **kwargs):
+ retries = 0
+ msg = ""
+ while retries < max_retries:
+ try:
+ return func(*args, **kwargs)
+ except ExtractorError as e:
+ msg = f"ExtractorError: {str(e.orig_msg).removeprefix('ERROR: ')}"
+ except DownloadError as e:
+ msg = f"DownloadError: {str(e.msg).removeprefix('ERROR: ')}"
+ if any(x in msg.lower() for x in ["sign in", "请登录", "地区", "国家", "country", "删除", "deleted"]):
+ retries += 1
+ break
+ except YoutubeDLError as e:
+ msg = f"YoutubeDLError: {str(e.msg).removeprefix('ERROR: ')}"
+ except Exception as e:
+ msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
+ retries += 1
+ time.sleep(1)
+ logger.error(f"Failed after {retries} retries: {msg}")
+ if msg:
+ args[2]["ytdlp_error"] = msg.replace("<", "[").replace(">", "]")
+ return args[2]
+ return {}
+
+ return wrapper
+
+
+@retry
+def download_video(json_path: str, ytdlp_opts: dict, result: dict) -> dict:
+ with YoutubeDL(ytdlp_opts) as ydl:
+ error_code = ydl.download_with_info_file(json_path) # 0: success, 1: error
+ if error_code != 0 and not result.get("ytdlp_error"):
+ url = unquote_plus(Path(json_path).stem)
+ result["ytdlp_error"] = f"❌下载失败\n{url}"
+ return result
+
+
+async def download_video_async(json_path: str, ytdlp_opts: dict) -> str:
+ """Wrapper to run the download function in a thread.
+
+ Generated by GPT-4o.
+ """
+ # Shared dictionary to hold the results
+ result = {}
+ download_thread = threading.Thread(target=download_video, args=(json_path, ytdlp_opts, result))
+ download_thread.start()
+ await asyncio.to_thread(download_thread.join)
+ return result.get("ytdlp_error", "")
+
+
+def create_hook(message: Message | None, loop, *, detail_progress: bool):
+ """Hook to show downloading progress."""
+
+ def hook(d):
+ msg = ""
+ title = d.get("info_dict", {}).get("title", "")
+ ftype = "视频" if d.get("info_dict", {}).get("video_ext", "").lower() != "none" else "音频"
+ emoji = "🎬" if ftype == "视频" else "🎧"
+ status = d.get("status", "")
+ if status == "downloading":
+ downloaded_bytes = float(d.get("downloaded_bytes")) if d.get("downloaded_bytes") else 0
+ total_bytes = float(d.get("total_bytes")) if d.get("total_bytes") else 0
+ total_bytes_estimate = float(d.get("total_bytes_estimate")) if d.get("total_bytes_estimate") else 0
+ total = max(total_bytes, total_bytes_estimate)
+ eta = float(d.get("eta")) if d.get("eta") else 0 # seconds
+ speed = float(d.get("speed")) if d.get("speed") else 0 # bytes/second
+ finished = downloaded_bytes / total if total > 0 else 0
+ msg += f"⏬{ftype}下载: {readable_size(downloaded_bytes)} / {readable_size(total)} ({finished:.2%})\n"
+ msg += f"⚡️当前网速: {readable_size(speed)}/s\n"
+ msg += f"🕒剩余时长: {readable_time(eta)}\n"
+ msg += f"{emoji}{title}"
+ elif status == "finished":
+ msg = f"✅{ftype}下载完成\n{emoji}{title}"
+ elif status == "error":
+ msg = f"❌{ftype}下载失败\n{emoji}{title}"
+ asyncio.run_coroutine_threadsafe(modify_progress(message, msg.strip(), detail_progress=detail_progress), loop)
+
+ return hook
src/ytdlp/main.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import io
+import warnings
+from pathlib import Path
+from typing import Literal
+
+from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
+from glom import Coalesce, glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message, ReplyParameters
+
+from asr.voice_recognition import asr_file
+from config import ASR, CAPTION_LENGTH, DB, MAX_FILE_BYTES, READING_SPEED, TEXT_LENGTH, YTDLP_RE_ENCODING_MAX_FILE_BYTES
+from database.database import get_db
+from messages.database import copy_messages_from_db, save_messages
+from messages.preprocess import preprocess_media
+from messages.progress import modify_progress, telegram_uploading
+from messages.sender import send2tg
+from messages.utils import blockquote, count_without_entities, get_reply_to, smart_split, warp_comments
+from multimedia import convert_to_h264, generate_cover
+from preview.bilibili import get_bilibili_comments, get_bilibili_vinfo, make_bvid_clickable
+from preview.youtube import get_youtube_comments, get_youtube_vinfo
+from publish import publish_telegraph
+from subtitles.base import fetch_subtitle
+from utils import count_subtitles, readable_size, readable_time, soup_to_text, to_int, true, ts_to_dt, unicode_to_ascii
+from ytdlp.download import ytdlp_download
+from ytdlp.utils import cleanup_ytdlp, platform_emoji
+
+
+async def preview_ytdlp(
+ client: Client,
+ message: Message,
+ url: str = "",
+ *,
+ platform: Literal["youtube", "bilibili", "ytdlp"] = "ytdlp",
+ vid: str = "",
+ bvid: str = "",
+ use_db: bool = True,
+ ytdlp_download_video: bool = True,
+ ytdlp_send_video: bool = True,
+ ytdlp_send_audio: bool = True,
+ bilibili_comments: bool = True,
+ youtube_comments: bool = True,
+ proxy: str | None = None,
+ ytdlp_video_target: str | int | None = None,
+ ytdlp_audio_target: str | int | None = None,
+ ytdlp_subtitle_target: str | int | None = None,
+ ytdlp_send_subtitle: bool = False,
+ subtitle_force_file: bool = False,
+ to_telegraph: bool = True,
+ **kwargs,
+) -> list[Message]:
+ """Preview ytdlp link in the message.
+
+ Args:
+ client (Client): The Pyrogram client.
+ message (Message): The trigger message object.
+ url (str, optional): ytdlp link.
+ platform (str, optional): The platform of the video.
+ vid (str, optional): The YouTube video id.
+ bvid (str, optional): The Bilibili video id.
+ use_db (bool, optional): Whether to use database to cache the result. Defaults to True.
+ ytdlp_download_video (bool, optional): Download video. Defaults to True.
+ ytdlp_send_video (bool, optional): Send video. Defaults to True.
+ ytdlp_send_audio (bool, optional): Send audio. Defaults to False.
+ bilibili_comments (bool, optional): Enable bilibili comments
+ youtube_comments (bool, optional): Enable youtube comments
+ proxy (str, optional): Proxy to use. Defaults to None.
+ ytdlp_video_target (str | int, optional): The target chat id to send video.
+ ytdlp_audio_target (str | int, optional): The target chat id to send audio.
+ ytdlp_send_subtitle (bool, optional): Send subtitle. Defaults to False.
+ subtitle_force_file (str, optional): If True, force to send transcription as file.
+ to_telegraph (bool, optional): Whether to publish the subtitle or transcription to telegraph.
+ """
+ logger.trace(f"{url=} {kwargs=}")
+ if kwargs.get("show_progress") and not kwargs.get("progress"):
+ res = await send2tg(client, message, texts=f"🔗正在解析链接\n{url}", **kwargs)
+ kwargs["progress"] = res[0]
+ # try cache
+ db_key = url
+ if true(use_db) and (kv := await get_db(db_key)):
+ logger.debug(f"YT-DLP preview {DB.ENGINE} cache hit for key={db_key}")
+ if db_msgs := await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
+ return db_msgs
+ await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
+
+ # get video info from API first
+ if platform == "youtube":
+ vinfo = await get_youtube_vinfo(vid)
+ elif platform == "bilibili":
+ vinfo = await get_bilibili_vinfo(bvid)
+ else:
+ vinfo = {}
+ if platform in ["youtube", "bilibili"] and not vinfo.get("downloadable"):
+ await modify_progress(text=vinfo.get("error_msg") or "❌视频无法下载", force_update=True, **kwargs)
+ return []
+
+ info = await ytdlp_download(url, proxy=proxy, platform=platform, ytdlp_download_video=ytdlp_download_video, **kwargs)
+ if not info:
+ return []
+ info |= vinfo # merge video info
+ captions = await generate_captions(info, url, platform, vid, bvid, bilibili_comments=bilibili_comments, youtube_comments=youtube_comments)
+ # add send_from_user prefix to caption
+ prefix = kwargs.get("send_from_user", "")
+ texts = f"{prefix}{captions['caption']}"
+ info["caption"] = texts
+
+ sent_messages = await send_media(client, message, info, ytdlp_video_target, ytdlp_audio_target, ytdlp_send_video=ytdlp_send_video, ytdlp_send_audio=ytdlp_send_audio, **kwargs)
+
+ # send subtitles
+ subtitles = ""
+ if true(ytdlp_send_subtitle) and info["audio_path"].is_file():
+ if platform in ["bilibili", "youtube"]: # get subtitle from API first
+ res = await fetch_subtitle(url=url)
+ subtitles = glom(res, Coalesce("full", "subtitles"), default="")
+ if not subtitles:
+ asr_engine = kwargs.get("asr_engine", "uncensored") if "youtube" in info["extractor"] else ASR.DEFAULT_ENGINE
+ res = await asr_file(info["audio_path"], asr_engine, client=client, message=message, silent=True)
+ subtitles = res.get("texts", "")
+ if count_subtitles(subtitles) < 20:
+ subtitles = "" # ignore too short transcription
+ if subtitles:
+ subtitle_msg = None
+ subtitle_target = ytdlp_subtitle_target or kwargs.get("target_chat") or message.chat.id
+ if len(subtitles) > TEXT_LENGTH or true(subtitle_force_file):
+ caption = f"{captions['caption']}\n#️⃣字符数: {count_subtitles(subtitles)}\n⏳阅读时长: {readable_time(60 * count_subtitles(subtitles) / READING_SPEED)}"
+ if true(to_telegraph):
+ html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
+ if telegraph_url := await publish_telegraph(title=info["title"], html=html, author=info["author"], url=url):
+ caption += f"\n⚡️[即时预览]({telegraph_url})"
+ with io.BytesIO(subtitles.encode("utf-8")) as f:
+ subtitle_msg = await client.send_document(to_int(subtitle_target), f, file_name=f"{info['title']}.txt", caption=caption)
+ else:
+ # get reply msg id
+ if sent_messages.get("video"):
+ reply_mid = sent_messages["video"][0].id
+ elif sent_messages.get("audio"):
+ reply_mid = sent_messages["audio"].id
+ else:
+ reply_mid = message.id
+ subtitle_msg = await client.send_message(subtitle_target, blockquote(subtitles), reply_parameters=ReplyParameters(message_id=reply_mid))
+ if isinstance(subtitle_msg, Message):
+ sent_messages["caption"] = subtitle_msg
+
+ # only save messages when both video and audio are uploaded
+ messages = [msg for msgs in sent_messages.values() for msg in (msgs if isinstance(msgs, list) else [msgs])]
+ if bool(use_db and info["video_path"].is_file() and info["audio_path"].is_file()):
+ metadata = {}
+ for k in ["author", "author_url", "title", "url", "create_time", "duration", "description"]:
+ if v := locals().get(k):
+ metadata[k] = unicode_to_ascii(v)
+ await save_messages(messages=messages, key=url, metadata=metadata)
+
+ Path(info["json_path"]).unlink(missing_ok=True)
+ cleanup_ytdlp(info["id"])
+ return messages
+
+
+async def generate_captions(info: dict, url: str, platform: str, vid: str, bvid: str, *, bilibili_comments: bool, youtube_comments: bool) -> dict:
+ """Generate captions."""
+ captions = ""
+ results = {}
+ emoji = platform_emoji(info["extractor"])
+ results["emoji"] = emoji
+
+ # author
+ if info.get("author") and info["author_url"]:
+ results["author"] = f"{emoji}[{info['author']}]({info['author_url']})"
+ elif info.get("author"):
+ results["author"] = f"{emoji}[{info['author']}]({url})"
+ else:
+ results["author"] = f"{emoji}[原始链接]({url})"
+ captions += f"{results['author']}\n"
+
+ # date
+ if info.get("pubdate"):
+ results["create_time"] = "🕒" + info["pubdate"]
+ elif dt := ts_to_dt(info.get("timestamp")):
+ results["create_time"] = f"🕒{dt:%Y-%m-%d %H:%M:%S}"
+ elif info.get("upload_date"):
+ results["create_time"] = "🕒" + info["update_date"]
+ else:
+ results["create_time"] = ""
+ if results["create_time"]:
+ captions += f"{results['create_time']}\n"
+
+ results["statistics"] = info.get("statistics", "")
+ if results["statistics"]:
+ captions += f"{results['statistics']}\n"
+
+ # title
+ if info.get("title"):
+ results["title"] = f"📝[{info['title']}]({url})"
+ captions += f"{results['title']}\n"
+ else:
+ results["title"] = ""
+
+ # desc
+ if (desc := info.get("description")) and (desc != "-"):
+ warnings.simplefilter("ignore", MarkupResemblesLocatorWarning)
+ soup = BeautifulSoup(desc, "html.parser")
+ desc_text = soup_to_text(soup)
+ results["description"] = make_bvid_clickable(desc_text)
+ captions += f"{results['description']}\n"
+ else:
+ results["description"] = ""
+
+ # comments
+ comment_list = []
+ comments = ""
+ if true(bilibili_comments) and platform == "bilibili":
+ comment_list = await get_bilibili_comments(bvid)
+ elif true(youtube_comments) and platform == "youtube":
+ comment_list = await get_youtube_comments(vid)
+ for comment in comment_list:
+ if await count_without_entities(f"{captions}{comment}") < CAPTION_LENGTH - 15: # leave some margin for other info
+ comments += comment
+ results["comments"] = comments.strip()
+ captions = f"{captions}{comments.strip()}"
+ results["caption"] = captions
+ return results
+
+
+def get_target_chats(message: Message, video_target: str | int | None = None, audio_target: str | int | None = None, **kwargs) -> tuple[int | str, int | str]:
+ """Get target chats of video and audio messages.
+
+ Returns:
+ (video_target_chat, audio_target_chat)
+ """
+ main_target = kwargs.get("target_chat") or message.chat.id
+ if video_target is None:
+ video_target = main_target
+ if audio_target is None:
+ audio_target = main_target
+ return to_int(video_target), to_int(audio_target)
+
+
+async def send_media(
+ client: Client,
+ message: Message,
+ info: dict,
+ ytdlp_video_target: str | int | None = None,
+ ytdlp_audio_target: str | int | None = None,
+ *,
+ ytdlp_send_video: bool = True,
+ ytdlp_send_audio: bool = False,
+ **kwargs,
+) -> dict:
+ """Send media to target chats.
+
+ Returns:
+ {
+ "video": list[Message],
+ "audio": Message,
+ }
+ """
+ video_path: Path = info["video_path"]
+ audio_path: Path = info["audio_path"]
+ video_messages = []
+ audio_message = None
+ video_target, audio_target = get_target_chats(message, ytdlp_video_target, ytdlp_audio_target, **kwargs)
+
+ reply_msg_id = kwargs.get("reply_msg_id", 0)
+ reply_parameters = get_reply_to(message.id, reply_msg_id)
+ thumb = await generate_cover(video_path) if video_path.is_file() else await generate_cover(audio_path)
+ if not Path(thumb).is_file():
+ thumb = None
+
+ # split large videos into multiple parts (less than 2GB)
+ if true(ytdlp_send_video) and video_path.is_file():
+ video_path = await convert_to_h264(video_path, re_encoding=True, max_file_size=YTDLP_RE_ENCODING_MAX_FILE_BYTES, skip_h264=True)
+ if video_path.stat().st_size > MAX_FILE_BYTES:
+ await modify_progress(text=f"🎬视频大小超过Telegram限制({MAX_FILE_BYTES / 1024 / 1024:.0f}MB), 正在切分...", **kwargs)
+ videos = await preprocess_media([{"video": video_path, "thumb": thumb}])
+ for idx, video in enumerate(videos):
+ video["thumb"] = thumb
+ caption = info["caption"].replace("📝[", f"📝[P{idx + 1}-") if len(videos) > 1 else info["caption"]
+ caption = (await smart_split(caption, CAPTION_LENGTH))[0]
+ await modify_progress(text=f"🎬视频上传中-P{idx + 1}: {readable_size(path=video['video'])}", force_update=True, **kwargs)
+ video_messages.append(
+ await client.send_video(
+ chat_id=to_int(video_target),
+ caption=warp_comments(caption),
+ reply_parameters=reply_parameters,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), video["video"], true(kwargs.get("detail_progress"))), # message, path, detail_progress
+ **video,
+ )
+ )
+ # don't need to split audio
+ if true(ytdlp_send_audio) and audio_path.is_file():
+ await modify_progress(text=f"🎧音频上传中: {readable_size(path=audio_path)}", force_update=True, **kwargs)
+ caption = (await smart_split(info["caption"], CAPTION_LENGTH))[0]
+ audio_message = await client.send_audio(
+ chat_id=to_int(audio_target),
+ audio=audio_path.as_posix(),
+ caption=warp_comments(caption),
+ performer=info["author"],
+ title=info["title"],
+ duration=round(float(info.get("duration", "0"))),
+ reply_parameters=reply_parameters,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), audio_path, true(kwargs.get("detail_progress"))), # message, path, detail_progress
+ thumb=thumb, # type: ignore
+ )
+ await modify_progress(del_status=True, **kwargs)
+ sent_messages = {}
+ if all(isinstance(x, Message) for x in video_messages):
+ sent_messages["video"] = video_messages
+ if isinstance(audio_message, Message):
+ sent_messages["audio"] = audio_message
+ return sent_messages
src/ytdlp/utils.py
@@ -0,0 +1,167 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+from pathlib import Path
+from typing import Literal
+from urllib.parse import urlparse
+
+from loguru import logger
+from yt_dlp.utils import YoutubeDLError
+
+from config import COOKIE, DOWNLOAD_DIR, PROXY
+from cookies import ytdlp_bilibili_cookie
+from utils import remove_none_values
+
+
+class ProxyError(Exception):
+ pass
+
+
+def get_ytdlp_proxy(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None) -> str | None:
+ """Get ytdlp proxy."""
+ if platform is None: # detect platform from url
+ if not url:
+ logger.warning("No url provided, fallback to default proxy")
+ return PROXY.YTDLP
+ parsed = urlparse(url)
+ host = parsed.netloc # www.youtube.com
+ platform = host.split(".")[-2] # type: ignore
+
+ if proxy is None: # proxy is not set
+ proxy = os.getenv(f"YTDLP_PROXY_{platform}".upper())
+
+ # empty: no proxy
+ # None: default ytdlp proxy
+ if proxy is None: # fallback to default proxy is unset
+ proxy = PROXY.YTDLP
+ elif proxy == "": # empty string means no proxy
+ proxy = None
+ logger.debug(f"YTDLP Proxy of {platform}: {proxy}")
+ return proxy
+
+
+async def get_ytdlp_opts(platform: Literal["youtube", "bilibili", "ytdlp"] | None = None, url: str = "", proxy: str | None = None, *, video: bool = True) -> dict:
+ """Get ytdlp options."""
+ if not proxy:
+ proxy = get_ytdlp_proxy(platform=platform, url=url, proxy=proxy)
+ ytdlp_opts = {
+ "paths": {"home": DOWNLOAD_DIR},
+ "cachedir": DOWNLOAD_DIR,
+ "simulate": False,
+ "skip_download": False,
+ "keepvideo": True,
+ "format": video_selector if video else "m4a/bestaudio/best",
+ "writethumbnail": True,
+ "trim_file_name": 60, # filesystem limit for filename is 255 bytes. UFT-8 char is 1-4 bytes.
+ "proxy": proxy,
+ "extractor_args": {"youtube": {"lang": ["zh-CN", "zh-HK", "zh-TW", "en", "en-GB"]}},
+ "ignore_no_formats_error": False,
+ "live_from_start": False,
+ "retries": 5,
+ "retry_sleep_functions": {"http": lambda _: 1}, # sleep 1 second between retries
+ "nocheckcertificate": True,
+ "source_address": "0.0.0.0", # force-ipv4 # noqa: S104
+ "outtmpl": "%(id)s.%(ext)s",
+ "noplaylist": True,
+ "color": "no_color-tty",
+ "logger": logger,
+ }
+ if platform == "bilibili" and COOKIE.YTDLP_BILIBILI_USE_COOKIE:
+ cookiefile = await ytdlp_bilibili_cookie()
+ logger.trace(f"Use cookie file: {cookiefile}")
+ ytdlp_opts["cookiefile"] = cookiefile
+ return ytdlp_opts
+
+
+def video_selector(ctx):
+ """Select the best format.
+
+ For the best compatibility, we choose .mp4 extension with AVC codec for video, .m4a extension for audio.
+ """
+ # formats are already sorted worst to best
+ formats = ctx.get("formats")[::-1]
+ if not formats:
+ msg = "No format found."
+ raise YoutubeDLError(msg)
+ formats = remove_none_values(formats)
+ logger.trace(f"Choose best format from {len(formats)} extracted formats")
+ # acodec='none' means there is no audio
+ # find compatible extension, VP9 is not supported by iOS, use AVC instead
+ all_videos = [f for f in formats if f.get("video_ext", "").lower() != "none"]
+ all_audios = [f for f in formats if f.get("audio_ext", "").lower() != "none"]
+ videos = [f for f in all_videos if f.get("video_ext", "").lower() == "mp4" and f.get("acodec", "").lower() == "none" and f.get("vcodec", "").lower().startswith("avc")]
+ audios = [f for f in all_audios if (f.get("resolution", "").lower() == "audio only" and f.get("audio_ext", "").lower() == "m4a")]
+ logger.trace(f"Found {len(videos)} video formats")
+ logger.trace(f"Found {len(audios)} video formats")
+
+ # if no compatible format found, fallback to the best format
+ if not videos:
+ videos = all_videos
+ if not audios:
+ audios = all_audios
+
+ if not videos and not audios:
+ msg = "No video and audio format found."
+ raise YoutubeDLError(msg)
+ elif not videos:
+ best_audio = audios[0]
+ logger.debug(f"Use audio format: {best_audio['format']}")
+ yield {
+ "format_id": f"{best_audio['format_id']}",
+ "ext": best_audio["ext"],
+ "requested_formats": [best_audio],
+ "protocol": f"{best_audio['protocol']}",
+ }
+ elif not audios:
+ best_video = videos[0]
+ logger.debug(f"Use video format: {best_video['format']}")
+ yield {
+ "format_id": f"{best_video['format_id']}",
+ "ext": best_video["ext"],
+ "requested_formats": [best_video],
+ "protocol": f"{best_video['protocol']}",
+ }
+ else:
+ best_video = videos[0]
+ best_audio = audios[0]
+ logger.debug(f"Use video format: {best_video['format']}")
+ logger.debug(f"Use audio format: {best_audio['format']}")
+ yield {
+ "format_id": f"{best_video['format_id']}+{best_audio['format_id']}",
+ "ext": best_video["ext"],
+ "requested_formats": [best_video, best_audio],
+ "protocol": f"{best_video['protocol']}+{best_audio['protocol']}",
+ }
+
+
+def uploader_url(info: dict, extractor: str) -> str:
+ if url := info.get("uploader_url"):
+ return url
+ if author_id := info.get("uploader_id"):
+ if "youtube" in extractor:
+ return f"https://www.youtube.com/{author_id}"
+ if "bilibili" in extractor:
+ return f"https://space.bilibili.com/{author_id}"
+ return ""
+
+
+def platform_emoji(extractor: str) -> str:
+ if "bilibili" in extractor:
+ return "🅱️"
+ if "youtube" in extractor:
+ return "🔴"
+ if "twitch" in extractor:
+ return "🟣"
+ if "facebook" in extractor:
+ return "🔵"
+ return "🆔"
+
+
+def cleanup_ytdlp(vid: str):
+ if not vid:
+ return
+ logger.debug(f"Cleaning up: {vid}")
+ for p in Path(DOWNLOAD_DIR).glob(f"{vid}*"):
+ if p.is_file():
+ logger.trace(f"Deleting ytdlp files: {p}")
+ p.unlink(missing_ok=True)
src/config.py
@@ -143,8 +143,6 @@ class PROVIDER: # default API provider
INSTAGRAM = os.getenv("INSTAGRAM_PROVIDER", "tikhub-ddinstagram-bridge").lower() # tikhub, ddinstagram, bridge
INSTAGRAM_COMMENTS = os.getenv("INSTAGRAM_COMMENTS_PROVIDER", "tikhub").lower() # tikhub or a false value (0, false, none, null, etc.)
WEIBO_COMMENTS = os.getenv("WEIBO_COMMENTS_PROVIDER", "free").lower() # free or a false value (0, false, none, null, etc.)
- YOUTUBE_COMMENTS = os.getenv("YOUTUBE_COMMENTS_PROVIDER", "free").lower() # free or a false value (0, false, none, null, etc.)
- YOUTUBE_SUBTITLE = os.getenv("YOUTUBE_SUBTITLE_PROVIDER", "free-tikhub").lower() # free or tikhub
class TOKEN:
@@ -204,7 +202,6 @@ class TID: # see more TID usecase in `src/permission.py`
ADMIN = os.getenv("TID_ADMIN", "") # comma separated userid or @username
HISTORY_ADMIN = os.getenv("TID_HISTORY_ADMIN", "") # comma separated userid (@username is NOT supported!)
# back up ytdlp audio if the user does not request it
- CHANNEL_YTDLP_BACKUP = os.getenv("TID_CHANNEL_YTDLP_BACKUP", "me")
DAILY_SUMMARY = os.getenv("TID_DAILY_SUMMARY", "{}") # {"source-chat-id": "target-chat-id"}, e.g. '{"-1001234567890": "-1009876543210"}'
GEMINI_CHATS = os.getenv("TID_GEMINI_CHATS", "") # comma separated chat ids to always use gemini models (no need `/gemini`)
OPENAI_CHATS = os.getenv("TID_OPENAI_CHATS", "") # comma separated chat ids to always use openai models (no need `/gpt`)
src/handler.py
@@ -38,12 +38,13 @@ from preview.v2ex import preview_v2ex
from preview.wechat import preview_wechat
from preview.weibo import preview_weibo
from preview.xiaohongshu import preview_xhs
-from preview.ytdlp import ProxyError, preview_ytdlp
from price.entrypoint import get_asset_price
from quotly.quotly import quote_message
from subtitles.subtitle import get_subtitle
from tts.tts import text_to_speech
from utils import to_int, true
+from ytdlp.main import preview_ytdlp
+from ytdlp.utils import ProxyError
async def handle_utilities(
@@ -286,11 +287,30 @@ async def handle_social_media(
sent_messages = await preview_ytdlp(client, message, proxy=PROXY.YTDLP_FALLBACK, **kwargs)
if warn_msg:
await warn_msg.delete()
- if not sent_messages and startswith_prefix(this_texts, prefix=cmd_prefix):
+ # if ytdlp failed, download directly
+ if (
+ not sent_messages
+ and startswith_prefix(this_texts, prefix=cmd_prefix)
+ and matched["platform"]
+ not in [
+ "bilibili",
+ "douyin",
+ "github",
+ "instagram",
+ "music163",
+ "reddit",
+ "spotify",
+ "tiktok",
+ "v2ex",
+ "weibo",
+ "x",
+ "xiaohongshu",
+ "youtube",
+ ]
+ ):
if kwargs.get("show_progress"):
kwargs["progress"] = await client.send_message(info["cid"], text="⚠️暂时不支持解析链接, 尝试直接下载该网页")
await download_url_in_message(client, this_msg, extra_prefix=cmd_prefix, **kwargs)
-
except Exception as e:
logger.exception(e)
src/permission.py
@@ -85,6 +85,24 @@ async def check_category(client: Client, message: Message, ctype: str) -> dict:
return permission
+@cache.memoize(ttl=0)
+def global_permissions() -> dict:
+ """Set permissions for all chats.
+
+ GLOBAL_YTDLP_SEND_AUDIO=0 # disable ytdlp_send_audio
+ GLOBAL_TWITTER_PROVIDER=vxtwitter-fxtwitter # set twitter provider to `vxtwitter-fxtwitter`
+ """
+ envs = [x for x in os.environ if x.upper().startswith("GLOBAL_")]
+ permission = {}
+ for key in envs:
+ value = os.environ[key]
+ option = key.removeprefix("GLOBAL_").lower()
+ permission[option] = to_bool(value) # type: ignore
+ logger.warning(f"Set `{option}` to {to_bool(value)}")
+ logger.success(f"Global permission: {permission}")
+ return permission
+
+
@cache.memoize(ttl=0)
def check_service(cid: int | str, ctype: str) -> dict:
if not cid or not ctype:
@@ -127,7 +145,7 @@ def check_service(cid: int | str, ctype: str) -> dict:
"favorite": True,
"convert_chinese": True,
"quotly": True,
- }
+ } | global_permissions()
if ctype == "PRIVATE":
permission["ai"] = True
@@ -194,22 +212,22 @@ def check_service(cid: int | str, ctype: str) -> dict:
permission["quotly"] = False
"""
- Set specific service
+ Set for specific chat
SET_111111_AI=1
SET_111111_DOUYIN=0
SET_111111_DOUYIN_PROVIDER=tikhub
"""
-
- def to_bool(v: str) -> bool | str:
- if str(v).lower() in {"1", "true", "t", "yes", "y", "on", "0", "n", "no", "f", "false", "off"}:
- return true(v)
- return v
-
- envs = [x for x in os.environ if x.startswith((f"SET_{cid}_", f"set_{cid}_"))]
+ envs = [x for x in os.environ if x.upper().startswith(f"SET_{cid}_")]
for key in envs:
value = os.environ[key]
- option = key.removeprefix(f"SET_{cid}_").removeprefix(f"set_{cid}_").lower()
+ option = key.removeprefix(f"SET_{cid}_").lower()
permission[option] = to_bool(value) # type: ignore
logger.warning(f"Set `{option}` for chat={cid} to {to_bool(value)}")
logger.success(f"Permission for chat={cid}: {permission}")
return permission
+
+
+def to_bool(v: str) -> bool | str:
+ if str(v).lower() in {"1", "true", "t", "yes", "y", "on", "0", "n", "no", "f", "false", "off"}:
+ return true(v)
+ return v
src/utils.py
@@ -283,6 +283,16 @@ def readable_size(num_bytes: str | float = 0, path: str | Path | None = None) ->
return f"{num_bytes:.1f} MB"
+def readable_count(num: int | str) -> str:
+ count = to_int(num)
+ if not isinstance(count, int):
+ return str(num)
+ if count > 10000:
+ m, n = divmod(count, 10000)
+ return f"{m}万" if n < 1000 else f"{m}.{n // 1000}万"
+ return str(count)
+
+
def find_url(text: str) -> str:
if not isinstance(text, str):
return ""