main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import json
  5from datetime import timedelta
  6from decimal import ROUND_DOWN, Decimal, getcontext
  7from pathlib import Path
  8
  9from ffmpeg import Progress
 10from ffmpeg.asyncio import FFmpeg
 11from loguru import logger
 12from pyrogram.client import Client
 13from pyrogram.types import Message
 14
 15from config import DOWNLOAD_DIR, PREFIX
 16from messages.parser import parse_msg
 17from messages.progress import modify_progress
 18from messages.sender import send2tg
 19from messages.utils import equal_prefix, remove_prefix, startswith_prefix
 20from multimedia import convert_to_h264, generate_cover, parse_media_info
 21from utils import readable_size, seconds_to_time
 22
 23
 24async def ffmpeg_cut(client: Client, message: Message, **kwargs):
 25    """Cut video or audio file."""
 26    docs = f"""✂️**视频切片**
 27该功能需要使用`{PREFIX.FFMPEG_CUT}`命令回复一条视频消息
 28
 29**命令格式**:
 30`{PREFIX.FFMPEG_CUT} start_time [end_time]`
 31时间格式可以为:
 32- 秒数: `10`
 33- 分秒: `1:00`
 34- 时分秒: `1:00:00`
 35(秒数可以为小数, 如: `1.5`)
 36
 37**示例**:
 38- `{PREFIX.FFMPEG_CUT} 10 20`: 第10秒至第20秒
 39- `{PREFIX.FFMPEG_CUT} 10 1:30`: 第10秒至1分30秒
 40- `{PREFIX.FFMPEG_CUT} 1:30.25 300`: 第1分30秒250豪秒至300秒
 41"""
 42    if not startswith_prefix(message.content, prefix=PREFIX.FFMPEG_CUT):
 43        return
 44    # send docs if message == "/cut"
 45    if equal_prefix(message.content, prefix=PREFIX.FFMPEG_CUT):
 46        await send2tg(client, message, texts=docs, **kwargs)
 47        return
 48    media_message = message.reply_to_message if message.reply_to_message else message
 49    info = parse_msg(media_message)
 50
 51    if info["mtype"] != "video":
 52        await send2tg(client, message, texts="❌请回复一条视频消息\n\n" + docs, **kwargs)
 53        return
 54    times = remove_prefix(message.content, prefix=PREFIX.FFMPEG_CUT).split()
 55    if len(times) not in [1, 2]:
 56        await send2tg(client, message, texts="❌时间格式错误\n\n" + docs, **kwargs)
 57        return
 58
 59    fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
 60    if not Path(fpath).is_file():
 61        kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载视频...", quote=True))
 62        fpath: str = await client.download_media(media_message, file_name=fpath)  # type: ignore
 63    if not Path(fpath).is_file():
 64        await modify_progress(text="❌视频下载失败", force_update=True, **kwargs)
 65        return
 66    vinfo = await parse_media_info(fpath)
 67    ext = Path(fpath).suffix or ".mp4"
 68    start_time = sanitize_time(times[0])
 69    out_path = Path(DOWNLOAD_DIR).joinpath(f"ffmpeg-cut-{info['file_name']}").as_posix()
 70    if len(times) == 1:
 71        end_time = sanitize_time(vinfo["duration"])
 72        cmd = f"✅视频下载完成 ({readable_size(path=fpath)})\n✂️执行切片命令:\n`ffmpeg -ss {start_time} -i input{ext} -c:v libx264 -c:a aac output{ext}`"
 73        ffmpeg = FFmpeg().option("y").input(fpath, ss=start_time).output(out_path, acodec="aac", vcodec="libx264", f="mp4")
 74    else:
 75        end_time = sanitize_time(times[1])
 76        cmd = f"✅视频下载完成 ({readable_size(path=fpath)})\n✂️执行切片命令:\n`ffmpeg -ss {start_time} -to {end_time} -i input{ext} -c:v libx264 -c:a aac output{ext}`"
 77        ffmpeg = FFmpeg().option("y").input(fpath, ss=start_time, to=end_time).output(out_path, acodec="aac", vcodec="libx264", f="mp4")
 78
 79    await modify_progress(text=cmd, force_update=True, **kwargs)
 80
 81    @ffmpeg.on("progress")
 82    def on_progress(p: Progress):
 83        logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
 84
 85    await ffmpeg.execute()
 86    if not Path(out_path).is_file():
 87        await modify_progress(text="❌切片失败", force_update=True, **kwargs)
 88        return
 89    """
 90                {
 91                    "video": "path/to/video.mp4",
 92                    "width": int,
 93                    "height": int,
 94                    "duration": int,
 95                    "thumb": "path/to/thumbnail.jpg" | None,
 96                },
 97                {
 98                    "audio": "path/to/audio.mp3",
 99                    "performer": str,
100                    "title": str,
101                    "duration": int,
102                    "thumb": "path/to/thumbnail.jpg" | None,
103                }
104    """
105    minfo = await parse_media_info(out_path)
106    media = [{"video": out_path, "thumb": generate_cover(fpath)}]
107    start = start_time.removeprefix("00:")
108    end = end_time.removeprefix("00:")
109    duration = f"{minfo['raw_duration']:.3f}".rstrip("0").rstrip(".")
110    caption = f"✂️切片时间: {start} - {end}\n⏳切片时长: {duration}\n💾切片大小: {minfo['filesize']}"
111    logger.success("✅切片完成\n" + caption)
112    await send2tg(client, message, texts=caption, media=media, **kwargs)
113    await modify_progress(del_status=True, **kwargs)
114
115
116def sanitize_time(t: str | float) -> str:
117    """Sanitize time string (HH:MM:SS.MILLISECONDS).
118
119    10 -> 00:00:10
120    10.25 -> 00:00:10.25
121    12:00 -> 00:12:00
122    12:34:00 -> 12:34:00
123    """
124    t = str(t)
125    ctx = getcontext()
126    ctx.prec = 3
127    ctx.rounding = ROUND_DOWN
128    parts = t.replace("", ":").split(":")  # noqa: RUF001
129    hours = 0
130    minutes = 0
131    seconds = Decimal(0)
132    if len(parts) == 1:
133        seconds = Decimal(parts[0])
134    elif len(parts) == 2:
135        minutes = int(parts[0])
136        seconds = Decimal(parts[1])
137    elif len(parts) == 3:
138        hours = int(parts[0])
139        minutes = int(parts[1])
140        seconds = Decimal(parts[2])
141    else:
142        return ""
143    delta = timedelta(hours=hours, minutes=minutes, seconds=float(seconds))
144    total_seconds = int(delta.total_seconds())
145    microseconds = delta.microseconds
146    h = total_seconds // 3600
147    m = (total_seconds % 3600) // 60
148    s = total_seconds % 60
149    time_str = f"{h:02d}:{m:02d}:{s:02d}"
150    if microseconds > 0:
151        if "." in t:
152            fractional_part = t.split(".")[-1]
153            time_str += f".{fractional_part}"
154        else:
155            time_str += f".{microseconds:06d}".rstrip("0")
156    return time_str
157
158
159async def ffmpeg_h264(client: Client, message: Message, **kwargs):
160    """Convert video to h264."""
161    if not startswith_prefix(message.content, prefix=PREFIX.FFMPEG_H264):
162        return
163    # send docs if message == "/h264"
164    if equal_prefix(message.content, prefix=PREFIX.FFMPEG_H264) and not message.reply_to_message:
165        await send2tg(client, message, texts=f"🔄使用 `{PREFIX.FFMPEG_H264}` 回复视频消息将其转换为H264编码", **kwargs)
166        return
167    media_message = message.reply_to_message if message.reply_to_message else message
168    info = parse_msg(media_message)
169    if info["mtype"] != "video":
170        await send2tg(client, message, texts="❌请回复一条视频消息", **kwargs)
171        return
172    fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
173    if not Path(fpath).is_file():
174        kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载视频文件...", quote=True))
175        fpath: str = await client.download_media(media_message, file_name=fpath)  # type: ignore
176    if not Path(fpath).is_file():
177        await modify_progress(text="❌视频文件下载失败", force_update=True, **kwargs)
178        return
179
180    media_info = await parse_media_info(fpath)
181    if media_info["video_codec"] == "h264" and not equal_prefix(message.content, prefix=f"{PREFIX.FFMPEG_H264} force"):
182        await modify_progress(text=f"⚠️该视频已经是H264编码, 无需转换\n使用 `{PREFIX.FFMPEG_H264} force` 强制进行转换", force_update=True, **kwargs)
183        return
184
185    h264_path = await convert_to_h264(fpath, force_re_encoding=True)
186    await send2tg(client, media_message, media=[{"video": h264_path}], **kwargs)
187    await modify_progress(del_status=True, **kwargs)
188
189
190async def ffprobe(client: Client, message: Message, **kwargs):
191    """Get media info."""
192    if not equal_prefix(message.content, prefix=PREFIX.FFPROBE):
193        return
194    media_message = message.reply_to_message if message.reply_to_message else message
195    info = parse_msg(media_message)
196    if info["mtype"] not in ["video", "audio", "photo"]:
197        await send2tg(client, media_message, texts="❌ffprobe不支持该媒体类型", **kwargs)
198        return
199    kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载媒体文件...", quote=True))
200    fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
201    if not Path(fpath).is_file():
202        fpath: str = await client.download_media(media_message, file_name=fpath)  # type: ignore
203    if not Path(fpath).is_file():
204        await modify_progress(text="❌文件下载失败", force_update=True, **kwargs)
205        return
206    try:
207        cmd = ["ffprobe", "-show_streams", "-print_format", "json", fpath]
208        process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
209        stdout_data, _ = await process.communicate()
210        metadata = json.loads(stdout_data)
211        await send2tg(client, media_message, texts=json.dumps(metadata, ensure_ascii=False, indent=2), **kwargs)
212    except Exception as e:
213        await send2tg(client, media_message, texts=f"❌获取媒体信息失败: {e}", **kwargs)
214    await modify_progress(del_status=True, **kwargs)