Commit c3d6957
Changed files (10)
src/asr/voice_recognition.py
@@ -9,6 +9,7 @@ from pyrogram.types import Message
from asr.tecent_asr import Credential, FlashRecognitionRequest, FlashRecognizer
from config import ASR_MAX_DURATION, ENABLE, PREFIX, TOKEN, cache
+from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import equal_prefix, startswith_prefix
@@ -93,13 +94,14 @@ async def voice_to_text(
if not (trigger_message := get_trigger_message(message, asr_need_prefix, asr_skip_voice, asr_skip_audio, asr_skip_video)):
return
+ trigger_info = parse_msg(trigger_message)
asr_engine = "16k_zh-PY" # default: 中英粤
if matched := re.match(r"/asr\s+([^.。,,/\s]+)", str(message.text)): # /asr yue
asr_engine = f"16k_{matched.group(1)}"
asr_engine = asr_engine.replace("16k_fy", "16k_zh_dialect") # fix dialect engine code
- msg = f"Recieved {trigger_message.media.name} message, start recognizing by {ENGINE_MAP.get(asr_engine, 'Unknown')}..."
+ msg = f"Recieved {trigger_info['mtype']} message, start recognizing by {ENGINE_MAP.get(asr_engine, 'Unknown')}..."
logger.info(msg)
if kwargs.get("show_progress"):
res = await send2tg(client, message, texts=msg, **kwargs)
@@ -109,9 +111,9 @@ async def voice_to_text(
return
voice_format = ""
path: str | Path = await trigger_message.download() # type: ignore
- if trigger_message.media.name == "VOICE": # audio/ogg
+ if trigger_info["mtype"] == "voice": # audio/ogg
voice_format = str(trigger_message.voice.mime_type).split("/")[-1] # set voice format
- elif trigger_message.media.name in ["AUDIO", "VIDEO"]:
+ elif trigger_info["mtype"] in ["audio", "video"]:
path = convert_to_audio(path, ext="m4a")
voice_format = "m4a"
@@ -187,10 +189,10 @@ def get_trigger_message(
"""
if not ENABLE.ASR:
return None
+ info = parse_msg(message)
+ this_text = info["text"] # this message
- this_text = message.text or message.caption or "" # this message
-
- if message.chat.type.name in ["GROUP", "SUPERGROUP", "CHANNEL", "BOT"]:
+ if info["ctype"] in ["group", "supergroup", "channel", "bot"]:
asr_need_prefix = asr_need_prefix or True
asr_skip_voice = asr_skip_voice or False
asr_skip_audio = asr_skip_audio or False
@@ -207,11 +209,12 @@ def get_trigger_message(
# treat the reply_to_message as the real message need to be recognized
trigger_msg = message.reply_to_message if startswith_prefix(this_text, prefix=[PREFIX.ASR]) else message
+ trigger_info = parse_msg(trigger_msg)
# skip non voice/audio/video message
if not trigger_msg:
return None
- if not trigger_msg.media or trigger_msg.media.name not in ["VOICE", "AUDIO", "VIDEO"]:
+ if trigger_info["mtype"] not in ["voice", "audio", "video"]:
return None
# always trigger if the message has "/asr" prefix
@@ -219,10 +222,10 @@ def get_trigger_message(
return trigger_msg
# match the asr_skip_* settings
- if asr_skip_voice and trigger_msg.media.name == "VOICE":
+ if asr_skip_voice and trigger_info["mtype"] == "voice":
return None
- if asr_skip_audio and trigger_msg.media.name == "AUDIO":
+ if asr_skip_audio and trigger_info["mtype"] == "audio":
return None
- if asr_skip_video and trigger_msg.media.name == "VIDEO":
+ if asr_skip_video and trigger_info["mtype"] == "video":
return None
return trigger_msg
src/bridge/ocr.py
@@ -8,6 +8,7 @@ from pyrogram.client import Client
from pyrogram.types import Message, ReplyParameters
from config import ENABLE, PREFIX, cache
+from messages.parser import parse_msg
from messages.sender import send2tg
from messages.utils import equal_prefix, startswith_prefix
from utils import i_am_bot
@@ -27,24 +28,28 @@ async def send_to_ocr_bridge(client: Client, message: Message, **kwargs):
if equal_prefix(message.text, prefix=[PREFIX.OCR]) and not message.reply_to_message:
await send2tg(client, message, texts=f"**图片转文字**: 以`{PREFIX.OCR}`回复图片消息即可提取文字", **kwargs)
return
- msg = message.text or message.caption or "" # /ocr args
- if not startswith_prefix(message.text or message.caption, prefix=[PREFIX.OCR]):
+ info = parse_msg(message)
+ msg = info["text"]
+ if not startswith_prefix(msg, prefix=[PREFIX.OCR]):
return
if await i_am_bot(client): # bot can't send message to other bots
return
- # get the img file_id
- if message.photo:
- img = message.photo.file_id
- elif message.reply_to_message and message.reply_to_message.photo:
- img = message.reply_to_message.photo.file_id
- else:
+
+ # reply a message with /ocr
+ if message.reply_to_message:
+ message = message.reply_to_message
+ info = parse_msg(message, silent=True) # parse again
+
+ if info["mtype"] != "photo":
return
+ # get the img file_id
+ file_id = info["file_id"]
cid = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id # MSG-A's cid
mid = kwargs.get("reply_msg_id", message.id) # MSG-A's mid
msg += f" \n#ID=({cid},{mid})".replace("None", "0")
logger.warning(f"OCR via 妙妙小工具 (@{OCR_BOT}): {msg!r}")
- await client.send_photo(chat_id=f"@{OCR_BOT}", photo=img, caption=msg)
+ await client.send_photo(chat_id=f"@{OCR_BOT}", photo=file_id, caption=msg)
@cache.memoize(ttl=10)
@@ -52,11 +57,18 @@ async def forward_ocr_results(client: Client, message: Message):
"""See docs in `bridge/README.md` for details."""
if message.from_user.username != OCR_BOT or not message.reply_to_message:
return
+ info = parse_msg(message)
reply_msg = message.reply_to_message
- reply_msg_text = reply_msg.text or reply_msg.caption or ""
+ reply_info = parse_msg(reply_msg)
+
+ # this message should be a photo with captions
+ if info["mtype"] != "photo" or not info["text"]:
+ return
+ # this message should reply to a photo message starting with "/ocr"
+ if reply_info["mtype"] != "photo" or not startswith_prefix(reply_info["text"], prefix=[PREFIX.OCR]):
+ return
- # forward ocr (result should be a photo)
- if message.photo and message.caption and reply_msg_text.startswith("/ocr") and (matched := re.search(r"#ID=\((-?\d+),(-?\d+)\)", str(reply_msg_text))):
+ if matched := re.search(r"#ID=\((-?\d+),(-?\d+)\)", reply_info["text"]):
target_cid = matched.group(1) # MSG-A's cid
target_mid = int(matched.group(2)) if int(matched.group(2)) != 0 else None # MSG-A's mid
cid = message.chat.id # result's cid
src/messages/chat_history.py
@@ -1,12 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-from datetime import datetime
-from zoneinfo import ZoneInfo
from pyrogram.client import Client
from pyrogram.types import Message
-from config import TZ
from messages.parser import parse_msg
@@ -18,19 +15,14 @@ async def get_chat_history(client: Client, message: Message, offset_id: int, num
async for msg in client.get_chat_history(chat_id=message.chat.id, offset_id=offset_id, limit=num_history): # type: ignore
if msg.empty:
continue
- texts = msg.text or msg.caption or ""
+
info = parse_msg(msg, silent=True)
- time = ""
- dt = msg.date
- if isinstance(dt, datetime):
- time = f"({msg.date.replace(tzinfo=ZoneInfo(TZ)):%Y-%m-%d %H:%M:%S})"
- media = f"[{msg.media.name}]" if msg.media else ""
res = ""
if info["full_name"]:
res += f"@{info['full_name']} "
- if time:
- res += f"{time}\n"
- res += f"{media}{texts}"
- if res:
+ res += f"{info['time']}\n"
+ media = f"[{msg.media.name}]" if msg.media else ""
+ res += f"{media}{info['text']}"
+ if res.strip():
history.append(res)
return history[::-1]
src/messages/parser.py
@@ -2,36 +2,50 @@
# -*- coding: utf-8 -*-
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
from loguru import logger
from pyrogram.enums import MessageEntityType
-from pyrogram.types import Message
+from pyrogram.types import Audio, Message
-from config import cache
+from config import TZ, cache
+from utils import nowdt
def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False) -> dict:
+ """Parse a message object and return a dictionary of its attributes.
+
+ Abbreviations: c = chat, m = message, u = user
+ """
if cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}"):
return cached
if not silent and verbose:
logger.trace(f"{message!r}")
- chat_type = message.chat.type.name if message.chat and message.chat.type else ""
- chat_title = message.chat.title if message.chat and message.chat.title else ""
+ mtype: str = message.media.value if message.media and hasattr(message.media, "value") else "text" # type: ignore
+ ctype = message.chat.type.name if message.chat and hasattr(message.chat, "type") else ""
+ ctitle = message.chat.title if message.chat and message.chat.title else ""
uid = message.from_user.id if message.from_user else 0
cid = message.chat.id if message.chat else 0
mid = message.id if message.id else 0
is_bot = bool(message.from_user and message.from_user.is_bot)
- text = message.text if message.text else ""
+ text = message.text or message.caption or ""
+ dt = message.date.replace(tzinfo=ZoneInfo(TZ)) if isinstance(message.date, datetime) else nowdt(TZ)
+ time = f"{dt:%Y-%m-%d %H:%M:%S}"
+
+ # parse user attributes
first_name = message.from_user.first_name if message.from_user and message.from_user.first_name else ""
last_name = message.from_user.last_name if message.from_user and message.from_user.last_name else ""
handle = message.from_user.username if message.from_user and message.from_user.username else ""
full_name = f"{first_name} {last_name}".strip() if message.from_user else ""
- video_name = message.video.file_name if message.video else ""
- photo_id = message.photo.file_unique_id if message.photo else ""
- caption = message.caption if message.caption else ""
- gif = message.animation.file_name if message.animation else ""
- sticker = message.sticker.set_name if message.sticker else ""
- file_name = message.document.file_name if message.document else ""
- message_url = f"https://t.me/c/{str(cid).removeprefix('-100')}/{mid}"
+
+ # parse media attributes
+ media = getattr(message, mtype) if hasattr(message, mtype) else Audio(file_id="", file_unique_id="", duration=0) # placeholder
+ file_id = media.file_id if hasattr(media, "file_id") and media.file_id else ""
+ file_name = media.file_name if hasattr(media, "file_name") and media.file_name else ""
+ mime_type = media.mime_type if hasattr(media, "mime_type") and media.mime_type else ""
+ file_size = media.file_size if hasattr(media, "file_size") and media.file_size else 0
+ duration = media.duration if hasattr(media, "duration") and media.duration else 0
# Parse URL from message entities
entity_urls = []
@@ -39,40 +53,42 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False)
entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
if message.caption_entities:
entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
+ message_url = f"https://t.me/c/{str(cid).removeprefix('-100')}/{mid}"
- # log the summary to console
- chat_type_emoji = {
+ ctype_emoji = {
"BOT": "🤖",
"GROUP": "👥",
"SUPERGROUP": "👥",
"CHANNEL": "📡",
- "PRIVATE": "🔴",
- }.get(chat_type, "")
+ "PRIVATE": "👨",
+ }.get(ctype, "")
+ mtype_emoji = {
+ "text": "🔤",
+ "audio": "🎧",
+ "document": "📔",
+ "photo": "🏞",
+ "sticker": "🎨",
+ "video": "🎬",
+ "animation": "✨",
+ "voice": "🎤",
+ }.get(mtype, mtype)
+ # log the summary to console
summary = ""
- if chat_title:
- summary += f"{chat_type_emoji}{chat_title}[{mid}]"
- if first_name:
- summary += f"🤖{full_name}[{uid}]" if is_bot else f"👤{full_name}[{uid}]"
- if video_name:
- summary += f" 🎬{video_name}"
- if photo_id:
- summary += f" 🏞{photo_id}"
- if sticker:
- summary += f" 🎨{sticker}"
- if gif:
- summary += f" ✨{gif}"
- if file_name:
- summary += f" 📔{file_name}"
+ if ctitle: # group or channel
+ summary += f"{ctype_emoji}{ctitle}[{mid}]"
+
+ if full_name: # private chat
+ summary += f"🤖{full_name}(@{handle})[{uid}]" if is_bot else f"👨{full_name}(@{handle})[{uid}]"
+ summary += f" {mtype_emoji}{mtype}{file_name}".strip()
if text:
summary += f" 📝{text}"
- if caption:
- summary += f" 📝{caption}"
if not silent:
logger.info(f"{summary!r}")
info = { # ensure the type of each field
- "chat_type": str(chat_type),
- "chat_title": str(chat_title),
+ "mtype": str(mtype),
+ "ctype": str(ctype),
+ "ctitle": str(ctitle),
"uid": int(uid),
"cid": int(cid),
"mid": int(mid),
@@ -82,12 +98,13 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False)
"last_name": str(last_name),
"full_name": str(full_name),
"handle": str(handle),
- "video_name": str(video_name),
+ "datetime": dt,
+ "time": str(time),
"file_name": str(file_name),
- "photo_id": str(photo_id),
- "caption": str(caption),
- "gif": str(gif),
- "sticker": str(sticker),
+ "file_id": str(file_id),
+ "mime_type": str(mime_type),
+ "file_size": int(file_size),
+ "duration": int(duration),
"summary": str(summary),
"message_url": str(message_url),
"entity_urls": entity_urls,
src/others/download_external.py
@@ -11,6 +11,7 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import ENABLE, MAX_FILE_BYTES, PREFIX
+from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import equal_prefix, get_reply_to, startswith_prefix
@@ -30,7 +31,8 @@ async def download_url_in_message(client: Client, message: Message, **kwargs):
"""Download the url from the message."""
if not ENABLE.WGET:
return
- if not startswith_prefix(message.text or message.caption, prefix=[PREFIX.WGET]):
+ info = parse_msg(message)
+ if not startswith_prefix(info["text"], prefix=[PREFIX.WGET]):
return
# send docs if message == "/wget", without reply
if equal_prefix(message.text, prefix=[PREFIX.WGET]) and not message.reply_to_message:
@@ -40,6 +42,7 @@ async def download_url_in_message(client: Client, message: Message, **kwargs):
# reply a message with /wget
if message.reply_to_message:
message = message.reply_to_message
+ info = parse_msg(message, silent=True) # parse again
target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
target_chat = to_int(target_chat)
@@ -47,7 +50,7 @@ async def download_url_in_message(client: Client, message: Message, **kwargs):
reply_parameters = get_reply_to(message.id, reply_msg_id)
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))" # noqa: RUF001
- if matched := re.findall(regex, message.text):
+ if matched := re.findall(regex, info["text"]):
url = https_url(matched[0][0])
logger.debug(f"URL found from message text: {url}")
src/others/extract_audio.py
@@ -34,16 +34,17 @@ async def extract_audio_file(client: Client, message: Message, **kwargs) -> None
await send2tg(client, message, texts=HELP, **kwargs)
return
- if not startswith_prefix(message.text or message.caption, prefix=[PREFIX.AUDIO]):
+ info = parse_msg(message)
+ if not startswith_prefix(info["text"], prefix=[PREFIX.AUDIO]):
return
# reply a message with /audio
if message.reply_to_message:
message = message.reply_to_message
+ info = parse_msg(message, silent=True) # parse again
- if not message.media or message.media.name != "VIDEO":
+ if info["mtype"] != "video":
return
- parse_msg(message, verbose=True)
msg = "🎬收到视频消息, 开始提取🎧音频..."
if kwargs.get("show_progress"):
src/others/gpt.py
@@ -14,6 +14,7 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import DOWNLOAD_DIR, ENABLE, GPT, PREFIX, PROXY, cache
+from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import equal_prefix, startswith_prefix
@@ -49,9 +50,9 @@ async def gpt_response(client: Client, message: Message, **kwargs):
"""
if not ENABLE.GPT:
return
-
+ info = parse_msg(message)
# send docs if message == "/ai", without reply
- if equal_prefix(message.text or message.caption, prefix=[PREFIX.GPT]) and not message.reply_to_message:
+ if equal_prefix(info["text"], prefix=[PREFIX.GPT]) and not message.reply_to_message:
await send2tg(client, message, texts=HELP, **kwargs)
return
@@ -290,15 +291,16 @@ def fix_doubao(contexts: list[dict]) -> list[dict]:
def is_valid_conversation(message: Message) -> bool:
- if startswith_prefix(message.text or message.caption, prefix=[PREFIX.GPT]):
+ info = parse_msg(message)
+ if startswith_prefix(info["text"], prefix=[PREFIX.GPT]):
return True
# is replying to gpt-bot response message?
if not message.reply_to_message:
return False
reply_msg = message.reply_to_message
- reply_text = reply_msg.text or reply_msg.caption or ""
- return reply_text.startswith("🤖")
+ reply_info = parse_msg(reply_msg, silent=True)
+ return reply_info["text"].startswith("🤖")
async def generate_single_msg_context(client: Client, message: Message) -> dict:
@@ -321,46 +323,48 @@ async def generate_single_msg_context(client: Client, message: Message) -> dict:
return ""
return re.sub(rf"(.*?){BOT_TIPS}\)", "", text.removeprefix(PREFIX.GPT)).strip()
- role = "assistant" if any(BOT_TIPS in texts for texts in [str(message.text), str(message.caption)]) else "user"
+ info = parse_msg(message, silent=True)
+ role = "assistant" if BOT_TIPS in info["text"] else "user"
# only text
if text := clean_text(message.text):
return {"role": role, "content": [{"type": "text", "text": text}]}
- if not message.media or message.media.name not in ["PHOTO", "VOICE", "AUDIO", "VIDEO", "DOCUMENT"]:
+ if info["mtype"] not in ["photo", "voice", "audio", "video", "document"]:
return {}
# has media
messages = await client.get_media_group(message.chat.id, message.id) if message.media_group_id else [message]
media = []
for msg in messages:
+ info = parse_msg(msg, silent=True)
try:
if GPT.MEDIA_FORMAT == "base64":
res: BytesIO = await client.download_media(msg, in_memory=True) # type: ignore
logger.debug(f"Downloaded GPT media: {res.name}")
ext = Path(res.name).suffix.removeprefix(".").replace("jpg", "jpeg")
b64 = base64.b64encode(res.getvalue()).decode("utf-8")
- if message.media.name == "PHOTO":
+ if info["mtype"] == "photo":
media.append({"type": "image_url", "image_url": {"url": f"data:image/{ext};base64,{b64}"}})
- elif message.media.name == "VIDEO":
+ elif info["mtype"] == "video":
media.append({"type": "video_url", "video_url": {"url": b64}})
- elif message.media.name == "DOCUMENT" and message.document.mime_type == "text/plain":
+ elif info["mtype"] == "document" and info["mime_type"] == "text/plain":
media.append({"type": "text", "text": res.getvalue().decode("utf-8")})
else:
logger.warning("Audio do not support base64, please use http")
else:
path: str = await client.download_media(msg) # type: ignore
logger.debug(f"Downloaded GPT media: {path}")
- if message.media.name == "PHOTO":
+ if info["mtype"] == "photo":
media.append({"type": "image_url", "image_url": {"url": f"{GPT.MEDIA_SERVER}/{Path(path).name}"}})
- elif message.media.name == "VIDEO":
+ elif info["mtype"] == "video":
media.append({"type": "video_url", "video_url": {"url": f"{GPT.MEDIA_SERVER}/{Path(path).name}"}})
- elif message.media.name in ["AUDIO", "VOICE"]:
+ elif info["mtype"] in ["audio", "voice"]:
mp3 = convert_to_audio(path, ext="mp3", codec="libmp3lame")
media.append({"audio": f"{GPT.MEDIA_SERVER}/{mp3.name}"})
- elif message.media.name == "DOCUMENT" and message.document.mime_type == "text/plain":
+ elif info["mtype"] == "document" and info["mime_type"] == "text/plain":
media.append({"type": "text", "text": Path(path).read_text()})
Path(path).unlink(missing_ok=True)
- if caption := msg.caption:
+ if caption := info["text"]:
media.append({"type": "text", "text": caption})
except Exception as e:
logger.warning(f"Download image from message failed: {e}")
src/others/subtitle.py
@@ -8,12 +8,12 @@ from datetime import timedelta
from loguru import logger
from pyrogram.client import Client
-from pyrogram.enums import MessageEntityType
from pyrogram.types import Message
from youtube_transcript_api import YouTubeTranscriptApi
from config import API, ENABLE, PREFIX, PROXY, TOKEN
from database import cache
+from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import equal_prefix, startswith_prefix
@@ -58,7 +58,7 @@ async def get_subtitle(client: Client, message: Message, **kwargs):
if subtitles := res.get("subtitle", ""):
length = len(subtitles)
with io.BytesIO(subtitles.encode("utf-8")) as f:
- await client.send_document(to_int(target_chat), f, file_name=f"{vid}.vtt-{length}字符.txt", caption=yt_url)
+ await client.send_document(to_int(target_chat), f, file_name=f"vtt字幕-{length}字符.txt", caption=f"{vid}[{yt_url}]")
elif error := res.get("error", ""):
await modify_progress(text=error, force_update=True, **kwargs)
await asyncio.sleep(3)
@@ -69,13 +69,16 @@ async def get_subtitle(client: Client, message: Message, **kwargs):
async def find_yt_vid(client: Client, message: Message) -> str:
- if not startswith_prefix(message.text or message.caption, prefix=[PREFIX.SUBTITLE]):
+ info = parse_msg(message)
+ if not startswith_prefix(info["text"], prefix=[PREFIX.SUBTITLE]):
return ""
- url = find_url_in_message(message)
+
# /subtitle "link"
- info = await match_social_media_link(url, flatten_first=True)
- if info["platform"] == "youtube":
- return info["vid"]
+ if entity_urls := info["entity_urls"]:
+ url = entity_urls[0]
+ matched = await match_social_media_link(url, flatten_first=True)
+ if matched["platform"] == "youtube":
+ return matched["vid"]
# is replying to message?
if not message.reply_to_message:
@@ -84,36 +87,27 @@ async def find_yt_vid(client: Client, message: Message) -> str:
# if reply to a media_group, fetch all messages in the group
reply_messages = await client.get_media_group(message.chat.id, message.id) if message.media_group_id else [reply_message]
for msg in reply_messages:
- url = find_url_in_message(msg)
- info = await match_social_media_link(url, flatten_first=True)
- if info["platform"] == "youtube":
- return info["vid"]
+ info = parse_msg(msg, silent=True)
+ if not info["entity_urls"]:
+ continue
+ url = info["entity_urls"][0]
+ matched = await match_social_media_link(url, flatten_first=True)
+ if matched["platform"] == "youtube":
+ return matched["vid"]
return ""
-def find_url_in_message(message: Message) -> str:
- # check first url in entities
- if message.entities:
- for entity in message.entities:
- if entity.type == MessageEntityType.TEXT_LINK:
- return entity.url
- if message.caption_entities:
- for entity in message.caption_entities:
- if entity.type == MessageEntityType.TEXT_LINK:
- return entity.url
- return str(message.text).strip() if message.text else str(message.caption).strip()
-
-
async def fetch_subtitle(video_id: str) -> dict:
proxy = {"http": PROXY.SUBTITLE, "https": PROXY.SUBTITLE} if PROXY.SUBTITLE else None
logger.info(f"Fetch Subtitle for {video_id=}, {proxy=}")
res = {}
try:
subtitles: list[dict] = YouTubeTranscriptApi.get_transcript(video_id=video_id, languages=["zh-CN", "zh-Hans", "zh", "zh-HK", "zh-TW", "zh-Hant", "en"], proxies=proxy)
- res["subtitle"] = to_webvtt(subtitles)
except Exception as e:
logger.error(f"Failed to get subtitle: {e}")
return await fetch_subtitle_tikhub(video_id)
+ if subtitles:
+ res["subtitle"] = to_webvtt(subtitles)
return res
@@ -154,7 +148,7 @@ def to_webvtt(subtitles: list[dict]) -> str:
for subtitle in subtitles:
start = format_timestamp(subtitle["start"])
end = format_timestamp(subtitle["start"] + subtitle["duration"])
- text = subtitle["text"]
+ text = subtitle.get("text", "")
vtt_output.append(f"{start} --> {end}")
vtt_output.append(text)
vtt_output.append("") # Add blank line between subtitles
src/handler.py
@@ -136,7 +136,7 @@ async def handle_social_media(
cmd_prefix.extend(PREFIX.MAIN)
ignore_prefix = ignore_prefix or []
info = parse_msg(message)
- this_texts = message.text or message.caption or "" # texts of the trigger message
+ this_texts = info["text"] # texts of the trigger message
if need_prefix and not startswith_prefix(this_texts, prefix=[*cmd_prefix, "/retry"], ignore_prefix=ignore_prefix):
return
kwargs |= params_from_msg_text(this_texts) # merge the parameters from the message text
@@ -151,6 +151,7 @@ async def handle_social_media(
return
# with reply, treat the reply_msg as the trigger to preview social media link
message = message.reply_to_message
+ info = parse_msg(message, silent=True) # parse again
warn_msg = None
if not need_prefix and startswith_prefix(this_texts, prefix=cmd_prefix, ignore_prefix=ignore_prefix):
@@ -162,8 +163,7 @@ async def handle_social_media(
# Caution: this format should be consistent with `save_messages` function in `message.database.py`
kwargs["send_from_user"] = f"👤[@{info['full_name']}](tg://user?id={info['uid']})//"
try:
- texts = message.text or message.caption or ""
- texts = await flatten_rediercts(texts)
+ texts = await flatten_rediercts(info["text"])
matched = await match_social_media_link(texts) # match "platform" and "url" (and other info)
kwargs |= matched
if startswith_prefix(this_texts, prefix=["/retry"], ignore_prefix=ignore_prefix):