main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from pathlib import Path
  4
  5from loguru import logger
  6from pyrogram.types import InputMediaAudio, InputMediaDocument, InputMediaPhoto, InputMediaVideo
  7
  8from config import CAPTION_LENGTH
  9from messages.utils import count_without_entities, smart_split
 10from multimedia import fix_ctts_invalid, fix_video_rotation, generate_cover, is_valid_video_or_audio, parse_media_info, split_large_video, split_long_img, validate_img
 11
 12
 13async def preprocess_media(media: list[dict]) -> list[dict]:
 14    """Filter out invalid media files.
 15
 16    - photo must be at most 10 MB in size.
 17    - photo's width and height must not exceed 10000 in total.
 18    - photo's width and height ratio must be at most 20.
 19    - filesize < 2GB for video
 20
 21    Args:
 22        media (list[dict]): The list of media info.
 23            format: {"photo": "path/to/photo.jpg"}
 24                OR  { "video": "path/to/video.mp4",
 25                      "thumb"(optional): "path/to/thumbnail.jpg" (generate a new one if unset)
 26                    }
 27                OR { "audio": "path/to/audio.mp3"
 28                    "performer"(optional): "Alice", ("Performer" if unset)
 29                    "title"(optional): "audio", (filename if unset)
 30                    "thumb"(optional): "path/to/thumbnail.jpg" (generate a new one if unset)
 31                    }
 32                OR {"document": "path/to/document.pdf"}
 33
 34    Returns:
 35        list[dict]: The filtered media info.
 36            {"photo": "path/to/photo.jpg"},
 37            {
 38                "video": "path/to/video.mp4",
 39                "width": int,
 40                "height": int,
 41                "duration": int,
 42                "thumb": "path/to/thumbnail.jpg" | None,
 43            },
 44            {
 45                "audio": "path/to/audio.mp3",
 46                "performer": str,
 47                "title": str,
 48                "duration": int,
 49                "thumb": "path/to/thumbnail.jpg" | None,
 50            }
 51            {"document": "path/to/document.pdf"}
 52    """
 53    num_before = len(media)
 54    logger.trace(f"{num_before} media info before preprocess: {media}")
 55    # Step-1: Photos
 56    done_photos = []
 57    for data in media:
 58        if not data.get("photo"):
 59            done_photos.append(data)
 60            continue
 61        if photo_path := data.get("photo"):
 62            valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
 63            done_photos.extend({"photo": valid_photo, "has_spoiler": data.get("has_spoiler", False)} for valid_photo in valid_photos)
 64
 65    # Step-2: Videos
 66    done_videos = []
 67    for data in done_photos:
 68        if not data.get("video"):
 69            done_videos.append(data)
 70            continue
 71        thumb = data.get("thumb")  # thumb is provided
 72        if video_path := data.get("video"):
 73            video_path = await fix_video_rotation(video_path)
 74            video_path = await fix_ctts_invalid(video_path)
 75            if not await is_valid_video_or_audio(video_path):
 76                logger.warning(f"Video is invalid: {video_path}")
 77                continue
 78
 79            # split large video files ( < 2GB)
 80            valid_videos = [x for x in await split_large_video(video_path) if await is_valid_video_or_audio(x)]
 81
 82            # generate thumbnails for each video if thumb is not provided
 83            thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
 84            for vpath, tpath in zip(valid_videos, thumbs, strict=True):
 85                video_info = await parse_media_info(vpath)
 86                thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
 87                done_videos.append(
 88                    {
 89                        "video": vpath.as_posix(),
 90                        "width": video_info["width"],
 91                        "height": video_info["height"],
 92                        "duration": video_info["duration"],
 93                        "thumb": thumb,
 94                        "has_spoiler": data.get("has_spoiler", False),
 95                    }
 96                )
 97    # Step-3: Audios
 98    done_audios = []
 99    for data in done_videos:
100        if not data.get("audio"):
101            done_audios.append(data)
102            continue
103        audio_path = Path(data.get("audio", ""))
104        if not await is_valid_video_or_audio(audio_path):
105            logger.warning(f"Audio is invalid: {audio_path}")
106            continue
107        audio_info = await parse_media_info(audio_path)
108        thumb = data.get("thumb")  # thumb is provided
109        thumb = valid_thumb if (valid_thumb := validate_img(thumb)) else generate_cover(data["audio"])
110        done_audios.append(
111            {
112                "audio": audio_path,
113                "performer": data.get("performer", "Performer"),
114                "title": data.get("title", audio_path.stem),
115                "duration": audio_info["duration"],
116                "thumb": thumb if validate_img(thumb) else None,
117            }
118        )
119
120    logger.debug(f"Filtered out {num_before - len(done_audios)} invalid media files")
121    logger.trace(f"{len(done_audios)} media info after preprocess: {done_audios}")
122    return done_audios
123
124
125async def warp_media_group(media: list[dict], caption: str = "", *, caption_above: bool = False) -> list:
126    """Warp media files into a list of media group objects.
127
128    item in media:
129    {
130        "photo": "path/to/photo.jpg",
131    }
132    {
133        "video": "path/to/video.mp4",
134        "width": int,
135        "height": int,
136        "duration": int,
137        "thumb": "path/to/thumbnail.jpg" | None,
138    }
139    {
140        "audio": "path/to/audio.mp3",
141        "performer": str,
142        "title": str,
143        "duration": int,
144        "thumb": "path/to/thumbnail.jpg" | None,
145    }
146    {"document": "path/to/document.pdf"}
147    """
148    group = []
149    if len(media) < 2:
150        logger.error(f"Media group requires at least 2 items, number of media: {len(media)}")
151        return []
152    if await count_without_entities(caption) > CAPTION_LENGTH:
153        logger.warning(f"Caption too long, length: {len(caption)}, caption: {caption}")
154        caption = (await smart_split(caption, CAPTION_LENGTH))[0]
155    if len(media) > 10:
156        logger.warning(f"Too many media files, number of media: {len(media)}")
157        media = media[:10]
158    # add caption to the first item, except for all media are documents
159    # If all media are documents, caption will be added to the last item
160    if all(x.get("document") for x in media):
161        group = [InputMediaDocument(x["document"]) for x in media[:-1]]
162        group.append(InputMediaDocument(media[-1]["document"], caption=caption))
163        return group
164
165    if media[0].get("photo"):
166        group.append(InputMediaPhoto(media[0]["photo"], caption=caption, show_caption_above_media=caption_above))
167    elif media[0].get("video"):
168        media[0]["media"] = media[0].pop("video")
169        group.append(InputMediaVideo(caption=caption, show_caption_above_media=caption_above, **media[0]))
170    elif media[0].get("audio"):
171        media[0]["media"] = media[0].pop("audio")
172        group.append(InputMediaAudio(caption=caption, **media[0]))
173    elif media[0].get("document"):
174        group.append(InputMediaDocument(media[0]["document"], caption=caption))
175    # DO NOT add captions for remaining media
176    for x in media[1:]:
177        if x.get("photo"):
178            group.append(InputMediaPhoto(x["photo"]))
179        elif x.get("video"):
180            x["media"] = x.pop("video")
181            group.append(InputMediaVideo(**x))
182        elif x.get("audio"):
183            x["media"] = x.pop("audio")
184            group.append(InputMediaAudio(**x))
185        elif x.get("document"):
186            group.append(InputMediaDocument(x["document"]))
187    return group