Commit b6009e9
Changed files (12)
src/asr/ali_asr.py
@@ -37,7 +37,7 @@ async def ali_asr(path: str | Path) -> dict:
logger.debug(f"阿里云ASR {path} via model: {model}")
api_key = random.choice(api_keys)
if model.startswith("paraformer-realtime-"):
- return ali_realtime_asr(model, path, api_key)
+ return await ali_realtime_asr(model, path, api_key)
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "X-DashScope-Async": "enable"}
path = Path(path).expanduser().resolve()
@@ -45,7 +45,7 @@ async def ali_asr(path: str | Path) -> dict:
url = FILE_SERVER.removesuffix("/") + "/" + path.name
elif ASR.ALI_FS_ENGINE.lower() == "uguu":
if path.stat().st_size > 100 * 1024 * 1024: # 100 MB
- path = downsampe_audio(path)
+ path = await downsampe_audio(path)
url = await upload_uguu(path) # max 100 MB for Uguu
elif ASR.ALI_FS_ENGINE.lower() == "alist":
url = await upload_alist(path)
@@ -111,11 +111,11 @@ async def query_ali_asr(task_id: str, api_key: str, query_times: int = 0) -> dic
return {"error": "❌" + glom(result, "output.message", default="语音识别失败")}
-def ali_realtime_asr(model: str, path: str | Path, api_key: str) -> dict:
+async def ali_realtime_asr(model: str, path: str | Path, api_key: str) -> dict:
# convert audio file
sample_rate = 8000 if "8k" in model else 16000
ext = "opus"
- audio_path = downsampe_audio(path, ext=ext, sample_rate=sample_rate, ac=1)
+ audio_path = await downsampe_audio(path, ext=ext, sample_rate=sample_rate, ac=1)
recognition = Recognition(model=model, format=ext, sample_rate=sample_rate, callback=RecognitionCallback(), api_key=api_key)
result = recognition.call(Path(audio_path).as_posix())
if result.status_code != 200:
src/asr/tecent_asr.py
@@ -152,7 +152,7 @@ async def tencent_async_asr(path: str | Path, engine: str) -> dict:
url = FILE_SERVER.removesuffix("/") + "/" + path.name
elif ASR.TENCENT_FS_ENGINE.lower() == "uguu":
if path.stat().st_size > 100 * 1024 * 1024: # 100 MB
- path = downsampe_audio(path)
+ path = await downsampe_audio(path)
url = await upload_uguu(path) # max 100 MB for Uguu
elif ASR.TENCENT_FS_ENGINE.lower() == "alist":
url = await upload_alist(path)
src/asr/utils.py
@@ -70,11 +70,11 @@ def get_gemini_asr_method(duration: float) -> tuple[str, list[str]]:
return "gemini", ["aac", "aiff", "flac", "mp3", "oga", "ogg", "opus", "wav"]
-def downsampe_audio(path: str | Path, ext: str = "opus", codec: str = "libopus", sample_rate: int = 16000, **kwargs) -> Path:
+async def downsampe_audio(path: str | Path, ext: str = "opus", codec: str = "libopus", sample_rate: int = 16000, **kwargs) -> Path:
path = Path(path).expanduser().resolve()
if not path.is_file():
return path
- return convert_to_audio(path, ext=ext, codec=codec, ar=sample_rate, **kwargs)
+ return await convert_to_audio(path, ext=ext, codec=codec, ar=sample_rate, **kwargs)
def is_english_word(text: str) -> bool:
src/asr/voice_recognition.py
@@ -182,7 +182,7 @@ async def asr_file(
path = Path(path).expanduser().resolve()
if not path.is_file():
return {"error": f"{path} is not exist"}
- info = parse_media_info(path)
+ info = await parse_media_info(path)
if duration == 0:
duration = info["duration"]
asr_method, supported_ext = get_asr_method(duration, file_size=Path(path).stat().st_size, force_engine=engine)
@@ -194,7 +194,7 @@ async def asr_file(
if info["audio_codec"].split("/")[-1] in supported_ext and not info["video_codec"]:
voice_format = info["audio_codec"].split("/")[-1]
else:
- path = convert_to_audio(path, ext="opus", codec="libopus")
+ path = await convert_to_audio(path, ext="opus", codec="libopus")
voice_format = "opus"
# match again based on converted file
asr_method, supported_ext = get_asr_method(duration, file_size=Path(path).stat().st_size, force_engine=engine)
src/messages/preprocess.py
@@ -10,7 +10,7 @@ from messages.utils import count_without_entities, smart_split
from multimedia import fix_video_rotation, generate_cover, is_valid_video_or_audio, parse_media_info, split_large_video, split_long_img, validate_img
-def preprocess_media(media: list[dict]) -> list[dict]:
+async def preprocess_media(media: list[dict]) -> list[dict]:
"""Filter out invalid media files.
- photo must be at most 10 MB in size.
@@ -60,7 +60,7 @@ def preprocess_media(media: list[dict]) -> list[dict]:
done_photos.append(data)
continue
if photo_path := data.get("photo"):
- valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
+ valid_photos = [await validate_img(photo) for photo in await split_long_img(photo_path) if await validate_img(photo)]
done_photos.extend({"photo": valid_photo} for valid_photo in valid_photos)
# Step-2: Videos
@@ -71,19 +71,19 @@ def preprocess_media(media: list[dict]) -> list[dict]:
continue
thumb = data.get("thumb") # thumb is provided
if video_path := data.get("video"):
- video_path = fix_video_rotation(video_path)
+ video_path = await fix_video_rotation(video_path)
if not is_valid_video_or_audio(video_path):
logger.warning(f"Video is invalid: {video_path}")
continue
# split large video files ( < 2GB)
- valid_videos = [x for x in split_large_video(video_path) if is_valid_video_or_audio(x)]
+ valid_videos = [x for x in await split_large_video(video_path) if is_valid_video_or_audio(x)]
# generate thumbnails for each video if thumb is not provided
- thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
+ thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := await validate_img(thumb)) else [await generate_cover(x) for x in valid_videos]
for vpath, tpath in zip(valid_videos, thumbs, strict=True):
- video_info = parse_media_info(vpath)
- thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
+ video_info = await parse_media_info(vpath)
+ thumb = valid_thumb if (valid_thumb := await validate_img(tpath)) else None
done_videos.append({"video": vpath.as_posix(), "width": video_info["width"], "height": video_info["height"], "duration": video_info["duration"], "thumb": thumb})
# Step-3: Audios
done_audios = []
@@ -95,16 +95,16 @@ def preprocess_media(media: list[dict]) -> list[dict]:
if not is_valid_video_or_audio(audio_path):
logger.warning(f"Audio is invalid: {audio_path}")
continue
- audio_info = parse_media_info(audio_path)
+ audio_info = await parse_media_info(audio_path)
thumb = data.get("thumb") # thumb is provided
- thumb = valid_thumb if (valid_thumb := validate_img(thumb)) else generate_cover(data["audio"])
+ thumb = valid_thumb if (valid_thumb := await validate_img(thumb)) else await generate_cover(data["audio"])
done_audios.append(
{
"audio": audio_path,
"performer": data.get("performer", "Performer"),
"title": data.get("title", audio_path.stem),
"duration": audio_info["duration"],
- "thumb": thumb if validate_img(thumb) else None,
+ "thumb": thumb if await validate_img(thumb) else None,
}
)
src/messages/sender.py
@@ -70,7 +70,7 @@ async def send2tg(
target_chat = to_int(target_chat)
reply_parameters = get_reply_to(message.id, reply_msg_id)
media = media or []
- media = preprocess_media(media)
+ media = await preprocess_media(media)
texts = texts or ""
if comments: # append comments to texts
texts = texts + "".join(comments) + BLOCKQUOTE_EXPANDABLE_END_DELIM
src/others/download_external.py
@@ -66,7 +66,7 @@ async def download_url_in_message(client: Client, message: Message, extra_prefix
if path.suffix != suffix:
path.rename(path.with_suffix(suffix))
path = path.with_suffix(suffix)
- if img := validate_img(path, force_jpg=False, delete=False):
+ if img := await validate_img(path, force_jpg=False, delete=False):
await modify_progress(text=f"🏞图片下载成功: {readable_size(path=img)}", force_update=True, **kwargs)
success = await send2tg(client, message, target_chat, reply_msg_id, texts=caption, media=[{"photo": img}])
elif path.suffix in [".m4a", ".mp3", ".wav", ".ogg", ".opus", ".flac", ".aac"]:
src/others/extract_audio.py
@@ -60,7 +60,7 @@ async def extract_audio_file(client: Client, message: Message, **kwargs) -> None
else:
await modify_progress(text=f"[{info['mtype']}]下载失败, 请稍后重试...", force_update=True, **kwargs)
return
- path = convert_to_audio(video, ext="m4a")
+ path = await convert_to_audio(video, ext="m4a")
if not Path(path).expanduser().resolve().is_file():
logger.trace(f"File not found: {path}")
return
@@ -69,7 +69,8 @@ async def extract_audio_file(client: Client, message: Message, **kwargs) -> None
path.rename(path.with_name(path.name.replace(".final.m4a", ".m4a")))
path = path.with_name(path.name.replace(".final.m4a", ".m4a"))
await modify_progress(text="🎧音频提取已完成, 开始上传...", force_update=True, **kwargs)
- duration = parse_media_info(path).get("duration", 0)
+ minfo = await parse_media_info(path)
+ duration = minfo.get("duration", 0)
target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
reply_msg_id = kwargs.get("reply_msg_id", 0)
src/others/raw_img_file.py
@@ -31,7 +31,7 @@ async def convert_raw_img_file(client: Client, message: Message, *, convert_need
logger.info(f"Convert raw image: {info['file_name']}")
path: str = await trigger_message.download() # type: ignore
logger.trace(f"Image {info['file_name']} downloaded to {path}")
- photos = split_long_img(path)
+ photos = await split_long_img(path)
media = [{"photo": photo.as_posix()} for photo in photos if photo.is_file()]
# send splits
if len(media) == 1:
src/preview/instagram.py
@@ -141,7 +141,7 @@ async def preview_ddinstagram(client: Client, message: Message, url: str, post_t
if img_url:
raw_url = f"{API.DDINSTAGRAM}{img_url}"
media["photo"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.jpg", proxy=PROXY.INSTAGRAM, **kwargs)
- if not bool(validate_img(media["photo"])):
+ if not bool(await validate_img(media["photo"])):
await send_to_social_media_bridge(client, message, text=url, **kwargs)
return
src/preview/ytdlp.py
@@ -208,15 +208,15 @@ async def preview_ytdlp(
target_chat = to_int(target_chat)
reply_msg_id = kwargs.get("reply_msg_id", 0)
reply_parameters = get_reply_to(message.id, reply_msg_id)
- thumb = generate_cover(video_path) if video_path.is_file() else generate_cover(audio_path)
+ thumb = await generate_cover(video_path) if video_path.is_file() else await generate_cover(audio_path)
if not Path(thumb).is_file():
thumb = None
# split large videos into multiple parts (less than 2GB)
if video_path.is_file() and not transcription_only:
- video_path = convert_to_h264(video_path, re_encoding=True, max_file_size=YTDLP_RE_ENCODING_MAX_FILE_BYTES, skip_h264=True)
+ video_path = await convert_to_h264(video_path, re_encoding=True, max_file_size=YTDLP_RE_ENCODING_MAX_FILE_BYTES, skip_h264=True)
if video_path.stat().st_size > MAX_FILE_BYTES:
await modify_progress(text="🎬视频大小超过Telegram限制(2000MB), 正在切分...", **kwargs)
- videos = preprocess_media([{"video": video_path, "thumb": thumb}])
+ videos = await preprocess_media([{"video": video_path, "thumb": thumb}])
for idx, video in enumerate(videos):
video["thumb"] = thumb
caption = texts.replace("📝[", f"📝[P{idx + 1}-") if len(videos) > 1 else texts
src/multimedia.py
@@ -5,7 +5,8 @@ import json
import math
from pathlib import Path
-from ffmpeg import FFmpeg, FFmpegError, Progress
+from ffmpeg import FFmpegError, Progress
+from ffmpeg.asyncio import FFmpeg
from loguru import logger
from PIL import Image
@@ -13,7 +14,7 @@ from config import MAX_FILE_BYTES
from utils import readable_size
-def parse_media_info(path: str | Path | None) -> dict:
+async def parse_media_info(path: str | Path | None) -> dict:
"""Given a media filepath, parse necessary information."""
if path is None or not Path(path).expanduser().resolve().is_file():
logger.error(f"File not found: {path}")
@@ -23,7 +24,7 @@ def parse_media_info(path: str | Path | None) -> dict:
ffprobe = FFmpeg(executable="ffprobe").input(path.as_posix(), print_format="json", show_streams=None)
info = {}
try:
- metadata = json.loads(ffprobe.execute())
+ metadata = json.loads(await ffprobe.execute())
streams = metadata.get("streams", [])
audio_stream = next((x for x in streams if x.get("codec_name") and x.get("codec_type", "") == "audio"), {})
video_stream = next((x for x in streams if x.get("codec_name") and x.get("codec_type", "") == "video"), {})
@@ -51,13 +52,13 @@ def parse_media_info(path: str | Path | None) -> dict:
return info
-def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio: float = 3, target_ratio: float = 2.17, overlap: float = 15, *, delete: bool = True) -> list[Path]:
+async def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio: float = 3, target_ratio: float = 2.17, overlap: float = 15, *, delete: bool = True) -> list[Path]:
if path is None or not Path(path).expanduser().resolve().is_file():
return []
path = Path(path).expanduser().resolve()
logger.debug(f"Checking long image: {path.name} [{readable_size(path=path)}]")
photos = []
- path = convert_img_to_telegram_format(path, delete=delete)
+ path = await convert_img_to_telegram_format(path, delete=delete)
try:
img = Image.open(path)
img_width, img_height = img.size
@@ -99,7 +100,7 @@ def split_long_img(path: str | Path | None, max_height: float = 2500, max_ratio:
return photos
-def split_large_video(path: str | Path | None, *, delete: bool = True) -> list[Path]:
+async def split_large_video(path: str | Path | None, *, delete: bool = True) -> list[Path]:
if path is None or not Path(path).expanduser().resolve().is_file():
return []
path = Path(path).expanduser().resolve()
@@ -119,8 +120,8 @@ def split_large_video(path: str | Path | None, *, delete: bool = True) -> list[P
try:
logger.debug(f"Splitting P{idx + 1}: {path.name} -> {out_path.name}")
ffmpeg = FFmpeg().option("y").input(path, ss=f"{start_time * 1000:.0f}ms").output(out_path, acodec="copy", vcodec="copy", fs=split_size)
- ffmpeg.execute()
- if probe := parse_media_info(out_path):
+ await ffmpeg.execute()
+ if probe := await parse_media_info(out_path):
videos.append(out_path)
start_time += probe["duration"]
except Exception as e:
@@ -130,7 +131,7 @@ def split_large_video(path: str | Path | None, *, delete: bool = True) -> list[P
return videos
-def convert_to_h264(
+async def convert_to_h264(
path: str | Path | None,
*,
re_encoding: bool = False,
@@ -158,7 +159,7 @@ def convert_to_h264(
return Path("")
path = Path(path).expanduser().resolve()
logger.debug(f"Checking H264 codec: {path.name}")
- info = parse_media_info(path)
+ info = await parse_media_info(path)
tmp_path = path.with_suffix(f".tmp.{ext}")
mp4_path = path.with_suffix(f".h264.{ext}")
success = True
@@ -176,7 +177,7 @@ def convert_to_h264(
if not re_encoding:
logger.debug(f"Convert video to H264 (copy): {path.name} -> {tmp_path.name}")
ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, codec="copy", movflags="+faststart", f=ext)
- ffmpeg.execute()
+ await ffmpeg.execute()
else:
logger.warning(f"Convert video to H264 (re-encoding): {path.name} -> {tmp_path.name}")
ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, acodec=audio_codec, vcodec="libx264", f=ext)
@@ -189,7 +190,7 @@ def convert_to_h264(
def on_completed():
logger.debug("completed")
- ffmpeg.execute()
+ await ffmpeg.execute()
if delete:
path.unlink(missing_ok=True)
tmp_path.rename(mp4_path)
@@ -209,12 +210,12 @@ def convert_to_h264(
return path
-def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str = "aac", delete: bool = True, **kwargs) -> Path:
+async def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str = "aac", delete: bool = True, **kwargs) -> Path:
if path is None or not Path(path).expanduser().resolve().is_file():
return Path("")
path = Path(path).expanduser().resolve()
logger.debug(f"Converting to audio {ext}: {path.name}")
- info = parse_media_info(path)
+ info = await parse_media_info(path)
tmp_path = path.with_suffix(f".tmp.{ext}")
final_path = path.with_suffix(f".final.{ext}")
success = True
@@ -222,7 +223,7 @@ def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str =
if info["audio_codec"] == codec:
logger.debug(f"Audio stream is already {codec}, without re-encoding: {path.name} -> {tmp_path.name}")
ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, vn=None, acodec="copy", **kwargs)
- ffmpeg.execute()
+ await ffmpeg.execute()
else:
logger.warning(f"Re-encoding audio: {path.name} -> {tmp_path.name}")
ffmpeg = FFmpeg().option("y").input(path).output(tmp_path, vn=None, acodec=codec, **kwargs)
@@ -235,7 +236,7 @@ def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str =
def on_completed():
logger.success(f"Converted audio: {path} to {final_path}, {codec=}")
- ffmpeg.execute()
+ await ffmpeg.execute()
if delete:
path.unlink(missing_ok=True)
tmp_path.rename(final_path)
@@ -254,7 +255,7 @@ def convert_to_audio(path: str | Path | None, ext: str = "m4a", *, codec: str =
return path
-def generate_cover(path: Path | str) -> str:
+async def generate_cover(path: Path | str) -> str:
"""Generate cover image base on media file path.
Must be jpg format.
@@ -275,14 +276,14 @@ def generate_cover(path: Path | str) -> str:
for ext in [".webp", ".png", ".heic", ".bmp"]:
cover_path = Path(path).with_suffix(ext)
if cover_path.is_file():
- converted = convert_img_to_telegram_format(cover_path)
+ converted = await convert_img_to_telegram_format(cover_path)
logger.debug(f"Converted cover image: {cover_path.name} -> {converted.name}")
return converted.as_posix()
logger.debug(f"Generate cover image from the first frame of {path}")
with contextlib.suppress(Exception):
ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(jpg_path, vframes=1)
- ffmpeg.execute()
+ await ffmpeg.execute()
return jpg_path.as_posix() if jpg_path.is_file() else ""
logger.error(f"Failed to generate cover image for: {path}")
@@ -318,7 +319,7 @@ def convert_jpg_via_pillow(path: str | Path | None, *, delete: bool = True) -> t
return True, save_path
-def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
+async def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> tuple[bool, Path]:
"""Returns: is_success, out_path."""
if path is None or not Path(path).expanduser().resolve().is_file():
return False, Path("")
@@ -327,7 +328,7 @@ def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> t
logger.debug(f"Converting {path.name} -> {save_path.name}")
try:
ffmpeg = FFmpeg().option("y").option("loglevel", "warning").input(path).output(save_path, vframes=1)
- ffmpeg.execute()
+ await ffmpeg.execute()
except Exception as e:
logger.error(f"Failed convert {path.name} -> {save_path.name}: {e}")
return False, path
@@ -336,7 +337,7 @@ def convert_jpg_via_ffmpeg(path: str | Path | None, *, delete: bool = True) -> t
return True, save_path
-def convert_img_to_telegram_format(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> Path:
+async def convert_img_to_telegram_format(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> Path:
if path is None or not Path(path).expanduser().resolve().is_file():
return Path("")
path = Path(path).expanduser().resolve()
@@ -352,7 +353,7 @@ def convert_img_to_telegram_format(path: str | Path | None, *, force_jpg: bool =
return out_path
logger.warning(f"Failed to convert {path.name} via PIL, try FFmpeg ...")
- success, out_path = convert_jpg_via_ffmpeg(path, delete=delete)
+ success, out_path = await convert_jpg_via_ffmpeg(path, delete=delete)
if success:
logger.success(f"Converted {path.name} via FFmpeg: {out_path.name}")
return out_path
@@ -403,7 +404,7 @@ def convert_img_match_telegram_rules(path: str | Path, num_bytes: int = 10485760
return convert_img_match_telegram_rules(save_path, num_bytes, wh_total, max_ratio, delete=delete)
-def validate_img(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> str:
+async def validate_img(path: str | Path | None, *, force_jpg: bool = True, delete: bool = True) -> str:
"""Check if the image is valid.
0. format must be in ["heic", "jpg", "jpeg", "png", "webp"]
@@ -421,7 +422,7 @@ def validate_img(path: str | Path | None, *, force_jpg: bool = True, delete: boo
if path.suffix.lower() not in [".heic", ".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"]:
logger.warning(f"Invalid image format: {path.name}")
return ""
- path = convert_img_to_telegram_format(path, force_jpg=force_jpg, delete=delete)
+ path = await convert_img_to_telegram_format(path, force_jpg=force_jpg, delete=delete)
if not path.is_file():
logger.warning(f"Invalid image: {path}")
@@ -452,7 +453,7 @@ def is_valid_video_or_audio(path: str | Path | None, *, delete: bool = True) ->
return False
-def fix_video_rotation(path: str | Path | None) -> Path:
+async def fix_video_rotation(path: str | Path | None) -> Path:
"""Fix video rotation for iOS devcies.
Some videos (Weibo's livephotos) are displayed in the wrong direction on the Telegram iOS client.
@@ -461,18 +462,20 @@ def fix_video_rotation(path: str | Path | None) -> Path:
return Path("")
path = Path(path).expanduser().resolve()
logger.trace(f"Checking video rotation: {path.name} [{readable_size(path=path)}]")
- probe_info = parse_media_info(path)
+ probe_info = await parse_media_info(path)
if not probe_info: # video can't parse by ffprobe
logger.warning(f"Invalid video: {path}")
return path
if probe_info.get("rotation") in [-90, 90]:
logger.warning(f"Fixing video rotation from {probe_info['height']}x{probe_info['width']}")
- path = convert_to_h264(path, re_encoding=True)
+ path = await convert_to_h264(path, re_encoding=True)
return path
if __name__ == "__main__":
+ import asyncio
+
# print(convert_to_h264("~/tests/test.mov"))
# is_valid_video_or_audio("~/tests/test.jpg")
# convert_img_match_telegram_rules("~/tests/test.large.jpg")
- print(convert_img_to_telegram_format("~/tests/test.heic"))
+ print(asyncio.run(convert_img_to_telegram_format("~/tests/test.heic")))