Files
usda-vision/camera-management-api/test_mqtt_simple.py
salirezav de46753f15 Update session summary and clean up unused files
- Added additional notes to SESSION_SUMMARY.md regarding MQTT debugging and enhanced logging.
- Removed outdated SQL seed files related to Phase 2 JC Experiments and Meyer Experiments to streamline the codebase.
- Updated the CLI version in the Supabase configuration for consistency.
- Cleaned up test files in the camera management API to improve maintainability.
2025-11-03 23:07:22 -05:00

165 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Simple MQTT Test Script
This script tests MQTT connectivity and message reception.
It connects to the broker and listens for messages on the configured topics.
Usage:
python test_mqtt_simple.py
"""
import paho.mqtt.client as mqtt
import time
import signal
import sys
from datetime import datetime
# MQTT Configuration (from config.json)
MQTT_BROKER_HOST = "192.168.1.110"
MQTT_BROKER_PORT = 1883
MQTT_USERNAME = None
MQTT_PASSWORD = None
# Topics to monitor (from config.json)
MQTT_TOPICS = {
"vibratory_conveyor": "vision/vibratory_conveyor/state",
"blower_separator": "vision/blower_separator/state"
}
class SimpleMQTTTester:
def __init__(self):
self.client = None
self.message_count = 0
self.running = True
def on_connect(self, client, userdata, flags, rc):
"""Callback when client connects"""
if rc == 0:
print(f"✅ Connected to MQTT broker: {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
# Subscribe to all topics
for machine_name, topic in MQTT_TOPICS.items():
result, mid = client.subscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
print(f"📋 Subscribed to: {topic} (machine: {machine_name})")
else:
print(f"❌ Failed to subscribe to {topic}: {result}")
else:
print(f"❌ Connection failed with return code {rc}")
def on_disconnect(self, client, userdata, rc):
"""Callback when client disconnects"""
if rc != 0:
print(f"⚠️ Unexpected disconnection (rc: {rc})")
else:
print("🔌 Disconnected from broker")
def on_message(self, client, userdata, msg):
"""Callback when a message is received"""
try:
topic = msg.topic
payload = msg.payload.decode("utf-8").strip()
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:-3]
self.message_count += 1
# Find machine name
machine_name = "unknown"
for name, configured_topic in MQTT_TOPICS.items():
if topic == configured_topic:
machine_name = name
break
# Display message
print(f"\n📡 [{timestamp}] Message #{self.message_count}")
print(f" 🏭 Machine: {machine_name}")
print(f" 📍 Topic: {topic}")
print(f" 📄 Payload: '{payload}'")
print(f" 📊 Total messages received: {self.message_count}")
# Check if payload is valid on/off
payload_lower = payload.lower()
if payload_lower in ["on", "off", "true", "false", "1", "0"]:
state = "ON" if payload_lower in ["on", "true", "1"] else "OFF"
print(f" ✅ Valid state: {state}")
else:
print(f" ⚠️ Unusual payload format: '{payload}'")
print("-" * 60)
except Exception as e:
print(f"❌ Error processing message: {e}")
def start(self):
"""Start the MQTT tester"""
print("🧪 Starting MQTT Test")
print("=" * 60)
print(f"Broker: {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
print(f"Topics to monitor: {len(MQTT_TOPICS)}")
for name, topic in MQTT_TOPICS.items():
print(f" - {name}: {topic}")
print("=" * 60)
print("\nWaiting for messages... (Press Ctrl+C to stop)\n")
# Create client
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# Set authentication if provided
if MQTT_USERNAME and MQTT_PASSWORD:
self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# Connect
try:
self.client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, 60)
except Exception as e:
print(f"❌ Failed to connect: {e}")
return False
# Start loop
self.client.loop_start()
# Wait for messages
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
print("\n\n🛑 Stopping test...")
self.running = False
# Cleanup
self.client.loop_stop()
self.client.disconnect()
print(f"\n📊 Test Summary:")
print(f" Total messages received: {self.message_count}")
print("✅ Test completed")
return True
def main():
"""Main entry point"""
tester = SimpleMQTTTester()
# Setup signal handler for graceful shutdown
def signal_handler(sig, frame):
print("\n\n🛑 Received interrupt signal, shutting down...")
tester.running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
success = tester.start()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()