Commit ec0bdcd
Changed files (9)
src/tts/engines.py
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import random
+from collections import defaultdict
+
+ENGINES = [
+ # Gemini
+ {"name": "Achernar", "desc": "Soft", "engine": "gemini", "sex": "male"},
+ {"name": "Achird", "desc": "Friendly", "engine": "gemini", "sex": "female"},
+ {"name": "Algenib", "desc": "Gravelly", "engine": "gemini", "sex": "female"},
+ {"name": "Algieba", "desc": "Smooth", "engine": "gemini", "sex": "male"},
+ {"name": "Alnilam", "desc": "Firm", "engine": "gemini", "sex": "male"},
+ {"name": "Aoede", "desc": "Breezy", "engine": "gemini", "sex": "female"},
+ {"name": "Autonoe", "desc": "Bright", "engine": "gemini", "sex": "male"},
+ {"name": "Callirrhoe", "desc": "Easy-going", "engine": "gemini", "sex": "female"},
+ {"name": "Charon", "desc": "Informative", "engine": "gemini", "sex": "male"},
+ {"name": "Despina", "desc": "Smooth", "engine": "gemini", "sex": "female"},
+ {"name": "Enceladus", "desc": "Breathy", "engine": "gemini", "sex": "male"},
+ {"name": "Erinome", "desc": "Clear", "engine": "gemini", "sex": "female"},
+ {"name": "Fenrir", "desc": "Excitable", "engine": "gemini", "sex": "male"},
+ {"name": "Gacrux", "desc": "Mature", "engine": "gemini", "sex": "male"},
+ {"name": "Iapetus", "desc": "Clear", "engine": "gemini", "sex": "male"},
+ {"name": "Kore", "desc": "Firm", "engine": "gemini", "sex": "female"},
+ {"name": "Laomedeia", "desc": "Upbeat", "engine": "gemini", "sex": "female"},
+ {"name": "Leda", "desc": "Youthful", "engine": "gemini", "sex": "female"},
+ {"name": "Orus", "desc": "Firm", "engine": "gemini", "sex": "male"},
+ {"name": "Puck", "desc": "Upbeat", "engine": "gemini", "sex": "male"},
+ {"name": "Pulcherrima", "desc": "Forward", "engine": "gemini", "sex": "female"},
+ {"name": "Rasalgethi", "desc": "Informative", "engine": "gemini", "sex": "male"},
+ {"name": "Sadachbia", "desc": "Lively", "engine": "gemini", "sex": "male"},
+ {"name": "Sadaltager", "desc": "Knowledgeable", "engine": "gemini", "sex": "male"},
+ {"name": "Schedar", "desc": "Even", "engine": "gemini", "sex": "male"},
+ {"name": "Sulafat", "desc": "Warm", "engine": "gemini", "sex": "female"},
+ {"name": "Umbriel", "desc": "Easy-going", "engine": "gemini", "sex": "male"},
+ {"name": "Vindemiatrix", "desc": "Gentle", "engine": "gemini", "sex": "female"},
+ {"name": "Zephyr", "desc": "Bright", "engine": "gemini", "sex": "female"},
+ {"name": "Zubenelgenubi", "desc": "Casual", "engine": "gemini", "sex": "male"},
+ # Qwen
+ {"name": "Chelsie", "desc": "圆润、甜美", "engine": "qwen", "sex": "female"},
+ {"name": "Cherry", "desc": "元气少女", "engine": "qwen", "sex": "female"},
+ {"name": "Ethan", "desc": "年轻、清亮", "engine": "qwen", "sex": "male"},
+ {"name": "Serena", "desc": "甜美、活泼", "engine": "qwen", "sex": "female"},
+ {"name": "Dylan", "desc": "【方言】北京话", "engine": "qwen", "sex": "male"},
+ {"name": "Jada", "desc": "【方言】吴语", "engine": "qwen", "sex": "female"},
+ {"name": "Sunny", "desc": "【方言】四川话", "engine": "qwen", "sex": "female"},
+ # Sambert
+ {"name": "知楠", "desc": "通用场景, 广告男声", "engine": "sambert", "sex": "male"},
+ {"name": "知琪", "desc": "通用场景, 温柔女声", "engine": "sambert", "sex": "f emale"},
+ {"name": "知厨", "desc": "新闻播报, 舌尖男声", "engine": "sambert", "sex": "male"},
+ {"name": "知德", "desc": "新闻播报, 新闻男声", "engine": "sambert", "sex": "male"},
+ {"name": "知佳", "desc": "新闻播报, 标准女声", "engine": "sambert", "sex": "female"},
+ {"name": "知茹", "desc": "新闻播报, 新闻女声", "engine": "sambert", "sex": "female"},
+ {"name": "知倩", "desc": "配音解说, 资讯女声", "engine": "sambert", "sex": "female"},
+ {"name": "知祥", "desc": "配音解说, 磁性男声", "engine": "sambert", "sex": "male"},
+ {"name": "知薇", "desc": "产品简介, 萝莉女声", "engine": "sambert", "sex": "female"},
+ {"name": "知浩", "desc": "通用场景, 咨询男声", "engine": "sambert", "sex": "male"},
+ {"name": "知婧", "desc": "通用场景, 严厉女声", "engine": "sambert", "sex": "female"},
+ {"name": "知茗", "desc": "通用场景, 诙谐男声", "engine": "sambert", "sex": "male"},
+ {"name": "知墨", "desc": "通用场景, 情感男声", "engine": "sambert", "sex": "male"},
+ {"name": "知娜", "desc": "通用场景, 浙普女声", "engine": "sambert", "sex": "female"},
+ {"name": "知树", "desc": "通用场景, 资讯男声", "engine": "sambert", "sex": "male"},
+ {"name": "知莎", "desc": "通用场景, 知性女声", "engine": "sambert", "sex": "female"},
+ {"name": "知婷", "desc": "通用场景, 电台女声", "engine": "sambert", "sex": "female"},
+ {"name": "知笑", "desc": "通用场景, 资讯女声", "engine": "sambert", "sex": "female"},
+ {"name": "知雅", "desc": "通用场景, 严厉女声", "engine": "sambert", "sex": "female"},
+ {"name": "知晔", "desc": "通用场景, 青年男声", "engine": "sambert", "sex": "male"},
+ {"name": "知颖", "desc": "通用场景, 软萌童声", "engine": "sambert", "sex": "male"},
+ {"name": "知媛", "desc": "通用场景, 知心姐姐", "engine": "sambert", "sex": "female"},
+ {"name": "知悦", "desc": "客服, 温柔女声", "engine": "sambert", "sex": "female"},
+ {"name": "知柜", "desc": "阅读产品简介, 直播女声", "engine": "sambert", "sex": "female"},
+ {"name": "知硕", "desc": "数字人, 自然男声", "engine": "sambert", "sex": "male"},
+ {"name": "知妙", "desc": "产品简介、数字人、直播", "engine": "sambert", "sex": "female"},
+ {"name": "知猫", "desc": "产品简介、数字人、直播", "engine": "sambert", "sex": "female"},
+ {"name": "知伦", "desc": "配音解说, 悬疑解说", "engine": "sambert", "sex": "male"},
+ {"name": "知飞", "desc": "配音解说, 激昂解说", "engine": "sambert", "sex": "male"},
+ {"name": "知达", "desc": "新闻播报, 标准男声", "engine": "sambert", "sex": "male"},
+]
+
+LIMIT_FOR_MODEL = {
+ "Dylan": ["qwen-tts-latest", "qwen-tts-2025-05-22"],
+ "Jada": ["qwen-tts-latest", "qwen-tts-2025-05-22"],
+ "Sunny": ["qwen-tts-latest", "qwen-tts-2025-05-22"],
+ "知楠": ["sambert-zhinan-v1"],
+ "知琪": ["sambert-zhiqi-v1"],
+ "知厨": ["sambert-zhichu-v1"],
+ "知德": ["sambert-zhide-v1"],
+ "知佳": ["sambert-zhijia-v1"],
+ "知茹": ["sambert-zhiru-v1"],
+ "知倩": ["sambert-zhiqian-v1"],
+ "知祥": ["sambert-zhixiang-v1"],
+ "知薇": ["sambert-zhiwei-v1"],
+ "知浩": ["sambert-zhihao-v1"],
+ "知婧": ["sambert-zhijing-v1"],
+ "知茗": ["sambert-zhiming-v1"],
+ "知墨": ["sambert-zhimo-v1"],
+ "知娜": ["sambert-zhina-v1"],
+ "知树": ["sambert-zhishu-v1"],
+ "知莎": ["sambert-zhistella-v1"],
+ "知婷": ["sambert-zhiting-v1"],
+ "知笑": ["sambert-zhixiao-v1"],
+ "知雅": ["sambert-zhiya-v1"],
+ "知晔": ["sambert-zhiye-v1"],
+ "知颖": ["sambert-zhiying-v1"],
+ "知媛": ["sambert-zhiyuan-v1"],
+ "知悦": ["sambert-zhiyue-v1"],
+ "知柜": ["sambert-zhigui-v1"],
+ "知硕": ["sambert-zhishuo-v1"],
+ "知妙": ["sambert-zhimiao-emo-v1"],
+ "知猫": ["sambert-zhimao-v1"],
+ "知伦": ["sambert-zhilun-v1"],
+ "知飞": ["sambert-zhifei-v1"],
+ "知达": ["sambert-zhida-v1"],
+}
+
+
+def get_random_one(engine: str = "", sex: str = "") -> dict:
+ available = ENGINES
+ if engine:
+ available = [x for x in available if x["engine"] == engine]
+ if sex:
+ available = [x for x in available if x["sex"] == sex]
+ if not available:
+ return random.choice(ENGINES)
+ return random.choice(available)
+
+
+def sex_emoji(name: str) -> str:
+ info = next((x for x in ENGINES if x["name"].lower() == name.lower()), None)
+ if not info:
+ return "❓"
+ return "🚹" if info["sex"] == "male" else "🚺"
+
+
+def list_engines() -> str:
+ texts = "👤音色名称: 描述\n"
+ groupped_by_engine = defaultdict(list)
+ for x in ENGINES:
+ groupped_by_engine[x["engine"]].append(x)
+ for engine, item_list in groupped_by_engine.items():
+ texts += f"🏷️提供商: **{engine.capitalize()}**\n"
+ for x in sorted(item_list, key=lambda x: x["sex"]): # groupped by sex
+ texts += f"{sex_emoji(x['name'])} `{x['name'].capitalize()}`: {x['desc']}\n"
+
+ return texts
+
+
+def get_tts_config(texts: str) -> tuple[str, str, str, str]:
+ """Get TTS config from texts.
+
+ Examples:
+ >>> get_tts_config("@Cherry 你好")
+ ("Cherry", "qwen", "qwen-tts", "你好")
+
+ Args:
+ texts (str): Texts to parse.
+
+ Returns:
+ (voice_name, engine, model, texts)
+ """
+ # use gemini by default
+ engine = "gemini"
+ if not texts.startswith("@"):
+ return "", engine, "", texts
+ if texts.startswith(("@男", "@male")):
+ info = get_random_one(sex="male")
+ model = random.choice(LIMIT_FOR_MODEL.get(info["name"], [""]))
+ return info["name"], info["engine"], model, texts.removeprefix("@男").removeprefix("@male").lstrip()
+ if texts.startswith(("@女", "@female")):
+ info = get_random_one(sex="female")
+ model = random.choice(LIMIT_FOR_MODEL.get(info["name"], [""]))
+ return info["name"], info["engine"], model, texts.removeprefix("@女").removeprefix("@female").lstrip()
+ if texts.lower().startswith("@gemini"):
+ info = get_random_one(engine="gemini")
+ model = random.choice(LIMIT_FOR_MODEL.get(info["name"], [""]))
+ return info["name"], info["engine"], model, texts[7:].lstrip()
+ if texts.lower().startswith("@qwen"):
+ info = get_random_one(engine="qwen")
+ model = random.choice(LIMIT_FOR_MODEL.get(info["name"], [""]))
+ return info["name"], info["engine"], model, texts[5:].lstrip()
+ if texts.lower().startswith("@sambert"):
+ info = get_random_one(engine="sambert")
+ model = random.choice(LIMIT_FOR_MODEL.get(info["name"], [""]))
+ return info["name"], info["engine"], model, texts[8:].lstrip()
+
+ texts = texts.removeprefix("@").lstrip()
+
+ for x in ENGINES:
+ if texts.lower().startswith(x["name"].lower()):
+ model = random.choice(LIMIT_FOR_MODEL.get(x["name"], [""]))
+ return x["name"], x["engine"], model, texts.removeprefix(x["name"]).lstrip()
+ return "", engine, "", texts
src/tts/gemini.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import wave
+from pathlib import Path
+
+from glom import glom
+from google import genai
+from google.genai import types
+from google.genai.types import HttpOptions
+from loguru import logger
+from pyrogram.enums import ParseMode
+from pyrogram.types import Message
+
+from config import CAPTION_LENGTH, DOWNLOAD_DIR, GEMINI, TTS
+from llm.hooks import hook_gemini_httpoptions
+from messages.utils import blockquote, smart_split
+from multimedia import convert_to_audio
+from utils import markdown_to_text, rand_string, strings_list
+
+
+async def gemini_tts(message: Message, texts: str, model: str = "", voice_name: str = "") -> dict:
+ """Gemini TTS.
+
+ https://ai.google.dev/gemini-api/docs/speech-generation
+
+ Returns:
+ {"voice": str, "duration": int, "caption": str}
+ """
+ model = model or TTS.GEMINI_MODEL
+ voice_name = voice_name or TTS.GEMINI_VOICE
+ raw_texts = markdown_to_text(texts)
+ num_token = await count_token(raw_texts, model)
+ if num_token < TTS.GEMINI_INPUT_TOKEN_LIMIT:
+ return await gemini_tts_real(message, texts, model, voice_name, return_bytes=False)
+ # split
+ text_list = await smart_split(texts, chars_per_string=TTS.GEMINI_SPLIT_LENGTH, mode=ParseMode.DISABLED)
+ resp = await asyncio.gather(*[gemini_tts_real(message, text, model, voice_name) for text in text_list])
+ save_path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}.wav"
+ combined_data = b"".join([r["voice"] for r in resp])
+ save_wave_file(save_path, combined_data)
+ ogg = await convert_to_audio(save_path, ext="ogg", codec="libopus")
+ caption = f"🗣音色: {voice_name}\n🤖引擎: {model}\n{blockquote(texts[: CAPTION_LENGTH - 20])}"
+ return {"voice": ogg, "duration": calculate_duration(combined_data), "caption": caption}
+
+
+async def gemini_tts_real(message: Message, texts: str, model: str, voice_name: str, *, return_bytes: bool = True) -> dict:
+ """Gemini TTS.
+
+ Args:
+ return_bytes (bool, optional): If True, return audio bytes. Defaults to False.
+
+ Returns:
+ {"voice": str or bytes, "duration": int, "caption": str}
+ """
+ for api_key in strings_list(GEMINI.API_KEY, shuffle=True):
+ try:
+ logger.debug(f"TTS via {model}, proxy={GEMINI.PROXY}, voice: {voice_name}, texts: {texts}")
+ http_options = HttpOptions(base_url=GEMINI.BASE_URL, async_client_args={"proxy": GEMINI.PROXY})
+ http_options = hook_gemini_httpoptions(http_options, message)
+ app = genai.Client(api_key=api_key, http_options=http_options)
+ response = await app.aio.models.generate_content(
+ model=model,
+ contents=markdown_to_text(texts),
+ config=types.GenerateContentConfig(
+ response_modalities=["AUDIO"],
+ speech_config=types.SpeechConfig(
+ voice_config=types.VoiceConfig(
+ prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice_name),
+ ),
+ ),
+ ),
+ )
+ if data := glom(response, "candidates.0.content.parts.0.inline_data.data", default=None):
+ caption = f"🗣音色: {voice_name}\n🤖引擎: {model}\n{blockquote(texts[: CAPTION_LENGTH - 20])}"
+ if return_bytes:
+ return {"voice": data, "duration": calculate_duration(data), "caption": caption}
+ save_path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}.wav"
+ save_wave_file(save_path, data)
+ ogg = await convert_to_audio(save_path, ext="ogg", codec="libopus")
+ return {"voice": ogg, "duration": calculate_duration(data), "caption": caption}
+ except Exception as e:
+ logger.error(e)
+ return {}
+
+
+def save_wave_file(path: Path | str, pcm: bytes, channels: int = 1, rate: float = 24000, sample_width: int = 2):
+ """Save PCM data to a wave file."""
+ path = Path(path).as_posix()
+ with wave.open(path, "wb") as wf:
+ wf.setnchannels(channels)
+ wf.setsampwidth(sample_width)
+ wf.setframerate(rate)
+ wf.writeframes(pcm)
+
+
+def calculate_duration(pcm: bytes, channels: int = 1, rate: float = 24000, sample_width: int = 2) -> int:
+ # calculate total frames
+ bytes_per_frame = sample_width * channels
+ if bytes_per_frame == 0:
+ return 0
+ num_frames = len(pcm) / bytes_per_frame
+ return round(num_frames / rate) # duration seconds
+
+
+async def count_token(texts: str, model_id: str = "") -> int:
+ model = model_id or TTS.GEMINI_MODEL
+ http_options = HttpOptions(async_client_args={"proxy": GEMINI.PROXY})
+ app = genai.Client(api_key=strings_list(GEMINI.API_KEY, shuffle=True)[0], http_options=http_options)
+ response = await app.aio.models.count_tokens(model=model, contents=texts)
+ return response.total_tokens or 0
src/tts/qwen.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+from pathlib import Path
+
+import soundfile as sf
+from dashscope import get_tokenizer
+from glom import glom
+from loguru import logger
+from pyrogram.enums import ParseMode
+
+from config import CAPTION_LENGTH, DOWNLOAD_DIR, TTS
+from messages.utils import blockquote, smart_split
+from multimedia import convert_to_audio
+from networking import download_file, hx_req
+from utils import markdown_to_text, rand_string, strings_list
+
+
+async def qwen_tts(texts: str, model: str = "", voice_name: str = "") -> dict:
+ """Qwen TTS.
+
+ https://help.aliyun.com/zh/model-studio/qwen-tts
+
+ Returns:
+ {"voice": str, "duration": int, "caption": str}
+ """
+ model = model or strings_list(TTS.QWEN_MODEL, shuffle=True)[0]
+ voice_name = voice_name or TTS.QWEN_VOICE
+ raw_texts = markdown_to_text(texts)
+ num_token = count_token(raw_texts, model)
+ if num_token < TTS.QWEN_INPUT_TOKEN_LIMIT:
+ return await qwen_tts_real(texts, model, voice_name, convert_ogg=True)
+ # split
+ text_list = await smart_split(texts, chars_per_string=TTS.QWEN_SPLIT_LENGTH, mode=ParseMode.DISABLED)
+ resp = await asyncio.gather(*[qwen_tts_real(text, model, voice_name) for text in text_list])
+ save_path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}.wav"
+ merge_wav([r["voice"] for r in resp], save_path)
+ ogg = await convert_to_audio(save_path, ext="ogg", codec="libopus")
+ caption = f"🗣音色: {voice_name}\n🤖引擎: {model}\n{blockquote(texts[: CAPTION_LENGTH - 20])}"
+ return {"voice": ogg, "duration": sum([r["duration"] for r in resp]), "caption": caption}
+
+
+async def qwen_tts_real(texts: str, model: str, voice_name: str, *, convert_ogg: bool = False) -> dict:
+ """Qwen TTS.
+
+ Args:
+ return_bytes (bool, optional): If True, return audio bytes. Defaults to False.
+
+ Returns:
+ {"url": str, "duration": int, "caption": str}
+ """
+ for api_key in strings_list(TTS.ALI_API_KEY, shuffle=True):
+ try:
+ logger.debug(f"TTS via {model}, voice: {voice_name}, texts: {texts}")
+ response = await hx_req(
+ "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation",
+ "POST",
+ headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
+ json_data={"model": model, "input": {"text": markdown_to_text(texts), "voice": voice_name}},
+ proxy=TTS.ALI_PROXY,
+ check_keys=["output.audio.url", "usage.output_tokens"],
+ )
+ url = glom(response, "output.audio.url", default="")
+ save_path = await download_file(url, proxy=TTS.ALI_PROXY)
+ duration = glom(response, "usage.output_tokens", default=0) / 50 # 1s = 50 tokens
+ caption = f"🗣音色: {voice_name}\n🤖引擎: {model}\n{blockquote(texts[: CAPTION_LENGTH - 20])}"
+ if convert_ogg:
+ save_path = await convert_to_audio(save_path, ext="ogg", codec="libopus")
+ except Exception as e:
+ logger.error(e)
+ return {"voice": save_path, "duration": duration, "caption": caption}
+ return {}
+
+
+def merge_wav(wav_paths: list[str], save_path: str | Path):
+ """Merge wav files into single one."""
+ # detect sample rate, channels, subtype
+ with sf.SoundFile(wav_paths[0], "r") as f:
+ samplerate = f.samplerate
+ channels = f.channels
+ subtype = f.subtype
+
+ # write one by one
+ with sf.SoundFile(save_path, "w", samplerate=samplerate, channels=channels, subtype=subtype) as outfile:
+ for file_path in wav_paths:
+ with sf.SoundFile(file_path, "r") as infile:
+ if infile.samplerate != samplerate or infile.channels != channels:
+ logger.warning(f"{file_path}的参数不匹配")
+ continue
+ for block in infile.blocks(blocksize=1024):
+ outfile.write(block)
+
+
+def count_token(texts: str, model: str = "") -> int:
+ tokenizer = get_tokenizer(model)
+ tokens = tokenizer.encode(texts)
+ return len(tokens)
src/tts/sambert.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import random
+from pathlib import Path
+
+import anyio
+import soundfile as sf
+from dashscope.audio.tts import SpeechSynthesizer
+from glom import glom
+from loguru import logger
+from pyrogram.enums import ParseMode
+
+from config import CAPTION_LENGTH, DOWNLOAD_DIR, TTS
+from messages.utils import blockquote, smart_split
+from multimedia import convert_to_audio
+from tts.engines import LIMIT_FOR_MODEL, get_random_one
+from utils import markdown_to_text, rand_string, strings_list
+
+
+async def sambert_tts(texts: str, model: str = "", voice_name: str = "") -> dict:
+ """Sambert TTS.
+
+ https://help.aliyun.com/zh/model-studio/text-to-speech
+
+ Returns:
+ {"voice": str, "duration": int, "caption": str}
+ """
+ if not model:
+ config = get_random_one(engine="sambert")
+ voice_name = config["name"]
+ model = random.choice(LIMIT_FOR_MODEL.get(voice_name, ["知琪"]))
+
+ raw_texts = markdown_to_text(texts)
+ if len(raw_texts) < TTS.SAMBERT_LENGTH_LIMIT:
+ return await sambert_tts_real(texts, model, voice_name, convert_ogg=True)
+ # split
+ text_list = await smart_split(texts, chars_per_string=TTS.SAMBERT_LENGTH_LIMIT, mode=ParseMode.DISABLED)
+ resp = await asyncio.gather(*[sambert_tts_real(text, model, voice_name) for text in text_list])
+ save_path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}.wav"
+ merge_wav([r["voice"] for r in resp], save_path)
+ ogg = await convert_to_audio(save_path, ext="ogg", codec="libopus")
+ caption = f"🗣音色: {voice_name}\n🤖引擎: {model}\n{blockquote(texts[: CAPTION_LENGTH - 20])}"
+ return {"voice": ogg, "duration": sum([r["duration"] for r in resp]), "caption": caption}
+
+
+async def sambert_tts_real(texts: str, model: str, voice_name: str, *, convert_ogg: bool = False) -> dict:
+ """Sambert TTS.
+
+ Args:
+ return_bytes (bool, optional): If True, return audio bytes. Defaults to False.
+
+ Returns:
+ {"url": str, "duration": int, "caption": str}
+ """
+ for api_key in strings_list(TTS.ALI_API_KEY, shuffle=True):
+ try:
+ logger.debug(f"TTS via {model}, voice: {voice_name}, texts: {texts}")
+ response = await asyncio.to_thread(SpeechSynthesizer.call, model, markdown_to_text(texts), format="wav", word_timestamp_enabled=True, api_key=api_key)
+ if response.get_audio_data() is not None:
+ save_path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}.wav"
+ async with await anyio.open_file(save_path, "wb") as f:
+ await f.write(response.get_audio_data())
+ duration = 0
+ if timestamps := response.get_timestamps():
+ duration = glom(timestamps, "-1.end_time", default=0) / 1000
+ caption = f"🗣音色: {voice_name}\n🤖引擎: {model}\n{blockquote(texts[: CAPTION_LENGTH - 20])}"
+ if convert_ogg:
+ save_path = await convert_to_audio(save_path, ext="ogg", codec="libopus")
+ except Exception as e:
+ logger.error(e)
+ return {"voice": save_path, "duration": duration, "caption": caption}
+ return {}
+
+
+def merge_wav(wav_paths: list[str], save_path: str | Path):
+ """Merge wav files into single one."""
+ # detect sample rate, channels, subtype
+ with sf.SoundFile(wav_paths[0], "r") as f:
+ samplerate = f.samplerate
+ channels = f.channels
+ subtype = f.subtype
+
+ # write one by one
+ with sf.SoundFile(save_path, "w", samplerate=samplerate, channels=channels, subtype=subtype) as outfile:
+ for file_path in wav_paths:
+ with sf.SoundFile(file_path, "r") as infile:
+ if infile.samplerate != samplerate or infile.channels != channels:
+ logger.warning(f"{file_path}的参数不匹配")
+ continue
+ for block in infile.blocks(blocksize=1024):
+ outfile.write(block)
src/tts/tts.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from pathlib import Path
+
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import PREFIX, TTS
+from messages.parser import parse_msg
+from messages.sender import send2tg
+from messages.utils import blockquote, equal_prefix, set_reaction, startswith_prefix
+from tts.engines import get_tts_config, list_engines
+from tts.gemini import gemini_tts
+from tts.qwen import qwen_tts
+from tts.sambert import sambert_tts
+from utils import read_text
+
+HELP = f"""🗣**文字转语音**
+使用说明:
+1. `{PREFIX.TTS}` + 文字或txt文件
+2. `{PREFIX.TTS}` 回复 `文字消息` 或 `txt文件消息`
+3. `{PREFIX.TTS} @音色名` 可以指定音色, 默认音色: {TTS.GEMINI_VOICE}
+
+特殊用法:
+- `{PREFIX.TTS}` + @男 或 @male: 随机一款男声
+- `{PREFIX.TTS}` + @女 或 @female: 随机一款女声
+- `{PREFIX.TTS} @gemini`: 随机一款Gemini音色
+- `{PREFIX.TTS} @qwen`: 随机一款通义千问音色
+- `{PREFIX.TTS} @sambert`: 随机一款阿里Sambert音色
+{blockquote(list_engines())}
+"""
+
+
+async def text_to_speech(client: Client, message: Message, **kwargs):
+ info = parse_msg(message, silent=True)
+ if not startswith_prefix(info["text"], prefix=PREFIX.TTS):
+ return
+ # send docs if message == "/tts", without reply
+ if info["mtype"] == "text" and equal_prefix(message.text, prefix=PREFIX.TTS) and not message.reply_to_message:
+ await send2tg(client, message, texts=HELP, **kwargs)
+ return
+
+ voice_name, engine, model, texts = get_tts_config(info["text"].removeprefix(PREFIX.TTS).lstrip())
+ reaction_msg = message
+ if message.reply_to_message:
+ message = message.reply_to_message
+ info = parse_msg(message, silent=True, use_cache=False) # parse again
+
+ # file
+ if info["mtype"] == "document":
+ if info["mime_type"].startswith("text/") or Path(info["file_name"]).suffix.lower() in [".txt", ".md"]:
+ fpath: str = await client.download_media(message, in_memory=False) # type: ignore
+ texts = read_text(fpath).strip()
+ else:
+ await reaction_msg.reply(text="不支持该文件格式, 请以 `.txt` 格式发送", quote=True)
+ return
+ elif reaction_msg.id != info["mid"]:
+ texts = info["text"]
+ await set_reaction(client, reaction_msg, reaction="👌")
+ if engine == "gemini":
+ resp = await gemini_tts(message, texts, model, voice_name)
+ elif engine == "qwen":
+ resp = await qwen_tts(texts, model, voice_name)
+ elif engine == "sambert":
+ resp = await sambert_tts(texts, model, voice_name)
+
+ path = Path(resp.get("voice", ""))
+ if path.is_file():
+ resp["duration"] = round(resp["duration"])
+ await message.reply_voice(**resp, quote=True)
+ await set_reaction(client, reaction_msg, reaction="")
+ else:
+ await set_reaction(client, reaction_msg, reaction="💔")
+ path.unlink(missing_ok=True)
src/config.py
@@ -64,6 +64,7 @@ class ENABLE: # see fine-grained permission in `src/permission.py`
CACHE_PRICE_SYMBOLS = os.getenv("ENABLE_CACHE_PRICE_SYMBOLS", "0").lower() in ["1", "y", "yes", "t", "true", "on"]
QUERY_DANMU = os.getenv("ENABLE_QUERY_DANMU", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
FAVORITE = os.getenv("ENABLE_FAVORITE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+ TTS = os.getenv("ENABLE_TTS", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
class PREFIX:
@@ -87,6 +88,7 @@ class PREFIX:
DANMU = os.getenv("PREFIX_DANMU", "/danmu").lower()
FAYAN = os.getenv("PREFIX_FAYAN", "/fa").lower()
HISTORY = "/history, /hist"
+ TTS = os.getenv("PREFIX_TTS", "/tts").lower()
class API:
@@ -294,6 +296,22 @@ class FAVORITE:
TIDS_ALLOW_SAVE = os.getenv("FAVORITE_TIDS_ALLOW_SAVE", "") # comma separated telegram uids
+class TTS:
+ # TTS related
+ GEMINI_MODEL = os.getenv("TTS_GEMINI_MODEL", "gemini-2.5-flash-preview-tts")
+ GEMINI_INPUT_TOKEN_LIMIT = int(os.getenv("TTS_GEMINI_INPUT_TOKEN_LIMIT", "8192")) # token limit of the tts model
+ GEMINI_SPLIT_LENGTH = int(os.getenv("TTS_GEMINI_SPLIT_LENGTH", "8192")) # split token limit of the tts model
+ GEMINI_VOICE = os.getenv("TTS_GEMINI_VOICE", "Sulafat")
+ ALI_API_KEY = os.getenv("TTS_ALI_API_KEY", "") # comma separated keys for load balance. e.g. "key1,key2,key3"
+ ALI_PROXY = os.getenv("TTS_ALI_PROXY", None) # Banned oversea IP, need a back to China proxy
+ QWEN_MODEL = os.getenv("TTS_QWEN_MODEL", "qwen-tts,qwen-tts-latest") # comma separated keys for load balance.
+ QWEN_INPUT_TOKEN_LIMIT = int(os.getenv("TTS_QWEN_INPUT_TOKEN_LIMIT", "512")) # token limit of the tts model
+ QWEN_SPLIT_LENGTH = int(os.getenv("TTS_QWEN_SPLIT_LENGTH", "512")) # split token limit of the tts model
+ QWEN_VOICE = os.getenv("TTS_QWEN_VOICE", "Chelsie")
+ SAMBERT_MODEL = os.getenv("TTS_SAMBERT_MODEL", "ramdom") # comma separated models for load balance. use "random" to randomly choose a model
+ SAMBERT_LENGTH_LIMIT = int(os.getenv("TTS_SAMBERT_LENGTH_LIMIT", "20000")) # token limit of the tts model
+
+
class GPT:
"""This is for OpenAI compatible API.
src/handler.py
@@ -8,7 +8,7 @@ from pyrogram.types import Message
from asr.voice_recognition import voice_to_text
from bridge.ocr import send_to_ocr_bridge
-from config import ENABLE, PREFIX, PROXY
+from config import ENABLE, FAVORITE, PREFIX, PROXY
from danmu.entrypoint import query_danmu
from database.database import del_db
from history.query import query_chat_history
@@ -36,6 +36,7 @@ from preview.xiaohongshu import preview_xhs
from preview.ytdlp import ProxyError, preview_ytdlp
from price.entrypoint import get_asset_price
from subtitles.subtitle import get_subtitle
+from tts.tts import text_to_speech
from utils import to_int, true
@@ -56,6 +57,7 @@ async def handle_utilities(
price: bool = True,
subtitle: bool = True,
summary: bool = True,
+ tts: bool = True,
wget: bool = True,
ytb: bool = True,
raw_img: bool = True,
@@ -81,6 +83,7 @@ async def handle_utilities(
ytb (bool, optional): Enable YouTube Search. Defaults to True.
history (bool, optional): Enable History Search. Defaults to True.
subtitle (bool, optional): Enable YouTube subtitle. Defaults to True.
+ tts (bool, optional): Enable TTS. Defaults to True.
wget (bool, optional): Enable WGET. Defaults to True.
ocr (bool, optional): Enable OCR. Defaults to True.
price (bool, optional): Enable Asset price. Defaults to True.
@@ -119,6 +122,8 @@ async def handle_utilities(
if favorite:
await save_favorite(client, message, **kwargs) # /save
await send_favorite(client, message, **kwargs) # /fav
+ if tts:
+ await text_to_speech(client, message, **kwargs) # /tts
if raw_img:
await convert_raw_img_file(client, message, **kwargs)
@@ -184,6 +189,9 @@ async def handle_social_media(
PREFIX.VOICE,
PREFIX.WGET,
PREFIX.FAYAN,
+ PREFIX.TTS,
+ FAVORITE.SAVE_PREFIX,
+ FAVORITE.SEND_PREFIX,
]
info = parse_msg(message)
@@ -328,6 +336,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefix: str):
msg += f"\n📖**AI总结**: 发送 `{PREFIX.AI_SUMMARY}` 查看详细教程"
if permission["asr"]:
msg += f"\n🗣**语音转文字**: `{PREFIX.ASR}` + 语音消息"
+ if permission["tts"]:
+ msg += f"\n🗣**文字转语音**: `{PREFIX.TTS}` + 文字"
if permission["audio"]:
msg += f"\n🎧**提取音频或语音**: `{PREFIX.AUDIO}` `{PREFIX.VOICE}` + 视频/语音消息"
if permission["ocr"]:
src/permission.py
@@ -105,6 +105,7 @@ def check_service(cid: int | str, ctype: str) -> dict:
"ocr": True,
"price": True,
"raw_img": True,
+ "tts": True,
"ytb": True,
"google": True,
"show_progress": True,
@@ -171,6 +172,8 @@ def check_service(cid: int | str, ctype: str) -> dict:
permission["history"] = False
if not ENABLE.FAVORITE:
permission["favorite"] = False
+ if not ENABLE.TTS:
+ permission["tts"] = False
"""
Set specific service
src/utils.py
@@ -13,9 +13,11 @@ from typing import Any
from zoneinfo import ZoneInfo
import chardet
+import markdown
import puremagic
import zhconv
from bilibili_api.utils.aid_bvid_transformer import aid2bvid, bvid2aid
+from bs4 import BeautifulSoup
from bs4.element import PageElement
from glom import PathAccessError, glom
from loguru import logger
@@ -167,6 +169,14 @@ def soup_to_text(soup: PageElement) -> str:
return text
+def markdown_to_text(mkdown: str) -> str:
+ with contextlib.suppress(Exception):
+ html = markdown.markdown(mkdown).replace("\n", "<br>")
+ soup = BeautifulSoup(html, "html.parser")
+ return soup.get_text()
+ return mkdown
+
+
def number_to_emoji(num: int | str, default: str | None = None) -> str:
"""Convert a number to an emoji."""
num = str(num)