main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from pathlib import Path
4
5from loguru import logger
6from pyrogram.types import InputMediaAudio, InputMediaDocument, InputMediaPhoto, InputMediaVideo
7
8from config import CAPTION_LENGTH
9from messages.utils import count_without_entities, smart_split
10from multimedia import fix_ctts_invalid, fix_video_rotation, generate_cover, is_valid_video_or_audio, parse_media_info, split_large_video, split_long_img, validate_img
11
12
13async def preprocess_media(media: list[dict]) -> list[dict]:
14 """Filter out invalid media files.
15
16 - photo must be at most 10 MB in size.
17 - photo's width and height must not exceed 10000 in total.
18 - photo's width and height ratio must be at most 20.
19 - filesize < 2GB for video
20
21 Args:
22 media (list[dict]): The list of media info.
23 format: {"photo": "path/to/photo.jpg"}
24 OR { "video": "path/to/video.mp4",
25 "thumb"(optional): "path/to/thumbnail.jpg" (generate a new one if unset)
26 }
27 OR { "audio": "path/to/audio.mp3"
28 "performer"(optional): "Alice", ("Performer" if unset)
29 "title"(optional): "audio", (filename if unset)
30 "thumb"(optional): "path/to/thumbnail.jpg" (generate a new one if unset)
31 }
32 OR {"document": "path/to/document.pdf"}
33
34 Returns:
35 list[dict]: The filtered media info.
36 {"photo": "path/to/photo.jpg"},
37 {
38 "video": "path/to/video.mp4",
39 "width": int,
40 "height": int,
41 "duration": int,
42 "thumb": "path/to/thumbnail.jpg" | None,
43 },
44 {
45 "audio": "path/to/audio.mp3",
46 "performer": str,
47 "title": str,
48 "duration": int,
49 "thumb": "path/to/thumbnail.jpg" | None,
50 }
51 {"document": "path/to/document.pdf"}
52 """
53 num_before = len(media)
54 logger.trace(f"{num_before} media info before preprocess: {media}")
55 # Step-1: Photos
56 done_photos = []
57 for data in media:
58 if not data.get("photo"):
59 done_photos.append(data)
60 continue
61 if photo_path := data.get("photo"):
62 valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
63 done_photos.extend({"photo": valid_photo, "has_spoiler": data.get("has_spoiler", False)} for valid_photo in valid_photos)
64
65 # Step-2: Videos
66 done_videos = []
67 for data in done_photos:
68 if not data.get("video"):
69 done_videos.append(data)
70 continue
71 thumb = data.get("thumb") # thumb is provided
72 if video_path := data.get("video"):
73 video_path = await fix_video_rotation(video_path)
74 video_path = await fix_ctts_invalid(video_path)
75 if not await is_valid_video_or_audio(video_path):
76 logger.warning(f"Video is invalid: {video_path}")
77 continue
78
79 # split large video files ( < 2GB)
80 valid_videos = [x for x in await split_large_video(video_path) if await is_valid_video_or_audio(x)]
81
82 # generate thumbnails for each video if thumb is not provided
83 thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
84 for vpath, tpath in zip(valid_videos, thumbs, strict=True):
85 video_info = await parse_media_info(vpath)
86 thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
87 done_videos.append(
88 {
89 "video": vpath.as_posix(),
90 "width": video_info["width"],
91 "height": video_info["height"],
92 "duration": video_info["duration"],
93 "thumb": thumb,
94 "has_spoiler": data.get("has_spoiler", False),
95 }
96 )
97 # Step-3: Audios
98 done_audios = []
99 for data in done_videos:
100 if not data.get("audio"):
101 done_audios.append(data)
102 continue
103 audio_path = Path(data.get("audio", ""))
104 if not await is_valid_video_or_audio(audio_path):
105 logger.warning(f"Audio is invalid: {audio_path}")
106 continue
107 audio_info = await parse_media_info(audio_path)
108 thumb = data.get("thumb") # thumb is provided
109 thumb = valid_thumb if (valid_thumb := validate_img(thumb)) else generate_cover(data["audio"])
110 done_audios.append(
111 {
112 "audio": audio_path,
113 "performer": data.get("performer", "Performer"),
114 "title": data.get("title", audio_path.stem),
115 "duration": audio_info["duration"],
116 "thumb": thumb if validate_img(thumb) else None,
117 }
118 )
119
120 logger.debug(f"Filtered out {num_before - len(done_audios)} invalid media files")
121 logger.trace(f"{len(done_audios)} media info after preprocess: {done_audios}")
122 return done_audios
123
124
125async def warp_media_group(media: list[dict], caption: str = "", *, caption_above: bool = False) -> list:
126 """Warp media files into a list of media group objects.
127
128 item in media:
129 {
130 "photo": "path/to/photo.jpg",
131 }
132 {
133 "video": "path/to/video.mp4",
134 "width": int,
135 "height": int,
136 "duration": int,
137 "thumb": "path/to/thumbnail.jpg" | None,
138 }
139 {
140 "audio": "path/to/audio.mp3",
141 "performer": str,
142 "title": str,
143 "duration": int,
144 "thumb": "path/to/thumbnail.jpg" | None,
145 }
146 {"document": "path/to/document.pdf"}
147 """
148 group = []
149 if len(media) < 2:
150 logger.error(f"Media group requires at least 2 items, number of media: {len(media)}")
151 return []
152 if await count_without_entities(caption) > CAPTION_LENGTH:
153 logger.warning(f"Caption too long, length: {len(caption)}, caption: {caption}")
154 caption = (await smart_split(caption, CAPTION_LENGTH))[0]
155 if len(media) > 10:
156 logger.warning(f"Too many media files, number of media: {len(media)}")
157 media = media[:10]
158 # add caption to the first item, except for all media are documents
159 # If all media are documents, caption will be added to the last item
160 if all(x.get("document") for x in media):
161 group = [InputMediaDocument(x["document"]) for x in media[:-1]]
162 group.append(InputMediaDocument(media[-1]["document"], caption=caption))
163 return group
164
165 if media[0].get("photo"):
166 group.append(InputMediaPhoto(media[0]["photo"], caption=caption, show_caption_above_media=caption_above))
167 elif media[0].get("video"):
168 media[0]["media"] = media[0].pop("video")
169 group.append(InputMediaVideo(caption=caption, show_caption_above_media=caption_above, **media[0]))
170 elif media[0].get("audio"):
171 media[0]["media"] = media[0].pop("audio")
172 group.append(InputMediaAudio(caption=caption, **media[0]))
173 elif media[0].get("document"):
174 group.append(InputMediaDocument(media[0]["document"], caption=caption))
175 # DO NOT add captions for remaining media
176 for x in media[1:]:
177 if x.get("photo"):
178 group.append(InputMediaPhoto(x["photo"]))
179 elif x.get("video"):
180 x["media"] = x.pop("video")
181 group.append(InputMediaVideo(**x))
182 elif x.get("audio"):
183 x["media"] = x.pop("audio")
184 group.append(InputMediaAudio(**x))
185 elif x.get("document"):
186 group.append(InputMediaDocument(x["document"]))
187 return group