Buffer WAV segments in RAM and raise default segment size to 20MB
Some checks failed
Build and Push EVS Bridge Image / docker (push) Has been cancelled

This commit is contained in:
Kai
2026-02-13 16:27:46 +01:00
parent 2ce87e8cce
commit 9dc1ac3099
3 changed files with 36 additions and 57 deletions

View File

@@ -21,5 +21,5 @@ SAVE_SESSIONS=true
SESSIONS_DIR=/data/sessions SESSIONS_DIR=/data/sessions
PCM_SAMPLE_RATE=16000 PCM_SAMPLE_RATE=16000
MAX_SESSION_BYTES=16000000 MAX_SESSION_BYTES=16000000
WAV_SEGMENT_MAX_BYTES=2097152 WAV_SEGMENT_MAX_BYTES=20971520
WAV_KEEP_FILES=10 WAV_KEEP_FILES=10

View File

@@ -80,8 +80,10 @@ You can build automations on these events (for STT/TTS pipelines or Node-RED han
- Audio format: PCM16LE, mono, 16 kHz - Audio format: PCM16LE, mono, 16 kHz
- `SAVE_SESSIONS=true` stores `.wav` files in `bridge/data/sessions` - `SAVE_SESSIONS=true` stores `.wav` files in `bridge/data/sessions`
- Recording is written during `start`..`stop` and rotated automatically: - Recording is buffered in RAM during `start`..`stop` and rotated automatically:
- `WAV_SEGMENT_MAX_BYTES` max size per `.wav` file (default: `2097152` = 2 MB) - PCM data is collected in memory and written as one WAV file when the segment limit is reached
- this reduces write frequency on disk
- `WAV_SEGMENT_MAX_BYTES` max size per `.wav` file (default: `20971520` = 20 MB)
- `WAV_KEEP_FILES` max number of `.wav` files to keep (default: `10`) - `WAV_KEEP_FILES` max number of `.wav` files to keep (default: `10`)
- `MAX_SESSION_BYTES` is only used if session file saving is disabled - `MAX_SESSION_BYTES` is only used if session file saving is disabled
- MQTT is recommended for control/events, WebSocket for streaming audio - MQTT is recommended for control/events, WebSocket for streaming audio
@@ -131,7 +133,7 @@ services:
SAVE_SESSIONS: "true" SAVE_SESSIONS: "true"
SESSIONS_DIR: "/data/sessions" SESSIONS_DIR: "/data/sessions"
PCM_SAMPLE_RATE: "16000" PCM_SAMPLE_RATE: "16000"
WAV_SEGMENT_MAX_BYTES: "2097152" WAV_SEGMENT_MAX_BYTES: "20971520"
WAV_KEEP_FILES: "10" WAV_KEEP_FILES: "10"
volumes: volumes:
- evs_bridge_data:/data - evs_bridge_data:/data

View File

@@ -4,7 +4,6 @@ import json
import logging import logging
import os import os
import time import time
import wave
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional
@@ -48,7 +47,7 @@ SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True)
SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "/data/sessions")) SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "/data/sessions"))
PCM_SAMPLE_RATE = int(os.getenv("PCM_SAMPLE_RATE", "16000")) PCM_SAMPLE_RATE = int(os.getenv("PCM_SAMPLE_RATE", "16000"))
MAX_SESSION_BYTES = int(os.getenv("MAX_SESSION_BYTES", "16000000")) MAX_SESSION_BYTES = int(os.getenv("MAX_SESSION_BYTES", "16000000"))
WAV_SEGMENT_MAX_BYTES = int(os.getenv("WAV_SEGMENT_MAX_BYTES", str(2 * 1024 * 1024))) WAV_SEGMENT_MAX_BYTES = int(os.getenv("WAV_SEGMENT_MAX_BYTES", str(20 * 1024 * 1024)))
WAV_KEEP_FILES = int(os.getenv("WAV_KEEP_FILES", "10")) WAV_KEEP_FILES = int(os.getenv("WAV_KEEP_FILES", "10"))
WAV_HEADER_BYTES = 44 WAV_HEADER_BYTES = 44
@@ -63,9 +62,7 @@ class DeviceSession:
last_rx_ts: float = field(default_factory=time.time) last_rx_ts: float = field(default_factory=time.time)
rx_bytes_total: int = 0 rx_bytes_total: int = 0
segment_index: int = 0 segment_index: int = 0
segment_pcm_bytes: int = 0 segment_pcm_buffer: bytearray = field(default_factory=bytearray)
current_wav_path: Optional[str] = None
current_wav: Optional[wave.Wave_write] = None
saved_wavs: List[str] = field(default_factory=list) saved_wavs: List[str] = field(default_factory=list)
@@ -144,41 +141,28 @@ def enforce_wav_retention() -> None:
log.exception("failed to enforce wav retention") log.exception("failed to enforce wav retention")
def close_active_wav(session: DeviceSession) -> None: def write_wav_segment(session: DeviceSession, pcm: bytes) -> Optional[str]:
if not session.current_wav: if not SAVE_SESSIONS or not pcm:
return
try:
session.current_wav.close()
except Exception:
log.exception("failed closing wav for %s", session.device_id)
finally:
session.current_wav = None
session.current_wav_path = None
session.segment_pcm_bytes = 0
def open_new_wav_segment(session: DeviceSession) -> Optional[str]:
if not SAVE_SESSIONS:
return None return None
try: try:
SESSIONS_DIR.mkdir(parents=True, exist_ok=True) SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
ts_ms = int(time.time() * 1000) ts_ms = int(time.time() * 1000)
name = f"{session.device_id}_{ts_ms}_part{session.segment_index:03d}.wav" name = f"{session.device_id}_{ts_ms}_part{session.segment_index:03d}.wav"
path = SESSIONS_DIR / name path = SESSIONS_DIR / name
import wave
wf = wave.open(str(path), "wb") wf = wave.open(str(path), "wb")
wf.setnchannels(1) wf.setnchannels(1)
wf.setsampwidth(2) wf.setsampwidth(2)
wf.setframerate(PCM_SAMPLE_RATE) wf.setframerate(PCM_SAMPLE_RATE)
session.current_wav = wf wf.writeframes(pcm)
session.current_wav_path = str(path) wf.close()
session.segment_pcm_bytes = 0
session.saved_wavs.append(str(path)) session.saved_wavs.append(str(path))
enforce_wav_retention() enforce_wav_retention()
session.segment_index += 1 session.segment_index += 1
return str(path) return str(path)
except Exception: except Exception:
log.exception("failed to open wav segment for %s", session.device_id) log.exception("failed to write wav segment for %s", session.device_id)
close_active_wav(session)
return None return None
@@ -190,27 +174,21 @@ def append_pcm_with_rotation(session: DeviceSession, data: bytes) -> None:
return return
max_pcm_per_file = WAV_SEGMENT_MAX_BYTES - WAV_HEADER_BYTES max_pcm_per_file = WAV_SEGMENT_MAX_BYTES - WAV_HEADER_BYTES
offset = 0 session.segment_pcm_buffer.extend(data)
total = len(data) while len(session.segment_pcm_buffer) >= max_pcm_per_file:
while offset < total: chunk = bytes(session.segment_pcm_buffer[:max_pcm_per_file])
if not session.current_wav: del session.segment_pcm_buffer[:max_pcm_per_file]
opened = open_new_wav_segment(session) write_wav_segment(session, chunk)
if not opened:
def flush_pending_segment(session: DeviceSession) -> None:
if not SAVE_SESSIONS:
return return
free_bytes = max_pcm_per_file - session.segment_pcm_bytes if not session.segment_pcm_buffer:
if free_bytes <= 0:
close_active_wav(session)
continue
chunk_len = min(free_bytes, total - offset)
chunk = data[offset : offset + chunk_len]
try:
session.current_wav.writeframesraw(chunk)
session.segment_pcm_bytes += chunk_len
except Exception:
log.exception("failed writing wav segment for %s", session.device_id)
close_active_wav(session)
return return
offset += chunk_len chunk = bytes(session.segment_pcm_buffer)
session.segment_pcm_buffer.clear()
write_wav_segment(session, chunk)
async def handle_text_message(device_id: str, session: DeviceSession, raw: str) -> None: async def handle_text_message(device_id: str, session: DeviceSession, raw: str) -> None:
@@ -227,19 +205,16 @@ async def handle_text_message(device_id: str, session: DeviceSession, raw: str)
session.rx_bytes_total = 0 session.rx_bytes_total = 0
session.saved_wavs.clear() session.saved_wavs.clear()
session.segment_index = 0 session.segment_index = 0
close_active_wav(session) session.segment_pcm_buffer.clear()
first_path = open_new_wav_segment(session)
payload = {"type": "start", "ts": time.time(), "device_id": device_id} payload = {"type": "start", "ts": time.time(), "device_id": device_id}
if first_path:
payload["wav_path"] = first_path
state.publish_status(device_id, payload) state.publish_status(device_id, payload)
await call_ha_webhook("start", payload) await call_ha_webhook("start", payload)
log.info("start: device=%s wav=%s", device_id, first_path or "-") log.info("start: device=%s", device_id)
return return
if msg_type == "stop": if msg_type == "stop":
session.ptt_active = False session.ptt_active = False
close_active_wav(session) flush_pending_segment(session)
metrics = build_metrics(device_id, session) metrics = build_metrics(device_id, session)
payload = {"type": "stop", "ts": time.time(), "device_id": device_id, **metrics} payload = {"type": "stop", "ts": time.time(), "device_id": device_id, **metrics}
if session.saved_wavs: if session.saved_wavs:
@@ -335,7 +310,9 @@ async def ws_handler(ws: WebSocketServerProtocol, path: str) -> None:
except websockets.ConnectionClosed: except websockets.ConnectionClosed:
pass pass
finally: finally:
close_active_wav(session) if session.ptt_active:
flush_pending_segment(session)
session.ptt_active = False
if state.devices.get(device_id) is session: if state.devices.get(device_id) is session:
del state.devices[device_id] del state.devices[device_id]
state.publish_status(device_id, {"type": "disconnected", "ts": time.time(), "device_id": device_id}) state.publish_status(device_id, {"type": "disconnected", "ts": time.time(), "device_id": device_id})