Add start/stop tones and rotate WAV sessions
Some checks failed
Build and Push EVS Bridge Image / docker (push) Has been cancelled

This commit is contained in:
Kai
2026-02-13 15:53:45 +01:00
parent 72c0fa19c6
commit bd3b73e387
5 changed files with 316 additions and 37 deletions

View File

@@ -4,9 +4,10 @@ import json
import logging
import os
import time
import wave
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional
from typing import Dict, List, Optional
import aiohttp
import paho.mqtt.client as mqtt
@@ -46,6 +47,10 @@ HA_WEBHOOK_URL = os.getenv("HA_WEBHOOK_URL", "").strip()
SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True)
SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "/data/sessions"))
PCM_SAMPLE_RATE = int(os.getenv("PCM_SAMPLE_RATE", "16000"))
MAX_SESSION_BYTES = int(os.getenv("MAX_SESSION_BYTES", "16000000"))
WAV_SEGMENT_MAX_BYTES = int(os.getenv("WAV_SEGMENT_MAX_BYTES", str(2 * 1024 * 1024)))
WAV_KEEP_FILES = int(os.getenv("WAV_KEEP_FILES", "10"))
WAV_HEADER_BYTES = 44
@dataclass
@@ -56,6 +61,12 @@ class DeviceSession:
ptt_active: bool = False
pcm_bytes: bytearray = field(default_factory=bytearray)
last_rx_ts: float = field(default_factory=time.time)
rx_bytes_total: int = 0
segment_index: int = 0
segment_pcm_bytes: int = 0
current_wav_path: Optional[str] = None
current_wav: Optional[wave.Wave_write] = None
saved_wavs: List[str] = field(default_factory=list)
class BridgeState:
@@ -89,12 +100,12 @@ state = BridgeState()
def build_metrics(device_id: str, session: DeviceSession) -> dict:
samples = len(session.pcm_bytes) // 2
samples = session.rx_bytes_total // 2
seconds = samples / float(PCM_SAMPLE_RATE)
return {
"device_id": device_id,
"ptt_active": session.ptt_active,
"rx_bytes": len(session.pcm_bytes),
"rx_bytes": session.rx_bytes_total,
"duration_s": round(seconds, 3),
"last_rx_ts": session.last_rx_ts,
}
@@ -113,26 +124,95 @@ async def call_ha_webhook(event: str, payload: dict) -> None:
log.exception("ha webhook call failed")
def save_session_wav(device_id: str, pcm: bytes) -> Optional[str]:
if not SAVE_SESSIONS or not pcm:
def enforce_wav_retention() -> None:
if not SAVE_SESSIONS:
return
try:
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
wavs = sorted(
[p for p in SESSIONS_DIR.glob("*.wav") if p.is_file()],
key=lambda p: p.stat().st_mtime,
)
while len(wavs) > WAV_KEEP_FILES:
oldest = wavs.pop(0)
try:
oldest.unlink()
log.info("deleted old wav: %s", oldest)
except Exception:
log.exception("failed to delete old wav: %s", oldest)
except Exception:
log.exception("failed to enforce wav retention")
def close_active_wav(session: DeviceSession) -> None:
if not session.current_wav:
return
try:
session.current_wav.close()
except Exception:
log.exception("failed closing wav for %s", session.device_id)
finally:
session.current_wav = None
session.current_wav_path = None
session.segment_pcm_bytes = 0
def open_new_wav_segment(session: DeviceSession) -> Optional[str]:
if not SAVE_SESSIONS:
return None
try:
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
ts = int(time.time())
path = SESSIONS_DIR / f"{device_id}_{ts}.wav"
import wave
with wave.open(str(path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(PCM_SAMPLE_RATE)
wf.writeframes(pcm)
ts_ms = int(time.time() * 1000)
name = f"{session.device_id}_{ts_ms}_part{session.segment_index:03d}.wav"
path = SESSIONS_DIR / name
wf = wave.open(str(path), "wb")
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(PCM_SAMPLE_RATE)
session.current_wav = wf
session.current_wav_path = str(path)
session.segment_pcm_bytes = 0
session.saved_wavs.append(str(path))
enforce_wav_retention()
session.segment_index += 1
return str(path)
except Exception:
log.exception("failed to save wav")
log.exception("failed to open wav segment for %s", session.device_id)
close_active_wav(session)
return None
def append_pcm_with_rotation(session: DeviceSession, data: bytes) -> None:
if not SAVE_SESSIONS or not data:
return
if WAV_SEGMENT_MAX_BYTES <= WAV_HEADER_BYTES:
log.warning("WAV_SEGMENT_MAX_BYTES too small, minimum is %s", WAV_HEADER_BYTES + 2)
return
max_pcm_per_file = WAV_SEGMENT_MAX_BYTES - WAV_HEADER_BYTES
offset = 0
total = len(data)
while offset < total:
if not session.current_wav:
opened = open_new_wav_segment(session)
if not opened:
return
free_bytes = max_pcm_per_file - session.segment_pcm_bytes
if free_bytes <= 0:
close_active_wav(session)
continue
chunk_len = min(free_bytes, total - offset)
chunk = data[offset : offset + chunk_len]
try:
session.current_wav.writeframesraw(chunk)
session.segment_pcm_bytes += chunk_len
except Exception:
log.exception("failed writing wav segment for %s", session.device_id)
close_active_wav(session)
return
offset += chunk_len
async def handle_text_message(device_id: str, session: DeviceSession, raw: str) -> None:
try:
msg = json.loads(raw)
@@ -144,33 +224,77 @@ async def handle_text_message(device_id: str, session: DeviceSession, raw: str)
if msg_type == "start":
session.ptt_active = True
session.pcm_bytes.clear()
session.rx_bytes_total = 0
session.saved_wavs.clear()
session.segment_index = 0
close_active_wav(session)
first_path = open_new_wav_segment(session)
payload = {"type": "start", "ts": time.time(), "device_id": device_id}
if first_path:
payload["wav_path"] = first_path
state.publish_status(device_id, payload)
await call_ha_webhook("start", payload)
log.info("start: device=%s wav=%s", device_id, first_path or "-")
return
if msg_type == "stop":
session.ptt_active = False
close_active_wav(session)
metrics = build_metrics(device_id, session)
wav_path = save_session_wav(device_id, bytes(session.pcm_bytes))
payload = {"type": "stop", "ts": time.time(), "device_id": device_id, **metrics}
if wav_path:
payload["wav_path"] = wav_path
if session.saved_wavs:
payload["wav_path"] = session.saved_wavs[-1]
payload["wav_paths"] = session.saved_wavs
state.publish_status(device_id, payload)
await call_ha_webhook("stop", payload)
log.info(
"stop: device=%s bytes=%s duration_s=%s wav_count=%s last_wav=%s",
device_id,
metrics.get("rx_bytes", 0),
metrics.get("duration_s", 0),
len(session.saved_wavs),
session.saved_wavs[-1] if session.saved_wavs else "-",
)
return
if msg_type == "ping":
await session.ws.send(json.dumps({"type": "pong", "ts": time.time()}))
return
if msg_type == "mic_level":
payload = {
"type": "mic_level",
"ts": time.time(),
"device_id": device_id,
"peak": msg.get("peak", 0),
"avg_abs": msg.get("avg_abs", 0),
"samples": msg.get("samples", 0),
}
state.publish_status(device_id, payload)
log.info(
"mic_level: device=%s peak=%s avg_abs=%s samples=%s",
device_id,
payload["peak"],
payload["avg_abs"],
payload["samples"],
)
return
log.info("text msg from %s: %s", device_id, msg)
async def handle_binary_message(device_id: str, session: DeviceSession, data: bytes) -> None:
session.last_rx_ts = time.time()
if session.ptt_active:
session.pcm_bytes.extend(data)
session.rx_bytes_total += len(data)
if SAVE_SESSIONS:
append_pcm_with_rotation(session, data)
else:
session.pcm_bytes.extend(data)
if len(session.pcm_bytes) > MAX_SESSION_BYTES:
# Keep newest data within cap to avoid unbounded memory growth.
drop = len(session.pcm_bytes) - MAX_SESSION_BYTES
del session.pcm_bytes[:drop]
if ECHO_ENABLED:
await session.ws.send(data)
@@ -211,6 +335,7 @@ async def ws_handler(ws: WebSocketServerProtocol, path: str) -> None:
except websockets.ConnectionClosed:
pass
finally:
close_active_wav(session)
if state.devices.get(device_id) is session:
del state.devices[device_id]
state.publish_status(device_id, {"type": "disconnected", "ts": time.time(), "device_id": device_id})