From 9dc1ac3099e37aa7955cc334c15084ac8804fba7 Mon Sep 17 00:00:00 2001 From: Kai Date: Fri, 13 Feb 2026 16:27:46 +0100 Subject: [PATCH] Buffer WAV segments in RAM and raise default segment size to 20MB --- bridge/.env.example | 2 +- bridge/README.md | 8 +++-- bridge/app.py | 83 ++++++++++++++++----------------------------- 3 files changed, 36 insertions(+), 57 deletions(-) diff --git a/bridge/.env.example b/bridge/.env.example index e1ee79a..5bd68ca 100644 --- a/bridge/.env.example +++ b/bridge/.env.example @@ -21,5 +21,5 @@ SAVE_SESSIONS=true SESSIONS_DIR=/data/sessions PCM_SAMPLE_RATE=16000 MAX_SESSION_BYTES=16000000 -WAV_SEGMENT_MAX_BYTES=2097152 +WAV_SEGMENT_MAX_BYTES=20971520 WAV_KEEP_FILES=10 diff --git a/bridge/README.md b/bridge/README.md index 8534bc2..c094c14 100644 --- a/bridge/README.md +++ b/bridge/README.md @@ -80,8 +80,10 @@ You can build automations on these events (for STT/TTS pipelines or Node-RED han - Audio format: PCM16LE, mono, 16 kHz - `SAVE_SESSIONS=true` stores `.wav` files in `bridge/data/sessions` -- Recording is written during `start`..`stop` and rotated automatically: - - `WAV_SEGMENT_MAX_BYTES` max size per `.wav` file (default: `2097152` = 2 MB) +- Recording is buffered in RAM during `start`..`stop` and rotated automatically: + - PCM data is collected in memory and written as one WAV file when the segment limit is reached + - this reduces write frequency on disk + - `WAV_SEGMENT_MAX_BYTES` max size per `.wav` file (default: `20971520` = 20 MB) - `WAV_KEEP_FILES` max number of `.wav` files to keep (default: `10`) - `MAX_SESSION_BYTES` is only used if session file saving is disabled - MQTT is recommended for control/events, WebSocket for streaming audio @@ -131,7 +133,7 @@ services: SAVE_SESSIONS: "true" SESSIONS_DIR: "/data/sessions" PCM_SAMPLE_RATE: "16000" - WAV_SEGMENT_MAX_BYTES: "2097152" + WAV_SEGMENT_MAX_BYTES: "20971520" WAV_KEEP_FILES: "10" volumes: - evs_bridge_data:/data diff --git a/bridge/app.py b/bridge/app.py index 9a230b5..1461c60 100644 --- a/bridge/app.py +++ b/bridge/app.py @@ -4,7 +4,6 @@ import json import logging import os import time -import wave from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional @@ -48,7 +47,7 @@ SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True) SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "/data/sessions")) PCM_SAMPLE_RATE = int(os.getenv("PCM_SAMPLE_RATE", "16000")) MAX_SESSION_BYTES = int(os.getenv("MAX_SESSION_BYTES", "16000000")) -WAV_SEGMENT_MAX_BYTES = int(os.getenv("WAV_SEGMENT_MAX_BYTES", str(2 * 1024 * 1024))) +WAV_SEGMENT_MAX_BYTES = int(os.getenv("WAV_SEGMENT_MAX_BYTES", str(20 * 1024 * 1024))) WAV_KEEP_FILES = int(os.getenv("WAV_KEEP_FILES", "10")) WAV_HEADER_BYTES = 44 @@ -63,9 +62,7 @@ class DeviceSession: last_rx_ts: float = field(default_factory=time.time) rx_bytes_total: int = 0 segment_index: int = 0 - segment_pcm_bytes: int = 0 - current_wav_path: Optional[str] = None - current_wav: Optional[wave.Wave_write] = None + segment_pcm_buffer: bytearray = field(default_factory=bytearray) saved_wavs: List[str] = field(default_factory=list) @@ -144,41 +141,28 @@ def enforce_wav_retention() -> None: log.exception("failed to enforce wav retention") -def close_active_wav(session: DeviceSession) -> None: - if not session.current_wav: - return - try: - session.current_wav.close() - except Exception: - log.exception("failed closing wav for %s", session.device_id) - finally: - session.current_wav = None - session.current_wav_path = None - session.segment_pcm_bytes = 0 - - -def open_new_wav_segment(session: DeviceSession) -> Optional[str]: - if not SAVE_SESSIONS: +def write_wav_segment(session: DeviceSession, pcm: bytes) -> Optional[str]: + if not SAVE_SESSIONS or not pcm: return None try: SESSIONS_DIR.mkdir(parents=True, exist_ok=True) ts_ms = int(time.time() * 1000) name = f"{session.device_id}_{ts_ms}_part{session.segment_index:03d}.wav" path = SESSIONS_DIR / name + import wave + wf = wave.open(str(path), "wb") wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(PCM_SAMPLE_RATE) - session.current_wav = wf - session.current_wav_path = str(path) - session.segment_pcm_bytes = 0 + wf.writeframes(pcm) + wf.close() session.saved_wavs.append(str(path)) enforce_wav_retention() session.segment_index += 1 return str(path) except Exception: - log.exception("failed to open wav segment for %s", session.device_id) - close_active_wav(session) + log.exception("failed to write wav segment for %s", session.device_id) return None @@ -190,27 +174,21 @@ def append_pcm_with_rotation(session: DeviceSession, data: bytes) -> None: return max_pcm_per_file = WAV_SEGMENT_MAX_BYTES - WAV_HEADER_BYTES - offset = 0 - total = len(data) - while offset < total: - if not session.current_wav: - opened = open_new_wav_segment(session) - if not opened: - return - free_bytes = max_pcm_per_file - session.segment_pcm_bytes - if free_bytes <= 0: - close_active_wav(session) - continue - chunk_len = min(free_bytes, total - offset) - chunk = data[offset : offset + chunk_len] - try: - session.current_wav.writeframesraw(chunk) - session.segment_pcm_bytes += chunk_len - except Exception: - log.exception("failed writing wav segment for %s", session.device_id) - close_active_wav(session) - return - offset += chunk_len + session.segment_pcm_buffer.extend(data) + while len(session.segment_pcm_buffer) >= max_pcm_per_file: + chunk = bytes(session.segment_pcm_buffer[:max_pcm_per_file]) + del session.segment_pcm_buffer[:max_pcm_per_file] + write_wav_segment(session, chunk) + + +def flush_pending_segment(session: DeviceSession) -> None: + if not SAVE_SESSIONS: + return + if not session.segment_pcm_buffer: + return + chunk = bytes(session.segment_pcm_buffer) + session.segment_pcm_buffer.clear() + write_wav_segment(session, chunk) async def handle_text_message(device_id: str, session: DeviceSession, raw: str) -> None: @@ -227,19 +205,16 @@ async def handle_text_message(device_id: str, session: DeviceSession, raw: str) session.rx_bytes_total = 0 session.saved_wavs.clear() session.segment_index = 0 - close_active_wav(session) - first_path = open_new_wav_segment(session) + session.segment_pcm_buffer.clear() payload = {"type": "start", "ts": time.time(), "device_id": device_id} - if first_path: - payload["wav_path"] = first_path state.publish_status(device_id, payload) await call_ha_webhook("start", payload) - log.info("start: device=%s wav=%s", device_id, first_path or "-") + log.info("start: device=%s", device_id) return if msg_type == "stop": session.ptt_active = False - close_active_wav(session) + flush_pending_segment(session) metrics = build_metrics(device_id, session) payload = {"type": "stop", "ts": time.time(), "device_id": device_id, **metrics} if session.saved_wavs: @@ -335,7 +310,9 @@ async def ws_handler(ws: WebSocketServerProtocol, path: str) -> None: except websockets.ConnectionClosed: pass finally: - close_active_wav(session) + if session.ptt_active: + flush_pending_segment(session) + session.ptt_active = False if state.devices.get(device_id) is session: del state.devices[device_id] state.publish_status(device_id, {"type": "disconnected", "ts": time.time(), "device_id": device_id})