Add 'api/' from commit '14ac229098e65aa643f84e8e17e0c5f1aaf8d639'

git-subtree-dir: api
git-subtree-mainline: 4743f19aef
git-subtree-split: 14ac229098
This commit is contained in:
Alireza Vaezi
2025-08-07 20:57:34 -04:00
146 changed files with 31249 additions and 0 deletions

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Test script to verify the frame conversion fix works correctly.
"""
import sys
import os
import numpy as np
# Add the current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Add camera SDK to path
sys.path.append(os.path.join(os.path.dirname(__file__), "camera_sdk"))
try:
import mvsdk
print("✅ mvsdk imported successfully")
except ImportError as e:
print(f"❌ Failed to import mvsdk: {e}")
sys.exit(1)
def test_frame_conversion():
"""Test the frame conversion logic"""
print("🧪 Testing frame conversion logic...")
# Simulate frame data
width, height = 640, 480
frame_size = width * height * 3 # RGB
# Create mock frame data
mock_frame_data = np.random.randint(0, 255, frame_size, dtype=np.uint8)
# Create a mock frame buffer (simulate memory address)
frame_buffer = mock_frame_data.ctypes.data
# Create mock FrameHead
class MockFrameHead:
def __init__(self):
self.iWidth = width
self.iHeight = height
self.uBytes = frame_size
frame_head = MockFrameHead()
try:
# Test the conversion logic (similar to what's in streamer.py)
frame_data_buffer = (mvsdk.c_ubyte * frame_head.uBytes).from_address(frame_buffer)
frame_data = np.frombuffer(frame_data_buffer, dtype=np.uint8)
frame = frame_data.reshape((frame_head.iHeight, frame_head.iWidth, 3))
print(f"✅ Frame conversion successful!")
print(f" Frame shape: {frame.shape}")
print(f" Frame dtype: {frame.dtype}")
print(f" Frame size: {frame.size} bytes")
return True
except Exception as e:
print(f"❌ Frame conversion failed: {e}")
return False
def main():
print("🔧 Frame Conversion Test")
print("=" * 40)
success = test_frame_conversion()
if success:
print("\n✅ Frame conversion fix is working correctly!")
print("📋 The streaming issue should be resolved after system restart.")
else:
print("\n❌ Frame conversion fix needs more work.")
print("\n💡 To apply the fix:")
print("1. Restart the USDA vision system")
print("2. Test streaming again")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Test script to demonstrate maximum FPS capture functionality.
"""
import requests
import json
import time
from datetime import datetime
BASE_URL = "http://localhost:8000"
def test_fps_modes():
"""Test different FPS modes to demonstrate the functionality"""
print("=" * 60)
print("Testing Maximum FPS Capture Functionality")
print("=" * 60)
# Test configurations
test_configs = [
{
"name": "Normal FPS (3.0)",
"data": {
"filename": "normal_fps_test.avi",
"exposure_ms": 1.0,
"gain": 3.0,
"fps": 3.0
}
},
{
"name": "High FPS (10.0)",
"data": {
"filename": "high_fps_test.avi",
"exposure_ms": 0.5,
"gain": 2.0,
"fps": 10.0
}
},
{
"name": "Maximum FPS (fps=0)",
"data": {
"filename": "max_fps_test.avi",
"exposure_ms": 0.1, # Very short exposure for max speed
"gain": 1.0, # Low gain to avoid overexposure
"fps": 0 # Maximum speed - no delay
}
},
{
"name": "Default FPS (omitted)",
"data": {
"filename": "default_fps_test.avi",
"exposure_ms": 1.0,
"gain": 3.0
# fps omitted - uses camera config default
}
}
]
for i, config in enumerate(test_configs, 1):
print(f"\n{i}. Testing {config['name']}")
print("-" * 40)
# Start recording
try:
response = requests.post(
f"{BASE_URL}/cameras/camera1/start-recording",
json=config['data'],
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
if result.get('success'):
print(f"✅ Recording started successfully")
print(f" Filename: {result.get('filename')}")
print(f" Settings: {json.dumps(config['data'], indent=6)}")
# Record for a short time
print(f" Recording for 3 seconds...")
time.sleep(3)
# Stop recording
stop_response = requests.post(f"{BASE_URL}/cameras/camera1/stop-recording")
if stop_response.status_code == 200:
stop_result = stop_response.json()
if stop_result.get('success'):
print(f"✅ Recording stopped successfully")
if 'duration_seconds' in stop_result:
print(f" Duration: {stop_result['duration_seconds']:.1f}s")
else:
print(f"❌ Failed to stop recording: {stop_result.get('message')}")
else:
print(f"❌ Stop request failed: {stop_response.status_code}")
else:
print(f"❌ Recording failed: {result.get('message')}")
else:
print(f"❌ Request failed: {response.status_code} - {response.text}")
except requests.exceptions.ConnectionError:
print(f"❌ Could not connect to {BASE_URL}")
print("Make sure the API server is running with: python main.py")
break
except Exception as e:
print(f"❌ Error: {e}")
# Wait between tests
if i < len(test_configs):
print(" Waiting 2 seconds before next test...")
time.sleep(2)
print("\n" + "=" * 60)
print("FPS Test Summary:")
print("=" * 60)
print("• fps > 0: Controlled frame rate with sleep delay")
print("• fps = 0: MAXIMUM speed capture (no delay between frames)")
print("• fps omitted: Uses camera config default")
print("• Video files with fps=0 are saved with 30 FPS metadata")
print("• Actual capture rate with fps=0 depends on:")
print(" - Camera hardware capabilities")
print(" - Exposure time (shorter = faster)")
print(" - Processing overhead")
print("=" * 60)
if __name__ == "__main__":
print("USDA Vision Camera System - Maximum FPS Test")
print("This script demonstrates fps=0 for maximum capture speed")
print("\nMake sure the system is running with: python main.py")
test_fps_modes()

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
Test script for camera streaming functionality.
This script tests the new streaming capabilities without interfering with recording.
"""
import sys
import os
import time
import requests
import threading
from datetime import datetime
# Add the current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_api_endpoints():
"""Test the streaming API endpoints"""
base_url = "http://localhost:8000"
print("🧪 Testing Camera Streaming API Endpoints")
print("=" * 50)
# Test system status
try:
response = requests.get(f"{base_url}/system/status", timeout=5)
if response.status_code == 200:
print("✅ System status endpoint working")
data = response.json()
print(f" System: {data.get('status', 'Unknown')}")
print(f" Camera Manager: {'Running' if data.get('camera_manager_running') else 'Stopped'}")
else:
print(f"❌ System status endpoint failed: {response.status_code}")
except Exception as e:
print(f"❌ System status endpoint error: {e}")
# Test camera list
try:
response = requests.get(f"{base_url}/cameras", timeout=5)
if response.status_code == 200:
print("✅ Camera list endpoint working")
cameras = response.json()
print(f" Found {len(cameras)} cameras: {list(cameras.keys())}")
# Test streaming for each camera
for camera_name in cameras.keys():
test_camera_streaming(base_url, camera_name)
else:
print(f"❌ Camera list endpoint failed: {response.status_code}")
except Exception as e:
print(f"❌ Camera list endpoint error: {e}")
def test_camera_streaming(base_url, camera_name):
"""Test streaming for a specific camera"""
print(f"\n🎥 Testing streaming for {camera_name}")
print("-" * 30)
# Test start streaming
try:
response = requests.post(f"{base_url}/cameras/{camera_name}/start-stream", timeout=10)
if response.status_code == 200:
print(f"✅ Start stream endpoint working for {camera_name}")
data = response.json()
print(f" Response: {data.get('message', 'No message')}")
else:
print(f"❌ Start stream failed for {camera_name}: {response.status_code}")
print(f" Error: {response.text}")
return
except Exception as e:
print(f"❌ Start stream error for {camera_name}: {e}")
return
# Wait a moment for stream to initialize
time.sleep(2)
# Test stream endpoint (just check if it responds)
try:
response = requests.get(f"{base_url}/cameras/{camera_name}/stream", timeout=5, stream=True)
if response.status_code == 200:
print(f"✅ Stream endpoint responding for {camera_name}")
print(f" Content-Type: {response.headers.get('content-type', 'Unknown')}")
# Read a small amount of data to verify it's working
chunk_count = 0
for chunk in response.iter_content(chunk_size=1024):
chunk_count += 1
if chunk_count >= 3: # Read a few chunks then stop
break
print(f" Received {chunk_count} data chunks")
else:
print(f"❌ Stream endpoint failed for {camera_name}: {response.status_code}")
except Exception as e:
print(f"❌ Stream endpoint error for {camera_name}: {e}")
# Test stop streaming
try:
response = requests.post(f"{base_url}/cameras/{camera_name}/stop-stream", timeout=5)
if response.status_code == 200:
print(f"✅ Stop stream endpoint working for {camera_name}")
data = response.json()
print(f" Response: {data.get('message', 'No message')}")
else:
print(f"❌ Stop stream failed for {camera_name}: {response.status_code}")
except Exception as e:
print(f"❌ Stop stream error for {camera_name}: {e}")
def test_concurrent_recording_and_streaming():
"""Test that streaming doesn't interfere with recording"""
base_url = "http://localhost:8000"
print("\n🔄 Testing Concurrent Recording and Streaming")
print("=" * 50)
try:
# Get available cameras
response = requests.get(f"{base_url}/cameras", timeout=5)
if response.status_code != 200:
print("❌ Cannot get camera list for concurrent test")
return
cameras = response.json()
if not cameras:
print("❌ No cameras available for concurrent test")
return
camera_name = list(cameras.keys())[0] # Use first camera
print(f"Using camera: {camera_name}")
# Start streaming
print("1. Starting streaming...")
response = requests.post(f"{base_url}/cameras/{camera_name}/start-stream", timeout=10)
if response.status_code != 200:
print(f"❌ Failed to start streaming: {response.text}")
return
time.sleep(2)
# Start recording
print("2. Starting recording...")
response = requests.post(f"{base_url}/cameras/{camera_name}/start-recording",
json={"filename": "test_concurrent_recording.avi"}, timeout=10)
if response.status_code == 200:
print("✅ Recording started successfully while streaming")
else:
print(f"❌ Failed to start recording while streaming: {response.text}")
# Let both run for a few seconds
print("3. Running both streaming and recording for 5 seconds...")
time.sleep(5)
# Stop recording
print("4. Stopping recording...")
response = requests.post(f"{base_url}/cameras/{camera_name}/stop-recording", timeout=5)
if response.status_code == 200:
print("✅ Recording stopped successfully")
else:
print(f"❌ Failed to stop recording: {response.text}")
# Stop streaming
print("5. Stopping streaming...")
response = requests.post(f"{base_url}/cameras/{camera_name}/stop-stream", timeout=5)
if response.status_code == 200:
print("✅ Streaming stopped successfully")
else:
print(f"❌ Failed to stop streaming: {response.text}")
print("✅ Concurrent test completed successfully!")
except Exception as e:
print(f"❌ Concurrent test error: {e}")
def main():
"""Main test function"""
print("🚀 USDA Vision Camera Streaming Test")
print("=" * 50)
print(f"Test started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# Wait for system to be ready
print("⏳ Waiting for system to be ready...")
time.sleep(3)
# Run tests
test_api_endpoints()
test_concurrent_recording_and_streaming()
print("\n" + "=" * 50)
print("🏁 Test completed!")
print("\n📋 Next Steps:")
print("1. Open camera_preview.html in your browser")
print("2. Click 'Start Stream' for any camera")
print("3. Verify live preview works without blocking recording")
print("4. Test concurrent recording and streaming")
if __name__ == "__main__":
main()