Commit a266fec

benny-dou <60535774+benny-dou@users.noreply.github.com>
2026-02-11 06:36:36
fix(asr): handle large audio files by processing chunks sequentially instead of in parallel to avoid OOM errors
1 parent 08edee7
Changed files (2)
src/asr/cloudflare.py
@@ -80,8 +80,8 @@ async def cloudflare_single_file(path_or_bytes: Path | bytes, model: str | None
                 }
                 for seg in glom(resp, "result.segments", default=[])
             ]
-            resp["raw_texts"] = " ".join(x["text"] for x in resp["segments"])
-            resp["texts"] = "\n".join(f"[{seconds_to_time(x['start'])}] {x['text'].lstrip()}" for x in resp["segments"])  # with timestamp
+            resp["raw_texts"] = " ".join(str(x["text"]) for x in resp["segments"])
+            resp["texts"] = "\n".join(f"[{seconds_to_time(float(x['start']))}] {str(x['text']).lstrip()}" for x in resp["segments"])  # with timestamp
             if resp.get("hx_error"):
                 resp["error"] = resp.pop("hx_error")
         except Exception as e:
@@ -136,12 +136,18 @@ async def cloudflare_file_chunks(
             tasks.append(audio_chunk_to_bytes(chunk, sr))
             offset_list.append(int(start / sr))
         bytes_list = await asyncio.gather(*tasks)  # convert chunks to bytes
-        tasks = []
-        for audio_bytes, offset_seconds in zip(bytes_list, offset_list, strict=True):
-            task = cloudflare_single_file(audio_bytes, model, offset_seconds=offset_seconds)
-            tasks.append(run_with_semaphore(task))
-        results = await asyncio.gather(*tasks)
-        results = [r for r in results if r.get("segments")]
+        # Process each chunk in parallel (DO NOT do this due to OOM for large audio files)
+        # tasks = []
+        # for audio_bytes, offset_seconds in zip(bytes_list, offset_list, strict=True):
+        #     task = cloudflare_single_file(audio_bytes, model, offset_seconds=offset_seconds)
+        #     tasks.append(run_with_semaphore(task))
+        # results = await asyncio.gather(*tasks)
+        # results = [r for r in results if r.get("segments")]
+        results = []
+        for audio_bytes, offset in zip(bytes_list, offset_list, strict=True):
+            res = await cloudflare_single_file(audio_bytes, model=model, offset_seconds=offset)
+            if res.get("segments"):
+                results.append(res)
         transcription = merge_transcripts(sorted(results, key=lambda x: x["segments"][0]["start"]))
     except Exception as e:
         logger.error(e)
src/asr/groq.py
@@ -82,8 +82,8 @@ async def groq_single_file(
         }
         for seg in resp.get("segments", [])
     ]
-    resp["raw_texts"] = " ".join(x["text"] for x in resp["segments"])
-    resp["texts"] = "\n".join(f"[{seconds_to_time(x['start'])}] {x['text'].lstrip()}" for x in resp["segments"])  # with timestamp
+    resp["raw_texts"] = " ".join(str(x["text"]) for x in resp["segments"])
+    resp["texts"] = "\n".join(f"[{seconds_to_time(float(x['start']))}] {str(x['text']).lstrip()}" for x in resp["segments"])  # with timestamp
     if resp.get("hx_error"):
         resp["error"] = resp.pop("hx_error")
     return resp
@@ -290,18 +290,32 @@ async def groq_file_chunks(
             tasks.append(audio_chunk_to_bytes(chunk, sr))
             offset_list.append(int(start / sr))
         bytes_list = await asyncio.gather(*tasks)  # convert chunks to bytes
-        tasks = [
-            groq_single_file(
+
+        # Process each chunk in parallel (DO NOT do this due to OOM for large audio files)
+        # tasks = [
+        #     groq_single_file(
+        #         audio_bytes,
+        #         start_seconds=offset,
+        #         model=model,
+        #         temperature=temperature,
+        #         language=language,
+        #     )
+        #     for audio_bytes, offset in zip(bytes_list, offset_list, strict=True)
+        # ]
+        # results = await asyncio.gather(*tasks)
+        # results = [r for r in results if r.get("segments")]
+        results = []
+        for audio_bytes, offset in zip(bytes_list, offset_list, strict=True):
+            res = await groq_single_file(
                 audio_bytes,
                 start_seconds=offset,
                 model=model,
                 temperature=temperature,
                 language=language,
             )
-            for audio_bytes, offset in zip(bytes_list, offset_list, strict=True)
-        ]
-        results = await asyncio.gather(*tasks)
-        results = [r for r in results if r.get("segments")]
+            if res.get("segments"):
+                results.append(res)
+
         transcription = merge_transcripts(sorted(results, key=lambda x: x["segments"][0]["start"]))
     except Exception as e:
         logger.error(e)