Files
usda-vision/camera-management-api/usda_vision_system/mqtt/client.py
salirezav 5070d9b2ca Enhance media API transcoding and video streaming capabilities
- Added support for limiting concurrent transcoding operations in the media API to prevent resource exhaustion.
- Implemented functions to retrieve video duration and bitrate using ffprobe for improved streaming performance.
- Enhanced the generate_transcoded_stream function to handle HTTP range requests, allowing for more efficient video playback.
- Updated VideoModal component to disable fluid and responsive modes, ensuring proper container boundaries during video playback.
- Improved logging throughout the transcoding process for better error tracking and debugging.
2025-11-04 11:55:27 -05:00

271 lines
11 KiB
Python

"""
MQTT Client for the USDA Vision Camera System.
This module provides MQTT connectivity and message handling for machine state updates.
"""
import threading
import time
import logging
from typing import Dict, Optional, Any
import paho.mqtt.client as mqtt
from ..core.config import Config, MQTTConfig
from ..core.state_manager import StateManager
from ..core.events import EventSystem, EventType
from .handlers import MQTTMessageHandler
class MQTTClient:
"""MQTT client for receiving machine state updates"""
def __init__(self, config: Config, state_manager: StateManager, event_system: EventSystem):
self.config = config
self.mqtt_config = config.mqtt
self.state_manager = state_manager
self.event_system = event_system
self.logger = logging.getLogger(__name__)
# MQTT client
self.client: Optional[mqtt.Client] = None
self.connected = False
self.running = False
# Threading
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Message handler
self.message_handler = MQTTMessageHandler(state_manager, event_system)
# Connection retry settings
self.reconnect_delay = 5 # seconds
self.max_reconnect_attempts = 10
# Topic mapping (topic -> machine_name)
self.topic_to_machine = {topic: machine_name for machine_name, topic in self.mqtt_config.topics.items()}
# Status tracking
self.start_time = None
self.message_count = 0
self.error_count = 0
self.last_message_time = None
def start(self) -> bool:
"""Start the MQTT client in a separate thread"""
if self.running:
self.logger.warning("MQTT client is already running")
return True
self.logger.info("Starting MQTT client...")
self.running = True
self._stop_event.clear()
self.start_time = time.time()
# Start in separate thread
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
# Wait a moment to see if connection succeeds
time.sleep(2)
return self.connected
def stop(self) -> None:
"""Stop the MQTT client"""
if not self.running:
return
self.logger.info("Stopping MQTT client...")
self.running = False
self._stop_event.set()
if self.client and self.connected:
self.client.disconnect()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
self.logger.info("MQTT client stopped")
def _run_loop(self) -> None:
"""Main MQTT client loop"""
reconnect_attempts = 0
while self.running and not self._stop_event.is_set():
try:
if not self.connected:
if self._connect():
reconnect_attempts = 0
else:
reconnect_attempts += 1
if reconnect_attempts >= self.max_reconnect_attempts:
self.logger.error(f"Max reconnection attempts ({self.max_reconnect_attempts}) reached")
break
self.logger.warning(f"Reconnection attempt {reconnect_attempts}/{self.max_reconnect_attempts} in {self.reconnect_delay}s")
if self._stop_event.wait(self.reconnect_delay):
break
continue
# Process MQTT messages
if self.client:
self.client.loop(timeout=1.0)
# Small delay to prevent busy waiting
if self._stop_event.wait(0.1):
break
except Exception as e:
self.logger.error(f"Error in MQTT loop: {e}")
self.connected = False
if self._stop_event.wait(self.reconnect_delay):
break
self.running = False
self.logger.info("MQTT client loop ended")
def _connect(self) -> bool:
"""Connect to MQTT broker"""
try:
# Create new client instance
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
# Set callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
# Set authentication if provided
if self.mqtt_config.username and self.mqtt_config.password:
self.client.username_pw_set(self.mqtt_config.username, self.mqtt_config.password)
# Connect to broker
self.logger.info(f"Connecting to MQTT broker at {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}")
self.client.connect(self.mqtt_config.broker_host, self.mqtt_config.broker_port, 60)
return True
except Exception as e:
self.logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def _subscribe_to_topics(self) -> None:
"""Subscribe to all configured topics"""
if not self.client or not self.connected:
return
for machine_name, topic in self.mqtt_config.topics.items():
try:
result, mid = self.client.subscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
self.logger.info(f"📋 MQTT SUBSCRIBED: {topic} (machine: {machine_name})")
print(f"📋 MQTT SUBSCRIBED: {machine_name}{topic}")
else:
self.logger.error(f"❌ MQTT SUBSCRIPTION FAILED: {topic}")
print(f"❌ MQTT SUBSCRIPTION FAILED: {topic}")
except Exception as e:
self.logger.error(f"Error subscribing to topic {topic}: {e}")
def _on_connect(self, client, userdata, flags, rc) -> None:
"""Callback for when the client connects to the broker"""
if rc == 0:
self.connected = True
self.state_manager.set_mqtt_connected(True)
self.event_system.publish(EventType.MQTT_CONNECTED, "mqtt_client")
self.logger.info(f"🔗 MQTT CONNECTED to broker successfully at {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}")
print(f"🔗 MQTT CONNECTED: {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}")
# Subscribe to topics immediately after connection
self._subscribe_to_topics()
self.logger.info(f"📋 MQTT subscribed to {len(self.mqtt_config.topics)} topics")
else:
self.connected = False
self.logger.error(f"❌ MQTT CONNECTION FAILED with return code {rc} to {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}")
print(f"❌ MQTT CONNECTION FAILED: {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port} (code: {rc})")
def _on_disconnect(self, client, userdata, rc) -> None:
"""Callback for when the client disconnects from the broker"""
self.connected = False
self.state_manager.set_mqtt_connected(False)
self.event_system.publish(EventType.MQTT_DISCONNECTED, "mqtt_client")
if rc != 0:
self.logger.warning(f"⚠️ MQTT DISCONNECTED unexpectedly (rc: {rc})")
print(f"⚠️ MQTT DISCONNECTED: Unexpected disconnection (code: {rc})")
else:
self.logger.info("🔌 MQTT DISCONNECTED gracefully")
print("🔌 MQTT DISCONNECTED: Graceful disconnection")
def _on_message(self, client, userdata, msg) -> None:
"""Callback for when a message is received"""
try:
topic = msg.topic
payload = msg.payload.decode("utf-8").strip()
# Log at INFO level so we can see messages in production
self.logger.info(f"📡 MQTT MESSAGE RECEIVED - Topic: {topic}, Payload: '{payload}'")
# Update MQTT activity and tracking
self.state_manager.update_mqtt_activity()
self.message_count += 1
self.last_message_time = time.time()
# Get machine name from topic
machine_name = self.topic_to_machine.get(topic)
if not machine_name:
self.logger.warning(f"❓ MQTT UNKNOWN TOPIC: {topic} (payload: '{payload}')")
print(f"❓ MQTT UNKNOWN TOPIC: {topic} (payload: '{payload}')")
return
# Show MQTT message on console with machine name
print(f"📡 MQTT MESSAGE: {machine_name}{payload}")
self.logger.info(f"📡 Processing MQTT message for machine '{machine_name}': '{payload}'")
# Handle the message
self.message_handler.handle_message(machine_name, topic, payload)
except Exception as e:
self.error_count += 1
self.logger.error(f"❌ Error processing MQTT message: {e}", exc_info=True)
def publish_message(self, topic: str, payload: str, qos: int = 0, retain: bool = False) -> bool:
"""Publish a message to MQTT broker"""
if not self.client or not self.connected:
self.logger.warning("Cannot publish: MQTT client not connected")
return False
try:
result = self.client.publish(topic, payload, qos, retain)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.logger.debug(f"Published message to {topic}: {payload}")
return True
else:
self.logger.error(f"Failed to publish message to {topic}")
return False
except Exception as e:
self.logger.error(f"Error publishing message: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""Get MQTT client status"""
uptime_seconds = None
last_message_time_str = None
if self.start_time:
uptime_seconds = time.time() - self.start_time
if self.last_message_time:
from datetime import datetime
last_message_time_str = datetime.fromtimestamp(self.last_message_time).isoformat()
return {"connected": self.connected, "running": self.running, "broker_host": self.mqtt_config.broker_host, "broker_port": self.mqtt_config.broker_port, "subscribed_topics": list(self.mqtt_config.topics.values()), "topic_mappings": self.topic_to_machine, "message_count": self.message_count, "error_count": self.error_count, "last_message_time": last_message_time_str, "uptime_seconds": uptime_seconds}
def is_connected(self) -> bool:
"""Check if MQTT client is connected"""
return self.connected
def is_running(self) -> bool:
"""Check if MQTT client is running"""
return self.running