From 5294c24b084396a916c370cafef42b2086956136 Mon Sep 17 00:00:00 2001 From: Kai Date: Fri, 13 Feb 2026 17:49:26 +0100 Subject: [PATCH] Add MQTT-based STT worker for VAD segments --- bridge/.env.example | 11 +++ bridge/README.md | 43 ++++++++- bridge/docker-compose.yml | 23 +++++ stt-worker/Dockerfile | 26 ++++++ stt-worker/README.md | 14 +++ stt-worker/app.py | 175 ++++++++++++++++++++++++++++++++++++ stt-worker/requirements.txt | 2 + 7 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 stt-worker/Dockerfile create mode 100644 stt-worker/README.md create mode 100644 stt-worker/app.py create mode 100644 stt-worker/requirements.txt diff --git a/bridge/.env.example b/bridge/.env.example index fd373d1..2da85c5 100644 --- a/bridge/.env.example +++ b/bridge/.env.example @@ -33,3 +33,14 @@ VAD_POSTROLL_MS=1000 VAD_START_THRESHOLD=900 VAD_STOP_THRESHOLD=600 VAD_MIN_SPEECH_MS=300 + +# STT worker settings (faster-whisper) +MQTT_VAD_TOPIC=evs/+/vad_segment +MQTT_TRANSCRIPT_TOPIC_TEMPLATE=evs/{device_id}/transcript +MQTT_STT_ERROR_TOPIC_TEMPLATE=evs/{device_id}/stt_error +STT_MODEL=small +STT_DEVICE=cpu +STT_COMPUTE_TYPE=int8 +STT_BEAM_SIZE=1 +STT_LANGUAGE=de +STT_CONDITION_ON_PREV_TEXT=false diff --git a/bridge/README.md b/bridge/README.md index 6de4ca4..767500e 100644 --- a/bridge/README.md +++ b/bridge/README.md @@ -8,6 +8,7 @@ It provides: - MQTT playback input (`evs//play_pcm16le`) - Optional Home Assistant webhook callbacks (`connected`, `start`, `stop`, `disconnected`) - VAD auto-segmentation (`vad_segment`) with pre-roll/post-roll +- Optional STT worker (`vad_segment` -> `transcript`) via MQTT ## 1) Start the bridge @@ -53,7 +54,7 @@ Then upload firmware. 1. Flash ESP32 2. Open serial monitor -3. Send `s` (stream mode) +3. Wait for WS connect (client switches to stream mode automatically) 4. In bridge logs, you should see the device connection 5. If `ECHO_ENABLED=true`, incoming audio is returned to ESP32 speaker @@ -63,7 +64,8 @@ Then upload firmware. - `evs//status` (connection/start/stop/disconnect) - `evs//mic_level` (mic telemetry) - `evs//vad_segment` (finalized speech segments) - - reserved for next steps: `evs//transcript`, `evs//stt_error` + - `evs//transcript` (text from stt-worker) + - `evs//stt_error` (stt-worker errors) - Playback input to device: - `evs//play_pcm16le` - payload options: @@ -101,6 +103,20 @@ You can build automations on these events (for STT/TTS pipelines or Node-RED han - `VAD_KEEP_FILES=200` limits number of stored VAD WAV files - `VAD_MAX_AGE_DAYS=7` deletes VAD WAV files older than 7 days - MQTT is recommended for control/events, WebSocket for streaming audio +- STT worker: + - subscribes: `evs//vad_segment` + - reads `wav_path` from event JSON + - transcribes with `faster-whisper` + - publishes transcript to `evs//transcript` + +## 6.1) STT Worker Config + +Use these env values (in `.env` or Portainer): +- `STT_MODEL` (`tiny`, `base`, `small`, `medium`, `large-v3`) +- `STT_DEVICE` (`cpu` or `cuda`) +- `STT_COMPUTE_TYPE` (`int8`, `float16`, ...) +- `STT_LANGUAGE` (`de` or empty for auto-detect) +- `MQTT_VAD_TOPIC`, `MQTT_TRANSCRIPT_TOPIC_TEMPLATE`, `MQTT_STT_ERROR_TOPIC_TEMPLATE` ## 7) Build and push to Gitea registry @@ -161,6 +177,29 @@ services: volumes: - evs_bridge_data:/data + evs-stt-worker: + image: git.khnm-zimmerling.de/kai/evs-stt-worker:latest + container_name: evs-stt-worker + restart: unless-stopped + environment: + LOG_LEVEL: "INFO" + MQTT_HOST: "10.100.3.247" + MQTT_PORT: "1883" + MQTT_USER: "" + MQTT_PASSWORD: "" + MQTT_BASE_TOPIC: "evs" + MQTT_VAD_TOPIC: "evs/+/vad_segment" + MQTT_TRANSCRIPT_TOPIC_TEMPLATE: "evs/{device_id}/transcript" + MQTT_STT_ERROR_TOPIC_TEMPLATE: "evs/{device_id}/stt_error" + STT_MODEL: "small" + STT_DEVICE: "cpu" + STT_COMPUTE_TYPE: "int8" + STT_BEAM_SIZE: "1" + STT_LANGUAGE: "de" + STT_CONDITION_ON_PREV_TEXT: "false" + volumes: + - evs_bridge_data:/data + volumes: evs_bridge_data: ``` diff --git a/bridge/docker-compose.yml b/bridge/docker-compose.yml index 38b0f13..0b432b0 100644 --- a/bridge/docker-compose.yml +++ b/bridge/docker-compose.yml @@ -9,3 +9,26 @@ services: - "${WS_PORT:-8765}:${WS_PORT:-8765}" volumes: - ./data:/data + + evs-stt-worker: + build: ../stt-worker + container_name: evs-stt-worker + restart: unless-stopped + environment: + LOG_LEVEL: "INFO" + MQTT_HOST: "${MQTT_HOST:-localhost}" + MQTT_PORT: "${MQTT_PORT:-1883}" + MQTT_USER: "${MQTT_USER:-}" + MQTT_PASSWORD: "${MQTT_PASSWORD:-}" + MQTT_BASE_TOPIC: "${MQTT_BASE_TOPIC:-evs}" + MQTT_VAD_TOPIC: "${MQTT_VAD_TOPIC:-evs/+/vad_segment}" + MQTT_TRANSCRIPT_TOPIC_TEMPLATE: "${MQTT_TRANSCRIPT_TOPIC_TEMPLATE:-evs/{device_id}/transcript}" + MQTT_STT_ERROR_TOPIC_TEMPLATE: "${MQTT_STT_ERROR_TOPIC_TEMPLATE:-evs/{device_id}/stt_error}" + STT_MODEL: "${STT_MODEL:-small}" + STT_DEVICE: "${STT_DEVICE:-cpu}" + STT_COMPUTE_TYPE: "${STT_COMPUTE_TYPE:-int8}" + STT_BEAM_SIZE: "${STT_BEAM_SIZE:-1}" + STT_LANGUAGE: "${STT_LANGUAGE:-de}" + STT_CONDITION_ON_PREV_TEXT: "${STT_CONDITION_ON_PREV_TEXT:-false}" + volumes: + - ./data:/data diff --git a/stt-worker/Dockerfile b/stt-worker/Dockerfile new file mode 100644 index 0000000..949d24b --- /dev/null +++ b/stt-worker/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +ENV LOG_LEVEL=INFO \ + MQTT_HOST=localhost \ + MQTT_PORT=1883 \ + MQTT_USER= \ + MQTT_PASSWORD= \ + MQTT_BASE_TOPIC=evs \ + MQTT_VAD_TOPIC=evs/+/vad_segment \ + MQTT_TRANSCRIPT_TOPIC_TEMPLATE=evs/{device_id}/transcript \ + MQTT_STT_ERROR_TOPIC_TEMPLATE=evs/{device_id}/stt_error \ + STT_MODEL=small \ + STT_DEVICE=cpu \ + STT_COMPUTE_TYPE=int8 \ + STT_BEAM_SIZE=1 \ + STT_LANGUAGE=de \ + STT_CONDITION_ON_PREV_TEXT=false + +CMD ["python", "app.py"] diff --git a/stt-worker/README.md b/stt-worker/README.md new file mode 100644 index 0000000..bc257bf --- /dev/null +++ b/stt-worker/README.md @@ -0,0 +1,14 @@ +# EVS STT Worker + +This worker subscribes to VAD events from MQTT, transcribes the referenced WAV files, and publishes text back to MQTT. + +Flow: +- input topic: `evs//vad_segment` +- reads: `wav_path` from JSON payload +- output topic: `evs//transcript` +- error topic: `evs//stt_error` + +Default model: +- `STT_MODEL=small` +- `STT_DEVICE=cpu` +- `STT_COMPUTE_TYPE=int8` diff --git a/stt-worker/app.py b/stt-worker/app.py new file mode 100644 index 0000000..b9ff1d3 --- /dev/null +++ b/stt-worker/app.py @@ -0,0 +1,175 @@ +import json +import logging +import os +import time +from pathlib import Path +from typing import Optional + +import paho.mqtt.client as mqtt +from faster_whisper import WhisperModel + + +logging.basicConfig( + level=os.getenv("LOG_LEVEL", "INFO"), + format="%(asctime)s %(levelname)s %(message)s", +) +log = logging.getLogger("evs-stt-worker") + + +def getenv_bool(name: str, default: bool) -> bool: + val = os.getenv(name) + if val is None: + return default + return val.strip().lower() in {"1", "true", "yes", "on"} + + +MQTT_HOST = os.getenv("MQTT_HOST", "localhost") +MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) +MQTT_USER = os.getenv("MQTT_USER", "") +MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "") +MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "evs") +MQTT_VAD_TOPIC = os.getenv("MQTT_VAD_TOPIC", f"{MQTT_BASE_TOPIC}/+/vad_segment") +MQTT_TRANSCRIPT_TOPIC_TEMPLATE = os.getenv( + "MQTT_TRANSCRIPT_TOPIC_TEMPLATE", f"{MQTT_BASE_TOPIC}" + "/{device_id}/transcript" +) +MQTT_STT_ERROR_TOPIC_TEMPLATE = os.getenv( + "MQTT_STT_ERROR_TOPIC_TEMPLATE", f"{MQTT_BASE_TOPIC}" + "/{device_id}/stt_error" +) + +STT_MODEL = os.getenv("STT_MODEL", "small") +STT_DEVICE = os.getenv("STT_DEVICE", "cpu") +STT_COMPUTE_TYPE = os.getenv("STT_COMPUTE_TYPE", "int8") +STT_BEAM_SIZE = int(os.getenv("STT_BEAM_SIZE", "1")) +STT_LANGUAGE = os.getenv("STT_LANGUAGE", "").strip() +STT_CONDITION_ON_PREV_TEXT = getenv_bool("STT_CONDITION_ON_PREV_TEXT", False) + + +class WorkerState: + def __init__(self) -> None: + self.client: Optional[mqtt.Client] = None + self.model: Optional[WhisperModel] = None + self.last_wav_path: str = "" + + def model_instance(self) -> WhisperModel: + if self.model is None: + log.info( + "loading model: model=%s device=%s compute_type=%s", + STT_MODEL, + STT_DEVICE, + STT_COMPUTE_TYPE, + ) + self.model = WhisperModel(STT_MODEL, device=STT_DEVICE, compute_type=STT_COMPUTE_TYPE) + log.info("model loaded") + return self.model + + +state = WorkerState() + + +def publish_json(topic: str, payload: dict) -> None: + if not state.client: + return + try: + state.client.publish(topic, json.dumps(payload), qos=0, retain=False) + except Exception: + log.exception("mqtt publish failed: topic=%s", topic) + + +def topic_for_transcript(device_id: str) -> str: + return MQTT_TRANSCRIPT_TOPIC_TEMPLATE.format(device_id=device_id) + + +def topic_for_error(device_id: str) -> str: + return MQTT_STT_ERROR_TOPIC_TEMPLATE.format(device_id=device_id) + + +def transcribe_wav(device_id: str, wav_path: str) -> None: + path = Path(wav_path) + if not path.is_file(): + payload = { + "type": "stt_error", + "ts": time.time(), + "device_id": device_id, + "wav_path": wav_path, + "error": "wav_not_found", + } + publish_json(topic_for_error(device_id), payload) + log.warning("wav not found: device=%s path=%s", device_id, wav_path) + return + + if wav_path == state.last_wav_path: + return + state.last_wav_path = wav_path + + try: + model = state.model_instance() + kwargs = { + "beam_size": STT_BEAM_SIZE, + "condition_on_previous_text": STT_CONDITION_ON_PREV_TEXT, + } + if STT_LANGUAGE: + kwargs["language"] = STT_LANGUAGE + + segments, info = model.transcribe(str(path), **kwargs) + text = " ".join(seg.text.strip() for seg in segments if seg.text and seg.text.strip()).strip() + payload = { + "type": "transcript", + "ts": time.time(), + "device_id": device_id, + "wav_path": wav_path, + "text": text, + "language": getattr(info, "language", ""), + "language_probability": getattr(info, "language_probability", 0.0), + "model": STT_MODEL, + } + publish_json(topic_for_transcript(device_id), payload) + log.info("transcript: device=%s chars=%s wav=%s", device_id, len(text), wav_path) + except Exception as exc: + payload = { + "type": "stt_error", + "ts": time.time(), + "device_id": device_id, + "wav_path": wav_path, + "error": str(exc), + } + publish_json(topic_for_error(device_id), payload) + log.exception("transcription failed: device=%s wav=%s", device_id, wav_path) + + +def on_mqtt_connect(client: mqtt.Client, _userdata, _flags, reason_code, _properties=None): + if reason_code == 0: + log.info("mqtt connected") + client.subscribe(MQTT_VAD_TOPIC, qos=0) + log.info("subscribed: %s", MQTT_VAD_TOPIC) + else: + log.error("mqtt connect failed reason=%s", reason_code) + + +def on_mqtt_message(_client: mqtt.Client, _userdata, msg: mqtt.MQTTMessage): + try: + payload = json.loads(msg.payload.decode("utf-8")) + except Exception: + return + if payload.get("type") != "vad_segment": + return + + device_id = payload.get("device_id", "") + wav_path = payload.get("wav_path", "") + if not device_id or not wav_path: + return + transcribe_wav(device_id, wav_path) + + +def main() -> None: + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="evs-stt-worker") + if MQTT_USER: + client.username_pw_set(MQTT_USER, MQTT_PASSWORD) + client.on_connect = on_mqtt_connect + client.on_message = on_mqtt_message + state.client = client + client.connect(MQTT_HOST, MQTT_PORT, keepalive=30) + client.loop_forever() + + +if __name__ == "__main__": + main() diff --git a/stt-worker/requirements.txt b/stt-worker/requirements.txt new file mode 100644 index 0000000..4f92f09 --- /dev/null +++ b/stt-worker/requirements.txt @@ -0,0 +1,2 @@ +paho-mqtt==2.1.0 +faster-whisper==1.1.1