Commit 4bb783f

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-09-22 03:14:50
feat(ffmpeg): add video/audio cutting feature via `ffmpeg`
1 parent c33e31f
src/others/ffmpeg.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from datetime import timedelta
+from decimal import ROUND_DOWN, Decimal, getcontext
+from pathlib import Path
+
+from ffmpeg import Progress
+from ffmpeg.asyncio import FFmpeg
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import DOWNLOAD_DIR, PREFIX
+from messages.parser import parse_msg
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, remove_prefix, startswith_prefix
+from multimedia import generate_cover, parse_media_info
+from utils import readable_size, seconds_to_time
+
+HELP = f"""✂️**视频/音频切片**
+该功能需要使用`{PREFIX.FFMPEG_CUT}`命令回复一条视频或音频消息
+
+**命令格式**:
+`{PREFIX.FFMPEG_CUT} start_time [end_time]`
+时间格式可以为:
+- 秒数: `10`
+- 分秒: `1:00`
+- 时分秒: `1:00:00`
+(秒数可以为小数, 如: `1.5`)
+
+**示例**:
+- `{PREFIX.FFMPEG_CUT} 10 20`: 第10秒至第20秒
+- `{PREFIX.FFMPEG_CUT} 10 1:30`: 第10秒至1分30秒
+- `{PREFIX.FFMPEG_CUT} 1:30.25 300`: 第1分30秒250豪秒至300秒
+
+⚠️注意: 切片使用ffmpeg对视频流或音频流进行copy, 不会重新进行编码
+copy操作使用关键帧作为分割点, 因此切片的开始或结束位置并非精确匹配请求时间点
+"""
+
+
+async def ffmpeg_cut(client: Client, message: Message, **kwargs):
+    """Cut video or audio file."""
+    if not startswith_prefix(message.content, prefix=PREFIX.FFMPEG_CUT):
+        return
+    # send docs if message == "/cut"
+    if equal_prefix(message.content, prefix=PREFIX.FFMPEG_CUT):
+        await send2tg(client, message, texts=HELP, **kwargs)
+        return
+    media_message = message.reply_to_message if message.reply_to_message else message
+    info = parse_msg(media_message)
+
+    if info["mtype"] not in ["video", "audio"]:
+        await send2tg(client, message, texts="❌请回复一条视频或音频消息\n\n" + HELP, **kwargs)
+        return
+    times = remove_prefix(message.content, prefix=PREFIX.FFMPEG_CUT).split()
+    if len(times) not in [1, 2]:
+        await send2tg(client, message, texts="❌时间格式错误\n\n" + HELP, **kwargs)
+        return
+
+    fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
+    if not Path(fpath).is_file():
+        kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载媒体文件...", quote=True))
+        fpath: str = await client.download_media(media_message, file_name=fpath)  # type: ignore
+    if not Path(fpath).is_file():
+        await modify_progress(texts="❌媒体文件下载失败", force_update=True, **kwargs)
+        return
+    ext = Path(fpath).suffix
+    start_time = sanitize_time(times[0])
+    out_path = Path(DOWNLOAD_DIR).joinpath(f"ffmpeg-cut-{info['file_name']}").as_posix()
+    if len(times) == 1:
+        cmd = f"✅文件下载完成 ({readable_size(path=fpath)})\n✂️执行切片命令:\n`ffmpeg -ss {start_time} -i input{ext} -c copy output{ext}`"
+        ffmpeg = FFmpeg().option("y").input(fpath, ss=start_time).output(out_path, acodec="copy", vcodec="copy")
+    else:
+        end_time = sanitize_time(times[1])
+        cmd = f"✅文件下载完成 ({readable_size(path=fpath)})\n✂️执行切片命令:\n`ffmpeg -ss {start_time} -to {end_time} -i input{ext} -c copy output{ext}`"
+        ffmpeg = FFmpeg().option("y").input(fpath, ss=start_time, to=end_time).output(out_path, acodec="copy", vcodec="copy")
+
+    await modify_progress(texts=cmd, force_update=True, **kwargs)
+
+    @ffmpeg.on("progress")
+    def on_progress(p: Progress):
+        logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
+
+    await ffmpeg.execute()
+    if not Path(out_path).is_file():
+        await modify_progress(texts="❌切片失败", force_update=True, **kwargs)
+        return
+    """
+                {
+                    "video": "path/to/video.mp4",
+                    "width": int,
+                    "height": int,
+                    "duration": int,
+                    "thumb": "path/to/thumbnail.jpg" | None,
+                },
+                {
+                    "audio": "path/to/audio.mp3",
+                    "performer": str,
+                    "title": str,
+                    "duration": int,
+                    "thumb": "path/to/thumbnail.jpg" | None,
+                }
+    """
+    minfo = await parse_media_info(out_path)
+    media = [{"video": out_path, "thumb": generate_cover(fpath)}] if info["mtype"] == "video" else [{"audio": out_path, "thumb": generate_cover(fpath)}]
+    caption = f"✂️切片大小: {minfo['filesize']}\n⏳切片时长: {seconds_to_time(minfo['duration'])}\n🎬视频编码: {minfo['video_codec']}\n🎧音频编码: {minfo['audio_codec']}"
+    logger.success("✅切片完成\n" + caption)
+    await send2tg(client, message, texts=caption, media=media, **kwargs)
+    await modify_progress(del_status=True, **kwargs)
+
+
+def sanitize_time(t: str) -> str:
+    """Sanitize time string (HH:MM:SS.MILLISECONDS).
+
+    10 -> 00:00:10
+    10.25 -> 00:00:10.25
+    12:00 -> 00:12:00
+    12:34:00 -> 12:34:00
+    """
+    ctx = getcontext()
+    ctx.prec = 3
+    ctx.rounding = ROUND_DOWN
+    parts = t.split(":")
+    hours = 0
+    minutes = 0
+    seconds = Decimal(0)
+    if len(parts) == 1:
+        seconds = Decimal(parts[0])
+    elif len(parts) == 2:
+        minutes = int(parts[0])
+        seconds = Decimal(parts[1])
+    elif len(parts) == 3:
+        hours = int(parts[0])
+        minutes = int(parts[1])
+        seconds = Decimal(parts[2])
+    else:
+        return ""
+    delta = timedelta(hours=hours, minutes=minutes, seconds=float(seconds))
+    total_seconds = int(delta.total_seconds())
+    microseconds = delta.microseconds
+    h = total_seconds // 3600
+    m = (total_seconds % 3600) // 60
+    s = total_seconds % 60
+    time_str = f"{h:02d}:{m:02d}:{s:02d}"
+    if microseconds > 0:
+        if "." in t:
+            fractional_part = t.split(".")[-1]
+            time_str += f".{fractional_part}"
+        else:
+            time_str += f".{microseconds:06d}".rstrip("0")
+    return time_str
src/config.py
@@ -73,6 +73,7 @@ class ENABLE:  # see fine-grained permission in `src/permission.py`
     CONVERT_CHINESE = os.getenv("ENABLE_CONVERT_CHINESE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     QUOTLY = os.getenv("ENABLE_QUOTLY", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     TMDB = os.getenv("ENABLE_TMDB", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    FFMPEG = os.getenv("ENABLE_FFMPEG", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
 
 
 class PREFIX:
@@ -101,6 +102,7 @@ class PREFIX:
     CONVERT_TO_SC = os.getenv("PREFIX_CONVERT_TO_SC", "/sc, /cn").lower()
     QUOTLY = os.getenv("PREFIX_QUOTLY", "/quote").lower()
     TMDB = os.getenv("PREFIX_TMDB", "/tmdb").lower()
+    FFMPEG_CUT = os.getenv("PREFIX_FFMPEG_CUT", "/cut").lower()
 
 
 class API:
src/handler.py
@@ -23,6 +23,7 @@ from others.convert_chinese import chinese_conversion
 from others.download_external import download_url_in_message
 from others.extract_audio import extract_audio_file
 from others.favorite import save_favorite, send_favorite
+from others.ffmpeg import ffmpeg_cut
 from others.raw_img_file import convert_raw_img_file
 from others.search_google import search_google
 from others.search_ytb import search_youtube
@@ -72,6 +73,7 @@ async def handle_utilities(
     wget: bool = True,
     ytb: bool = True,
     tmdb: bool = True,
+    ffmpeg: bool = True,
     raw_img: bool = True,
     show_progress: bool = True,
     detail_progress: bool = False,
@@ -103,6 +105,7 @@ async def handle_utilities(
         price (bool, optional): Enable Asset price. Defaults to True.
         summary (bool, optional): Enable AI summary. Defaults to True.
         tmdb (bool, optional): Enable TMDB query. Defaults to True.
+        ffmpeg (bool, optional): Enable ffmpeg commands. Defaults to True.
         raw_img (bool, optional): Enable convert raw image. Defaults to False.
         show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
         detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
@@ -147,6 +150,8 @@ async def handle_utilities(
         await search_tmdb(client, message, **kwargs)  # /tmdb
     if raw_img:
         await convert_raw_img_file(client, message, **kwargs)
+    if ffmpeg:
+        await ffmpeg_cut(client, message, **kwargs)
 
 
 async def handle_social_media(
@@ -430,6 +435,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefix: str):
         msg += f"\n📖**查询直播合订本**: 发送 `{PREFIX.DANMU}`, `{PREFIX.FAYAN}` 查看详细教程"
     if permission["convert_chinese"]:
         msg += f"\n🔄**简繁转换**: `{PREFIX.CONVERT_TO_SC}` 或 `{PREFIX.CONVERT_TO_TC}`"
+    if permission["ffmpeg"]:
+        msg += f"\n✂️**视频切片**: `{PREFIX.FFMPEG_CUT}` 回复视频消息"
 
     msg += "\n\n单独发送每个命令前缀本身可查看该命令详细使用说明"
     return msg
src/multimedia.py
@@ -41,7 +41,7 @@ async def parse_media_info(path: str | Path | None) -> dict:
         info = {
             "name": path.stem,
             "path": path.resolve().as_posix(),
-            "duration": math.ceil(float(duration)),
+            "duration": math.floor(float(duration)),
             "width": round(float(width)),
             "height": round(float(height)),
             "audio_codec": audio_stream.get("codec_name", ""),
src/permission.py
@@ -146,6 +146,7 @@ def check_service(cid: int | str, ctype: str) -> dict:
         "convert_chinese": True,
         "quotly": True,
         "tmdb": True,
+        "ffmpeg": True,
     } | global_permissions()
 
     if ctype == "PRIVATE":
@@ -213,6 +214,8 @@ def check_service(cid: int | str, ctype: str) -> dict:
         permission["quotly"] = False
     if not ENABLE.TMDB:
         permission["tmdb"] = False
+    if not ENABLE.FFMPEG:
+        permission["ffmpeg"] = False
 
     """
     Set for specific chat