main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from pathlib import Path
4
5from loguru import logger
6from pyrogram.client import Client
7from pyrogram.types import Message
8
9from config import PREFIX
10from messages.parser import parse_msg
11from messages.progress import modify_progress
12from messages.sender import send2tg
13from messages.utils import equal_prefix, get_reply_to, startswith_prefix
14from multimedia import convert_to_audio, parse_media_info
15from networking import match_social_media_link
16from utils import to_int
17from ytdlp.download import ytdlp_download
18
19HELP = f"""
20🎧**消息转音频或语音**
21使用方式:
221. `{PREFIX.AUDIO}` 回复消息转换为音频文件
232. `{PREFIX.VOICE}` 回复消息转换为语音消息
243. `{PREFIX.AUDIO}` 或 `{PREFIX.VOICE}` + 油管/B站链接: 下载其中的音频
25"""
26
27
28async def extract_audio_file(client: Client, message: Message, **kwargs) -> None:
29 """Extract audio from video message."""
30 # send docs if message == "/audio", without reply
31 if equal_prefix(message.text, prefix=[PREFIX.AUDIO, PREFIX.VOICE]) and not message.reply_to_message:
32 await send2tg(client, message, texts=HELP, **kwargs)
33 return
34 if not startswith_prefix(message.content, prefix=[PREFIX.AUDIO, PREFIX.VOICE]):
35 return
36
37 media_msg = message.reply_to_message or message
38 info = parse_msg(media_msg)
39 if info["mtype"] == "audio" and equal_prefix(media_msg.content, prefix=PREFIX.AUDIO):
40 await message.reply("🎧该消息已经为音频, 无需提取")
41 return
42 if info["mtype"] == "voice" and equal_prefix(media_msg.content, prefix=PREFIX.VOICE):
43 await message.reply("🎧该消息已经为语音, 无需提取")
44 return
45
46 if info["mtype"] in ["video", "audio", "voice"]:
47 kwargs["progress"] = kwargs.get("progress") or await message.reply_text(f"[{info['mtype']}]收到消息, 开始提取🎧音频...", quote=True)
48 video: str = await media_msg.download() # type: ignore
49 if not Path(video).expanduser().resolve().is_file():
50 await modify_progress(text=f"❌{info['mtype']}下载失败, 请稍后重试...", force_update=True, **kwargs)
51 return
52 path = await convert_to_audio(video, ext="m4a")
53 else: # match social media links
54 kwargs["progress"] = kwargs.get("progress", await message.reply_text("使用ytdlp提取🎧音频...", quote=True))
55 matched = await match_social_media_link(info["text"])
56 if matched["platform"] not in ["youtube", "bilibili", "ytdlp"]:
57 await modify_progress(text="❌未找到ytdlp支持的链接", force_update=True, **kwargs)
58 return
59 downloaded = await ytdlp_download(matched["url"], ytdlp_download_video=False, **kwargs)
60 path = downloaded["audio_path"]
61
62 path = Path(path).expanduser().resolve()
63 if not path.is_file():
64 logger.trace(f"File not found: {path}")
65 await modify_progress(text="❌ytdlp下载失败", force_update=True, **kwargs)
66 return
67 if path.name.endswith(".final.m4a"): # remove final suffix
68 path.rename(path.with_name(path.name.replace(".final.m4a", ".m4a")))
69 path = path.with_name(path.name.replace(".final.m4a", ".m4a"))
70 await modify_progress(text="🎧音频提取已完成, 开始上传...", force_update=True, **kwargs)
71 minfo = await parse_media_info(path)
72 duration = minfo.get("duration", 0)
73
74 target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
75 reply_msg_id = kwargs.get("reply_msg_id", 0)
76 reply_parameters = get_reply_to(message.id, reply_msg_id)
77 if startswith_prefix(message.content, prefix=PREFIX.AUDIO):
78 await client.send_audio(to_int(target_chat), Path(path).as_posix(), duration=duration, reply_parameters=reply_parameters)
79 elif startswith_prefix(message.content, prefix=PREFIX.VOICE):
80 await client.send_voice(to_int(target_chat), Path(path).as_posix(), duration=duration, reply_parameters=reply_parameters)
81 path.unlink(missing_ok=True)
82 await modify_progress(del_status=True, **kwargs)