Commit fe984a8
Changed files (6)
src
src/messages/preprocess.py
@@ -0,0 +1,171 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from pathlib import Path
+
+from loguru import logger
+from pyrogram.types import InputMediaAudio, InputMediaDocument, InputMediaPhoto, InputMediaVideo
+
+from config import CAPTION_LENGTH
+from multimedia import fix_video_rotation, generate_cover, is_valid_video_or_audio, parse_media_info, split_large_video, split_long_img, validate_img
+
+
+def preprocess_media(media: list[dict]) -> list[dict]:
+ """Filter out invalid media files.
+
+ - photo must be at most 10 MB in size.
+ - photo's width and height must not exceed 10000 in total.
+ - photo's width and height ratio must be at most 20.
+ - filesize < 2GB for video
+
+ Args:
+ media (list[dict]): The list of media info.
+ format: {"photo": "path/to/photo.jpg"}
+ OR { "video": "path/to/video.mp4",
+ "thumb"(optional): "path/to/thumbnail.jpg" (generate a new one if unset)
+ }
+ OR { "audio": "path/to/audio.mp3"
+ "performer"(optional): "Alice", ("Performer" if unset)
+ "title"(optional): "audio", (filename if unset)
+ "thumb"(optional): "path/to/thumbnail.jpg" (generate a new one if unset)
+ }
+ OR {"document": "path/to/document.pdf"}
+
+ Returns:
+ list[dict]: The filtered media info.
+ {"photo": "path/to/photo.jpg"},
+ {
+ "video": "path/to/video.mp4",
+ "width": int,
+ "height": int,
+ "duration": int,
+ "thumb": "path/to/thumbnail.jpg" | None,
+ },
+ {
+ "audio": "path/to/audio.mp3",
+ "performer": str,
+ "title": str,
+ "duration": int,
+ "thumb": "path/to/thumbnail.jpg" | None,
+ }
+ {"document": "path/to/document.pdf"}
+ """
+ num_before = len(media)
+ logger.trace(f"{num_before} media info before preprocess: {media}")
+
+ # Step-1: Photos
+ done_photos = []
+ for data in media:
+ if not data.get("photo"):
+ done_photos.append(data)
+ continue
+ if photo_path := data.get("photo"):
+ valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
+ done_photos.extend({"photo": valid_photo} for valid_photo in valid_photos)
+
+ # Step-2: Videos
+ done_videos = []
+ for data in done_photos:
+ if not data.get("video"):
+ done_videos.append(data)
+ continue
+ thumb = data.get("thumb") # thumb is provided
+ if video_path := data.get("video"):
+ video_path = fix_video_rotation(video_path)
+ if not is_valid_video_or_audio(video_path):
+ logger.warning(f"Video is invalid: {video_path}")
+ continue
+
+ # split large video files ( < 2GB)
+ valid_videos = [x for x in split_large_video(video_path) if is_valid_video_or_audio(x)]
+
+ # generate thumbnails for each video if thumb is not provided
+ thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
+ for vpath, tpath in zip(valid_videos, thumbs, strict=True):
+ video_info = parse_media_info(vpath)
+ thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
+ done_videos.append({"video": vpath.as_posix(), "width": video_info["width"], "height": video_info["height"], "duration": video_info["duration"], "thumb": thumb})
+ # Step-3: Audios
+ done_audios = []
+ for data in done_videos:
+ if not data.get("audio"):
+ done_audios.append(data)
+ continue
+ audio_path = Path(data.get("audio", ""))
+ if not is_valid_video_or_audio(audio_path):
+ logger.warning(f"Audio is invalid: {audio_path}")
+ continue
+ audio_info = parse_media_info(audio_path)
+ thumb = data.get("thumb") # thumb is provided
+ thumb = valid_thumb if (valid_thumb := validate_img(thumb)) else generate_cover(data["audio"])
+ done_audios.append(
+ {
+ "audio": audio_path,
+ "performer": data.get("performer", "Performer"),
+ "title": data.get("title", audio_path.stem),
+ "duration": audio_info["duration"],
+ "thumb": thumb if validate_img(thumb) else None,
+ }
+ )
+
+ logger.debug(f"Filtered out {num_before - len(done_audios)} invalid media files")
+ logger.trace(f"{len(done_audios)} media info after preprocess: {done_audios}")
+ return done_audios
+
+
+def warp_media_group(media: list[dict], caption: str = "") -> list:
+ """Warp media files into a list of media group objects.
+
+ item in media:
+ {
+ "photo": "path/to/photo.jpg",
+ }
+ {
+ "video": "path/to/video.mp4",
+ "width": int,
+ "height": int,
+ "duration": int,
+ "thumb": "path/to/thumbnail.jpg" | None,
+ }
+ {
+ "audio": "path/to/audio.mp3",
+ "performer": str,
+ "title": str,
+ "duration": int,
+ "thumb": "path/to/thumbnail.jpg" | None,
+ }
+ {"document": "path/to/document.pdf"}
+ """
+ group = []
+ if len(media) < 2:
+ logger.error(f"Media group requires at least 2 items, number of media: {len(media)}")
+ return []
+ if len(caption) > CAPTION_LENGTH:
+ logger.warning(f"Caption too long, length: {len(caption)}, caption: {caption}")
+ caption = caption[:CAPTION_LENGTH]
+ if len(media) > 10:
+ logger.warning(f"Too many media files, number of media: {len(media)}")
+ media = media[:10]
+ # add caption to the first item
+ if media[0].get("photo"):
+ group.append(InputMediaPhoto(media[0]["photo"], caption=caption))
+ elif media[0].get("video"):
+ media[0]["media"] = media[0].pop("video")
+ group.append(InputMediaVideo(caption=caption, **media[0]))
+ elif media[0].get("audio"):
+ media[0]["media"] = media[0].pop("audio")
+ group.append(InputMediaAudio(caption=caption, **media[0]))
+ elif media[0].get("document"):
+ group.append(InputMediaDocument(media[0]["document"], caption=caption))
+ # DO NOT add captions for remaining media
+ for x in media[1:]:
+ if x.get("photo"):
+ group.append(InputMediaPhoto(x["photo"]))
+ elif x.get("video"):
+ x["media"] = x.pop("video")
+ group.append(InputMediaVideo(**x))
+ elif x.get("audio"):
+ x["media"] = x.pop("audio")
+ group.append(InputMediaAudio(**x))
+ elif x.get("document"):
+ group.append(InputMediaDocument(x["document"]))
+ return group
src/messages/sender.py
@@ -6,182 +6,15 @@ from pathlib import Path
from loguru import logger
from pyrogram.client import Client
-from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message, ReplyParameters
+from pyrogram.types import Message, ReplyParameters
from config import CAPTION_LENGTH
+from messages.preprocess import preprocess_media, warp_media_group
from messages.progress import modify_progress, telegram_uploading
from messages.utils import get_reply_to, summay_media
-from multimedia import fix_video_rotation, generate_cover, is_valid_video, parse_media_info, split_large_video, split_long_img, validate_img
from utils import smart_split, to_int
-def warp_media_group(media: list[dict], caption: str = "") -> list:
- """Warp media files into a list of InputMediaPhoto or InputMediaVideo objects.
-
- item in media:
- {
- "photo": "path/to/photo.jpg",
- }
- or
- {
- "video": "path/to/video.mp4",
- "width": int,
- "height": int,
- "duration": int,
- "thumb": "path/to/thumbnail.jpg" | None,
- }
- """
- group = []
- if len(media) < 2:
- logger.error(f"Media group requires at least 2 items, number of media: {len(media)}")
- return []
- if len(caption) > CAPTION_LENGTH:
- logger.warning(f"Caption too long, length: {len(caption)}, caption: {caption}")
- caption = caption[:CAPTION_LENGTH]
- if len(media) > 10:
- logger.warning(f"Too many media files, number of media: {len(media)}")
- media = media[:10]
- # add caption to the first item
- if media[0].get("photo"):
- group.append(InputMediaPhoto(media[0]["photo"], caption=caption))
- elif media[0].get("video"):
- media[0]["media"] = media[0].pop("video")
- group.append(InputMediaVideo(caption=caption, **media[0]))
-
- # DO NOT add captions for remaining media
- for x in media[1:]:
- if x.get("photo"):
- group.append(InputMediaPhoto(x["photo"]))
- elif x.get("video"):
- x["media"] = x.pop("video")
- group.append(InputMediaVideo(**x))
- return group
-
-
-def preprocess_media(media: list[dict]) -> list[dict]:
- """Filter out invalid media files.
-
- - photo must be at most 10 MB in size.
- - photo's width and height must not exceed 10000 in total.
- - photo's width and height ratio must be at most 20.
- - filesize < 2GB for video
-
- Args:
- media (list[dict]): The list of media info.
- format: {"photo": "path/to/photo.jpg"}
- OR { "video": "path/to/video.mp4",
- "thumb"(optional): "path/to/thumbnail.jpg" (if thumb is passed, use it. Otherwise, generate a new one)
- }
-
- Returns:
- list[dict]: The filtered media info.
- {"photo": "path/to/photo.jpg"}
- or
- {
- "video": "path/to/video.mp4",
- "width": int,
- "height": int,
- "duration": int,
- "thumb": "path/to/thumbnail.jpg" | None,
- }
- """
- num_before = len(media)
- logger.trace(f"{num_before} media info before preprocess: {media}")
- results = []
-
- # Step-1: Photos
- step1_res = []
- for data in media:
- if photo_path := data.get("photo"):
- valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
- step1_res.extend({"photo": valid_photo} for valid_photo in valid_photos)
- continue
- step1_res.append(data) # other type
-
- # Step-2: Videos
- for data in step1_res:
- thumb = data.get("thumb") # thumb is provided
- if video_path := data.get("video"):
- video_path = fix_video_rotation(video_path)
- if not is_valid_video(video_path):
- logger.warning(f"Video is invalid: {video_path}")
- continue
-
- # split large video files ( < 2GB)
- valid_videos = [x for x in split_large_video(video_path) if is_valid_video(x)]
-
- # generate thumbnails for each video if thumb is not provided
- thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
- for vpath, tpath in zip(valid_videos, thumbs, strict=True):
- video_info = parse_media_info(vpath)
- thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
- results.append({"video": vpath.as_posix(), "width": video_info["width"], "height": video_info["height"], "duration": video_info["duration"], "thumb": thumb})
- continue
- results.append(data) # other type
-
- logger.debug(f"Filtered out {num_before - len(results)} invalid media files")
- logger.trace(f"{len(results)} media info after preprocess: {results}")
- return results
-
-
-async def send_texts(
- client: Client,
- target_chat: int | str,
- reply_parameters: ReplyParameters,
- *,
- texts: str = "",
- cooldown: float = 0,
-) -> list[Message | None]:
- sent_messages: list[Message | None] = []
- logger.trace(f"Sending {len(texts)} texts only")
- for idx, msg in enumerate(smart_split(texts.strip())):
- if not msg:
- continue
- if idx == 0:
- sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=reply_parameters))
- else: # disbale reply
- sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=ReplyParameters()))
- await asyncio.sleep(cooldown)
- return sent_messages
-
-
-async def send_single_media(
- client: Client,
- target_chat: int | str,
- reply_parameters: ReplyParameters,
- *,
- media: dict,
- texts: str = "",
- cooldown: float = 0,
- **kwargs,
-) -> list[Message | None]:
- sent_messages: list[Message | None] = []
- logger.trace(f"Sending single media with {len(texts)} texts")
- caption = smart_split(texts, CAPTION_LENGTH)[0]
- remaining_texts = texts.removeprefix(caption)
- if photo := media.get("photo"):
- sent_messages.append(await client.send_photo(chat_id=target_chat, photo=photo, caption=caption, reply_parameters=reply_parameters))
- elif video := media.get("video"):
- sent_messages.append(
- await client.send_video(
- chat_id=target_chat,
- reply_parameters=reply_parameters,
- caption=caption,
- progress=telegram_uploading,
- progress_args=(kwargs.get("progress", False), video, kwargs.get("detail_progress", True)),
- **media,
- )
- )
- if remaining_texts:
- sent_messages.extend(await send_texts(client, target_chat, ReplyParameters(), texts=remaining_texts, cooldown=cooldown))
-
- for key in ["path", "thumb", "audio", "photo", "video"]:
- if media.get(key) and Path(media[key]).is_file():
- logger.trace(f"Deleting: {media[key]}")
- Path(media[key]).unlink(missing_ok=True)
- return sent_messages
-
-
async def send2tg(
client: Client,
message: Message,
@@ -290,3 +123,83 @@ async def send2tg(
logger.trace(f"Deleting: {x[key]}")
Path(x[key]).unlink(missing_ok=True)
return sent_messages
+
+
+async def send_texts(
+ client: Client,
+ target_chat: int | str,
+ reply_parameters: ReplyParameters,
+ *,
+ texts: str = "",
+ cooldown: float = 0,
+) -> list[Message | None]:
+ sent_messages: list[Message | None] = []
+ logger.trace(f"Sending {len(texts)} texts only")
+ for idx, msg in enumerate(smart_split(texts.strip())):
+ if not msg:
+ continue
+ if idx == 0:
+ sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=reply_parameters))
+ else: # disbale reply
+ sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=ReplyParameters()))
+ await asyncio.sleep(cooldown)
+ return sent_messages
+
+
+async def send_single_media(
+ client: Client,
+ target_chat: int | str,
+ reply_parameters: ReplyParameters,
+ *,
+ media: dict,
+ texts: str = "",
+ cooldown: float = 0,
+ **kwargs,
+) -> list[Message | None]:
+ sent_messages: list[Message | None] = []
+ logger.trace(f"Sending single media with {len(texts)} texts")
+ caption = smart_split(texts, CAPTION_LENGTH)[0]
+ remaining_texts = texts.removeprefix(caption)
+ if photo := media.get("photo"):
+ sent_messages.append(await client.send_photo(chat_id=target_chat, photo=photo, caption=caption, reply_parameters=reply_parameters))
+ elif video := media.get("video"):
+ sent_messages.append(
+ await client.send_video(
+ chat_id=target_chat,
+ reply_parameters=reply_parameters,
+ caption=caption,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), video, kwargs.get("detail_progress", True)),
+ **media,
+ )
+ )
+ elif audio := media.get("audio"):
+ sent_messages.append(
+ await client.send_audio(
+ chat_id=target_chat,
+ reply_parameters=reply_parameters,
+ caption=caption,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), audio, kwargs.get("detail_progress", True)),
+ **media,
+ )
+ )
+ elif document := media.get("document"):
+ sent_messages.append(
+ await client.send_document(
+ chat_id=target_chat,
+ reply_parameters=reply_parameters,
+ caption=caption,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), document, kwargs.get("detail_progress", True)),
+ **media,
+ )
+ )
+ if remaining_texts:
+ sent_messages.extend(await send_texts(client, target_chat, ReplyParameters(), texts=remaining_texts, cooldown=cooldown))
+
+ for key in ["path", "thumb", "audio", "photo", "video"]:
+ if media.get(key) and Path(media[key]).is_file():
+ logger.trace(f"Deleting: {media[key]}")
+ Path(media[key]).unlink(missing_ok=True)
+ return sent_messages
src/others/download_external.py
@@ -15,7 +15,7 @@ from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import equal_prefix, get_reply_to, startswith_prefix
-from multimedia import is_valid_video, validate_img
+from multimedia import is_valid_video_or_audio, validate_img
from networking import download_file
from utils import https_url, readable_size, to_int
@@ -71,7 +71,7 @@ async def download_url_in_message(client: Client, message: Message, **kwargs):
elif Path(path).suffix in [".m4a", ".mp3", ".wav", ".ogg", ".opus", ".flac", ".aac"]:
await modify_progress(text=f"π§ι³ι’δΈθ½½ζε: {readable_size(path=path)}", force_update=True, **kwargs)
success = await client.send_audio(target_chat, Path(path).as_posix(), caption=url, reply_parameters=reply_parameters)
- elif is_valid_video(path, delete=False):
+ elif is_valid_video_or_audio(path, delete=False):
await modify_progress(text=f"π¬θ§ι’δΈθ½½ζε: {readable_size(path=path)}", force_update=True, **kwargs)
success = await send2tg(client, message, target_chat, reply_msg_id, texts=url, media=[{"video": path}])
elif Path(path).stat().st_size < MAX_FILE_BYTES:
src/preview/instagram.py
@@ -16,7 +16,7 @@ from messages.database import copy_messages_from_db, save_messages
from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import summay_media
-from multimedia import is_valid_video, validate_img
+from multimedia import is_valid_video_or_audio, validate_img
from networking import download_file, download_media, hx_req
from utils import true
@@ -157,7 +157,7 @@ async def preview_ddinstagram(client: Client, message: Message, url: str, post_t
if video_url:
raw_url = f"{API.DDINSTAGRAM}{video_url}"
media["video"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.mp4", workers_proxy=True, **kwargs)
- if not is_valid_video(media["video"]):
+ if not is_valid_video_or_audio(media["video"]):
await send_to_social_media_bridge(client, message, text=url, **kwargs)
return
src/preview/ytdlp.py
@@ -20,8 +20,9 @@ from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
from config import API, CAPTION_LENGTH, DB, DOWNLOAD_DIR, MAX_FILE_BYTES, PROVIDER, PROXY, TID, TOKEN, cache
from database import get_db
from messages.database import copy_messages_from_db, save_messages
+from messages.preprocess import preprocess_media
from messages.progress import modify_progress, telegram_uploading
-from messages.sender import preprocess_media, send2tg
+from messages.sender import send2tg
from messages.utils import get_reply_to
from multimedia import generate_cover
from networking import hx_req
src/multimedia.py
@@ -404,7 +404,7 @@ def validate_img(path: str | Path | None, *, delete: bool = True) -> str:
return path.as_posix() if path.is_file() else ""
-def is_valid_video(path: str | Path | None, *, delete: bool = True) -> bool:
+def is_valid_video_or_audio(path: str | Path | None, *, delete: bool = True) -> bool:
"""Check if the video is valid."""
if parse_media_info(path):
return True
@@ -436,6 +436,6 @@ def fix_video_rotation(path: str | Path | None) -> Path:
if __name__ == "__main__":
# print(convert_to_h264("~/tests/test.mov"))
- # is_valid_video("~/tests/test.jpg")
+ # is_valid_video_or_audio("~/tests/test.jpg")
# convert_img_match_telegram_rules("~/tests/test.large.jpg")
print(convert_img_to_telegram_format("~/tests/test.heic"))