Files
usda-vision/camera-management-api/test_mqtt_simple.py
salirezav 4acad772f9 Update camera management and MQTT logging for improved functionality
- Changed log level in configuration from WARNING to INFO for better visibility of system operations.
- Enhanced StandaloneAutoRecorder initialization to accept camera manager, state manager, and event system for improved modularity.
- Updated recording routes to handle optional request bodies and improved error logging for better debugging.
- Added checks in CameraMonitor to determine if a camera is already in use before initialization, enhancing resource management.
- Improved MQTT client logging to provide more detailed connection and message handling information.
- Added new MQTT event handling capabilities to the VisionApiClient for better tracking of machine states.
2025-11-03 16:56:53 -05:00

163 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Simple MQTT Test Script
This script tests MQTT connectivity and message reception.
It connects to the broker and listens for messages on the configured topics.
Usage:
python test_mqtt_simple.py
"""
import paho.mqtt.client as mqtt
import time
import signal
import sys
from datetime import datetime
# MQTT Configuration (from config.json)
MQTT_BROKER_HOST = "192.168.1.110"
MQTT_BROKER_PORT = 1883
MQTT_USERNAME = None
MQTT_PASSWORD = None
# Topics to monitor (from config.json)
MQTT_TOPICS = {
"vibratory_conveyor": "vision/vibratory_conveyor/state",
"blower_separator": "vision/blower_separator/state"
}
class SimpleMQTTTester:
def __init__(self):
self.client = None
self.message_count = 0
self.running = True
def on_connect(self, client, userdata, flags, rc):
"""Callback when client connects"""
if rc == 0:
print(f"✅ Connected to MQTT broker: {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
# Subscribe to all topics
for machine_name, topic in MQTT_TOPICS.items():
result, mid = client.subscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
print(f"📋 Subscribed to: {topic} (machine: {machine_name})")
else:
print(f"❌ Failed to subscribe to {topic}: {result}")
else:
print(f"❌ Connection failed with return code {rc}")
def on_disconnect(self, client, userdata, rc):
"""Callback when client disconnects"""
if rc != 0:
print(f"⚠️ Unexpected disconnection (rc: {rc})")
else:
print("🔌 Disconnected from broker")
def on_message(self, client, userdata, msg):
"""Callback when a message is received"""
try:
topic = msg.topic
payload = msg.payload.decode("utf-8").strip()
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.message_count += 1
# Find machine name
machine_name = "unknown"
for name, configured_topic in MQTT_TOPICS.items():
if topic == configured_topic:
machine_name = name
break
# Display message
print(f"\n📡 [{timestamp}] Message #{self.message_count}")
print(f" 🏭 Machine: {machine_name}")
print(f" 📍 Topic: {topic}")
print(f" 📄 Payload: '{payload}'")
print(f" 📊 Total messages received: {self.message_count}")
# Check if payload is valid on/off
payload_lower = payload.lower()
if payload_lower in ["on", "off", "true", "false", "1", "0"]:
state = "ON" if payload_lower in ["on", "true", "1"] else "OFF"
print(f" ✅ Valid state: {state}")
else:
print(f" ⚠️ Unusual payload format: '{payload}'")
print("-" * 60)
except Exception as e:
print(f"❌ Error processing message: {e}")
def start(self):
"""Start the MQTT tester"""
print("🧪 Starting MQTT Test")
print("=" * 60)
print(f"Broker: {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
print(f"Topics to monitor: {len(MQTT_TOPICS)}")
for name, topic in MQTT_TOPICS.items():
print(f" - {name}: {topic}")
print("=" * 60)
print("\nWaiting for messages... (Press Ctrl+C to stop)\n")
# Create client
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# Set authentication if provided
if MQTT_USERNAME and MQTT_PASSWORD:
self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# Connect
try:
self.client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, 60)
except Exception as e:
print(f"❌ Failed to connect: {e}")
return False
# Start loop
self.client.loop_start()
# Wait for messages
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
print("\n\n🛑 Stopping test...")
self.running = False
# Cleanup
self.client.loop_stop()
self.client.disconnect()
print(f"\n📊 Test Summary:")
print(f" Total messages received: {self.message_count}")
print("✅ Test completed")
return True
def main():
"""Main entry point"""
tester = SimpleMQTTTester()
# Setup signal handler for graceful shutdown
def signal_handler(sig, frame):
print("\n\n🛑 Received interrupt signal, shutting down...")
tester.running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
success = tester.start()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()