Commit 429eb80
Changed files (6)
src
messages
preview
ytdlp
src/messages/preprocess.py
@@ -60,7 +60,7 @@ async def preprocess_media(media: list[dict]) -> list[dict]:
done_photos.append(data)
continue
if photo_path := data.get("photo"):
- valid_photos = [await validate_img(photo) for photo in await split_long_img(photo_path) if await validate_img(photo)]
+ valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
done_photos.extend({"photo": valid_photo} for valid_photo in valid_photos)
# Step-2: Videos
@@ -80,10 +80,10 @@ async def preprocess_media(media: list[dict]) -> list[dict]:
valid_videos = [x for x in await split_large_video(video_path) if await is_valid_video_or_audio(x)]
# generate thumbnails for each video if thumb is not provided
- thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := await validate_img(thumb)) else [await generate_cover(x) for x in valid_videos]
+ thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
for vpath, tpath in zip(valid_videos, thumbs, strict=True):
video_info = await parse_media_info(vpath)
- thumb = valid_thumb if (valid_thumb := await validate_img(tpath)) else None
+ thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
done_videos.append({"video": vpath.as_posix(), "width": video_info["width"], "height": video_info["height"], "duration": video_info["duration"], "thumb": thumb})
# Step-3: Audios
done_audios = []
@@ -97,14 +97,14 @@ async def preprocess_media(media: list[dict]) -> list[dict]:
continue
audio_info = await parse_media_info(audio_path)
thumb = data.get("thumb") # thumb is provided
- thumb = valid_thumb if (valid_thumb := await validate_img(thumb)) else await generate_cover(data["audio"])
+ thumb = valid_thumb if (valid_thumb := validate_img(thumb)) else generate_cover(data["audio"])
done_audios.append(
{
"audio": audio_path,
"performer": data.get("performer", "Performer"),
"title": data.get("title", audio_path.stem),
"duration": audio_info["duration"],
- "thumb": thumb if await validate_img(thumb) else None,
+ "thumb": thumb if validate_img(thumb) else None,
}
)
src/others/download_external.py
@@ -65,7 +65,7 @@ async def download_url_in_message(client: Client, message: Message, extra_prefix
if path.suffix != suffix:
path.rename(path.with_suffix(suffix))
path = path.with_suffix(suffix)
- if img := await validate_img(path, force_jpg=False, delete=False):
+ if img := validate_img(path, force_jpg=False, delete=False):
await modify_progress(text=f"πεΎηδΈθ½½ζε: {readable_size(path=img)}", force_update=True, **kwargs)
success = await send2tg(client, message, target_chat, reply_msg_id, texts=caption, media=[{"photo": img}])
elif path.suffix in [".m4a", ".mp3", ".wav", ".ogg", ".opus", ".flac", ".aac"]:
src/others/raw_img_file.py
@@ -31,7 +31,7 @@ async def convert_raw_img_file(client: Client, message: Message, *, convert_need
logger.info(f"Convert raw image: {info['file_name']}")
path: str = await trigger_message.download() # type: ignore
logger.trace(f"Image {info['file_name']} downloaded to {path}")
- photos = await split_long_img(path)
+ photos = split_long_img(path)
media = [{"photo": photo.as_posix()} for photo in photos if photo.is_file()]
# send splits
if len(media) == 1:
src/preview/instagram.py
@@ -154,7 +154,7 @@ async def preview_ddinstagram(client: Client, message: Message, url: str, post_t
if (tag := soup.find("meta", attrs={"property": "twitter:image"})) and (img_url := tag.get("content")): # type: ignore
raw_url = f"{API.DDINSTAGRAM}{img_url}"
media["photo"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.jpg", proxy=PROXY.INSTAGRAM, **kwargs)
- if not bool(await validate_img(media["photo"])):
+ if not bool(validate_img(media["photo"])):
await send_to_social_media_bridge(client, message, text=url, **kwargs)
return
src/ytdlp/main.py
@@ -265,7 +265,7 @@ async def send_media(
reply_msg_id = kwargs.get("reply_msg_id", 0)
reply_parameters = get_reply_to(message.id, reply_msg_id)
- thumb = await generate_cover(video_path) if video_path.is_file() else await generate_cover(audio_path)
+ thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
if not Path(thumb).is_file():
thumb = None
src/multimedia.py
@@ -5,8 +5,8 @@ import json
import math
from pathlib import Path
-from ffmpeg import FFmpegError, Progress
-from ffmpeg.asyncio import FFmpeg
+from ffmpeg import FFmpeg, FFmpegError, Progress
+from ffmpeg.asyncio import FFmpeg as FFmpegAsync
from loguru import logger
from PIL import Image
@@ -21,7 +21,7 @@ async def parse_media_info(path: str | Path | None) -> dict:
return {}
path = Path(path).expanduser().resolve()
logger.trace(f"Parsing media info: {path.name} [{readable_size(path=path)}]")
- ffprobe = FFmpeg(executable="ffprobe").input(path.as_posix(), print_format="json", show_streams=None)
+ ffprobe = FFmpegAsync(executable="ffprobe").input(path.as_posix(), print_format="json", show_streams=None)
info = {}
try:
metadata = json.loads(await ffprobe.execute())
@@ -52,13 +52,13 @@ async def parse_media_info(path: str | Path | None) -> dict:
return info
-async def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio: float = 3, target_ratio: float = 2.17, overlap: float = 15, *, delete: bool = True) -> list[Path]:
+def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio: float = 3, target_ratio: float = 2.17, overlap: float = 15, *, delete: bool = True) -> list[Path]:
if path is None or not Path(path).expanduser().resolve().is_file():
return []
path = Path(path).expanduser().resolve()
logger.debug(f"Checking long image: {path.name} [{readable_size(path=path)}]")
photos = []
- path = await convert_img_to_telegram_format(path, delete=delete)
+ path = convert_img_to_telegram_format(path, delete=delete)
try:
img = Image.open(path)
img_width, img_height = img.size
@@ -119,7 +119,7 @@ async def split_large_video(path: str | Path | None, *, delete: bool = True) ->
out_path = path.with_stem(f"{path.stem}_{idx + 1:02}")
try:
logger.debug(f"Splitting P{idx + 1}: {path.name} -> {out_path.name}")
- ffmpeg = FFmpeg().option("y").input(path, ss=f"{start_time * 1000:.0f}ms").output(out_path, acodec="copy", vcodec="copy", fs=split_size)
+ ffmpeg = FFmpegAsync().option("y").input(path, ss=f"{start_time * 1000:.0f}ms").output(out_path, acodec="copy", vcodec="copy", fs=split_size)
await ffmpeg.execute()
if probe := await parse_media_info(out_path):
videos.append(out_path)
@@ -176,11 +176,11 @@ async def convert_to_h264(
try:
if not re_encoding:
logger.debug(f"Convert video to H264 (copy): {path.name} -> {tmp_path.name}")
- ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, codec="copy", movflags="+faststart", f=ext)
+ ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, codec="copy", movflags="+faststart", f=ext)
await ffmpeg.execute()
else:
logger.warning(f"Convert video to H264 (re-encoding): {path.name} -> {tmp_path.name}")
- ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, acodec=audio_codec, vcodec="libx264", f=ext)
+ ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, acodec=audio_codec, vcodec="libx264", f=ext)
@ffmpeg.on("progress")
def on_progress(p: Progress):
@@ -222,11 +222,11 @@ async def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec:
try:
if info["audio_codec"] == codec:
logger.debug(f"Audio stream is already {codec}, without re-encoding: {path.name} -> {tmp_path.name}")
- ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, vn=None, acodec="copy", **kwargs)
+ ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, vn=None, acodec="copy", **kwargs)
await ffmpeg.execute()
else:
logger.warning(f"Re-encoding audio: {path.name} -> {tmp_path.name}")
- ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, vn=None, acodec=codec, **kwargs)
+ ffmpeg = FFmpegAsync().option("y").input(path).output(tmp_path, vn=None, acodec=codec, **kwargs)
@ffmpeg.on("progress")
def on_progress(p: Progress):
@@ -255,7 +255,7 @@ async def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec:
return path
-async def generate_cover(path: Path | str) -> str:
+def generate_cover(path: Path | str) -> str:
"""Generate cover image base on media file path.
Must be jpg format.
@@ -278,14 +278,14 @@ async def generate_cover(path: Path | str) -> str:
for ext in [".webp", ".png", ".heic", ".bmp"]:
cover_path = Path(path).with_suffix(ext)
if cover_path.is_file():
- converted = await convert_img_to_telegram_format(cover_path)
+ converted = convert_img_to_telegram_format(cover_path)
logger.debug(f"Converted cover image: {cover_path.name} -> {converted.name}")
return converted.as_posix()
logger.debug(f"Generate cover image from the first frame of {path}")
with contextlib.suppress(Exception):
ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(jpg_path, vframes=1)
- await ffmpeg.execute()
+ ffmpeg.execute()
return jpg_path.as_posix() if jpg_path.is_file() else ""
logger.error(f"Failed to generate cover image for: {path}")
@@ -321,7 +321,7 @@ def convert_jpg_via_pillow(path: str | Path | None, *, delete: bool = True) -> t
return True, save_path
-async def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
+def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
"""Returns: is_success, out_path."""
if path is None or not Path(path).expanduser().resolve().is_file():
return False, Path("")
@@ -330,7 +330,7 @@ async def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True
logger.debug(f"Converting {path.name} -> {save_path.name}")
try:
ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(save_path, vframes=1)
- await ffmpeg.execute()
+ ffmpeg.execute()
except Exception as e:
logger.error(f"Failed convert {path.name} -> {save_path.name}: {e}")
return False, path
@@ -339,7 +339,7 @@ async def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True
return True, save_path
-async def convert_img_to_telegram_format(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> Path:
+def convert_img_to_telegram_format(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> Path:
if path is None or not Path(path).expanduser().resolve().is_file():
return Path("")
path = Path(path).expanduser().resolve()
@@ -355,7 +355,7 @@ async def convert_img_to_telegram_format(path: str | Path | None, *, force_jpg:
return out_path
logger.warning(f"Failed to convert {path.name} via PIL, try FFmpeg ...")
- success, out_path = await convert_jpg_via_ffmpeg(path, delete=delete)
+ success, out_path = convert_jpg_via_ffmpeg(path, delete=delete)
if success:
logger.success(f"Converted {path.name} via FFmpeg: {out_path.name}")
return out_path
@@ -406,7 +406,7 @@ def convert_img_match_telegram_rules(path: str | Path, num_bytes: int = 10485760
return convert_img_match_telegram_rules(save_path, num_bytes, wh_total, max_ratio, delete=delete)
-async def validate_img(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> str:
+def validate_img(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> str:
"""Check if the image is valid.
0. format must be in ["heic", "jpg", "jpeg", "png", "webp"]
@@ -424,7 +424,7 @@ async def validate_img(path: str | Path | None, *, force_jpg: bool = True, delet
if path.suffix.lower() not in [".heic", ".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"]:
logger.warning(f"Invalid image format: {path.name}")
return ""
- path = await convert_img_to_telegram_format(path, force_jpg=force_jpg, delete=delete)
+ path = convert_img_to_telegram_format(path, force_jpg=force_jpg, delete=delete)
if not path.is_file():
logger.warning(f"Invalid image: {path}")
@@ -475,9 +475,7 @@ async def fix_video_rotation(path: str | Path | None) -> Path:
if __name__ == "__main__":
- import asyncio
-
# print(convert_to_h264("~/tests/test.mov"))
# is_valid_video_or_audio("~/tests/test.jpg")
# convert_img_match_telegram_rules("~/tests/test.large.jpg")
- print(asyncio.run(convert_img_to_telegram_format("~/tests/test.heic")))
+ print(convert_img_to_telegram_format("~/tests/test.heic"))