import json import logging import os from typing import Any, Dict from flask import Flask, jsonify, render_template, request import paho.mqtt.client as mqtt MQTT_HOST = os.getenv("MQTT_HOST", "127.0.0.1") MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) MQTT_USER = os.getenv("MQTT_USER", "") MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "") MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "evs") PORTAL_BIND_HOST = os.getenv("PORTAL_BIND_HOST", "0.0.0.0") PORTAL_BIND_PORT = int(os.getenv("PORTAL_BIND_PORT", "8088")) LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("evs-control-portal") app = Flask(__name__) mqtt_connected = False mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="evs-control-portal") if MQTT_USER: mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) def _on_connect(_client, _userdata, _flags, reason_code, _properties=None): global mqtt_connected mqtt_connected = reason_code == 0 if mqtt_connected: log.info("mqtt connected: %s:%s", MQTT_HOST, MQTT_PORT) else: log.error("mqtt connect failed: reason_code=%s", reason_code) def _on_disconnect(_client, _userdata, reason_code, _properties=None): global mqtt_connected mqtt_connected = False log.warning("mqtt disconnected: reason_code=%s", reason_code) mqtt_client.on_connect = _on_connect mqtt_client.on_disconnect = _on_disconnect mqtt_client.connect_async(MQTT_HOST, MQTT_PORT, keepalive=30) mqtt_client.loop_start() def _topic_for_device(device_id: str) -> str: return f"{MQTT_BASE_TOPIC}/{device_id}/command" def _publish(device_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: if not mqtt_connected: return { "ok": False, "error": f"mqtt not connected to {MQTT_HOST}:{MQTT_PORT}", "mqtt_connected": False, } topic = _topic_for_device(device_id) encoded = json.dumps(payload, separators=(",", ":")) info = mqtt_client.publish(topic, encoded, qos=0, retain=False) return { "ok": info.rc == mqtt.MQTT_ERR_SUCCESS, "topic": topic, "payload": payload, "mqtt_rc": info.rc, "mqtt_connected": mqtt_connected, } @app.get("/") def index(): return render_template("index.html", mqtt_host=MQTT_HOST, mqtt_port=MQTT_PORT, base_topic=MQTT_BASE_TOPIC) @app.post("/api/publish") def api_publish(): body = request.get_json(force=True, silent=True) or {} device_id = str(body.get("device_id", "")).strip() payload = body.get("payload") if not device_id: return jsonify({"ok": False, "error": "device_id is required"}), 400 if not isinstance(payload, dict): return jsonify({"ok": False, "error": "payload must be a JSON object"}), 400 return jsonify(_publish(device_id, payload)) if __name__ == "__main__": app.run(host=PORTAL_BIND_HOST, port=PORTAL_BIND_PORT, debug=False)