main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import contextlib
  5import json
  6import math
  7from pathlib import Path
  8
  9from ffmpeg import FFmpeg, FFmpegError, Progress
 10from ffmpeg.asyncio import FFmpeg as FFmpegAsync
 11from loguru import logger
 12from PIL import Image
 13
 14from config import MAX_FILE_BYTES
 15from utils import readable_size, seconds_to_time
 16
 17
 18async def parse_media_info(path: str | Path | None) -> dict:
 19    """Given a media filepath, parse necessary information."""
 20    if path is None or not Path(path).expanduser().resolve().is_file():
 21        logger.error(f"File not found: {path}")
 22        return {}
 23    path = Path(path).expanduser().resolve()
 24    logger.trace(f"Parsing media info: {path.name} [{readable_size(path=path)}]")
 25    # ffprobe = FFmpegAsync(executable="ffprobe").input(path.as_posix(), print_format="json", show_streams=None)
 26    info = {}
 27    try:
 28        # metadata = json.loads(await ffprobe.execute())
 29        cmd = ["ffprobe", "-show_streams", "-print_format", "json", path.as_posix()]
 30        process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
 31        stdout_data, stderr_data = await process.communicate()
 32        metadata = json.loads(stdout_data)
 33        streams = metadata.get("streams", [])
 34        audio_stream = next((x for x in streams if x.get("codec_name") and x.get("codec_type", "") == "audio"), {})
 35        video_stream = next((x for x in streams if x.get("codec_name") and x.get("codec_type", "") == "video"), {})
 36        durations = [x.get("duration", 0) for x in streams]  # all channels duration (some file embed the duration in subtitle stream)
 37        duration = max(map(float, durations))
 38        width = video_stream.get("width", "0")
 39        height = video_stream.get("height", "0")
 40        side_data = video_stream.get("side_data_list", [])
 41        info = {
 42            "name": path.stem,
 43            "path": path.resolve().as_posix(),
 44            "duration": math.floor(float(duration)),
 45            "raw_duration": float(duration),
 46            "width": round(float(width)),
 47            "height": round(float(height)),
 48            "audio_codec": audio_stream.get("codec_name", ""),
 49            "video_codec": video_stream.get("codec_name", ""),
 50            "rotation": round(side_data[0].get("rotation", 0)) if side_data else 0,
 51            "filesize": readable_size(path=path),
 52            "ctts_invalid": "ctts invalid" in stderr_data.decode(errors="ignore").lower(),
 53        }
 54    except Exception as e:
 55        logger.error(f"Failed to parse media file info: {e}")
 56    logger.debug(f"Parsed media info: {info}")
 57    return info
 58
 59
 60def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio: float = 3, target_ratio: float = 2.17, overlap: float = 15, *, delete: bool = True) -> list[Path]:
 61    if path is None or not Path(path).expanduser().resolve().is_file():
 62        return []
 63    path = Path(path).expanduser().resolve()
 64    logger.debug(f"Checking long image: {path.name} [{readable_size(path=path)}]")
 65    photos = []
 66    path = convert_img_to_telegram_format(path, delete=delete)
 67    try:
 68        img = Image.open(path)
 69        img_width, img_height = img.size
 70        hw_ratio = img_height / img_width
 71        logger.trace(f"Height: {img_height}, Width: {img_width}, H/W Ratio: {hw_ratio:.2f}")
 72        if img_height <= float(max_height) or hw_ratio <= float(max_ratio):
 73            photos.append(path)
 74        else:
 75            logger.warning(f"Long image detected: {path.name}, Splitting ...")
 76            # Calculate the effective number of splits
 77            split_height = round(img_width * target_ratio)
 78            step = split_height - overlap
 79            num_splits = math.ceil((img_height - overlap) / step)
 80
 81            # Adjust step to distribute remaining height across splits
 82            total_overlap = (num_splits - 1) * overlap
 83            adjusted_step = round((img_height - total_overlap) / num_splits)
 84            logger.debug(f"Splitting {path} into {num_splits} splits!")
 85
 86            for idx in range(num_splits):
 87                top = idx * (adjusted_step)
 88                bottom = top + split_height
 89                if bottom > img_height:  # Adjust for the last split
 90                    bottom = img_height
 91                    top = max(0, bottom - split_height)
 92                box = (0, top, img_width, bottom)
 93                cropped_image = img.crop(box)
 94                logger.trace(cropped_image)
 95                save_path = Path(path).with_suffix(f".s{idx}.jpg")
 96                cropped_image.convert("RGB").save(save_path)
 97                photos.append(save_path)
 98                logger.debug(f"split {idx} saved to {save_path}")
 99            if delete:
100                path.unlink(missing_ok=True)
101        img.close()
102    except Exception as e:
103        logger.error(f"Failed to split long image: {e}")
104        return [path]
105    return photos
106
107
108async def split_large_video(path: str | Path | None, *, delete: bool = True) -> list[Path]:
109    if path is None or not Path(path).expanduser().resolve().is_file():
110        return []
111    path = Path(path).expanduser().resolve()
112    logger.trace(f"Checking large video: {path.name} [{readable_size(path=path)}]")
113    file_size = path.stat().st_size
114    if file_size <= MAX_FILE_BYTES:
115        logger.trace(f"Video is already under 2GB limit: {path.name}")
116        return [path]
117    split_size = MAX_FILE_BYTES - 20 * 1024 * 1024  # reduce a little bit (50MB)
118    videos = []
119
120    num_split = (file_size // split_size) + 1
121    logger.warning(f"Split video file: {path.name} into {num_split} parts.")
122    start_time = 0
123    for idx in range(num_split):
124        out_path = path.with_stem(f"{path.stem}_{idx + 1:02}")
125        try:
126            logger.debug(f"Splitting P{idx + 1}: {path.name} -> {out_path.name}")
127            ffmpeg = FFmpegAsync().option("y").input(path, ss=f"{start_time * 1000:.0f}ms").output(out_path, acodec="copy", vcodec="copy", fs=split_size)
128            await ffmpeg.execute()
129            if probe := await parse_media_info(out_path):
130                videos.append(out_path)
131                start_time += probe["duration"]
132        except Exception as e:
133            logger.error(f"Failed to split P{idx + 1}: {path.name} -> {out_path.name} : {e}")
134    if delete:
135        path.unlink(missing_ok=True)
136    return videos
137
138
139async def convert_to_h264(
140    path: str | Path | None,
141    *,
142    allow_re_encoding: bool = False,
143    force_re_encoding: bool = False,
144    max_file_size: int = 0,
145    skip_h264: bool = False,
146    audio_codec: str = "aac",
147    ext: str = "mp4",
148    delete: bool = True,
149) -> Path:
150    """Convert video to H264 format.
151
152    Args:
153        path (str | Path | None): video file path
154        allow_re_encoding (bool, optional): re-encode video. Defaults to False.
155        force_re_encoding (bool, optional): force re-encode video. Defaults to False.
156        max_file_size (int, optional): limit the max file size for re-encoding. Defaults to 0 (no limit).
157        skip_h264 (bool, optional): skip conversion if video is already H264. Defaults to False.
158        audio_codec (str, optional): audio codec used in re-encoding. Defaults to "aac".
159        ext (str, optional): output format. Defaults to "mp4".
160        delete (bool, optional): delete original file. Defaults to True.
161
162    Returns:
163        Path: output video path
164    """
165    if path is None or not Path(path).expanduser().resolve().is_file():
166        return Path("")
167    path = Path(path).expanduser().resolve()
168    logger.debug(f"Checking H264 codec: {path.name}")
169    info = await parse_media_info(path)
170    tmp_path = path.with_suffix(f".tmp.{ext}")
171    mp4_path = path.with_suffix(f".h264.{ext}")
172    success = True
173    if info["video_codec"] == "h264":
174        if skip_h264:
175            logger.debug(f"Video is already H264, skip conversion: {path.name}")
176            return path
177        logger.debug("Video is already H264, skip re-encoding")
178        allow_re_encoding = False
179    if max_file_size > 0 and path.stat().st_size > max_file_size:
180        logger.warning(f"Video file size is too large: {path.stat().st_size}, skip re-encoding")
181        allow_re_encoding = False
182
183    try:
184        if not allow_re_encoding and not force_re_encoding:
185            logger.debug(f"Convert video to H264 (copy): {path.name} -> {tmp_path.name}")
186            ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, codec="copy", movflags="+faststart", f=ext)
187            await ffmpeg.execute()
188        else:
189            logger.warning(f"Convert video to H264 (re-encoding): {path.name} -> {tmp_path.name}")
190            ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, acodec=audio_codec, vcodec="libx264", f=ext)
191
192            @ffmpeg.on("progress")
193            def on_progress(p: Progress):
194                logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
195
196            @ffmpeg.on("completed")
197            def on_completed():
198                logger.debug("completed")
199
200            await ffmpeg.execute()
201        if delete:
202            path.unlink(missing_ok=True)
203        tmp_path.rename(mp4_path)
204    except FFmpegError as e:
205        logger.error(f"Failed to convert mp4 {path.name}: {e.message}")
206        success = False
207    except Exception as e:
208        logger.error(f"Failed to convert mp4 {path.name}: {e}")
209        success = False
210    finally:  # always delete tmp file
211        tmp_path.unlink(missing_ok=True)
212    if success:
213        # delete original file
214        if delete:
215            path.unlink(missing_ok=True)
216        return mp4_path if mp4_path.is_file() else path
217    return path
218
219
220async def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str = "aac", delete: bool = True, **kwargs) -> Path:
221    if path is None or not Path(path).expanduser().resolve().is_file():
222        return Path("")
223    path = Path(path).expanduser().resolve()
224    logger.debug(f"Converting to audio {ext}: {path.name}")
225    info = await parse_media_info(path)
226    tmp_path = path.with_suffix(f".tmp.{ext}")
227    final_path = path.with_suffix(f".final.{ext}")
228    success = True
229    try:
230        if info["audio_codec"] == codec:
231            logger.debug(f"Audio stream is already {codec}, without re-encoding: {path.name} -> {tmp_path.name}")
232            ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, vn=None, acodec="copy", **kwargs)
233            await ffmpeg.execute()
234        else:
235            logger.warning(f"Re-encoding audio: {path.name} -> {tmp_path.name}")
236            ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, vn=None, acodec=codec, **kwargs)
237
238            @ffmpeg.on("progress")
239            def on_progress(p: Progress):
240                logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
241
242            @ffmpeg.on("completed")
243            def on_completed():
244                logger.success(f"Converted audio: {path} to {final_path}, {codec=}")
245
246            await ffmpeg.execute()
247        if delete:
248            path.unlink(missing_ok=True)
249        tmp_path.rename(final_path)
250    except FFmpegError as e:
251        logger.error(f"Failed to convert m4a {path.name}: {e.message}")
252        success = False
253    except Exception as e:
254        logger.error(f"Failed to convert m4a {path.name}: {e}")
255        success = False
256    finally:  # always delete tmp file
257        tmp_path.unlink(missing_ok=True)
258    if success:
259        if delete:
260            path.unlink(missing_ok=True)
261        return final_path if final_path.is_file() else path
262    return path
263
264
265def generate_cover(path: Path | str) -> str:
266    """Generate cover image base on media file path.
267
268    Must be jpg format.
269
270    Args:
271        path (Path): media file path
272
273    Returns:
274        str: cover path
275    """
276    logger.debug(f"Generate cover for: {path}")
277    if not Path(path).expanduser().resolve().is_file():
278        return ""
279    jpg_path = Path(path).with_suffix(".jpg")
280    for ext in [".jpg", ".jpeg"]:
281        cover_path = Path(path).with_suffix(ext)
282        if cover_path.is_file():
283            logger.debug(f"JPG cover image already exists: {cover_path.as_posix()}")
284            return cover_path.as_posix()
285    for ext in [".webp", ".png", ".heic", ".bmp"]:
286        cover_path = Path(path).with_suffix(ext)
287        if cover_path.is_file():
288            converted = convert_img_to_telegram_format(cover_path)
289            logger.debug(f"Converted cover image: {cover_path.name} -> {converted.name}")
290            return converted.as_posix()
291
292    logger.debug(f"Generate cover image from the first frame of {path}")
293    with contextlib.suppress(Exception):
294        ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(jpg_path, vframes=1)
295        ffmpeg.execute()
296        return jpg_path.as_posix() if jpg_path.is_file() else ""
297
298    logger.error(f"Failed to generate cover image for: {path}")
299    return ""
300
301
302def convert_jpg_via_pillow(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
303    """Returns: is_success, out_path."""
304    if path is None or not Path(path).expanduser().resolve().is_file():
305        return False, Path("")
306    path = Path(path).expanduser().resolve()
307
308    if path.suffix.lower() == ".heic":
309        try:
310            from pillow_heif import register_heif_opener  # type: ignore
311        except ModuleNotFoundError:
312            logger.warning("Please install 'pillow_heif' package for PIL's heic support.")
313            logger.warning("Docs: https://pillow-heif.readthedocs.io/en/latest/installation.html")
314            return False, path
315        register_heif_opener()
316
317    save_path = path.with_suffix(".jpg")
318    logger.debug(f"Converting {path.name} -> {save_path.name}")
319    try:
320        img = Image.open(path)
321        img.convert("RGB").save(save_path)
322        img.close()
323    except Exception as e:
324        logger.error(f"Failed convert {path.name} -> {save_path.name}: {e}")
325        return False, path
326    if delete:
327        path.unlink(missing_ok=True)
328    return True, save_path
329
330
331def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
332    """Returns: is_success, out_path."""
333    if path is None or not Path(path).expanduser().resolve().is_file():
334        return False, Path("")
335    path = Path(path).expanduser().resolve()
336    save_path = path.with_suffix(".jpg")
337    logger.debug(f"Converting {path.name} -> {save_path.name}")
338    try:
339        ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(save_path, vframes=1)
340        ffmpeg.execute()
341    except Exception as e:
342        logger.error(f"Failed convert {path.name} -> {save_path.name}: {e}")
343        return False, path
344    if delete:
345        path.unlink(missing_ok=True)
346    return True, save_path
347
348
349def convert_img_to_telegram_format(path: str | Path | None, *, delete: bool = True) -> Path:
350    if path is None or not Path(path).expanduser().resolve().is_file():
351        return Path("")
352    path = Path(path).expanduser().resolve()
353    if path.suffix.lower() in [".jpg", ".jpeg", ".png", ".bmp", ".gif"]:
354        return path
355
356    success, out_path = convert_jpg_via_pillow(path, delete=delete)
357    if success:
358        logger.success(f"Converted {path.name} via PIL: {out_path.name}")
359        return out_path
360    logger.warning(f"Failed to convert {path.name} via PIL, try FFmpeg ...")
361
362    success, out_path = convert_jpg_via_ffmpeg(path, delete=delete)
363    if success:
364        logger.success(f"Converted {path.name} via FFmpeg: {out_path.name}")
365        return out_path
366    logger.error(f"Failed to convert {path.name} via FFmpeg.")
367
368    return path
369
370
371def convert_img_match_telegram_rules(path: str | Path, num_bytes: int = 10485760, wh_total: int = 10000, max_ratio: float = 20, *, delete: bool = True) -> Path:
372    """Convert image to meet Telegram photo requirements.
373
374    1. photo must be at most 10 MB in size.
375    2. photo's width and height must not exceed 10000 in total.
376    3. photo's width and height ratio must be at most 20.
377    """
378    path = Path(path).expanduser().resolve()
379    if not path.is_file():
380        return Path("")
381    min_ratio = 1 / max_ratio
382    filesize = path.stat().st_size
383    save_path = path.with_stem(f"{path.stem}X")
384    try:
385        img = Image.open(path)
386        width, height = img.size
387        ratio = width / height
388        logger.trace(f"{path.name}: {width}x{height} (r={ratio:.2f}), {filesize} bytes ({readable_size(path=path)})")
389        if filesize < num_bytes and width + height < wh_total and min_ratio < ratio < max_ratio:
390            logger.debug(f"Image is already under limit: {path.name}")
391            img.close()
392            return path
393        new_width = round(0.85 * width)
394        if ratio > max_ratio:
395            new_height = round(new_width / max_ratio)
396        elif ratio < min_ratio:
397            new_height = round(new_width * min_ratio)
398        else:
399            new_height = round(new_width / ratio)
400
401        img = img.resize((new_width, new_height))
402        img.save(save_path)
403        img.close()
404    except Exception as e:
405        logger.error(f"Failed to reduce image size: {path}, {e}")
406        return path
407    if delete:
408        path.unlink(missing_ok=True)
409    logger.debug(f"Reduced image size: {save_path.stat().st_size} bytes ({readable_size(path=save_path)})")
410    return convert_img_match_telegram_rules(save_path, num_bytes, wh_total, max_ratio, delete=delete)
411
412
413def validate_img(path: str | Path | None, *, delete: bool = True) -> str:
414    """Check if the image is valid.
415
416    0. format must be in ["heic", "jpg", "jpeg", "png", "webp"]
417    1. photo must be at most 10 MB in size.
418    2. photo's width and height must not exceed 10000 in total.
419    3. photo's width and height ratio must be at most 20.
420    """
421    if path is None or not Path(path).expanduser().resolve().is_file():
422        logger.warning(f"Image path not found: {path}")
423        return ""
424    path = Path(path).expanduser().resolve()
425    logger.trace(f"Checking image: {path.name} [{readable_size(path=path)}]")
426    # Telegram support image format: JPEG, PNG, BMP, and GIF
427    # For other formats, we should convert them to .jpg
428    if path.suffix.lower() not in [".heic", ".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"]:
429        logger.warning(f"Invalid image format: {path.name}")
430        return ""
431    path = convert_img_to_telegram_format(path, delete=delete)
432
433    if not path.is_file():
434        logger.warning(f"Invalid image: {path}")
435        return ""
436    try:
437        img = Image.open(path)
438        img.verify()
439        img.close()
440        valid_path = convert_img_match_telegram_rules(path, delete=delete)
441        if valid_path.as_posix() != path.as_posix():
442            valid_path.rename(path)
443    except Exception as e:
444        logger.error(f"Broken image: {path}, {e}")
445        if delete:
446            path.unlink(missing_ok=True)
447        return ""
448    return path.as_posix() if path.is_file() else ""
449
450
451async def is_valid_video_or_audio(path: str | Path | None, *, delete: bool = True) -> bool:
452    """Check if the video is valid."""
453    if await parse_media_info(path):
454        return True
455
456    logger.error(f"Invalid video: {path}")
457    if delete:
458        Path(str(path)).unlink(missing_ok=True)
459    return False
460
461
462async def fix_video_rotation(path: str | Path | None) -> Path:
463    """Fix video rotation for iOS devcies.
464
465    Some videos (Weibo's livephotos) are displayed in the wrong direction on the Telegram iOS client.
466    """
467    if path is None or not Path(path).expanduser().resolve().is_file():
468        return Path("")
469    path = Path(path).expanduser().resolve()
470    logger.trace(f"Checking video rotation: {path.name} [{readable_size(path=path)}]")
471    probe_info = await parse_media_info(path)
472    if not probe_info:  # video can't parse by ffprobe
473        logger.warning(f"Invalid video: {path}")
474        return path
475    if probe_info.get("rotation") in [-90, 90]:
476        logger.warning(f"Fixing video rotation from {probe_info['height']}x{probe_info['width']}")
477        path = await convert_to_h264(path, allow_re_encoding=True)
478    return path
479
480
481async def fix_ctts_invalid(path: str | Path | None) -> Path:
482    """Convert CTTS invalid video to H264.
483
484    CTTS invalid videos can't be played on Telegram iOS client.
485    """
486    if path is None or not Path(path).expanduser().resolve().is_file():
487        return Path("")
488    path = Path(path).expanduser().resolve()
489    probe_info = await parse_media_info(path)
490    if not probe_info or not probe_info.get("ctts_invalid", False):
491        return path
492    logger.warning(f"Converting CTTS invalid video: {path.name}")
493    return await convert_to_h264(path, force_re_encoding=True)
494
495
496if __name__ == "__main__":
497    # print(convert_to_h264("~/tests/test.mov"))
498    # is_valid_video_or_audio("~/tests/test.jpg")
499    # convert_img_match_telegram_rules("~/tests/test.large.jpg")
500    print(convert_img_to_telegram_format("~/tests/test.heic"))