From bd3b73e387ec184b1dc9a391c5401dfd3e930df8 Mon Sep 17 00:00:00 2001 From: Kai Date: Fri, 13 Feb 2026 15:53:45 +0100 Subject: [PATCH] Add start/stop tones and rotate WAV sessions --- bridge/.env.example | 3 + bridge/README.md | 6 ++ bridge/app.py | 163 ++++++++++++++++++++++++++++++++----- include/secrets.example.h | 14 ++++ src/main.cpp | 167 ++++++++++++++++++++++++++++++++++---- 5 files changed, 316 insertions(+), 37 deletions(-) diff --git a/bridge/.env.example b/bridge/.env.example index 195b5a6..e1ee79a 100644 --- a/bridge/.env.example +++ b/bridge/.env.example @@ -20,3 +20,6 @@ HA_WEBHOOK_URL= SAVE_SESSIONS=true SESSIONS_DIR=/data/sessions PCM_SAMPLE_RATE=16000 +MAX_SESSION_BYTES=16000000 +WAV_SEGMENT_MAX_BYTES=2097152 +WAV_KEEP_FILES=10 diff --git a/bridge/README.md b/bridge/README.md index da40e6f..8534bc2 100644 --- a/bridge/README.md +++ b/bridge/README.md @@ -80,6 +80,10 @@ You can build automations on these events (for STT/TTS pipelines or Node-RED han - Audio format: PCM16LE, mono, 16 kHz - `SAVE_SESSIONS=true` stores `.wav` files in `bridge/data/sessions` +- Recording is written during `start`..`stop` and rotated automatically: + - `WAV_SEGMENT_MAX_BYTES` max size per `.wav` file (default: `2097152` = 2 MB) + - `WAV_KEEP_FILES` max number of `.wav` files to keep (default: `10`) +- `MAX_SESSION_BYTES` is only used if session file saving is disabled - MQTT is recommended for control/events, WebSocket for streaming audio ## 7) Build and push to Gitea registry @@ -127,6 +131,8 @@ services: SAVE_SESSIONS: "true" SESSIONS_DIR: "/data/sessions" PCM_SAMPLE_RATE: "16000" + WAV_SEGMENT_MAX_BYTES: "2097152" + WAV_KEEP_FILES: "10" volumes: - evs_bridge_data:/data diff --git a/bridge/app.py b/bridge/app.py index cb2834b..9a230b5 100644 --- a/bridge/app.py +++ b/bridge/app.py @@ -4,9 +4,10 @@ import json import logging import os import time +import wave from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Optional +from typing import Dict, List, Optional import aiohttp import paho.mqtt.client as mqtt @@ -46,6 +47,10 @@ HA_WEBHOOK_URL = os.getenv("HA_WEBHOOK_URL", "").strip() SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True) SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "/data/sessions")) PCM_SAMPLE_RATE = int(os.getenv("PCM_SAMPLE_RATE", "16000")) +MAX_SESSION_BYTES = int(os.getenv("MAX_SESSION_BYTES", "16000000")) +WAV_SEGMENT_MAX_BYTES = int(os.getenv("WAV_SEGMENT_MAX_BYTES", str(2 * 1024 * 1024))) +WAV_KEEP_FILES = int(os.getenv("WAV_KEEP_FILES", "10")) +WAV_HEADER_BYTES = 44 @dataclass @@ -56,6 +61,12 @@ class DeviceSession: ptt_active: bool = False pcm_bytes: bytearray = field(default_factory=bytearray) last_rx_ts: float = field(default_factory=time.time) + rx_bytes_total: int = 0 + segment_index: int = 0 + segment_pcm_bytes: int = 0 + current_wav_path: Optional[str] = None + current_wav: Optional[wave.Wave_write] = None + saved_wavs: List[str] = field(default_factory=list) class BridgeState: @@ -89,12 +100,12 @@ state = BridgeState() def build_metrics(device_id: str, session: DeviceSession) -> dict: - samples = len(session.pcm_bytes) // 2 + samples = session.rx_bytes_total // 2 seconds = samples / float(PCM_SAMPLE_RATE) return { "device_id": device_id, "ptt_active": session.ptt_active, - "rx_bytes": len(session.pcm_bytes), + "rx_bytes": session.rx_bytes_total, "duration_s": round(seconds, 3), "last_rx_ts": session.last_rx_ts, } @@ -113,26 +124,95 @@ async def call_ha_webhook(event: str, payload: dict) -> None: log.exception("ha webhook call failed") -def save_session_wav(device_id: str, pcm: bytes) -> Optional[str]: - if not SAVE_SESSIONS or not pcm: +def enforce_wav_retention() -> None: + if not SAVE_SESSIONS: + return + try: + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + wavs = sorted( + [p for p in SESSIONS_DIR.glob("*.wav") if p.is_file()], + key=lambda p: p.stat().st_mtime, + ) + while len(wavs) > WAV_KEEP_FILES: + oldest = wavs.pop(0) + try: + oldest.unlink() + log.info("deleted old wav: %s", oldest) + except Exception: + log.exception("failed to delete old wav: %s", oldest) + except Exception: + log.exception("failed to enforce wav retention") + + +def close_active_wav(session: DeviceSession) -> None: + if not session.current_wav: + return + try: + session.current_wav.close() + except Exception: + log.exception("failed closing wav for %s", session.device_id) + finally: + session.current_wav = None + session.current_wav_path = None + session.segment_pcm_bytes = 0 + + +def open_new_wav_segment(session: DeviceSession) -> Optional[str]: + if not SAVE_SESSIONS: return None try: SESSIONS_DIR.mkdir(parents=True, exist_ok=True) - ts = int(time.time()) - path = SESSIONS_DIR / f"{device_id}_{ts}.wav" - import wave - - with wave.open(str(path), "wb") as wf: - wf.setnchannels(1) - wf.setsampwidth(2) - wf.setframerate(PCM_SAMPLE_RATE) - wf.writeframes(pcm) + ts_ms = int(time.time() * 1000) + name = f"{session.device_id}_{ts_ms}_part{session.segment_index:03d}.wav" + path = SESSIONS_DIR / name + wf = wave.open(str(path), "wb") + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(PCM_SAMPLE_RATE) + session.current_wav = wf + session.current_wav_path = str(path) + session.segment_pcm_bytes = 0 + session.saved_wavs.append(str(path)) + enforce_wav_retention() + session.segment_index += 1 return str(path) except Exception: - log.exception("failed to save wav") + log.exception("failed to open wav segment for %s", session.device_id) + close_active_wav(session) return None +def append_pcm_with_rotation(session: DeviceSession, data: bytes) -> None: + if not SAVE_SESSIONS or not data: + return + if WAV_SEGMENT_MAX_BYTES <= WAV_HEADER_BYTES: + log.warning("WAV_SEGMENT_MAX_BYTES too small, minimum is %s", WAV_HEADER_BYTES + 2) + return + + max_pcm_per_file = WAV_SEGMENT_MAX_BYTES - WAV_HEADER_BYTES + offset = 0 + total = len(data) + while offset < total: + if not session.current_wav: + opened = open_new_wav_segment(session) + if not opened: + return + free_bytes = max_pcm_per_file - session.segment_pcm_bytes + if free_bytes <= 0: + close_active_wav(session) + continue + chunk_len = min(free_bytes, total - offset) + chunk = data[offset : offset + chunk_len] + try: + session.current_wav.writeframesraw(chunk) + session.segment_pcm_bytes += chunk_len + except Exception: + log.exception("failed writing wav segment for %s", session.device_id) + close_active_wav(session) + return + offset += chunk_len + + async def handle_text_message(device_id: str, session: DeviceSession, raw: str) -> None: try: msg = json.loads(raw) @@ -144,33 +224,77 @@ async def handle_text_message(device_id: str, session: DeviceSession, raw: str) if msg_type == "start": session.ptt_active = True session.pcm_bytes.clear() + session.rx_bytes_total = 0 + session.saved_wavs.clear() + session.segment_index = 0 + close_active_wav(session) + first_path = open_new_wav_segment(session) payload = {"type": "start", "ts": time.time(), "device_id": device_id} + if first_path: + payload["wav_path"] = first_path state.publish_status(device_id, payload) await call_ha_webhook("start", payload) + log.info("start: device=%s wav=%s", device_id, first_path or "-") return if msg_type == "stop": session.ptt_active = False + close_active_wav(session) metrics = build_metrics(device_id, session) - wav_path = save_session_wav(device_id, bytes(session.pcm_bytes)) payload = {"type": "stop", "ts": time.time(), "device_id": device_id, **metrics} - if wav_path: - payload["wav_path"] = wav_path + if session.saved_wavs: + payload["wav_path"] = session.saved_wavs[-1] + payload["wav_paths"] = session.saved_wavs state.publish_status(device_id, payload) await call_ha_webhook("stop", payload) + log.info( + "stop: device=%s bytes=%s duration_s=%s wav_count=%s last_wav=%s", + device_id, + metrics.get("rx_bytes", 0), + metrics.get("duration_s", 0), + len(session.saved_wavs), + session.saved_wavs[-1] if session.saved_wavs else "-", + ) return if msg_type == "ping": await session.ws.send(json.dumps({"type": "pong", "ts": time.time()})) return + if msg_type == "mic_level": + payload = { + "type": "mic_level", + "ts": time.time(), + "device_id": device_id, + "peak": msg.get("peak", 0), + "avg_abs": msg.get("avg_abs", 0), + "samples": msg.get("samples", 0), + } + state.publish_status(device_id, payload) + log.info( + "mic_level: device=%s peak=%s avg_abs=%s samples=%s", + device_id, + payload["peak"], + payload["avg_abs"], + payload["samples"], + ) + return + log.info("text msg from %s: %s", device_id, msg) async def handle_binary_message(device_id: str, session: DeviceSession, data: bytes) -> None: session.last_rx_ts = time.time() if session.ptt_active: - session.pcm_bytes.extend(data) + session.rx_bytes_total += len(data) + if SAVE_SESSIONS: + append_pcm_with_rotation(session, data) + else: + session.pcm_bytes.extend(data) + if len(session.pcm_bytes) > MAX_SESSION_BYTES: + # Keep newest data within cap to avoid unbounded memory growth. + drop = len(session.pcm_bytes) - MAX_SESSION_BYTES + del session.pcm_bytes[:drop] if ECHO_ENABLED: await session.ws.send(data) @@ -211,6 +335,7 @@ async def ws_handler(ws: WebSocketServerProtocol, path: str) -> None: except websockets.ConnectionClosed: pass finally: + close_active_wav(session) if state.devices.get(device_id) is session: del state.devices[device_id] state.publish_status(device_id, {"type": "disconnected", "ts": time.time(), "device_id": device_id}) diff --git a/include/secrets.example.h b/include/secrets.example.h index 03c3ed9..23c5f5f 100644 --- a/include/secrets.example.h +++ b/include/secrets.example.h @@ -17,5 +17,19 @@ static const char* EVS_DEVICE_ID = "esp32-room-name"; // Connectivity behavior static constexpr uint32_t EVS_RECONNECT_MS = 5000; +static constexpr bool EVS_DEFAULT_STREAM_MODE = true; +static constexpr bool EVS_SERIAL_COMMAND_ECHO = true; + +// INMP441 tuning +// L/R pin on mic: +// - GND usually = left channel +// - 3V3 usually = right channel +static constexpr bool EVS_MIC_USE_RIGHT_CHANNEL = false; +// 24-bit sample to 16-bit conversion shift. Start with 8. +static constexpr int EVS_MIC_S24_TO_S16_SHIFT = 8; +// Digital mic gain (avoid clipping). Typical 0.2 .. 0.8. +static constexpr float EVS_MIC_GAIN = 0.35f; +// Extra gain only for local loopback monitor path. +static constexpr float EVS_LOOPBACK_MONITOR_GAIN = 0.35f; #endif // EVS_SECRETS_H diff --git a/src/main.cpp b/src/main.cpp index 6ceb124..b699f54 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,10 +1,18 @@ #include #include #include +#include #include "driver/i2s.h" #include "secrets.h" using namespace websockets; +static constexpr bool kDefaultStreamMode = EVS_DEFAULT_STREAM_MODE; +static constexpr bool kSerialCommandEcho = EVS_SERIAL_COMMAND_ECHO; +static constexpr bool kMicUseRightChannel = EVS_MIC_USE_RIGHT_CHANNEL; +static constexpr int kMicS24ToS16Shift = EVS_MIC_S24_TO_S16_SHIFT; +static constexpr float kMicGain = EVS_MIC_GAIN; +static constexpr float kLoopbackMonitorGain = EVS_LOOPBACK_MONITOR_GAIN; + // --------------------------- // Project config // --------------------------- @@ -24,6 +32,8 @@ static constexpr int PWM_CHANNEL = 0; static constexpr uint32_t PWM_FREQ = 22050; static constexpr uint8_t PWM_RES_BITS = 8; static constexpr uint32_t SPEAKER_SAMPLE_RATE = 16000; +static constexpr uint32_t MIC_TELEMETRY_INTERVAL_MS = 1000; +static constexpr float PI_F = 3.14159265358979323846f; // WiFi / WebSocket @@ -40,6 +50,7 @@ static bool g_wsConnected = false; static uint32_t g_lastConnectTryMs = 0; static uint32_t g_nextOutUs = 0; static bool g_streamingActive = false; +static uint32_t g_lastMicTelemetryMs = 0; static constexpr size_t RX_SAMPLES_CAP = 16000; static int16_t g_rxSamples[RX_SAMPLES_CAP]; @@ -52,7 +63,7 @@ static bool initMicI2s() { .mode = (i2s_mode_t)(I2S_MODE_MASTER | I2S_MODE_RX), .sample_rate = MIC_SAMPLE_RATE, .bits_per_sample = MIC_BITS, - .channel_format = I2S_CHANNEL_FMT_ONLY_LEFT, + .channel_format = kMicUseRightChannel ? I2S_CHANNEL_FMT_ONLY_RIGHT : I2S_CHANNEL_FMT_ONLY_LEFT, .communication_format = I2S_COMM_FORMAT_STAND_I2S, .intr_alloc_flags = 0, .dma_buf_count = 8, @@ -129,6 +140,49 @@ static void enqueuePcmFrame(const int16_t* frame, size_t count) { } } +static void enqueueTone(uint16_t freqHz, uint16_t durationMs, int16_t amplitude) { + if (freqHz == 0 || durationMs == 0) { + return; + } + const uint32_t sampleCount = (uint32_t)SPEAKER_SAMPLE_RATE * durationMs / 1000U; + const float phaseStep = 2.0f * PI_F * (float)freqHz / (float)SPEAKER_SAMPLE_RATE; + float phase = 0.0f; + for (uint32_t i = 0; i < sampleCount; ++i) { + const float s = sinf(phase); + const int16_t sample = (int16_t)(s * (float)amplitude); + if (!enqueuePcmSample(sample)) { + return; + } + phase += phaseStep; + if (phase > 2.0f * PI_F) { + phase -= 2.0f * PI_F; + } + } +} + +static void enqueueSilenceMs(uint16_t durationMs) { + const uint32_t sampleCount = (uint32_t)SPEAKER_SAMPLE_RATE * durationMs / 1000U; + for (uint32_t i = 0; i < sampleCount; ++i) { + if (!enqueuePcmSample(0)) { + return; + } + } +} + +static void playStartTone() { + // Ascending double beep: A5 -> C6. + enqueueTone(880, 90, 5000); + enqueueSilenceMs(35); + enqueueTone(1047, 110, 6000); +} + +static void playStopTone() { + // Descending double beep: C6 -> A5. + enqueueTone(1047, 90, 5000); + enqueueSilenceMs(35); + enqueueTone(880, 110, 6000); +} + static void onWsMessageCallback(WebsocketsMessage message) { if (!message.isBinary()) { return; @@ -157,9 +211,13 @@ static void onWsEventCallback(WebsocketsEvent event, String) { if (g_mode == DeviceMode::StreamToServer && !g_streamingActive) { g_ws.send("{\"type\":\"start\"}"); g_streamingActive = true; + playStartTone(); } Serial.println("WS connected"); } else if (event == WebsocketsEvent::ConnectionClosed) { + if (g_streamingActive) { + playStopTone(); + } g_wsConnected = false; g_streamingActive = false; Serial.println("WS disconnected"); @@ -206,11 +264,13 @@ static void setMode(DeviceMode mode) { if (g_mode == DeviceMode::StreamToServer && g_wsConnected && g_streamingActive) { g_ws.send("{\"type\":\"stop\"}"); g_streamingActive = false; + playStopTone(); } g_mode = mode; if (g_mode == DeviceMode::StreamToServer && g_wsConnected && !g_streamingActive) { g_ws.send("{\"type\":\"start\"}"); g_streamingActive = true; + playStartTone(); } } @@ -222,19 +282,71 @@ static void handleFrameForServer(const int16_t* frame, size_t count) { g_ws.sendBinary(reinterpret_cast(frame), count * sizeof(int16_t)); } -static void serviceSpeaker() { - const uint32_t periodUs = 1000000UL / SPEAKER_SAMPLE_RATE; - const uint32_t now = micros(); - if ((int32_t)(now - g_nextOutUs) < 0) { +static void publishMicTelemetryIfDue(const int16_t* frame, size_t count) { + if (count == 0) { return; } - g_nextOutUs += periodUs; - int16_t s = 0; - if (dequeuePcmSample(s)) { - pwmWrite(pcm16ToPwm8(s)); - } else { - pwmWrite(128); + int16_t peak = 0; + int64_t sum_abs = 0; + for (size_t i = 0; i < count; ++i) { + int16_t v = frame[i]; + int16_t av = (v < 0) ? (int16_t)(-v) : v; + if (av > peak) peak = av; + sum_abs += av; + } + const uint32_t avg_abs = (uint32_t)(sum_abs / (int64_t)count); + + const uint32_t now = millis(); + if ((now - g_lastMicTelemetryMs) < MIC_TELEMETRY_INTERVAL_MS) { + return; + } + g_lastMicTelemetryMs = now; + + Serial.print("MIC peak="); + Serial.print(peak); + Serial.print(" avg_abs="); + Serial.print(avg_abs); + Serial.print(" ws="); + Serial.println(g_wsConnected ? "connected" : "disconnected"); + + if (g_wsConnected) { + String msg = "{\"type\":\"mic_level\",\"peak\":"; + msg += String(peak); + msg += ",\"avg_abs\":"; + msg += String(avg_abs); + msg += ",\"samples\":"; + msg += String((unsigned)count); + msg += "}"; + g_ws.send(msg); + } +} + +static inline int16_t convertMicSampleToPcm16(int32_t raw32) { + // INMP441 uses 24-bit signed samples packed into 32-bit I2S slots. + int32_t s24 = raw32 >> 8; + int32_t s16 = s24 >> kMicS24ToS16Shift; + float scaled = (float)s16 * kMicGain; + if (scaled > 32767.0f) scaled = 32767.0f; + if (scaled < -32768.0f) scaled = -32768.0f; + return (int16_t)scaled; +} + +static void serviceSpeaker() { + const uint32_t periodUs = 1000000UL / SPEAKER_SAMPLE_RATE; + uint32_t now = micros(); + int processed = 0; + // Catch up if we lagged behind, but cap work per loop iteration. + while ((int32_t)(now - g_nextOutUs) >= 0 && processed < 8) { + g_nextOutUs += periodUs; + int16_t s = 0; + if (dequeuePcmSample(s)) { + pwmWrite(pcm16ToPwm8(s)); + } else { + pwmWrite(128); + } + now = micros(); + ++processed; } } @@ -242,15 +354,21 @@ static void printHelp() { Serial.println(); Serial.println("Commands:"); Serial.println(" i = idle"); - Serial.println(" s = stream mode (stub)"); + Serial.println(" s = stream mode"); Serial.println(" l = local loopback mode"); Serial.println(" p = print network status"); Serial.println(" h = help"); + Serial.print("Default on boot: "); + Serial.println(kDefaultStreamMode ? "StreamToServer" : "LocalLoopback"); } static void handleSerialCommands() { while (Serial.available()) { const char c = (char)Serial.read(); + if (kSerialCommandEcho && c != '\r' && c != '\n') { + Serial.print("RX cmd: "); + Serial.println(c); + } if (c == 'i') { setMode(DeviceMode::Idle); Serial.println("Mode -> Idle"); @@ -269,6 +387,8 @@ static void handleSerialCommands() { Serial.println(g_wsConnected ? "connected" : "disconnected"); } else if (c == 'h') { printHelp(); + } else if (c != '\r' && c != '\n') { + Serial.println("Unknown command"); } } } @@ -291,7 +411,11 @@ void setup() { g_ws.onEvent(onWsEventCallback); g_nextOutUs = micros(); - setMode(DeviceMode::LocalLoopback); + if (kDefaultStreamMode) { + setMode(DeviceMode::StreamToServer); + } else { + setMode(DeviceMode::LocalLoopback); + } Serial.println("Audio init ok"); Serial.println("Set local environment values in include/secrets.h"); printHelp(); @@ -299,8 +423,10 @@ void setup() { void loop() { handleSerialCommands(); - ensureConnectivity(); - g_ws.poll(); + if (g_mode != DeviceMode::LocalLoopback) { + ensureConnectivity(); + g_ws.poll(); + } serviceSpeaker(); size_t bytesRead = 0; @@ -313,14 +439,19 @@ void loop() { const size_t sampleCount = bytesRead / sizeof(int32_t); static int16_t pcm16[MIC_FRAME_SAMPLES]; for (size_t i = 0; i < sampleCount; ++i) { - // INMP441 delivers meaningful data in the high bits for 32-bit slot formats. - pcm16[i] = (int16_t)(g_micBuffer[i] >> 14); + pcm16[i] = convertMicSampleToPcm16(g_micBuffer[i]); } if (g_mode == DeviceMode::StreamToServer) { handleFrameForServer(pcm16, sampleCount); + publishMicTelemetryIfDue(pcm16, sampleCount); } else if (g_mode == DeviceMode::LocalLoopback) { - enqueuePcmFrame(pcm16, sampleCount); + for (size_t i = 0; i < sampleCount; ++i) { + float v = (float)pcm16[i] * kLoopbackMonitorGain; + if (v > 32767.0f) v = 32767.0f; + if (v < -32768.0f) v = -32768.0f; + enqueuePcmSample((int16_t)v); + } } else { // idle }