import audioop import logging import os import socket import time from typing import Optional, Tuple import pymumble_py3 as pymumble logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s", ) log = logging.getLogger("evs-mumble-bridge") def getenv_bool(name: str, default: bool) -> bool: value = os.getenv(name) if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} DEVICE_ID = os.getenv("DEVICE_ID", "esp32-evs-1") UDP_LISTEN_HOST = os.getenv("UDP_LISTEN_HOST", "0.0.0.0") UDP_LISTEN_PORT = int(os.getenv("UDP_LISTEN_PORT", "5004")) INPUT_SAMPLE_RATE = int(os.getenv("INPUT_SAMPLE_RATE", "16000")) INPUT_CHANNELS = int(os.getenv("INPUT_CHANNELS", "1")) MUMBLE_SAMPLE_RATE = int(os.getenv("MUMBLE_SAMPLE_RATE", "48000")) FRAME_MS = int(os.getenv("MUMBLE_FRAME_MS", "20")) MUMBLE_AUDIO_GAIN = float(os.getenv("MUMBLE_AUDIO_GAIN", "1.0")) MUMBLE_HOST = os.getenv("MUMBLE_HOST", "") MUMBLE_PORT = int(os.getenv("MUMBLE_PORT", "64738")) MUMBLE_USERNAME = os.getenv("MUMBLE_USERNAME", f"EVS-{DEVICE_ID}") MUMBLE_PASSWORD = os.getenv("MUMBLE_PASSWORD", "") MUMBLE_CHANNEL = os.getenv("MUMBLE_CHANNEL", "").strip() MUMBLE_CHANNEL_ID = int(os.getenv("MUMBLE_CHANNEL_ID", "0")) MUMBLE_RECONNECT_SEC = int(os.getenv("MUMBLE_RECONNECT_SEC", "5")) MUMBLE_VERBOSE = getenv_bool("MUMBLE_VERBOSE", False) MUMBLE_CONNECT_TIMEOUT_SEC = int(os.getenv("MUMBLE_CONNECT_TIMEOUT_SEC", "30")) MUMBLE_CONNECT_STRICT = getenv_bool("MUMBLE_CONNECT_STRICT", False) def _channel_name(ch) -> str: name = getattr(ch, "name", None) if isinstance(name, str) and name: return name get_prop = getattr(ch, "get_property", None) if callable(get_prop): try: value = get_prop("name") if isinstance(value, str): return value except Exception: pass return "" def _channel_parent(ch): parent = getattr(ch, "parent", None) if parent is not None: return parent get_prop = getattr(ch, "get_property", None) if callable(get_prop): try: return get_prop("parent") except Exception: pass return None def _channel_id(ch): for attr in ("channel_id", "id"): value = getattr(ch, attr, None) if isinstance(value, int): return value get_prop = getattr(ch, "get_property", None) if callable(get_prop): try: value = get_prop("channel_id") if isinstance(value, int): return value except Exception: pass return None def resolve_channel(mumble: pymumble.Mumble, channel_expr: str): # Supports single name ("EVS") and path ("KHNM/EVS"). parts = [p.strip() for p in channel_expr.split("/") if p.strip()] if not parts: return None channels_obj = getattr(mumble, "channels", None) if channels_obj is None: return None values_fn = getattr(channels_obj, "values", None) if not callable(values_fn): try: return channels_obj.find_by_name(channel_expr) except Exception: return None channels = list(values_fn()) if not channels: return None if len(parts) == 1: try: return channels_obj.find_by_name(parts[0]) except Exception: return None # Path traversal with parent matching when possible. candidates = channels parent_id = None for part in parts: next_candidates = [] for ch in candidates: if _channel_name(ch) != part: continue ch_parent = _channel_parent(ch) if parent_id is None or ch_parent == parent_id: next_candidates.append(ch) if not next_candidates: # Fallback for trees where parent property is unavailable. next_candidates = [ch for ch in channels if _channel_name(ch) == part] if not next_candidates: return None chosen = next_candidates[0] parent_id = _channel_id(chosen) candidates = channels return chosen def resolve_channel_by_id(mumble: pymumble.Mumble, channel_id: int): if channel_id <= 0: return None channels_obj = getattr(mumble, "channels", None) if channels_obj is None: return None values_fn = getattr(channels_obj, "values", None) if not callable(values_fn): return None for ch in values_fn(): if _channel_id(ch) == channel_id: return ch return None def get_current_channel_id(mumble: pymumble.Mumble): users_obj = getattr(mumble, "users", None) if users_obj is None: return None me = getattr(users_obj, "myself", None) if me is None: return None for attr in ("channel_id", "channel"): value = getattr(me, attr, None) if isinstance(value, int): return value get_prop = getattr(me, "get_property", None) if callable(get_prop): for key in ("channel_id", "channel"): try: value = get_prop(key) if isinstance(value, int): return value except Exception: pass return None def connect_mumble() -> pymumble.Mumble: if not MUMBLE_HOST: raise RuntimeError("MUMBLE_HOST is required") log.info( "connecting mumble: host=%s port=%s user=%s channel=%s channel_id=%s", MUMBLE_HOST, MUMBLE_PORT, MUMBLE_USERNAME, MUMBLE_CHANNEL or "", MUMBLE_CHANNEL_ID, ) mumble = pymumble.Mumble( MUMBLE_HOST, MUMBLE_USERNAME, password=MUMBLE_PASSWORD or None, port=MUMBLE_PORT, reconnect=True, debug=MUMBLE_VERBOSE, ) mumble.set_application_string("EVS Mumble Bridge") mumble.start() deadline = time.time() + MUMBLE_CONNECT_TIMEOUT_SEC ready = False while time.time() < deadline: try: if mumble.is_ready(): ready = True break except Exception: pass # Fallback: some servers/auth flows do not expose ready quickly. # If we already have a valid session, continue instead of reconnect-looping. users_obj = getattr(mumble, "users", None) me = getattr(users_obj, "myself", None) if users_obj is not None else None session = getattr(me, "session", None) if me is not None else None if isinstance(session, int) and session > 0: ready = True log.warning("ready fallback via session detected: session=%s", session) break time.sleep(0.1) if not ready: if MUMBLE_CONNECT_STRICT: raise RuntimeError("mumble not ready after connect timeout") log.warning( "mumble not ready after %ss, continuing because MUMBLE_CONNECT_STRICT=false", MUMBLE_CONNECT_TIMEOUT_SEC, ) try: mumble.sound_output.set_audio_per_packet(FRAME_MS / 1000.0) except Exception: # Optional depending on library version. pass if MUMBLE_CHANNEL_ID > 0: try: channel = resolve_channel_by_id(mumble, MUMBLE_CHANNEL_ID) if channel is None: channels_obj = getattr(mumble, "channels", None) if channels_obj is not None: try: channel = channels_obj[MUMBLE_CHANNEL_ID] except Exception: channel = None if channel is not None: channel.move_in() log.info("moved to channel id: %s", MUMBLE_CHANNEL_ID) else: log.warning("channel id not found: %s", MUMBLE_CHANNEL_ID) except Exception: log.exception("failed to move channel by id") elif MUMBLE_CHANNEL: try: channel = resolve_channel(mumble, MUMBLE_CHANNEL) if channel is not None: channel.move_in() log.info("moved to channel: %s", MUMBLE_CHANNEL) else: log.warning("channel not found: %s", MUMBLE_CHANNEL) except Exception: log.exception("failed to move channel") current_channel_id = get_current_channel_id(mumble) if current_channel_id is not None: log.info("current mumble channel id: %s", current_channel_id) return mumble def open_udp_socket() -> socket.socket: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((UDP_LISTEN_HOST, UDP_LISTEN_PORT)) sock.settimeout(1.0) log.info("udp listening: %s:%s", UDP_LISTEN_HOST, UDP_LISTEN_PORT) return sock def process_audio_chunk(raw_pcm: bytes, rate_state: Optional[Tuple]) -> Tuple[bytes, Optional[Tuple]]: data = raw_pcm if INPUT_SAMPLE_RATE != MUMBLE_SAMPLE_RATE: data, rate_state = audioop.ratecv( data, 2, # sample width in bytes (PCM16LE) INPUT_CHANNELS, INPUT_SAMPLE_RATE, MUMBLE_SAMPLE_RATE, rate_state, ) if MUMBLE_AUDIO_GAIN != 1.0: data = audioop.mul(data, 2, MUMBLE_AUDIO_GAIN) return data, rate_state def run() -> None: udp = open_udp_socket() mumble = None rate_state = None out_buffer = bytearray() frame_bytes = int((MUMBLE_SAMPLE_RATE * 2 * FRAME_MS) / 1000) # mono, 16-bit while True: if mumble is None: try: mumble = connect_mumble() log.info("mumble ready") except Exception: log.exception("mumble connect failed, retrying in %ss", MUMBLE_RECONNECT_SEC) time.sleep(MUMBLE_RECONNECT_SEC) continue try: packet, _addr = udp.recvfrom(8192) except socket.timeout: # No UDP data right now; keep loop alive. continue except Exception: log.exception("udp receive failed") continue if not packet: continue try: processed, rate_state = process_audio_chunk(packet, rate_state) out_buffer.extend(processed) while len(out_buffer) >= frame_bytes: frame = bytes(out_buffer[:frame_bytes]) del out_buffer[:frame_bytes] mumble.sound_output.add_sound(frame) except Exception: log.exception("audio processing/send failed") mumble = None time.sleep(MUMBLE_RECONNECT_SEC) if __name__ == "__main__": run()