feat: Add MQTT publisher and tester scripts for USDA Vision Camera System

- Implemented mqtt_publisher_test.py for manual MQTT message publishing
- Created mqtt_test.py to test MQTT message reception and display statistics
- Developed test_api_changes.py to verify API changes for camera settings and filename handling
- Added test_camera_recovery_api.py for testing camera recovery API endpoints
- Introduced test_max_fps.py to demonstrate maximum FPS capture functionality
- Implemented test_mqtt_events_api.py to test MQTT events API endpoint
- Created test_mqtt_logging.py for enhanced MQTT logging and API endpoint testing
- Added sdk_config.py for SDK initialization and configuration with error suppression
This commit is contained in:
Alireza Vaezi
2025-07-28 16:30:14 -04:00
parent e2acebc056
commit 9cb043ef5f
40 changed files with 4485 additions and 838 deletions

View File

@@ -15,6 +15,7 @@ from datetime import datetime
# Add the current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_imports():
"""Test that all modules can be imported"""
print("Testing imports...")
@@ -27,46 +28,49 @@ def test_imports():
from usda_vision_system.storage.manager import StorageManager
from usda_vision_system.api.server import APIServer
from usda_vision_system.main import USDAVisionSystem
print("✅ All imports successful")
return True
except Exception as e:
print(f"❌ Import failed: {e}")
return False
def test_configuration():
"""Test configuration loading"""
print("\nTesting configuration...")
try:
from usda_vision_system.core.config import Config
# Test default config
config = Config()
print(f"✅ Default config loaded")
print(f" MQTT broker: {config.mqtt.broker_host}:{config.mqtt.broker_port}")
print(f" Storage path: {config.storage.base_path}")
print(f" Cameras configured: {len(config.cameras)}")
# Test config file if it exists
if os.path.exists("config.json"):
config_file = Config("config.json")
print(f"✅ Config file loaded")
return True
except Exception as e:
print(f"❌ Configuration test failed: {e}")
return False
def test_camera_discovery():
"""Test camera discovery"""
print("\nTesting camera discovery...")
try:
sys.path.append('./python demo')
sys.path.append("./python demo")
import mvsdk
devices = mvsdk.CameraEnumerateDevice()
print(f"✅ Camera discovery successful")
print(f" Found {len(devices)} camera(s)")
for i, device in enumerate(devices):
try:
name = device.GetFriendlyName()
@@ -74,13 +78,14 @@ def test_camera_discovery():
print(f" Camera {i}: {name} ({port_type})")
except Exception as e:
print(f" Camera {i}: Error getting info - {e}")
return True
except Exception as e:
print(f"❌ Camera discovery failed: {e}")
print(" Make sure GigE cameras are connected and python demo library is available")
return False
def test_storage_setup():
"""Test storage directory setup"""
print("\nTesting storage setup...")
@@ -88,22 +93,23 @@ def test_storage_setup():
from usda_vision_system.core.config import Config
from usda_vision_system.storage.manager import StorageManager
from usda_vision_system.core.state_manager import StateManager
config = Config()
state_manager = StateManager()
storage_manager = StorageManager(config, state_manager)
# Test storage statistics
stats = storage_manager.get_storage_statistics()
print(f"✅ Storage manager initialized")
print(f" Base path: {stats.get('base_path', 'Unknown')}")
print(f" Total files: {stats.get('total_files', 0)}")
return True
except Exception as e:
print(f"❌ Storage setup failed: {e}")
return False
def test_mqtt_config():
"""Test MQTT configuration (without connecting)"""
print("\nTesting MQTT configuration...")
@@ -112,45 +118,47 @@ def test_mqtt_config():
from usda_vision_system.mqtt.client import MQTTClient
from usda_vision_system.core.state_manager import StateManager
from usda_vision_system.core.events import EventSystem
config = Config()
state_manager = StateManager()
event_system = EventSystem()
mqtt_client = MQTTClient(config, state_manager, event_system)
status = mqtt_client.get_status()
print(f"✅ MQTT client initialized")
print(f" Broker: {status['broker_host']}:{status['broker_port']}")
print(f" Topics: {len(status['subscribed_topics'])}")
for topic in status['subscribed_topics']:
for topic in status["subscribed_topics"]:
print(f" - {topic}")
return True
except Exception as e:
print(f"❌ MQTT configuration test failed: {e}")
return False
def test_system_initialization():
"""Test full system initialization (without starting)"""
print("\nTesting system initialization...")
try:
from usda_vision_system.main import USDAVisionSystem
# Create system instance
system = USDAVisionSystem()
# Check system status
status = system.get_system_status()
print(f"✅ System initialized successfully")
print(f" Running: {status['running']}")
print(f" Components initialized: {len(status['components'])}")
return True
except Exception as e:
print(f"❌ System initialization failed: {e}")
return False
def test_api_endpoints():
"""Test API endpoints if server is running"""
print("\nTesting API endpoints...")
@@ -159,7 +167,7 @@ def test_api_endpoints():
response = requests.get("http://localhost:8000/health", timeout=5)
if response.status_code == 200:
print("✅ API server is running")
# Test system status endpoint
try:
response = requests.get("http://localhost:8000/system/status", timeout=5)
@@ -172,7 +180,7 @@ def test_api_endpoints():
print(f"⚠️ System status endpoint returned {response.status_code}")
except Exception as e:
print(f"⚠️ System status test failed: {e}")
return True
else:
print(f"⚠️ API server returned status {response.status_code}")
@@ -184,34 +192,27 @@ def test_api_endpoints():
print(f"❌ API test failed: {e}")
return False
def main():
"""Run all tests"""
print("USDA Vision Camera System - Test Suite")
print("=" * 50)
tests = [
test_imports,
test_configuration,
test_camera_discovery,
test_storage_setup,
test_mqtt_config,
test_system_initialization,
test_api_endpoints
]
tests = [test_imports, test_configuration, test_camera_discovery, test_storage_setup, test_mqtt_config, test_system_initialization, test_api_endpoints]
passed = 0
total = len(tests)
for test in tests:
try:
if test():
passed += 1
except Exception as e:
print(f"❌ Test {test.__name__} crashed: {e}")
print("\n" + "=" * 50)
print(f"Test Results: {passed}/{total} tests passed")
if passed == total:
print("🎉 All tests passed! System appears to be working correctly.")
return 0
@@ -219,5 +220,6 @@ def main():
print("⚠️ Some tests failed. Check the output above for details.")
return 1
if __name__ == "__main__":
sys.exit(main())