main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import json
5from datetime import timedelta
6from decimal import ROUND_DOWN, Decimal, getcontext
7from pathlib import Path
8
9from ffmpeg import Progress
10from ffmpeg.asyncio import FFmpeg
11from loguru import logger
12from pyrogram.client import Client
13from pyrogram.types import Message
14
15from config import DOWNLOAD_DIR, PREFIX
16from messages.parser import parse_msg
17from messages.progress import modify_progress
18from messages.sender import send2tg
19from messages.utils import equal_prefix, remove_prefix, startswith_prefix
20from multimedia import convert_to_h264, generate_cover, parse_media_info
21from utils import readable_size, seconds_to_time
22
23
24async def ffmpeg_cut(client: Client, message: Message, **kwargs):
25 """Cut video or audio file."""
26 docs = f"""✂️**视频切片**
27该功能需要使用`{PREFIX.FFMPEG_CUT}`命令回复一条视频消息
28
29**命令格式**:
30`{PREFIX.FFMPEG_CUT} start_time [end_time]`
31时间格式可以为:
32- 秒数: `10`
33- 分秒: `1:00`
34- 时分秒: `1:00:00`
35(秒数可以为小数, 如: `1.5`)
36
37**示例**:
38- `{PREFIX.FFMPEG_CUT} 10 20`: 第10秒至第20秒
39- `{PREFIX.FFMPEG_CUT} 10 1:30`: 第10秒至1分30秒
40- `{PREFIX.FFMPEG_CUT} 1:30.25 300`: 第1分30秒250豪秒至300秒
41"""
42 if not startswith_prefix(message.content, prefix=PREFIX.FFMPEG_CUT):
43 return
44 # send docs if message == "/cut"
45 if equal_prefix(message.content, prefix=PREFIX.FFMPEG_CUT):
46 await send2tg(client, message, texts=docs, **kwargs)
47 return
48 media_message = message.reply_to_message if message.reply_to_message else message
49 info = parse_msg(media_message)
50
51 if info["mtype"] != "video":
52 await send2tg(client, message, texts="❌请回复一条视频消息\n\n" + docs, **kwargs)
53 return
54 times = remove_prefix(message.content, prefix=PREFIX.FFMPEG_CUT).split()
55 if len(times) not in [1, 2]:
56 await send2tg(client, message, texts="❌时间格式错误\n\n" + docs, **kwargs)
57 return
58
59 fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
60 if not Path(fpath).is_file():
61 kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载视频...", quote=True))
62 fpath: str = await client.download_media(media_message, file_name=fpath) # type: ignore
63 if not Path(fpath).is_file():
64 await modify_progress(text="❌视频下载失败", force_update=True, **kwargs)
65 return
66 vinfo = await parse_media_info(fpath)
67 ext = Path(fpath).suffix or ".mp4"
68 start_time = sanitize_time(times[0])
69 out_path = Path(DOWNLOAD_DIR).joinpath(f"ffmpeg-cut-{info['file_name']}").as_posix()
70 if len(times) == 1:
71 end_time = sanitize_time(vinfo["duration"])
72 cmd = f"✅视频下载完成 ({readable_size(path=fpath)})\n✂️执行切片命令:\n`ffmpeg -ss {start_time} -i input{ext} -c:v libx264 -c:a aac output{ext}`"
73 ffmpeg = FFmpeg().option("y").input(fpath, ss=start_time).output(out_path, acodec="aac", vcodec="libx264", f="mp4")
74 else:
75 end_time = sanitize_time(times[1])
76 cmd = f"✅视频下载完成 ({readable_size(path=fpath)})\n✂️执行切片命令:\n`ffmpeg -ss {start_time} -to {end_time} -i input{ext} -c:v libx264 -c:a aac output{ext}`"
77 ffmpeg = FFmpeg().option("y").input(fpath, ss=start_time, to=end_time).output(out_path, acodec="aac", vcodec="libx264", f="mp4")
78
79 await modify_progress(text=cmd, force_update=True, **kwargs)
80
81 @ffmpeg.on("progress")
82 def on_progress(p: Progress):
83 logger.trace(f"Converted time: {seconds_to_time(p.time.seconds)}, size: {readable_size(p.size)}, speed: {p.speed}")
84
85 await ffmpeg.execute()
86 if not Path(out_path).is_file():
87 await modify_progress(text="❌切片失败", force_update=True, **kwargs)
88 return
89 """
90 {
91 "video": "path/to/video.mp4",
92 "width": int,
93 "height": int,
94 "duration": int,
95 "thumb": "path/to/thumbnail.jpg" | None,
96 },
97 {
98 "audio": "path/to/audio.mp3",
99 "performer": str,
100 "title": str,
101 "duration": int,
102 "thumb": "path/to/thumbnail.jpg" | None,
103 }
104 """
105 minfo = await parse_media_info(out_path)
106 media = [{"video": out_path, "thumb": generate_cover(fpath)}]
107 start = start_time.removeprefix("00:")
108 end = end_time.removeprefix("00:")
109 duration = f"{minfo['raw_duration']:.3f}".rstrip("0").rstrip(".")
110 caption = f"✂️切片时间: {start} - {end}\n⏳切片时长: {duration}秒\n💾切片大小: {minfo['filesize']}"
111 logger.success("✅切片完成\n" + caption)
112 await send2tg(client, message, texts=caption, media=media, **kwargs)
113 await modify_progress(del_status=True, **kwargs)
114
115
116def sanitize_time(t: str | float) -> str:
117 """Sanitize time string (HH:MM:SS.MILLISECONDS).
118
119 10 -> 00:00:10
120 10.25 -> 00:00:10.25
121 12:00 -> 00:12:00
122 12:34:00 -> 12:34:00
123 """
124 t = str(t)
125 ctx = getcontext()
126 ctx.prec = 3
127 ctx.rounding = ROUND_DOWN
128 parts = t.replace(":", ":").split(":") # noqa: RUF001
129 hours = 0
130 minutes = 0
131 seconds = Decimal(0)
132 if len(parts) == 1:
133 seconds = Decimal(parts[0])
134 elif len(parts) == 2:
135 minutes = int(parts[0])
136 seconds = Decimal(parts[1])
137 elif len(parts) == 3:
138 hours = int(parts[0])
139 minutes = int(parts[1])
140 seconds = Decimal(parts[2])
141 else:
142 return ""
143 delta = timedelta(hours=hours, minutes=minutes, seconds=float(seconds))
144 total_seconds = int(delta.total_seconds())
145 microseconds = delta.microseconds
146 h = total_seconds // 3600
147 m = (total_seconds % 3600) // 60
148 s = total_seconds % 60
149 time_str = f"{h:02d}:{m:02d}:{s:02d}"
150 if microseconds > 0:
151 if "." in t:
152 fractional_part = t.split(".")[-1]
153 time_str += f".{fractional_part}"
154 else:
155 time_str += f".{microseconds:06d}".rstrip("0")
156 return time_str
157
158
159async def ffmpeg_h264(client: Client, message: Message, **kwargs):
160 """Convert video to h264."""
161 if not startswith_prefix(message.content, prefix=PREFIX.FFMPEG_H264):
162 return
163 # send docs if message == "/h264"
164 if equal_prefix(message.content, prefix=PREFIX.FFMPEG_H264) and not message.reply_to_message:
165 await send2tg(client, message, texts=f"🔄使用 `{PREFIX.FFMPEG_H264}` 回复视频消息将其转换为H264编码", **kwargs)
166 return
167 media_message = message.reply_to_message if message.reply_to_message else message
168 info = parse_msg(media_message)
169 if info["mtype"] != "video":
170 await send2tg(client, message, texts="❌请回复一条视频消息", **kwargs)
171 return
172 fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
173 if not Path(fpath).is_file():
174 kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载视频文件...", quote=True))
175 fpath: str = await client.download_media(media_message, file_name=fpath) # type: ignore
176 if not Path(fpath).is_file():
177 await modify_progress(text="❌视频文件下载失败", force_update=True, **kwargs)
178 return
179
180 media_info = await parse_media_info(fpath)
181 if media_info["video_codec"] == "h264" and not equal_prefix(message.content, prefix=f"{PREFIX.FFMPEG_H264} force"):
182 await modify_progress(text=f"⚠️该视频已经是H264编码, 无需转换\n使用 `{PREFIX.FFMPEG_H264} force` 强制进行转换", force_update=True, **kwargs)
183 return
184
185 h264_path = await convert_to_h264(fpath, force_re_encoding=True)
186 await send2tg(client, media_message, media=[{"video": h264_path}], **kwargs)
187 await modify_progress(del_status=True, **kwargs)
188
189
190async def ffprobe(client: Client, message: Message, **kwargs):
191 """Get media info."""
192 if not equal_prefix(message.content, prefix=PREFIX.FFPROBE):
193 return
194 media_message = message.reply_to_message if message.reply_to_message else message
195 info = parse_msg(media_message)
196 if info["mtype"] not in ["video", "audio", "photo"]:
197 await send2tg(client, media_message, texts="❌ffprobe不支持该媒体类型", **kwargs)
198 return
199 kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载媒体文件...", quote=True))
200 fpath = Path(DOWNLOAD_DIR).joinpath(f"{info['mid']}-{info['cid']}-{info['file_name']}").as_posix()
201 if not Path(fpath).is_file():
202 fpath: str = await client.download_media(media_message, file_name=fpath) # type: ignore
203 if not Path(fpath).is_file():
204 await modify_progress(text="❌文件下载失败", force_update=True, **kwargs)
205 return
206 try:
207 cmd = ["ffprobe", "-show_streams", "-print_format", "json", fpath]
208 process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
209 stdout_data, _ = await process.communicate()
210 metadata = json.loads(stdout_data)
211 await send2tg(client, media_message, texts=json.dumps(metadata, ensure_ascii=False, indent=2), **kwargs)
212 except Exception as e:
213 await send2tg(client, media_message, texts=f"❌获取媒体信息失败: {e}", **kwargs)
214 await modify_progress(del_status=True, **kwargs)