Improve bridge routing logs and stabilize control portal MQTT reconnect
Some checks failed
Build and Push EVS Bridge Image / docker (push) Has been cancelled
Some checks failed
Build and Push EVS Bridge Image / docker (push) Has been cancelled
This commit is contained in:
@@ -166,6 +166,7 @@ services:
|
|||||||
MQTT_BASE_TOPIC: "evs"
|
MQTT_BASE_TOPIC: "evs"
|
||||||
MQTT_TTS_TOPIC: "evs/+/play_pcm16le"
|
MQTT_TTS_TOPIC: "evs/+/play_pcm16le"
|
||||||
MQTT_STATUS_RETAIN: "true"
|
MQTT_STATUS_RETAIN: "true"
|
||||||
|
WS_TX_CHUNK_BYTES: "1024"
|
||||||
DEVICE_PAIR_MAP: '{"esp32-evs-1-mic":"esp32-evs-1-spk"}'
|
DEVICE_PAIR_MAP: '{"esp32-evs-1-mic":"esp32-evs-1-spk"}'
|
||||||
HA_WEBHOOK_URL: ""
|
HA_WEBHOOK_URL: ""
|
||||||
SAVE_SESSIONS: "true"
|
SAVE_SESSIONS: "true"
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "evs")
|
|||||||
MQTT_TTS_TOPIC = os.getenv("MQTT_TTS_TOPIC", f"{MQTT_BASE_TOPIC}/+/play_pcm16le")
|
MQTT_TTS_TOPIC = os.getenv("MQTT_TTS_TOPIC", f"{MQTT_BASE_TOPIC}/+/play_pcm16le")
|
||||||
MQTT_STATUS_RETAIN = getenv_bool("MQTT_STATUS_RETAIN", True)
|
MQTT_STATUS_RETAIN = getenv_bool("MQTT_STATUS_RETAIN", True)
|
||||||
DEVICE_PAIR_MAP_JSON = os.getenv("DEVICE_PAIR_MAP", "").strip()
|
DEVICE_PAIR_MAP_JSON = os.getenv("DEVICE_PAIR_MAP", "").strip()
|
||||||
|
WS_TX_CHUNK_BYTES = int(os.getenv("WS_TX_CHUNK_BYTES", "1024"))
|
||||||
|
|
||||||
HA_WEBHOOK_URL = os.getenv("HA_WEBHOOK_URL", "").strip()
|
HA_WEBHOOK_URL = os.getenv("HA_WEBHOOK_URL", "").strip()
|
||||||
SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True)
|
SAVE_SESSIONS = getenv_bool("SAVE_SESSIONS", True)
|
||||||
@@ -118,7 +119,11 @@ class BridgeState:
|
|||||||
if not session:
|
if not session:
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
await session.ws.send(pcm_data)
|
if not pcm_data:
|
||||||
|
return True
|
||||||
|
chunk_size = max(128, WS_TX_CHUNK_BYTES)
|
||||||
|
for i in range(0, len(pcm_data), chunk_size):
|
||||||
|
await session.ws.send(pcm_data[i : i + chunk_size])
|
||||||
return True
|
return True
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception("ws send to device failed")
|
log.exception("ws send to device failed")
|
||||||
@@ -457,8 +462,18 @@ async def handle_binary_message(device_id: str, session: DeviceSession, data: by
|
|||||||
if ECHO_ENABLED:
|
if ECHO_ENABLED:
|
||||||
target_device = paired_output_device(device_id)
|
target_device = paired_output_device(device_id)
|
||||||
ok = await state.send_binary_to_device(target_device, data)
|
ok = await state.send_binary_to_device(target_device, data)
|
||||||
if not ok and target_device != device_id:
|
if ok:
|
||||||
log.debug("paired output device not connected: src=%s target=%s", device_id, target_device)
|
if target_device != device_id:
|
||||||
|
log.info(
|
||||||
|
"audio routed: src=%s -> target=%s bytes=%s",
|
||||||
|
device_id,
|
||||||
|
target_device,
|
||||||
|
len(data),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.debug("audio echo self: device=%s bytes=%s", device_id, len(data))
|
||||||
|
elif target_device != device_id:
|
||||||
|
log.warning("paired output device not connected: src=%s target=%s", device_id, target_device)
|
||||||
|
|
||||||
|
|
||||||
def parse_device_id(path: str) -> str:
|
def parse_device_id(path: str) -> str:
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
@@ -13,13 +14,37 @@ MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
|
|||||||
MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "evs")
|
MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "evs")
|
||||||
PORTAL_BIND_HOST = os.getenv("PORTAL_BIND_HOST", "0.0.0.0")
|
PORTAL_BIND_HOST = os.getenv("PORTAL_BIND_HOST", "0.0.0.0")
|
||||||
PORTAL_BIND_PORT = int(os.getenv("PORTAL_BIND_PORT", "8088"))
|
PORTAL_BIND_PORT = int(os.getenv("PORTAL_BIND_PORT", "8088"))
|
||||||
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
||||||
|
|
||||||
|
|
||||||
|
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(message)s")
|
||||||
|
log = logging.getLogger("evs-control-portal")
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
mqtt_connected = False
|
||||||
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="evs-control-portal")
|
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="evs-control-portal")
|
||||||
if MQTT_USER:
|
if MQTT_USER:
|
||||||
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
|
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
|
||||||
mqtt_client.connect(MQTT_HOST, MQTT_PORT, keepalive=30)
|
|
||||||
|
|
||||||
|
def _on_connect(_client, _userdata, _flags, reason_code, _properties=None):
|
||||||
|
global mqtt_connected
|
||||||
|
mqtt_connected = reason_code == 0
|
||||||
|
if mqtt_connected:
|
||||||
|
log.info("mqtt connected: %s:%s", MQTT_HOST, MQTT_PORT)
|
||||||
|
else:
|
||||||
|
log.error("mqtt connect failed: reason_code=%s", reason_code)
|
||||||
|
|
||||||
|
|
||||||
|
def _on_disconnect(_client, _userdata, reason_code, _properties=None):
|
||||||
|
global mqtt_connected
|
||||||
|
mqtt_connected = False
|
||||||
|
log.warning("mqtt disconnected: reason_code=%s", reason_code)
|
||||||
|
|
||||||
|
|
||||||
|
mqtt_client.on_connect = _on_connect
|
||||||
|
mqtt_client.on_disconnect = _on_disconnect
|
||||||
|
mqtt_client.connect_async(MQTT_HOST, MQTT_PORT, keepalive=30)
|
||||||
mqtt_client.loop_start()
|
mqtt_client.loop_start()
|
||||||
|
|
||||||
|
|
||||||
@@ -28,6 +53,12 @@ def _topic_for_device(device_id: str) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def _publish(device_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
def _publish(device_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
if not mqtt_connected:
|
||||||
|
return {
|
||||||
|
"ok": False,
|
||||||
|
"error": f"mqtt not connected to {MQTT_HOST}:{MQTT_PORT}",
|
||||||
|
"mqtt_connected": False,
|
||||||
|
}
|
||||||
topic = _topic_for_device(device_id)
|
topic = _topic_for_device(device_id)
|
||||||
encoded = json.dumps(payload, separators=(",", ":"))
|
encoded = json.dumps(payload, separators=(",", ":"))
|
||||||
info = mqtt_client.publish(topic, encoded, qos=0, retain=False)
|
info = mqtt_client.publish(topic, encoded, qos=0, retain=False)
|
||||||
@@ -36,6 +67,7 @@ def _publish(device_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
"topic": topic,
|
"topic": topic,
|
||||||
"payload": payload,
|
"payload": payload,
|
||||||
"mqtt_rc": info.rc,
|
"mqtt_rc": info.rc,
|
||||||
|
"mqtt_connected": mqtt_connected,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ framework = arduino
|
|||||||
monitor_speed = 115200
|
monitor_speed = 115200
|
||||||
lib_deps =
|
lib_deps =
|
||||||
gilmaimon/ArduinoWebsockets @ ^0.5.4
|
gilmaimon/ArduinoWebsockets @ ^0.5.4
|
||||||
|
knolleary/PubSubClient @ ^2.8
|
||||||
|
|
||||||
; Stable baseline (your current toolchain family)
|
; Stable baseline (your current toolchain family)
|
||||||
[env:esp32dev_core2]
|
[env:esp32dev_core2]
|
||||||
|
|||||||
Reference in New Issue
Block a user