main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import random
4from pathlib import Path
5
6import anyio
7from glom import flatten, glom
8from loguru import logger
9
10from asr.utils import convert_single_channel, downsampe_audio
11from config import ASR
12from networking import hx_req
13from utils import strings_list, zhcn
14
15
16async def deepgram_asr(path: str | Path) -> dict:
17 """Deepgram ASR.
18
19 https://developers.deepgram.com/docs/pre-recorded-audio
20 """
21 path = Path(path).expanduser().resolve()
22 if not path.is_file():
23 return {"texts": "", "error": "File not found."}
24 supported_ext = [".mp3", ".aac", ".flac", ".m4a", ".mp2", ".mp4", ".ogg", ".opus", ".oga", ".pcm", ".wav", ".webm"]
25 audio_path = path if path.suffix.lower() in supported_ext else await downsampe_audio(path, ext="wav", codec="pcm_s16le")
26 audio_path = await convert_single_channel(audio_path, ext="wav", codec="pcm_s16le")
27 api_keys = strings_list(ASR.DEEPGRAM_API, shuffle=True)
28 if not api_keys:
29 return {"error": "请配置DeepGram语音识别的API Key"}
30 headers = {"Authorization": f"Token {random.choice(api_keys)}"}
31 url = "https://api.deepgram.com/v1/listen"
32 params = {"model": "nova-3-general", "detect_language": True, "punctuate": True, "smart_format": True}
33 async with await anyio.open_file(path, "rb") as f:
34 res = await hx_req(
35 url,
36 method="POST",
37 headers=headers,
38 content_data=await f.read(),
39 params=params,
40 timeout=600,
41 check_keys=["results.channels.0.alternatives.0.words"],
42 )
43 if res.get("hx_error"):
44 return {"error": res["hx_error"]}
45 try:
46 start_seconds = flatten(glom(res, "results.channels.*.alternatives.0.words.*.start"))
47 sentences = flatten(glom(res, "results.channels.*.alternatives.0.words.*.punctuated_word"))
48 res = ""
49 indexs = list(range(len(sentences)))
50 for idx, start_time, sentence in zip(indexs, start_seconds, sentences, strict=True):
51 if not sentence:
52 continue
53 if idx == 0 or res.endswith((".", "。", "?", "?")): # noqa: RUF001
54 start_seconds = float(start_time)
55 minutes = int(start_seconds // 60)
56 seconds = int(start_seconds % 60)
57 res += f"\n[{minutes:02d}:{seconds:02d}] {sentence}"
58 else:
59 res += sentence
60 except Exception as e:
61 logger.error(e)
62 return {"error": str(e)}
63 return {"texts": zhcn(res.strip())}