Files

597 lines
20 KiB
Python

import audioop
import logging
import math
import os
import socket
import subprocess
import time
from typing import Optional, Tuple
import pymumble_py3 as pymumble
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("evs-mumble-bridge")
def getenv_bool(name: str, default: bool) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
DEVICE_ID = os.getenv("DEVICE_ID", "esp32-evs-1")
UDP_LISTEN_HOST = os.getenv("UDP_LISTEN_HOST", "0.0.0.0")
UDP_LISTEN_PORT = int(os.getenv("UDP_LISTEN_PORT", "5004"))
INPUT_SAMPLE_RATE = int(os.getenv("INPUT_SAMPLE_RATE", "16000"))
INPUT_CHANNELS = int(os.getenv("INPUT_CHANNELS", "1"))
MUMBLE_SAMPLE_RATE = int(os.getenv("MUMBLE_SAMPLE_RATE", "48000"))
FRAME_MS = int(os.getenv("MUMBLE_FRAME_MS", "20"))
MUMBLE_AUDIO_GAIN = float(os.getenv("MUMBLE_AUDIO_GAIN", "1.0"))
MUMBLE_HOST = os.getenv("MUMBLE_HOST", "")
MUMBLE_PORT = int(os.getenv("MUMBLE_PORT", "64738"))
MUMBLE_USERNAME = os.getenv("MUMBLE_USERNAME", f"EVS-{DEVICE_ID}")
MUMBLE_PASSWORD = os.getenv("MUMBLE_PASSWORD", "")
MUMBLE_CERTFILE = os.getenv("MUMBLE_CERTFILE", "").strip()
MUMBLE_KEYFILE = os.getenv("MUMBLE_KEYFILE", "").strip()
MUMBLE_AUTO_CERT = getenv_bool("MUMBLE_AUTO_CERT", True)
MUMBLE_CERT_DIR = os.getenv("MUMBLE_CERT_DIR", "/data/certs").strip()
MUMBLE_CERT_DAYS = int(os.getenv("MUMBLE_CERT_DAYS", "3650"))
MUMBLE_CERT_SUBJECT = os.getenv("MUMBLE_CERT_SUBJECT", "").strip()
MUMBLE_CERT_AUTO_RENEW = getenv_bool("MUMBLE_CERT_AUTO_RENEW", False)
MUMBLE_CERT_RENEW_BEFORE_DAYS = int(os.getenv("MUMBLE_CERT_RENEW_BEFORE_DAYS", "30"))
MUMBLE_CHANNEL = os.getenv("MUMBLE_CHANNEL", "").strip()
MUMBLE_CHANNEL_ID = int(os.getenv("MUMBLE_CHANNEL_ID", "0"))
MUMBLE_RECONNECT_SEC = int(os.getenv("MUMBLE_RECONNECT_SEC", "5"))
MUMBLE_VERBOSE = getenv_bool("MUMBLE_VERBOSE", False)
MUMBLE_CONNECT_TIMEOUT_SEC = int(os.getenv("MUMBLE_CONNECT_TIMEOUT_SEC", "30"))
MUMBLE_CONNECT_STRICT = getenv_bool("MUMBLE_CONNECT_STRICT", False)
BRIDGE_STATS_INTERVAL_SEC = int(os.getenv("BRIDGE_STATS_INTERVAL_SEC", "5"))
VAD_ENABLED = getenv_bool("VAD_ENABLED", False)
VAD_RMS_THRESHOLD = int(os.getenv("VAD_RMS_THRESHOLD", "700"))
VAD_OPEN_FRAMES = int(os.getenv("VAD_OPEN_FRAMES", "2"))
VAD_CLOSE_FRAMES = int(os.getenv("VAD_CLOSE_FRAMES", "20"))
HPF_ENABLED = getenv_bool("HPF_ENABLED", True)
HPF_CUTOFF_HZ = float(os.getenv("HPF_CUTOFF_HZ", "120.0"))
TX_BUFFER_MAX_MS = int(os.getenv("TX_BUFFER_MAX_MS", "800"))
TX_BUFFER_TARGET_MS = int(os.getenv("TX_BUFFER_TARGET_MS", "300"))
def _channel_name(ch) -> str:
name = getattr(ch, "name", None)
if isinstance(name, str) and name:
return name
get_prop = getattr(ch, "get_property", None)
if callable(get_prop):
try:
value = get_prop("name")
if isinstance(value, str):
return value
except Exception:
pass
return ""
def _channel_parent(ch):
parent = getattr(ch, "parent", None)
if parent is not None:
return parent
get_prop = getattr(ch, "get_property", None)
if callable(get_prop):
try:
return get_prop("parent")
except Exception:
pass
return None
def _channel_id(ch):
for attr in ("channel_id", "id"):
value = getattr(ch, attr, None)
if isinstance(value, int):
return value
get_prop = getattr(ch, "get_property", None)
if callable(get_prop):
try:
value = get_prop("channel_id")
if isinstance(value, int):
return value
except Exception:
pass
return None
def resolve_channel(mumble: pymumble.Mumble, channel_expr: str):
# Supports single name ("EVS") and path ("KHNM/EVS").
parts = [p.strip() for p in channel_expr.split("/") if p.strip()]
if not parts:
return None
channels_obj = getattr(mumble, "channels", None)
if channels_obj is None:
return None
values_fn = getattr(channels_obj, "values", None)
if not callable(values_fn):
try:
return channels_obj.find_by_name(channel_expr)
except Exception:
return None
channels = list(values_fn())
if not channels:
return None
if len(parts) == 1:
try:
return channels_obj.find_by_name(parts[0])
except Exception:
return None
# Path traversal with parent matching when possible.
candidates = channels
parent_id = None
for part in parts:
next_candidates = []
for ch in candidates:
if _channel_name(ch) != part:
continue
ch_parent = _channel_parent(ch)
if parent_id is None or ch_parent == parent_id:
next_candidates.append(ch)
if not next_candidates:
# Fallback for trees where parent property is unavailable.
next_candidates = [ch for ch in channels if _channel_name(ch) == part]
if not next_candidates:
return None
chosen = next_candidates[0]
parent_id = _channel_id(chosen)
candidates = channels
return chosen
def resolve_channel_by_id(mumble: pymumble.Mumble, channel_id: int):
if channel_id <= 0:
return None
channels_obj = getattr(mumble, "channels", None)
if channels_obj is None:
return None
values_fn = getattr(channels_obj, "values", None)
if not callable(values_fn):
return None
for ch in values_fn():
if _channel_id(ch) == channel_id:
return ch
return None
def get_current_channel_id(mumble: pymumble.Mumble):
users_obj = getattr(mumble, "users", None)
if users_obj is None:
return None
me = getattr(users_obj, "myself", None)
if me is None:
return None
for attr in ("channel_id", "channel"):
value = getattr(me, attr, None)
if isinstance(value, int):
return value
get_prop = getattr(me, "get_property", None)
if callable(get_prop):
for key in ("channel_id", "channel"):
try:
value = get_prop(key)
if isinstance(value, int):
return value
except Exception:
pass
return None
def connect_mumble() -> pymumble.Mumble:
if not MUMBLE_HOST:
raise RuntimeError("MUMBLE_HOST is required")
log.info(
"connecting mumble: host=%s port=%s user=%s channel=%s channel_id=%s cert=%s key=%s",
MUMBLE_HOST,
MUMBLE_PORT,
MUMBLE_USERNAME,
MUMBLE_CHANNEL or "<unchanged>",
MUMBLE_CHANNEL_ID,
MUMBLE_CERTFILE or "<none>",
MUMBLE_KEYFILE or "<none>",
)
certfile, keyfile = resolve_cert_paths()
ensure_cert_material(certfile, keyfile)
mumble = pymumble.Mumble(
MUMBLE_HOST,
MUMBLE_USERNAME,
password=MUMBLE_PASSWORD or None,
port=MUMBLE_PORT,
certfile=certfile,
keyfile=keyfile,
reconnect=True,
debug=MUMBLE_VERBOSE,
)
mumble.set_application_string("EVS Mumble Bridge")
mumble.start()
deadline = time.time() + MUMBLE_CONNECT_TIMEOUT_SEC
ready = False
while time.time() < deadline:
try:
if mumble.is_ready():
ready = True
break
except Exception:
pass
# Fallback: some servers/auth flows do not expose ready quickly.
# If we already have a valid session, continue instead of reconnect-looping.
users_obj = getattr(mumble, "users", None)
me = getattr(users_obj, "myself", None) if users_obj is not None else None
session = getattr(me, "session", None) if me is not None else None
if isinstance(session, int) and session > 0:
ready = True
log.warning("ready fallback via session detected: session=%s", session)
break
time.sleep(0.1)
if not ready:
if MUMBLE_CONNECT_STRICT:
raise RuntimeError("mumble not ready after connect timeout")
log.warning(
"mumble not ready after %ss, continuing because MUMBLE_CONNECT_STRICT=false",
MUMBLE_CONNECT_TIMEOUT_SEC,
)
try:
mumble.sound_output.set_audio_per_packet(FRAME_MS / 1000.0)
except Exception:
# Optional depending on library version.
pass
if MUMBLE_CHANNEL_ID > 0:
try:
channel = resolve_channel_by_id(mumble, MUMBLE_CHANNEL_ID)
if channel is None:
channels_obj = getattr(mumble, "channels", None)
if channels_obj is not None:
try:
channel = channels_obj[MUMBLE_CHANNEL_ID]
except Exception:
channel = None
if channel is not None:
channel.move_in()
log.info("moved to channel id: %s", MUMBLE_CHANNEL_ID)
else:
log.warning("channel id not found: %s", MUMBLE_CHANNEL_ID)
except Exception:
log.exception("failed to move channel by id")
elif MUMBLE_CHANNEL:
try:
channel = resolve_channel(mumble, MUMBLE_CHANNEL)
if channel is not None:
channel.move_in()
log.info("moved to channel: %s", MUMBLE_CHANNEL)
else:
log.warning("channel not found: %s", MUMBLE_CHANNEL)
except Exception:
log.exception("failed to move channel")
current_channel_id = get_current_channel_id(mumble)
if current_channel_id is not None:
log.info("current mumble channel id: %s", current_channel_id)
return mumble
def resolve_cert_paths() -> Tuple[Optional[str], Optional[str]]:
certfile = MUMBLE_CERTFILE or None
keyfile = MUMBLE_KEYFILE or None
if certfile is None and MUMBLE_AUTO_CERT:
certfile = os.path.join(MUMBLE_CERT_DIR, f"{DEVICE_ID}.crt")
if keyfile is None and MUMBLE_AUTO_CERT:
keyfile = os.path.join(MUMBLE_CERT_DIR, f"{DEVICE_ID}.key")
return certfile, keyfile
def ensure_cert_material(certfile: Optional[str], keyfile: Optional[str]) -> None:
if certfile is None and keyfile is None:
return
cert_exists = bool(certfile and os.path.exists(certfile))
key_exists = bool(keyfile and os.path.exists(keyfile))
should_renew = False
if cert_exists and key_exists and MUMBLE_CERT_AUTO_RENEW:
should_renew = cert_needs_renewal(certfile)
if should_renew:
log.warning(
"cert renewal required: cert=%s renew_before_days=%d",
certfile,
MUMBLE_CERT_RENEW_BEFORE_DAYS,
)
if cert_exists and key_exists and not should_renew:
return
if not MUMBLE_AUTO_CERT:
if certfile and not cert_exists:
raise RuntimeError(f"MUMBLE_CERTFILE not found: {certfile}")
if keyfile and not key_exists:
raise RuntimeError(f"MUMBLE_KEYFILE not found: {keyfile}")
return
if not certfile or not keyfile:
raise RuntimeError("auto cert generation requires both certfile and keyfile paths")
os.makedirs(os.path.dirname(certfile), exist_ok=True)
os.makedirs(os.path.dirname(keyfile), exist_ok=True)
subject = MUMBLE_CERT_SUBJECT or f"/CN={MUMBLE_USERNAME}"
cmd = [
"openssl",
"req",
"-x509",
"-newkey",
"rsa:2048",
"-nodes",
"-keyout",
keyfile,
"-out",
certfile,
"-days",
str(MUMBLE_CERT_DAYS),
"-subj",
subject,
]
log.info("creating self-signed mumble cert: cert=%s key=%s subject=%s", certfile, keyfile, subject)
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as exc:
raise RuntimeError(
f"openssl cert generation failed (rc={exc.returncode}): {exc.stderr.strip()}"
) from exc
def cert_needs_renewal(certfile: str) -> bool:
if MUMBLE_CERT_RENEW_BEFORE_DAYS <= 0:
return False
check_seconds = MUMBLE_CERT_RENEW_BEFORE_DAYS * 86400
cmd = [
"openssl",
"x509",
"-in",
certfile,
"-checkend",
str(check_seconds),
"-noout",
]
try:
# openssl x509 -checkend returns:
# 0 -> certificate valid longer than check_seconds
# 1 -> certificate will expire within check_seconds
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
return result.returncode != 0
except Exception:
log.exception("failed to check cert expiration, forcing renewal")
return True
def open_udp_socket() -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_LISTEN_HOST, UDP_LISTEN_PORT))
sock.settimeout(1.0)
log.info("udp listening: %s:%s", UDP_LISTEN_HOST, UDP_LISTEN_PORT)
return sock
def apply_highpass_pcm16(data: bytes, prev_x: float, prev_y: float, sample_rate: int) -> Tuple[bytes, float, float]:
if len(data) < 2 or sample_rate <= 0:
return data, prev_x, prev_y
if HPF_CUTOFF_HZ <= 0.0:
return data, prev_x, prev_y
# 1st-order DC-block / high-pass:
# y[n] = alpha * (y[n-1] + x[n] - x[n-1])
rc = 1.0 / (2.0 * math.pi * HPF_CUTOFF_HZ)
dt = 1.0 / float(sample_rate)
alpha = rc / (rc + dt)
samples = memoryview(data).cast("h")
out = bytearray(len(data))
out_mv = memoryview(out).cast("h")
x_prev = prev_x
y_prev = prev_y
for i, x in enumerate(samples):
y = alpha * (y_prev + float(x) - x_prev)
if y > 32767.0:
y = 32767.0
elif y < -32768.0:
y = -32768.0
out_mv[i] = int(y)
x_prev = float(x)
y_prev = y
return bytes(out), x_prev, y_prev
def process_audio_chunk(
raw_pcm: bytes,
rate_state: Optional[Tuple],
hp_prev_x: float,
hp_prev_y: float,
) -> Tuple[bytes, Optional[Tuple], float, float]:
data = raw_pcm
if INPUT_SAMPLE_RATE != MUMBLE_SAMPLE_RATE:
data, rate_state = audioop.ratecv(
data,
2, # sample width in bytes (PCM16LE)
INPUT_CHANNELS,
INPUT_SAMPLE_RATE,
MUMBLE_SAMPLE_RATE,
rate_state,
)
if MUMBLE_AUDIO_GAIN != 1.0:
data = audioop.mul(data, 2, MUMBLE_AUDIO_GAIN)
if HPF_ENABLED:
data, hp_prev_x, hp_prev_y = apply_highpass_pcm16(data, hp_prev_x, hp_prev_y, MUMBLE_SAMPLE_RATE)
return data, rate_state, hp_prev_x, hp_prev_y
def run() -> None:
udp = open_udp_socket()
mumble = None
rate_state = None
hp_prev_x = 0.0
hp_prev_y = 0.0
out_buffer = bytearray()
frame_bytes = int((MUMBLE_SAMPLE_RATE * 2 * FRAME_MS) / 1000) # mono, 16-bit
frame_period_s = FRAME_MS / 1000.0
next_send_ts = time.monotonic()
udp_packets = 0
udp_bytes = 0
frames_sent = 0
send_errors = 0
dropped_bytes = 0
dropped_frames = 0
vad_dropped_packets = 0
vad_open = not VAD_ENABLED
vad_voice_frames = 0
vad_silence_frames = 0
stats_t0 = time.time()
last_udp_from = None
while True:
if mumble is None:
try:
mumble = connect_mumble()
log.info("mumble ready")
# Reset ratecv state when reconnecting so timing is clean.
rate_state = None
hp_prev_x = 0.0
hp_prev_y = 0.0
next_send_ts = time.monotonic()
except Exception:
log.exception("mumble connect failed, retrying in %ss", MUMBLE_RECONNECT_SEC)
time.sleep(MUMBLE_RECONNECT_SEC)
continue
try:
packet, addr = udp.recvfrom(8192)
except socket.timeout:
# No UDP data right now; keep loop alive.
packet = None
except Exception:
log.exception("udp receive failed")
continue
now = time.time()
if BRIDGE_STATS_INTERVAL_SEC > 0 and (now - stats_t0) >= BRIDGE_STATS_INTERVAL_SEC:
dt = max(0.001, now - stats_t0)
log.info(
"bridge stats: udp_packets=%d udp_bytes=%d udp_kbps=%.1f frames_sent=%d send_errors=%d buffer_bytes=%d dropped_frames=%d dropped_bytes=%d vad_open=%s vad_dropped=%d last_from=%s",
udp_packets,
udp_bytes,
(udp_bytes * 8.0 / 1000.0) / dt,
frames_sent,
send_errors,
len(out_buffer),
dropped_frames,
dropped_bytes,
vad_open,
vad_dropped_packets,
last_udp_from or "-",
)
udp_packets = 0
udp_bytes = 0
frames_sent = 0
send_errors = 0
dropped_bytes = 0
dropped_frames = 0
vad_dropped_packets = 0
stats_t0 = now
if not packet:
continue
try:
udp_packets += 1
udp_bytes += len(packet)
if last_udp_from != str(addr):
last_udp_from = str(addr)
log.info("udp source: %s", last_udp_from)
if VAD_ENABLED:
rms = audioop.rms(packet, 2)
if rms >= VAD_RMS_THRESHOLD:
vad_voice_frames += 1
vad_silence_frames = 0
if not vad_open and vad_voice_frames >= VAD_OPEN_FRAMES:
vad_open = True
log.info("vad open: rms=%d threshold=%d", rms, VAD_RMS_THRESHOLD)
else:
vad_silence_frames += 1
vad_voice_frames = 0
if vad_open and vad_silence_frames >= VAD_CLOSE_FRAMES:
vad_open = False
out_buffer.clear()
log.info("vad close: rms=%d threshold=%d", rms, VAD_RMS_THRESHOLD)
if not vad_open:
vad_dropped_packets += 1
continue
processed, rate_state, hp_prev_x, hp_prev_y = process_audio_chunk(
packet,
rate_state,
hp_prev_x,
hp_prev_y,
)
out_buffer.extend(processed)
if TX_BUFFER_MAX_MS > 0:
max_bytes = int((MUMBLE_SAMPLE_RATE * 2 * TX_BUFFER_MAX_MS) / 1000)
target_bytes = int((MUMBLE_SAMPLE_RATE * 2 * TX_BUFFER_TARGET_MS) / 1000)
if target_bytes < frame_bytes:
target_bytes = frame_bytes
if len(out_buffer) > max_bytes:
to_drop = len(out_buffer) - target_bytes
# Keep alignment at 16-bit boundaries.
to_drop -= to_drop % 2
if to_drop > 0:
del out_buffer[:to_drop]
dropped_bytes += to_drop
dropped_frames += to_drop // frame_bytes
while len(out_buffer) >= frame_bytes and time.monotonic() >= next_send_ts:
frame = bytes(out_buffer[:frame_bytes])
del out_buffer[:frame_bytes]
mumble.sound_output.add_sound(frame)
frames_sent += 1
next_send_ts += frame_period_s
# Prevent "runaway catchup" if scheduler was stalled.
now_mono = time.monotonic()
if (now_mono - next_send_ts) > (frame_period_s * 5):
next_send_ts = now_mono
except Exception:
send_errors += 1
log.exception("audio processing/send failed")
mumble = None
time.sleep(MUMBLE_RECONNECT_SEC)
if __name__ == "__main__":
run()