main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import contextlib
5import json
6import math
7from pathlib import Path
8
9from ffmpeg import FFmpeg, FFmpegError, Progress
10from ffmpeg.asyncio import FFmpeg as FFmpegAsync
11from loguru import logger
12from PIL import Image
13
14from config import MAX_FILE_BYTES
15from utils import readable_size, seconds_to_time
16
17
18async def parse_media_info(path: str | Path | None) -> dict:
19 """Given a media filepath, parse necessary information."""
20 if path is None or not Path(path).expanduser().resolve().is_file():
21 logger.error(f"File not found: {path}")
22 return {}
23 path = Path(path).expanduser().resolve()
24 logger.trace(f"Parsing media info: {path.name} [{readable_size(path=path)}]")
25 # ffprobe = FFmpegAsync(executable="ffprobe").input(path.as_posix(), print_format="json", show_streams=None)
26 info = {}
27 try:
28 # metadata = json.loads(await ffprobe.execute())
29 cmd = ["ffprobe", "-show_streams", "-print_format", "json", path.as_posix()]
30 process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
31 stdout_data, stderr_data = await process.communicate()
32 metadata = json.loads(stdout_data)
33 streams = metadata.get("streams", [])
34 audio_stream = next((x for x in streams if x.get("codec_name") and x.get("codec_type", "") == "audio"), {})
35 video_stream = next((x for x in streams if x.get("codec_name") and x.get("codec_type", "") == "video"), {})
36 durations = [x.get("duration", 0) for x in streams] # all channels duration (some file embed the duration in subtitle stream)
37 duration = max(map(float, durations))
38 width = video_stream.get("width", "0")
39 height = video_stream.get("height", "0")
40 side_data = video_stream.get("side_data_list", [])
41 info = {
42 "name": path.stem,
43 "path": path.resolve().as_posix(),
44 "duration": math.floor(float(duration)),
45 "raw_duration": float(duration),
46 "width": round(float(width)),
47 "height": round(float(height)),
48 "audio_codec": audio_stream.get("codec_name", ""),
49 "video_codec": video_stream.get("codec_name", ""),
50 "rotation": round(side_data[0].get("rotation", 0)) if side_data else 0,
51 "filesize": readable_size(path=path),
52 "ctts_invalid": "ctts invalid" in stderr_data.decode(errors="ignore").lower(),
53 }
54 except Exception as e:
55 logger.error(f"Failed to parse media file info: {e}")
56 logger.debug(f"Parsed media info: {info}")
57 return info
58
59
60def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio: float = 3, target_ratio: float = 2.17, overlap: float = 15, *, delete: bool = True) -> list[Path]:
61 if path is None or not Path(path).expanduser().resolve().is_file():
62 return []
63 path = Path(path).expanduser().resolve()
64 logger.debug(f"Checking long image: {path.name} [{readable_size(path=path)}]")
65 photos = []
66 path = convert_img_to_telegram_format(path, delete=delete)
67 try:
68 img = Image.open(path)
69 img_width, img_height = img.size
70 hw_ratio = img_height / img_width
71 logger.trace(f"Height: {img_height}, Width: {img_width}, H/W Ratio: {hw_ratio:.2f}")
72 if img_height <= float(max_height) or hw_ratio <= float(max_ratio):
73 photos.append(path)
74 else:
75 logger.warning(f"Long image detected: {path.name}, Splitting ...")
76 # Calculate the effective number of splits
77 split_height = round(img_width * target_ratio)
78 step = split_height - overlap
79 num_splits = math.ceil((img_height - overlap) / step)
80
81 # Adjust step to distribute remaining height across splits
82 total_overlap = (num_splits - 1) * overlap
83 adjusted_step = round((img_height - total_overlap) / num_splits)
84 logger.debug(f"Splitting {path} into {num_splits} splits!")
85
86 for idx in range(num_splits):
87 top = idx * (adjusted_step)
88 bottom = top + split_height
89 if bottom > img_height: # Adjust for the last split
90 bottom = img_height
91 top = max(0, bottom - split_height)
92 box = (0, top, img_width, bottom)
93 cropped_image = img.crop(box)
94 logger.trace(cropped_image)
95 save_path = Path(path).with_suffix(f".s{idx}.jpg")
96 cropped_image.convert("RGB").save(save_path)
97 photos.append(save_path)
98 logger.debug(f"split {idx} saved to {save_path}")
99 if delete:
100 path.unlink(missing_ok=True)
101 img.close()
102 except Exception as e:
103 logger.error(f"Failed to split long image: {e}")
104 return [path]
105 return photos
106
107
108async def split_large_video(path: str | Path | None, *, delete: bool = True) -> list[Path]:
109 if path is None or not Path(path).expanduser().resolve().is_file():
110 return []
111 path = Path(path).expanduser().resolve()
112 logger.trace(f"Checking large video: {path.name} [{readable_size(path=path)}]")
113 file_size = path.stat().st_size
114 if file_size <= MAX_FILE_BYTES:
115 logger.trace(f"Video is already under 2GB limit: {path.name}")
116 return [path]
117 split_size = MAX_FILE_BYTES - 20 * 1024 * 1024 # reduce a little bit (50MB)
118 videos = []
119
120 num_split = (file_size // split_size) + 1
121 logger.warning(f"Split video file: {path.name} into {num_split} parts.")
122 start_time = 0
123 for idx in range(num_split):
124 out_path = path.with_stem(f"{path.stem}_{idx + 1:02}")
125 try:
126 logger.debug(f"Splitting P{idx + 1}: {path.name} -> {out_path.name}")
127 ffmpeg = FFmpegAsync().option("y").input(path, ss=f"{start_time * 1000:.0f}ms").output(out_path, acodec="copy", vcodec="copy", fs=split_size)
128 await ffmpeg.execute()
129 if probe := await parse_media_info(out_path):
130 videos.append(out_path)
131 start_time += probe["duration"]
132 except Exception as e:
133 logger.error(f"Failed to split P{idx + 1}: {path.name} -> {out_path.name} : {e}")
134 if delete:
135 path.unlink(missing_ok=True)
136 return videos
137
138
139async def convert_to_h264(
140 path: str | Path | None,
141 *,
142 allow_re_encoding: bool = False,
143 force_re_encoding: bool = False,
144 max_file_size: int = 0,
145 skip_h264: bool = False,
146 audio_codec: str = "aac",
147 ext: str = "mp4",
148 delete: bool = True,
149) -> Path:
150 """Convert video to H264 format.
151
152 Args:
153 path (str | Path | None): video file path
154 allow_re_encoding (bool, optional): re-encode video. Defaults to False.
155 force_re_encoding (bool, optional): force re-encode video. Defaults to False.
156 max_file_size (int, optional): limit the max file size for re-encoding. Defaults to 0 (no limit).
157 skip_h264 (bool, optional): skip conversion if video is already H264. Defaults to False.
158 audio_codec (str, optional): audio codec used in re-encoding. Defaults to "aac".
159 ext (str, optional): output format. Defaults to "mp4".
160 delete (bool, optional): delete original file. Defaults to True.
161
162 Returns:
163 Path: output video path
164 """
165 if path is None or not Path(path).expanduser().resolve().is_file():
166 return Path("")
167 path = Path(path).expanduser().resolve()
168 logger.debug(f"Checking H264 codec: {path.name}")
169 info = await parse_media_info(path)
170 tmp_path = path.with_suffix(f".tmp.{ext}")
171 mp4_path = path.with_suffix(f".h264.{ext}")
172 success = True
173 if info["video_codec"] == "h264":
174 if skip_h264:
175 logger.debug(f"Video is already H264, skip conversion: {path.name}")
176 return path
177 logger.debug("Video is already H264, skip re-encoding")
178 allow_re_encoding = False
179 if max_file_size > 0 and path.stat().st_size > max_file_size:
180 logger.warning(f"Video file size is too large: {path.stat().st_size}, skip re-encoding")
181 allow_re_encoding = False
182
183 try:
184 if not allow_re_encoding and not force_re_encoding:
185 logger.debug(f"Convert video to H264 (copy): {path.name} -> {tmp_path.name}")
186 ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, codec="copy", movflags="+faststart", f=ext)
187 await ffmpeg.execute()
188 else:
189 logger.warning(f"Convert video to H264 (re-encoding): {path.name} -> {tmp_path.name}")
190 ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, acodec=audio_codec, vcodec="libx264", f=ext)
191
192 @ffmpeg.on("progress")
193 def on_progress(p: Progress):
194 logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
195
196 @ffmpeg.on("completed")
197 def on_completed():
198 logger.debug("completed")
199
200 await ffmpeg.execute()
201 if delete:
202 path.unlink(missing_ok=True)
203 tmp_path.rename(mp4_path)
204 except FFmpegError as e:
205 logger.error(f"Failed to convert mp4 {path.name}: {e.message}")
206 success = False
207 except Exception as e:
208 logger.error(f"Failed to convert mp4 {path.name}: {e}")
209 success = False
210 finally: # always delete tmp file
211 tmp_path.unlink(missing_ok=True)
212 if success:
213 # delete original file
214 if delete:
215 path.unlink(missing_ok=True)
216 return mp4_path if mp4_path.is_file() else path
217 return path
218
219
220async def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str = "aac", delete: bool = True, **kwargs) -> Path:
221 if path is None or not Path(path).expanduser().resolve().is_file():
222 return Path("")
223 path = Path(path).expanduser().resolve()
224 logger.debug(f"Converting to audio {ext}: {path.name}")
225 info = await parse_media_info(path)
226 tmp_path = path.with_suffix(f".tmp.{ext}")
227 final_path = path.with_suffix(f".final.{ext}")
228 success = True
229 try:
230 if info["audio_codec"] == codec:
231 logger.debug(f"Audio stream is already {codec}, without re-encoding: {path.name} -> {tmp_path.name}")
232 ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, vn=None, acodec="copy", **kwargs)
233 await ffmpeg.execute()
234 else:
235 logger.warning(f"Re-encoding audio: {path.name} -> {tmp_path.name}")
236 ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, vn=None, acodec=codec, **kwargs)
237
238 @ffmpeg.on("progress")
239 def on_progress(p: Progress):
240 logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
241
242 @ffmpeg.on("completed")
243 def on_completed():
244 logger.success(f"Converted audio: {path} to {final_path}, {codec=}")
245
246 await ffmpeg.execute()
247 if delete:
248 path.unlink(missing_ok=True)
249 tmp_path.rename(final_path)
250 except FFmpegError as e:
251 logger.error(f"Failed to convert m4a {path.name}: {e.message}")
252 success = False
253 except Exception as e:
254 logger.error(f"Failed to convert m4a {path.name}: {e}")
255 success = False
256 finally: # always delete tmp file
257 tmp_path.unlink(missing_ok=True)
258 if success:
259 if delete:
260 path.unlink(missing_ok=True)
261 return final_path if final_path.is_file() else path
262 return path
263
264
265def generate_cover(path: Path | str) -> str:
266 """Generate cover image base on media file path.
267
268 Must be jpg format.
269
270 Args:
271 path (Path): media file path
272
273 Returns:
274 str: cover path
275 """
276 logger.debug(f"Generate cover for: {path}")
277 if not Path(path).expanduser().resolve().is_file():
278 return ""
279 jpg_path = Path(path).with_suffix(".jpg")
280 for ext in [".jpg", ".jpeg"]:
281 cover_path = Path(path).with_suffix(ext)
282 if cover_path.is_file():
283 logger.debug(f"JPG cover image already exists: {cover_path.as_posix()}")
284 return cover_path.as_posix()
285 for ext in [".webp", ".png", ".heic", ".bmp"]:
286 cover_path = Path(path).with_suffix(ext)
287 if cover_path.is_file():
288 converted = convert_img_to_telegram_format(cover_path)
289 logger.debug(f"Converted cover image: {cover_path.name} -> {converted.name}")
290 return converted.as_posix()
291
292 logger.debug(f"Generate cover image from the first frame of {path}")
293 with contextlib.suppress(Exception):
294 ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(jpg_path, vframes=1)
295 ffmpeg.execute()
296 return jpg_path.as_posix() if jpg_path.is_file() else ""
297
298 logger.error(f"Failed to generate cover image for: {path}")
299 return ""
300
301
302def convert_jpg_via_pillow(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
303 """Returns: is_success, out_path."""
304 if path is None or not Path(path).expanduser().resolve().is_file():
305 return False, Path("")
306 path = Path(path).expanduser().resolve()
307
308 if path.suffix.lower() == ".heic":
309 try:
310 from pillow_heif import register_heif_opener # type: ignore
311 except ModuleNotFoundError:
312 logger.warning("Please install 'pillow_heif' package for PIL's heic support.")
313 logger.warning("Docs: https://pillow-heif.readthedocs.io/en/latest/installation.html")
314 return False, path
315 register_heif_opener()
316
317 save_path = path.with_suffix(".jpg")
318 logger.debug(f"Converting {path.name} -> {save_path.name}")
319 try:
320 img = Image.open(path)
321 img.convert("RGB").save(save_path)
322 img.close()
323 except Exception as e:
324 logger.error(f"Failed convert {path.name} -> {save_path.name}: {e}")
325 return False, path
326 if delete:
327 path.unlink(missing_ok=True)
328 return True, save_path
329
330
331def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
332 """Returns: is_success, out_path."""
333 if path is None or not Path(path).expanduser().resolve().is_file():
334 return False, Path("")
335 path = Path(path).expanduser().resolve()
336 save_path = path.with_suffix(".jpg")
337 logger.debug(f"Converting {path.name} -> {save_path.name}")
338 try:
339 ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(save_path, vframes=1)
340 ffmpeg.execute()
341 except Exception as e:
342 logger.error(f"Failed convert {path.name} -> {save_path.name}: {e}")
343 return False, path
344 if delete:
345 path.unlink(missing_ok=True)
346 return True, save_path
347
348
349def convert_img_to_telegram_format(path: str | Path | None, *, delete: bool = True) -> Path:
350 if path is None or not Path(path).expanduser().resolve().is_file():
351 return Path("")
352 path = Path(path).expanduser().resolve()
353 if path.suffix.lower() in [".jpg", ".jpeg", ".png", ".bmp", ".gif"]:
354 return path
355
356 success, out_path = convert_jpg_via_pillow(path, delete=delete)
357 if success:
358 logger.success(f"Converted {path.name} via PIL: {out_path.name}")
359 return out_path
360 logger.warning(f"Failed to convert {path.name} via PIL, try FFmpeg ...")
361
362 success, out_path = convert_jpg_via_ffmpeg(path, delete=delete)
363 if success:
364 logger.success(f"Converted {path.name} via FFmpeg: {out_path.name}")
365 return out_path
366 logger.error(f"Failed to convert {path.name} via FFmpeg.")
367
368 return path
369
370
371def convert_img_match_telegram_rules(path: str | Path, num_bytes: int = 10485760, wh_total: int = 10000, max_ratio: float = 20, *, delete: bool = True) -> Path:
372 """Convert image to meet Telegram photo requirements.
373
374 1. photo must be at most 10 MB in size.
375 2. photo's width and height must not exceed 10000 in total.
376 3. photo's width and height ratio must be at most 20.
377 """
378 path = Path(path).expanduser().resolve()
379 if not path.is_file():
380 return Path("")
381 min_ratio = 1 / max_ratio
382 filesize = path.stat().st_size
383 save_path = path.with_stem(f"{path.stem}X")
384 try:
385 img = Image.open(path)
386 width, height = img.size
387 ratio = width / height
388 logger.trace(f"{path.name}: {width}x{height} (r={ratio:.2f}), {filesize} bytes ({readable_size(path=path)})")
389 if filesize < num_bytes and width + height < wh_total and min_ratio < ratio < max_ratio:
390 logger.debug(f"Image is already under limit: {path.name}")
391 img.close()
392 return path
393 new_width = round(0.85 * width)
394 if ratio > max_ratio:
395 new_height = round(new_width / max_ratio)
396 elif ratio < min_ratio:
397 new_height = round(new_width * min_ratio)
398 else:
399 new_height = round(new_width / ratio)
400
401 img = img.resize((new_width, new_height))
402 img.save(save_path)
403 img.close()
404 except Exception as e:
405 logger.error(f"Failed to reduce image size: {path}, {e}")
406 return path
407 if delete:
408 path.unlink(missing_ok=True)
409 logger.debug(f"Reduced image size: {save_path.stat().st_size} bytes ({readable_size(path=save_path)})")
410 return convert_img_match_telegram_rules(save_path, num_bytes, wh_total, max_ratio, delete=delete)
411
412
413def validate_img(path: str | Path | None, *, delete: bool = True) -> str:
414 """Check if the image is valid.
415
416 0. format must be in ["heic", "jpg", "jpeg", "png", "webp"]
417 1. photo must be at most 10 MB in size.
418 2. photo's width and height must not exceed 10000 in total.
419 3. photo's width and height ratio must be at most 20.
420 """
421 if path is None or not Path(path).expanduser().resolve().is_file():
422 logger.warning(f"Image path not found: {path}")
423 return ""
424 path = Path(path).expanduser().resolve()
425 logger.trace(f"Checking image: {path.name} [{readable_size(path=path)}]")
426 # Telegram support image format: JPEG, PNG, BMP, and GIF
427 # For other formats, we should convert them to .jpg
428 if path.suffix.lower() not in [".heic", ".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"]:
429 logger.warning(f"Invalid image format: {path.name}")
430 return ""
431 path = convert_img_to_telegram_format(path, delete=delete)
432
433 if not path.is_file():
434 logger.warning(f"Invalid image: {path}")
435 return ""
436 try:
437 img = Image.open(path)
438 img.verify()
439 img.close()
440 valid_path = convert_img_match_telegram_rules(path, delete=delete)
441 if valid_path.as_posix() != path.as_posix():
442 valid_path.rename(path)
443 except Exception as e:
444 logger.error(f"Broken image: {path}, {e}")
445 if delete:
446 path.unlink(missing_ok=True)
447 return ""
448 return path.as_posix() if path.is_file() else ""
449
450
451async def is_valid_video_or_audio(path: str | Path | None, *, delete: bool = True) -> bool:
452 """Check if the video is valid."""
453 if await parse_media_info(path):
454 return True
455
456 logger.error(f"Invalid video: {path}")
457 if delete:
458 Path(str(path)).unlink(missing_ok=True)
459 return False
460
461
462async def fix_video_rotation(path: str | Path | None) -> Path:
463 """Fix video rotation for iOS devcies.
464
465 Some videos (Weibo's livephotos) are displayed in the wrong direction on the Telegram iOS client.
466 """
467 if path is None or not Path(path).expanduser().resolve().is_file():
468 return Path("")
469 path = Path(path).expanduser().resolve()
470 logger.trace(f"Checking video rotation: {path.name} [{readable_size(path=path)}]")
471 probe_info = await parse_media_info(path)
472 if not probe_info: # video can't parse by ffprobe
473 logger.warning(f"Invalid video: {path}")
474 return path
475 if probe_info.get("rotation") in [-90, 90]:
476 logger.warning(f"Fixing video rotation from {probe_info['height']}x{probe_info['width']}")
477 path = await convert_to_h264(path, allow_re_encoding=True)
478 return path
479
480
481async def fix_ctts_invalid(path: str | Path | None) -> Path:
482 """Convert CTTS invalid video to H264.
483
484 CTTS invalid videos can't be played on Telegram iOS client.
485 """
486 if path is None or not Path(path).expanduser().resolve().is_file():
487 return Path("")
488 path = Path(path).expanduser().resolve()
489 probe_info = await parse_media_info(path)
490 if not probe_info or not probe_info.get("ctts_invalid", False):
491 return path
492 logger.warning(f"Converting CTTS invalid video: {path.name}")
493 return await convert_to_h264(path, force_re_encoding=True)
494
495
496if __name__ == "__main__":
497 # print(convert_to_h264("~/tests/test.mov"))
498 # is_valid_video_or_audio("~/tests/test.jpg")
499 # convert_img_match_telegram_rules("~/tests/test.large.jpg")
500 print(convert_img_to_telegram_format("~/tests/test.heic"))