Add USDA Vision Camera Streaming API and related functionality

- Implemented streaming API endpoints for starting, stopping, and retrieving live streams from cameras.
- Added support for concurrent streaming and recording operations.
- Created test scripts for frame conversion and streaming functionality.
- Developed a CameraStreamer class to manage live preview streaming without blocking recording.
- Included error handling and logging for camera operations.
- Added configuration endpoints for camera settings and real-time updates.
- Enhanced testing scenarios for various camera configurations and error handling.
This commit is contained in:
Alireza Vaezi
2025-07-28 18:09:48 -04:00
parent 7bc8138f24
commit ef0f9f85c5
20 changed files with 3594 additions and 4 deletions

View File

@@ -8,5 +8,6 @@ using the camera SDK library (mvsdk).
from .manager import CameraManager
from .recorder import CameraRecorder
from .monitor import CameraMonitor
from .streamer import CameraStreamer
__all__ = ["CameraManager", "CameraRecorder", "CameraMonitor"]
__all__ = ["CameraManager", "CameraRecorder", "CameraMonitor", "CameraStreamer"]

View File

@@ -22,6 +22,7 @@ from ..core.events import EventSystem, EventType, Event, publish_camera_status_c
from ..core.timezone_utils import format_filename_timestamp
from .recorder import CameraRecorder
from .monitor import CameraMonitor
from .streamer import CameraStreamer
from .sdk_config import initialize_sdk_with_suppression
@@ -40,6 +41,7 @@ class CameraManager:
# Camera management
self.available_cameras: List[Any] = [] # mvsdk camera device info
self.camera_recorders: Dict[str, CameraRecorder] = {} # camera_name -> recorder
self.camera_streamers: Dict[str, CameraStreamer] = {} # camera_name -> streamer
self.camera_monitor: Optional[CameraMonitor] = None
# Threading
@@ -71,6 +73,9 @@ class CameraManager:
# Initialize camera recorders
self._initialize_recorders()
# Initialize camera streamers
self._initialize_streamers()
self.logger.info("Camera manager started successfully")
return True
@@ -93,6 +98,12 @@ class CameraManager:
recorder.stop_recording()
recorder.cleanup()
# Stop all active streaming
with self._lock:
for streamer in self.camera_streamers.values():
if streamer.is_streaming():
streamer.stop_streaming()
self.logger.info("Camera manager stopped")
def _discover_cameras(self) -> None:
@@ -427,3 +438,104 @@ class CameraManager:
self.logger.error(f"Error reinitializing camera {camera_name}: {e}")
self.state_manager.update_camera_status(name=camera_name, status="error", device_info={"error": str(e)})
return False
def _initialize_streamers(self) -> None:
"""Initialize camera streamers for configured cameras"""
with self._lock:
for camera_config in self.config.cameras:
if not camera_config.enabled:
continue
try:
# Find matching physical camera
device_info = self._find_camera_device(camera_config.name)
if device_info is None:
self.logger.warning(f"No physical camera found for streaming: {camera_config.name}")
continue
# Create streamer
streamer = CameraStreamer(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system)
# Add streamer to the list
self.camera_streamers[camera_config.name] = streamer
self.logger.info(f"Successfully created streamer for camera: {camera_config.name}")
except Exception as e:
self.logger.error(f"Error initializing streamer for {camera_config.name}: {e}")
def get_camera_streamer(self, camera_name: str) -> Optional[CameraStreamer]:
"""Get camera streamer for a specific camera"""
return self.camera_streamers.get(camera_name)
def start_camera_streaming(self, camera_name: str) -> bool:
"""Start streaming for a specific camera"""
streamer = self.camera_streamers.get(camera_name)
if not streamer:
self.logger.error(f"Camera streamer not found: {camera_name}")
return False
return streamer.start_streaming()
def stop_camera_streaming(self, camera_name: str) -> bool:
"""Stop streaming for a specific camera"""
streamer = self.camera_streamers.get(camera_name)
if not streamer:
self.logger.error(f"Camera streamer not found: {camera_name}")
return False
return streamer.stop_streaming()
def is_camera_streaming(self, camera_name: str) -> bool:
"""Check if a camera is currently streaming"""
streamer = self.camera_streamers.get(camera_name)
if not streamer:
return False
return streamer.is_streaming()
def get_camera_config(self, camera_name: str) -> Optional[CameraConfig]:
"""Get camera configuration"""
return self.config.get_camera_by_name(camera_name)
def update_camera_config(self, camera_name: str, **kwargs) -> bool:
"""Update camera configuration and save to config file"""
try:
# Update the configuration
success = self.config.update_camera_config(camera_name, **kwargs)
if success:
self.logger.info(f"Updated configuration for camera {camera_name}: {kwargs}")
return True
else:
self.logger.error(f"Failed to update configuration for camera {camera_name}")
return False
except Exception as e:
self.logger.error(f"Error updating camera configuration: {e}")
return False
def apply_camera_config(self, camera_name: str) -> bool:
"""Apply current configuration to active camera (requires camera restart)"""
try:
# Get the recorder for this camera
recorder = self.camera_recorders.get(camera_name)
if not recorder:
self.logger.error(f"Camera recorder not found: {camera_name}")
return False
# Stop recording if active
was_recording = recorder.is_recording()
if was_recording:
recorder.stop_recording()
# Reinitialize the camera with new settings
success = self.reinitialize_failed_camera(camera_name)
if success:
self.logger.info(f"Successfully applied configuration to camera {camera_name}")
return True
else:
self.logger.error(f"Failed to apply configuration to camera {camera_name}")
return False
except Exception as e:
self.logger.error(f"Error applying camera configuration: {e}")
return False

View File

@@ -328,6 +328,117 @@ class CameraRecorder:
self.logger.error(f"Error updating camera settings: {e}")
return False
def update_advanced_camera_settings(self, **kwargs) -> bool:
"""Update advanced camera settings dynamically"""
if not self.hCamera:
self.logger.error("Camera not initialized")
return False
try:
settings_updated = False
# Update basic settings
if "exposure_ms" in kwargs and kwargs["exposure_ms"] is not None:
mvsdk.CameraSetAeState(self.hCamera, 0)
exposure_us = int(kwargs["exposure_ms"] * 1000)
mvsdk.CameraSetExposureTime(self.hCamera, exposure_us)
self.camera_config.exposure_ms = kwargs["exposure_ms"]
settings_updated = True
if "gain" in kwargs and kwargs["gain"] is not None:
gain_value = int(kwargs["gain"] * 100)
mvsdk.CameraSetAnalogGain(self.hCamera, gain_value)
self.camera_config.gain = kwargs["gain"]
settings_updated = True
if "target_fps" in kwargs and kwargs["target_fps"] is not None:
self.camera_config.target_fps = kwargs["target_fps"]
settings_updated = True
# Update image quality settings
if "sharpness" in kwargs and kwargs["sharpness"] is not None:
mvsdk.CameraSetSharpness(self.hCamera, kwargs["sharpness"])
self.camera_config.sharpness = kwargs["sharpness"]
settings_updated = True
if "contrast" in kwargs and kwargs["contrast"] is not None:
mvsdk.CameraSetContrast(self.hCamera, kwargs["contrast"])
self.camera_config.contrast = kwargs["contrast"]
settings_updated = True
if "gamma" in kwargs and kwargs["gamma"] is not None:
mvsdk.CameraSetGamma(self.hCamera, kwargs["gamma"])
self.camera_config.gamma = kwargs["gamma"]
settings_updated = True
if "saturation" in kwargs and kwargs["saturation"] is not None and not self.monoCamera:
mvsdk.CameraSetSaturation(self.hCamera, kwargs["saturation"])
self.camera_config.saturation = kwargs["saturation"]
settings_updated = True
# Update noise reduction settings
if "noise_filter_enabled" in kwargs and kwargs["noise_filter_enabled"] is not None:
# Note: Noise filter settings may require camera restart to take effect
self.camera_config.noise_filter_enabled = kwargs["noise_filter_enabled"]
settings_updated = True
if "denoise_3d_enabled" in kwargs and kwargs["denoise_3d_enabled"] is not None:
# Note: 3D denoise settings may require camera restart to take effect
self.camera_config.denoise_3d_enabled = kwargs["denoise_3d_enabled"]
settings_updated = True
# Update color settings (for color cameras)
if not self.monoCamera:
if "auto_white_balance" in kwargs and kwargs["auto_white_balance"] is not None:
mvsdk.CameraSetWbMode(self.hCamera, kwargs["auto_white_balance"])
self.camera_config.auto_white_balance = kwargs["auto_white_balance"]
settings_updated = True
if "color_temperature_preset" in kwargs and kwargs["color_temperature_preset"] is not None:
if not self.camera_config.auto_white_balance:
mvsdk.CameraSetPresetClrTemp(self.hCamera, kwargs["color_temperature_preset"])
self.camera_config.color_temperature_preset = kwargs["color_temperature_preset"]
settings_updated = True
# Update advanced settings
if "anti_flicker_enabled" in kwargs and kwargs["anti_flicker_enabled"] is not None:
mvsdk.CameraSetAntiFlick(self.hCamera, kwargs["anti_flicker_enabled"])
self.camera_config.anti_flicker_enabled = kwargs["anti_flicker_enabled"]
settings_updated = True
if "light_frequency" in kwargs and kwargs["light_frequency"] is not None:
mvsdk.CameraSetLightFrequency(self.hCamera, kwargs["light_frequency"])
self.camera_config.light_frequency = kwargs["light_frequency"]
settings_updated = True
# Update HDR settings (if supported)
if "hdr_enabled" in kwargs and kwargs["hdr_enabled"] is not None:
try:
mvsdk.CameraSetHDR(self.hCamera, 1 if kwargs["hdr_enabled"] else 0)
self.camera_config.hdr_enabled = kwargs["hdr_enabled"]
settings_updated = True
except AttributeError:
self.logger.warning("HDR functions not available in this SDK version")
if "hdr_gain_mode" in kwargs and kwargs["hdr_gain_mode"] is not None:
try:
if self.camera_config.hdr_enabled:
mvsdk.CameraSetHDRGainMode(self.hCamera, kwargs["hdr_gain_mode"])
self.camera_config.hdr_gain_mode = kwargs["hdr_gain_mode"]
settings_updated = True
except AttributeError:
self.logger.warning("HDR gain mode functions not available in this SDK version")
if settings_updated:
updated_settings = [k for k, v in kwargs.items() if v is not None]
self.logger.info(f"Updated camera settings: {updated_settings}")
return settings_updated
except Exception as e:
self.logger.error(f"Error updating advanced camera settings: {e}")
return False
def start_recording(self, filename: str) -> bool:
"""Start video recording"""
with self._lock:

View File

@@ -0,0 +1,320 @@
"""
Camera Streamer for the USDA Vision Camera System.
This module provides live preview streaming from GigE cameras without blocking recording.
It creates a separate camera connection for streaming that doesn't interfere with recording.
"""
import sys
import os
import threading
import time
import logging
import cv2
import numpy as np
import contextlib
from typing import Optional, Dict, Any, Generator
from datetime import datetime
import queue
# Add camera SDK to path
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "camera_sdk"))
import mvsdk
from ..core.config import CameraConfig
from ..core.state_manager import StateManager
from ..core.events import EventSystem
from .sdk_config import ensure_sdk_initialized
@contextlib.contextmanager
def suppress_camera_errors():
"""Context manager to temporarily suppress camera SDK error output"""
# Save original file descriptors
original_stderr = os.dup(2)
original_stdout = os.dup(1)
try:
# Redirect stderr and stdout to devnull
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, 2) # stderr
os.dup2(devnull, 1) # stdout (in case SDK uses stdout)
os.close(devnull)
yield
finally:
# Restore original file descriptors
os.dup2(original_stderr, 2)
os.dup2(original_stdout, 1)
os.close(original_stderr)
os.close(original_stdout)
class CameraStreamer:
"""Provides live preview streaming from cameras without blocking recording"""
def __init__(self, camera_config: CameraConfig, device_info: Any, state_manager: StateManager, event_system: EventSystem):
self.camera_config = camera_config
self.device_info = device_info
self.state_manager = state_manager
self.event_system = event_system
self.logger = logging.getLogger(f"{__name__}.{camera_config.name}")
# Camera handle and properties (separate from recorder)
self.hCamera: Optional[int] = None
self.cap = None
self.monoCamera = False
self.frame_buffer = None
self.frame_buffer_size = 0
# Streaming state
self.streaming = False
self._streaming_thread: Optional[threading.Thread] = None
self._stop_streaming_event = threading.Event()
self._frame_queue = queue.Queue(maxsize=5) # Buffer for latest frames
self._lock = threading.RLock()
# Stream settings (optimized for preview)
self.preview_fps = 10.0 # Lower FPS for preview to reduce load
self.preview_quality = 70 # JPEG quality for streaming
def start_streaming(self) -> bool:
"""Start streaming preview frames"""
with self._lock:
if self.streaming:
self.logger.warning("Streaming already active")
return True
try:
# Initialize camera for streaming
if not self._initialize_camera():
return False
# Start streaming thread
self._stop_streaming_event.clear()
self._streaming_thread = threading.Thread(target=self._streaming_loop, daemon=True)
self._streaming_thread.start()
self.streaming = True
self.logger.info(f"Started streaming for camera: {self.camera_config.name}")
return True
except Exception as e:
self.logger.error(f"Error starting streaming: {e}")
self._cleanup_camera()
return False
def stop_streaming(self) -> bool:
"""Stop streaming preview frames"""
with self._lock:
if not self.streaming:
return True
try:
# Signal streaming thread to stop
self._stop_streaming_event.set()
# Wait for thread to finish
if self._streaming_thread and self._streaming_thread.is_alive():
self._streaming_thread.join(timeout=5.0)
# Cleanup camera resources
self._cleanup_camera()
self.streaming = False
self.logger.info(f"Stopped streaming for camera: {self.camera_config.name}")
return True
except Exception as e:
self.logger.error(f"Error stopping streaming: {e}")
return False
def get_latest_frame(self) -> Optional[bytes]:
"""Get the latest frame as JPEG bytes for streaming"""
try:
# Get latest frame from queue (non-blocking)
frame = self._frame_queue.get_nowait()
# Encode as JPEG
_, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.preview_quality])
return buffer.tobytes()
except queue.Empty:
return None
except Exception as e:
self.logger.error(f"Error getting latest frame: {e}")
return None
def get_frame_generator(self) -> Generator[bytes, None, None]:
"""Generator for MJPEG streaming"""
while self.streaming:
frame_bytes = self.get_latest_frame()
if frame_bytes:
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame_bytes + b"\r\n")
else:
time.sleep(0.1) # Wait a bit if no frame available
def _initialize_camera(self) -> bool:
"""Initialize camera for streaming (separate from recording)"""
try:
self.logger.info(f"Initializing camera for streaming: {self.camera_config.name}")
# Ensure SDK is initialized
ensure_sdk_initialized()
# Check if device_info is valid
if self.device_info is None:
self.logger.error("No device info provided for camera initialization")
return False
# Initialize camera (suppress output to avoid MVCAMAPI error messages)
with suppress_camera_errors():
self.hCamera = mvsdk.CameraInit(self.device_info, -1, -1)
self.logger.info("Camera initialized successfully for streaming")
# Get camera capabilities
self.cap = mvsdk.CameraGetCapability(self.hCamera)
# Determine if camera is monochrome
self.monoCamera = self.cap.sIspCapacity.bMonoSensor != 0
# Set output format based on camera type and bit depth
if self.monoCamera:
mvsdk.CameraSetIspOutFormat(self.hCamera, mvsdk.CAMERA_MEDIA_TYPE_MONO8)
else:
mvsdk.CameraSetIspOutFormat(self.hCamera, mvsdk.CAMERA_MEDIA_TYPE_BGR8)
# Configure camera settings for streaming (optimized for preview)
self._configure_streaming_settings()
# Allocate frame buffer
bytes_per_pixel = 1 if self.monoCamera else 3
self.frame_buffer_size = self.cap.sResolutionRange.iWidthMax * self.cap.sResolutionRange.iHeightMax * bytes_per_pixel
self.frame_buffer = mvsdk.CameraAlignMalloc(self.frame_buffer_size, 16)
# Start camera
mvsdk.CameraPlay(self.hCamera)
self.logger.info("Camera started successfully for streaming")
return True
except Exception as e:
self.logger.error(f"Error initializing camera for streaming: {e}")
self._cleanup_camera()
return False
def _configure_streaming_settings(self):
"""Configure camera settings optimized for streaming"""
try:
# Set trigger mode to free run for continuous streaming
mvsdk.CameraSetTriggerMode(self.hCamera, 0)
# Set exposure (use a reasonable default for preview)
exposure_us = int(self.camera_config.exposure_ms * 1000)
mvsdk.CameraSetExposureTime(self.hCamera, exposure_us)
# Set gain
mvsdk.CameraSetAnalogGain(self.hCamera, int(self.camera_config.gain))
# Set frame rate for streaming (lower than recording)
if hasattr(mvsdk, "CameraSetFrameSpeed"):
mvsdk.CameraSetFrameSpeed(self.hCamera, int(self.preview_fps))
self.logger.info(f"Streaming settings configured: exposure={self.camera_config.exposure_ms}ms, gain={self.camera_config.gain}, fps={self.preview_fps}")
except Exception as e:
self.logger.warning(f"Could not configure some streaming settings: {e}")
def _streaming_loop(self):
"""Main streaming loop that captures frames continuously"""
self.logger.info("Starting streaming loop")
try:
while not self._stop_streaming_event.is_set():
try:
# Capture frame with timeout
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 200) # 200ms timeout
# Process frame
mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead)
# Convert to OpenCV format
frame = self._convert_frame_to_opencv(FrameHead)
if frame is not None:
# Add frame to queue (replace oldest if queue is full)
try:
self._frame_queue.put_nowait(frame)
except queue.Full:
# Remove oldest frame and add new one
try:
self._frame_queue.get_nowait()
self._frame_queue.put_nowait(frame)
except queue.Empty:
pass
# Release buffer
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
# Control frame rate
time.sleep(1.0 / self.preview_fps)
except Exception as e:
if not self._stop_streaming_event.is_set():
self.logger.error(f"Error in streaming loop: {e}")
time.sleep(0.1) # Brief pause before retrying
except Exception as e:
self.logger.error(f"Fatal error in streaming loop: {e}")
finally:
self.logger.info("Streaming loop ended")
def _convert_frame_to_opencv(self, FrameHead) -> Optional[np.ndarray]:
"""Convert camera frame to OpenCV format"""
try:
# Convert the frame buffer memory address to a proper buffer
# that numpy can work with using mvsdk.c_ubyte
frame_data_buffer = (mvsdk.c_ubyte * FrameHead.uBytes).from_address(self.frame_buffer)
if self.monoCamera:
# Monochrome camera
frame_data = np.frombuffer(frame_data_buffer, dtype=np.uint8)
frame = frame_data.reshape((FrameHead.iHeight, FrameHead.iWidth))
# Convert to 3-channel for consistency
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
else:
# Color camera (BGR format)
frame_data = np.frombuffer(frame_data_buffer, dtype=np.uint8)
frame = frame_data.reshape((FrameHead.iHeight, FrameHead.iWidth, 3))
return frame
except Exception as e:
self.logger.error(f"Error converting frame: {e}")
return None
def _cleanup_camera(self):
"""Clean up camera resources"""
try:
if self.frame_buffer:
mvsdk.CameraAlignFree(self.frame_buffer)
self.frame_buffer = None
if self.hCamera is not None:
mvsdk.CameraUnInit(self.hCamera)
self.hCamera = None
self.logger.info("Camera resources cleaned up for streaming")
except Exception as e:
self.logger.error(f"Error cleaning up camera resources: {e}")
def is_streaming(self) -> bool:
"""Check if streaming is active"""
return self.streaming
def __del__(self):
"""Destructor to ensure cleanup"""
if self.streaming:
self.stop_streaming()