""" MQTT Client for the USDA Vision Camera System. This module provides MQTT connectivity and message handling for machine state updates. """ import threading import time import logging from typing import Dict, Optional, Any import paho.mqtt.client as mqtt from ..core.config import Config, MQTTConfig from ..core.state_manager import StateManager from ..core.events import EventSystem, EventType from .handlers import MQTTMessageHandler class MQTTClient: """MQTT client for receiving machine state updates""" def __init__(self, config: Config, state_manager: StateManager, event_system: EventSystem): self.config = config self.mqtt_config = config.mqtt self.state_manager = state_manager self.event_system = event_system self.logger = logging.getLogger(__name__) # MQTT client self.client: Optional[mqtt.Client] = None self.connected = False self.running = False # Threading self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() # Message handler self.message_handler = MQTTMessageHandler(state_manager, event_system) # Connection retry settings self.reconnect_delay = 5 # seconds self.max_reconnect_attempts = 10 # Topic mapping (topic -> machine_name) self.topic_to_machine = {topic: machine_name for machine_name, topic in self.mqtt_config.topics.items()} # Status tracking self.start_time = None self.message_count = 0 self.error_count = 0 self.last_message_time = None def start(self) -> bool: """Start the MQTT client in a separate thread""" if self.running: self.logger.warning("MQTT client is already running") return True self.logger.info("Starting MQTT client...") self.running = True self._stop_event.clear() self.start_time = time.time() # Start in separate thread self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start() # Wait a moment to see if connection succeeds time.sleep(2) return self.connected def stop(self) -> None: """Stop the MQTT client""" if not self.running: return self.logger.info("Stopping MQTT client...") self.running = False self._stop_event.set() if self.client and self.connected: self.client.disconnect() if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) self.logger.info("MQTT client stopped") def _run_loop(self) -> None: """Main MQTT client loop""" reconnect_attempts = 0 while self.running and not self._stop_event.is_set(): try: if not self.connected: if self._connect(): reconnect_attempts = 0 else: reconnect_attempts += 1 if reconnect_attempts >= self.max_reconnect_attempts: self.logger.error(f"Max reconnection attempts ({self.max_reconnect_attempts}) reached") break self.logger.warning(f"Reconnection attempt {reconnect_attempts}/{self.max_reconnect_attempts} in {self.reconnect_delay}s") if self._stop_event.wait(self.reconnect_delay): break continue # Process MQTT messages if self.client: self.client.loop(timeout=1.0) # Small delay to prevent busy waiting if self._stop_event.wait(0.1): break except Exception as e: self.logger.error(f"Error in MQTT loop: {e}") self.connected = False if self._stop_event.wait(self.reconnect_delay): break self.running = False self.logger.info("MQTT client loop ended") def _connect(self) -> bool: """Connect to MQTT broker""" try: # Create new client instance self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) # Set callbacks self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.client.on_message = self._on_message # Set authentication if provided if self.mqtt_config.username and self.mqtt_config.password: self.client.username_pw_set(self.mqtt_config.username, self.mqtt_config.password) # Connect to broker self.logger.info(f"Connecting to MQTT broker at {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}") self.client.connect(self.mqtt_config.broker_host, self.mqtt_config.broker_port, 60) return True except Exception as e: self.logger.error(f"Failed to connect to MQTT broker: {e}") return False def _subscribe_to_topics(self) -> None: """Subscribe to all configured topics""" if not self.client or not self.connected: return for machine_name, topic in self.mqtt_config.topics.items(): try: result, mid = self.client.subscribe(topic) if result == mqtt.MQTT_ERR_SUCCESS: self.logger.info(f"📋 MQTT SUBSCRIBED: {topic} (machine: {machine_name})") print(f"📋 MQTT SUBSCRIBED: {machine_name} → {topic}") else: self.logger.error(f"❌ MQTT SUBSCRIPTION FAILED: {topic}") print(f"❌ MQTT SUBSCRIPTION FAILED: {topic}") except Exception as e: self.logger.error(f"Error subscribing to topic {topic}: {e}") def _on_connect(self, client, userdata, flags, rc) -> None: """Callback for when the client connects to the broker""" if rc == 0: self.connected = True self.state_manager.set_mqtt_connected(True) self.event_system.publish(EventType.MQTT_CONNECTED, "mqtt_client") self.logger.info(f"🔗 MQTT CONNECTED to broker successfully at {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}") print(f"🔗 MQTT CONNECTED: {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}") # Subscribe to topics immediately after connection self._subscribe_to_topics() self.logger.info(f"📋 MQTT subscribed to {len(self.mqtt_config.topics)} topics") else: self.connected = False self.logger.error(f"❌ MQTT CONNECTION FAILED with return code {rc} to {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port}") print(f"❌ MQTT CONNECTION FAILED: {self.mqtt_config.broker_host}:{self.mqtt_config.broker_port} (code: {rc})") def _on_disconnect(self, client, userdata, rc) -> None: """Callback for when the client disconnects from the broker""" self.connected = False self.state_manager.set_mqtt_connected(False) self.event_system.publish(EventType.MQTT_DISCONNECTED, "mqtt_client") if rc != 0: self.logger.warning(f"⚠️ MQTT DISCONNECTED unexpectedly (rc: {rc})") print(f"⚠️ MQTT DISCONNECTED: Unexpected disconnection (code: {rc})") else: self.logger.info("🔌 MQTT DISCONNECTED gracefully") print("🔌 MQTT DISCONNECTED: Graceful disconnection") def _on_message(self, client, userdata, msg) -> None: """Callback for when a message is received""" try: topic = msg.topic payload = msg.payload.decode("utf-8").strip() # Log at INFO level so we can see messages in production self.logger.info(f"📡 MQTT MESSAGE RECEIVED - Topic: {topic}, Payload: '{payload}'") # Update MQTT activity and tracking self.state_manager.update_mqtt_activity() self.message_count += 1 self.last_message_time = time.time() # Get machine name from topic machine_name = self.topic_to_machine.get(topic) if not machine_name: self.logger.warning(f"❓ MQTT UNKNOWN TOPIC: {topic} (payload: '{payload}')") print(f"❓ MQTT UNKNOWN TOPIC: {topic} (payload: '{payload}')") return # Show MQTT message on console with machine name print(f"📡 MQTT MESSAGE: {machine_name} → {payload}") self.logger.info(f"📡 Processing MQTT message for machine '{machine_name}': '{payload}'") # Handle the message self.message_handler.handle_message(machine_name, topic, payload) except Exception as e: self.error_count += 1 self.logger.error(f"❌ Error processing MQTT message: {e}", exc_info=True) def publish_message(self, topic: str, payload: str, qos: int = 0, retain: bool = False) -> bool: """Publish a message to MQTT broker""" if not self.client or not self.connected: self.logger.warning("Cannot publish: MQTT client not connected") return False try: result = self.client.publish(topic, payload, qos, retain) if result.rc == mqtt.MQTT_ERR_SUCCESS: self.logger.debug(f"Published message to {topic}: {payload}") return True else: self.logger.error(f"Failed to publish message to {topic}") return False except Exception as e: self.logger.error(f"Error publishing message: {e}") return False def get_status(self) -> Dict[str, Any]: """Get MQTT client status""" uptime_seconds = None last_message_time_str = None if self.start_time: uptime_seconds = time.time() - self.start_time if self.last_message_time: from datetime import datetime last_message_time_str = datetime.fromtimestamp(self.last_message_time).isoformat() return {"connected": self.connected, "running": self.running, "broker_host": self.mqtt_config.broker_host, "broker_port": self.mqtt_config.broker_port, "subscribed_topics": list(self.mqtt_config.topics.values()), "topic_mappings": self.topic_to_machine, "message_count": self.message_count, "error_count": self.error_count, "last_message_time": last_message_time_str, "uptime_seconds": uptime_seconds} def is_connected(self) -> bool: """Check if MQTT client is connected""" return self.connected def is_running(self) -> bool: """Check if MQTT client is running""" return self.running