Initial EVS client + bridge setup

This commit is contained in:
Kai
2026-02-13 10:07:11 +01:00
commit c742d74860
14 changed files with 889 additions and 0 deletions

22
bridge/.env.example Normal file
View File

@@ -0,0 +1,22 @@
WS_HOST=0.0.0.0
WS_PORT=8765
WS_PATH=/audio
ECHO_ENABLED=true
LOG_LEVEL=INFO
MQTT_ENABLED=true
MQTT_HOST=homeassistant.local
MQTT_PORT=1883
MQTT_USER=
MQTT_PASSWORD=
MQTT_BASE_TOPIC=evs
MQTT_TTS_TOPIC=evs/+/play_pcm16le
MQTT_STATUS_RETAIN=true
# Optional webhook in Home Assistant:
# HA settings -> Automations & Scenes -> Webhooks
HA_WEBHOOK_URL=
SAVE_SESSIONS=true
SESSIONS_DIR=/data/sessions
PCM_SAMPLE_RATE=16000

10
bridge/Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
CMD ["python", "app.py"]

73
bridge/README.md Normal file
View File

@@ -0,0 +1,73 @@
# EVS Bridge (Home Assistant + MQTT + WebSocket)
This service is the audio bridge between your ESP32 client and your Home Assistant stack.
It provides:
- WebSocket endpoint for raw PCM audio (`/audio`)
- MQTT status/events (`evs/<device_id>/status`)
- MQTT playback input (`evs/<device_id>/play_pcm16le`)
- Optional Home Assistant webhook callbacks (`connected`, `start`, `stop`, `disconnected`)
## 1) Start the bridge
1. Copy env template:
```bash
cp .env.example .env
```
2. Edit `.env`:
- `MQTT_HOST`, `MQTT_USER`, `MQTT_PASSWORD`
- `HA_WEBHOOK_URL` (optional)
3. Start:
```bash
docker compose up -d --build
```
## 2) Configure ESP32
In `src/main.cpp`:
- no environment-specific values should be edited directly
In `include/secrets.h`:
- copy from `include/secrets.example.h`
- set WiFi credentials
- set bridge host
- set WS port/path
- set unique `EVS_DEVICE_ID`
Then upload firmware.
## 3) Test flow
1. Flash ESP32
2. Open serial monitor
3. Send `s` (stream mode)
4. In bridge logs, you should see the device connection
5. If `ECHO_ENABLED=true`, incoming audio is returned to ESP32 speaker
## 4) MQTT topics
- Status/events published by bridge:
- `evs/<device_id>/status` (JSON)
- Playback input to device:
- `evs/<device_id>/play_pcm16le`
- payload options:
- raw binary PCM16LE
- JSON `{ "pcm16le_b64": "<base64>" }`
## 5) Home Assistant integration
Use webhook for event hooks:
- Configure `HA_WEBHOOK_URL` in `.env`
- Bridge sends JSON with event and metadata on:
- `connected`
- `start`
- `stop`
- `disconnected`
You can build automations on these events (for STT/TTS pipelines or Node-RED handoff).
## 6) Notes
- Audio format: PCM16LE, mono, 16 kHz
- `SAVE_SESSIONS=true` stores `.wav` files in `bridge/data/sessions`
- MQTT is recommended for control/events, WebSocket for streaming audio

291
bridge/app.py Normal file
View File

@@ -0,0 +1,291 @@
import asyncio
import base64
import json
import logging
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional
import aiohttp
import paho.mqtt.client as mqtt
import websockets
from websockets.server import WebSocketServerProtocol
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("evs-bridge")
def getenv_bool(name: str, default: bool) -> bool:
val = os.getenv(name)
if val is None:
return default
return val.strip().lower() in {"1", "true", "yes", "on"}
WS_HOST = os.getenv("WS_HOST", "0.0.0.0")
WS_PORT = int(os.getenv("WS_PORT", "8765"))
WS_PATH = os.getenv("WS_PATH", "/audio")
ECHO_ENABLED = getenv_bool("ECHO_ENABLED", True)
MQTT_ENABLED = getenv_bool("MQTT_ENABLED", True)
MQTT_HOST = os.getenv("MQTT_HOST", "localhost")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
MQTT_USER = os.getenv("MQTT_USER", "")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "evs")
MQTT_TTS_TOPIC = os.getenv("MQTT_TTS_TOPIC", f"{MQTT_BASE_TOPIC}/+/play_pcm16le")
MQTT_STATUS_RETAIN = getenv_bool("MQTT_STATUS_RETAIN", True)
HA_WEBHOOK_URL = os.getenv("HA_WEBHOOK_URL", "").strip()
SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True)
SESSIONS_DIR = Path(os.getenv("SESSIONS_DIR", "/data/sessions"))
PCM_SAMPLE_RATE = int(os.getenv("PCM_SAMPLE_RATE", "16000"))
@dataclass
class DeviceSession:
device_id: str
ws: WebSocketServerProtocol
connected_at: float = field(default_factory=time.time)
ptt_active: bool = False
pcm_bytes: bytearray = field(default_factory=bytearray)
last_rx_ts: float = field(default_factory=time.time)
class BridgeState:
def __init__(self) -> None:
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.devices: Dict[str, DeviceSession] = {}
self.mqtt_client: Optional[mqtt.Client] = None
def publish_status(self, device_id: str, payload: dict) -> None:
if not self.mqtt_client:
return
topic = f"{MQTT_BASE_TOPIC}/{device_id}/status"
try:
self.mqtt_client.publish(topic, json.dumps(payload), qos=0, retain=MQTT_STATUS_RETAIN)
except Exception:
log.exception("mqtt publish failed")
async def send_binary_to_device(self, device_id: str, pcm_data: bytes) -> bool:
session = self.devices.get(device_id)
if not session:
return False
try:
await session.ws.send(pcm_data)
return True
except Exception:
log.exception("ws send to device failed")
return False
state = BridgeState()
def build_metrics(device_id: str, session: DeviceSession) -> dict:
samples = len(session.pcm_bytes) // 2
seconds = samples / float(PCM_SAMPLE_RATE)
return {
"device_id": device_id,
"ptt_active": session.ptt_active,
"rx_bytes": len(session.pcm_bytes),
"duration_s": round(seconds, 3),
"last_rx_ts": session.last_rx_ts,
}
async def call_ha_webhook(event: str, payload: dict) -> None:
if not HA_WEBHOOK_URL:
return
data = {"event": event, **payload}
try:
async with aiohttp.ClientSession() as client:
async with client.post(HA_WEBHOOK_URL, json=data, timeout=10) as resp:
if resp.status >= 400:
log.warning("ha webhook error status=%s", resp.status)
except Exception:
log.exception("ha webhook call failed")
def save_session_wav(device_id: str, pcm: bytes) -> Optional[str]:
if not SAVE_SESSIONS or not pcm:
return None
try:
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
ts = int(time.time())
path = SESSIONS_DIR / f"{device_id}_{ts}.wav"
import wave
with wave.open(str(path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(PCM_SAMPLE_RATE)
wf.writeframes(pcm)
return str(path)
except Exception:
log.exception("failed to save wav")
return None
async def handle_text_message(device_id: str, session: DeviceSession, raw: str) -> None:
try:
msg = json.loads(raw)
except Exception:
log.warning("invalid text frame from %s: %s", device_id, raw[:80])
return
msg_type = msg.get("type")
if msg_type == "start":
session.ptt_active = True
session.pcm_bytes.clear()
payload = {"type": "start", "ts": time.time(), "device_id": device_id}
state.publish_status(device_id, payload)
await call_ha_webhook("start", payload)
return
if msg_type == "stop":
session.ptt_active = False
metrics = build_metrics(device_id, session)
wav_path = save_session_wav(device_id, bytes(session.pcm_bytes))
payload = {"type": "stop", "ts": time.time(), "device_id": device_id, **metrics}
if wav_path:
payload["wav_path"] = wav_path
state.publish_status(device_id, payload)
await call_ha_webhook("stop", payload)
return
if msg_type == "ping":
await session.ws.send(json.dumps({"type": "pong", "ts": time.time()}))
return
log.info("text msg from %s: %s", device_id, msg)
async def handle_binary_message(device_id: str, session: DeviceSession, data: bytes) -> None:
session.last_rx_ts = time.time()
if session.ptt_active:
session.pcm_bytes.extend(data)
if ECHO_ENABLED:
await session.ws.send(data)
def parse_device_id(path: str) -> str:
# expected:
# /audio
# /audio?device_id=esp32-kitchen
if "?" not in path:
return "esp32-unknown"
try:
from urllib.parse import parse_qs, urlsplit
q = parse_qs(urlsplit(path).query)
return q.get("device_id", ["esp32-unknown"])[0]
except Exception:
return "esp32-unknown"
async def ws_handler(ws: WebSocketServerProtocol, path: str) -> None:
if not path.startswith(WS_PATH):
await ws.close(code=1008, reason="Invalid path")
return
device_id = parse_device_id(path)
session = DeviceSession(device_id=device_id, ws=ws)
state.devices[device_id] = session
state.publish_status(device_id, {"type": "connected", "ts": time.time(), "device_id": device_id})
await call_ha_webhook("connected", {"device_id": device_id, "ts": time.time()})
log.info("device connected: %s", device_id)
try:
async for message in ws:
if isinstance(message, bytes):
await handle_binary_message(device_id, session, message)
else:
await handle_text_message(device_id, session, message)
except websockets.ConnectionClosed:
pass
finally:
if state.devices.get(device_id) is session:
del state.devices[device_id]
state.publish_status(device_id, {"type": "disconnected", "ts": time.time(), "device_id": device_id})
await call_ha_webhook("disconnected", {"device_id": device_id, "ts": time.time()})
log.info("device disconnected: %s", device_id)
def on_mqtt_connect(client: mqtt.Client, _userdata, _flags, reason_code, _properties=None):
if reason_code == 0:
log.info("mqtt connected")
client.subscribe(MQTT_TTS_TOPIC, qos=0)
else:
log.error("mqtt connect failed reason=%s", reason_code)
def on_mqtt_message(_client: mqtt.Client, _userdata, msg: mqtt.MQTTMessage):
# topic: evs/<device_id>/play_pcm16le
try:
parts = msg.topic.split("/")
if len(parts) < 3:
return
device_id = parts[1]
# payload options:
# 1) raw binary PCM16LE
# 2) json {"pcm16le_b64":"..."}
payload = msg.payload
if payload.startswith(b"{"):
doc = json.loads(payload.decode("utf-8"))
b64 = doc.get("pcm16le_b64", "")
if not b64:
return
pcm = base64.b64decode(b64)
else:
pcm = bytes(payload)
if not state.loop:
return
fut = asyncio.run_coroutine_threadsafe(state.send_binary_to_device(device_id, pcm), state.loop)
_ = fut.result(timeout=2)
except Exception:
log.exception("mqtt message handling failed")
def setup_mqtt(loop: asyncio.AbstractEventLoop) -> Optional[mqtt.Client]:
if not MQTT_ENABLED:
log.info("mqtt disabled")
return None
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="evs-bridge")
if MQTT_USER:
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
client.on_connect = on_mqtt_connect
client.on_message = on_mqtt_message
client.connect(MQTT_HOST, MQTT_PORT, keepalive=30)
client.loop_start()
log.info("mqtt connecting to %s:%s", MQTT_HOST, MQTT_PORT)
return client
async def main():
state.loop = asyncio.get_running_loop()
state.mqtt_client = setup_mqtt(state.loop)
ws_server = await websockets.serve(ws_handler, WS_HOST, WS_PORT, max_size=2**22)
log.info("ws listening on ws://%s:%s%s", WS_HOST, WS_PORT, WS_PATH)
try:
await ws_server.wait_closed()
finally:
if state.mqtt_client:
state.mqtt_client.loop_stop()
state.mqtt_client.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass

11
bridge/docker-compose.yml Normal file
View File

@@ -0,0 +1,11 @@
services:
evs-bridge:
build: .
container_name: evs-bridge
restart: unless-stopped
env_file:
- .env
ports:
- "${WS_PORT:-8765}:${WS_PORT:-8765}"
volumes:
- ./data:/data

3
bridge/requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
websockets==12.0
paho-mqtt==2.1.0
aiohttp==3.10.11