diff --git a/api-tests.http b/api-tests.http index 1cd62ba..b6bffbf 100644 --- a/api-tests.http +++ b/api-tests.http @@ -46,6 +46,7 @@ POST {{API}}/cameras/camera1/stop-rtsp ### RTSP stream URL (use with VLC/ffplay): # rtsp://{{host}}:{{rtsp_port}}/camera1 + ### getting a list of all videos GET {{MEDIA}}/videos/?page=10&limit=1 diff --git a/camera-management-api/usda_vision_system/camera/manager.py b/camera-management-api/usda_vision_system/camera/manager.py index a39838a..3302cec 100644 --- a/camera-management-api/usda_vision_system/camera/manager.py +++ b/camera-management-api/usda_vision_system/camera/manager.py @@ -167,7 +167,9 @@ class CameraManager: continue # Create recorder (uses lazy initialization - camera will be initialized when recording starts) - recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system) + # Get corresponding streamer for frame sharing + streamer = self.camera_streamers.get(camera_config.name) + recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system, streamer=streamer) # Add recorder to the list (camera will be initialized lazily when needed) self.camera_recorders[camera_config.name] = recorder @@ -291,6 +293,13 @@ class CameraManager: self.logger.error(f"Camera not found: {camera_name}") return False + # Ensure streamer reference is set (in case recorder was created before streamer) + if not recorder.streamer: + streamer = self.camera_streamers.get(camera_name) + if streamer: + recorder.streamer = streamer + self.logger.debug(f"Updated streamer reference for recorder {camera_name}") + # Update camera settings if provided if exposure_ms is not None or gain is not None or fps is not None: settings_updated = recorder.update_camera_settings(exposure_ms=exposure_ms, gain=gain, target_fps=fps) @@ -444,7 +453,9 @@ class CameraManager: return False # Create new recorder (uses lazy initialization) - recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system) + # Get corresponding streamer for frame sharing + streamer = self.camera_streamers.get(camera_config.name) + recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system, streamer=streamer) # Success - add to recorders (camera will be initialized lazily when needed) self.camera_recorders[camera_name] = recorder @@ -481,6 +492,13 @@ class CameraManager: # Add streamer to the list self.camera_streamers[camera_config.name] = streamer + + # Update recorder's streamer reference if recorder already exists + recorder = self.camera_recorders.get(camera_config.name) + if recorder: + recorder.streamer = streamer + self.logger.debug(f"Updated streamer reference for recorder {camera_config.name}") + self.logger.info(f"Successfully created streamer for camera: {camera_config.name}") except Exception as e: diff --git a/camera-management-api/usda_vision_system/camera/recorder.py b/camera-management-api/usda_vision_system/camera/recorder.py index 797b150..cd00ff1 100644 --- a/camera-management-api/usda_vision_system/camera/recorder.py +++ b/camera-management-api/usda_vision_system/camera/recorder.py @@ -54,12 +54,13 @@ def suppress_camera_errors(): class CameraRecorder: """Handles video recording for a single camera""" - def __init__(self, camera_config: CameraConfig, device_info: Any, state_manager: StateManager, event_system: EventSystem, storage_manager=None): + def __init__(self, camera_config: CameraConfig, device_info: Any, state_manager: StateManager, event_system: EventSystem, storage_manager=None, streamer=None): self.camera_config = camera_config self.device_info = device_info self.state_manager = state_manager self.event_system = event_system self.storage_manager = storage_manager + self.streamer = streamer # Reference to CameraStreamer for frame sharing self.logger = logging.getLogger(f"{__name__}.{camera_config.name}") # Camera handle and properties @@ -476,8 +477,20 @@ class CameraRecorder: self.logger.warning("Already recording!") return False - # Initialize camera if not already initialized (lazy initialization) - if not self.hCamera: + # Check if streamer is active - if so, we can share frames without opening a new camera connection + use_streamer_frames = False + if self.streamer: + if self.streamer.streaming: + self.logger.info("Streamer is active - will share frames instead of opening separate camera connection") + use_streamer_frames = True + # Don't initialize camera - we'll use frames from streamer + else: + self.logger.debug(f"Streamer exists but not streaming (streaming={self.streamer.streaming})") + else: + self.logger.debug("No streamer reference available - will use direct camera capture") + + # Initialize camera only if streamer is not active + if not use_streamer_frames and not self.hCamera: self.logger.info("Camera not initialized, initializing now...") if not self._initialize_camera(): self.logger.error("Failed to initialize camera for recording") @@ -488,10 +501,11 @@ class CameraRecorder: output_path = os.path.join(self.camera_config.storage_path, filename) Path(self.camera_config.storage_path).mkdir(parents=True, exist_ok=True) - # Test camera capture before starting recording - if not self._test_camera_capture(): - self.logger.error("Camera capture test failed") - return False + # Test camera capture before starting recording (only if not using streamer frames) + if not use_streamer_frames: + if not self._test_camera_capture(): + self.logger.error("Camera capture test failed") + return False # Initialize recording state self.output_filename = output_path @@ -499,8 +513,8 @@ class CameraRecorder: self.start_time = now_atlanta() # Use Atlanta timezone self._stop_recording_event.clear() - # Start recording thread - self._recording_thread = threading.Thread(target=self._recording_loop, daemon=True) + # Start recording thread (pass use_streamer_frames flag) + self._recording_thread = threading.Thread(target=self._recording_loop, daemon=True, kwargs={"use_streamer_frames": use_streamer_frames}) self._recording_thread.start() # Update state @@ -535,75 +549,130 @@ class CameraRecorder: with self._lock: if not self.recording: self.logger.warning("Not currently recording") + # Check if thread is still alive (might be a race condition) + if self._recording_thread and self._recording_thread.is_alive(): + self.logger.warning("Recording flag is False but thread is still alive - forcing stop") + self._stop_recording_event.set() + self._recording_thread.join(timeout=5) + self._cleanup_recording() return False - try: - # Signal recording thread to stop - self._stop_recording_event.set() - - # Wait for recording thread to finish - if self._recording_thread and self._recording_thread.is_alive(): - self._recording_thread.join(timeout=5) + # Signal recording thread to stop + self.logger.info("Setting stop event for recording thread...") + self._stop_recording_event.set() + # Save state before releasing lock + thread_to_join = self._recording_thread + output_filename = self.output_filename + start_time = self.start_time + frame_count = self.frame_count + use_streamer_frames = (self.streamer and self.streamer.streaming) if self.streamer else False + + # Release lock while waiting for thread (to avoid deadlock) + # The recording loop might need the lock for cleanup + try: + if thread_to_join and thread_to_join.is_alive(): + self.logger.info("Waiting for recording thread to finish (timeout: 10s)...") + thread_to_join.join(timeout=10) + if thread_to_join.is_alive(): + self.logger.warning("Recording thread did not stop within timeout - may still be running") + else: + self.logger.info("Recording thread stopped successfully") + + # Re-acquire lock for state updates + with self._lock: # Update state self.recording = False # Calculate duration and file size duration = 0 file_size = 0 - if self.start_time: - duration = (now_atlanta() - self.start_time).total_seconds() + if start_time: + duration = (now_atlanta() - start_time).total_seconds() - if self.output_filename and os.path.exists(self.output_filename): - file_size = os.path.getsize(self.output_filename) + if output_filename and os.path.exists(output_filename): + file_size = os.path.getsize(output_filename) # Update state manager - if self.output_filename: - self.state_manager.stop_recording(self.output_filename, file_size, self.frame_count) + if output_filename: + self.state_manager.stop_recording(output_filename, file_size, frame_count) # Publish event - publish_recording_stopped(self.camera_config.name, self.output_filename or "unknown", duration) + publish_recording_stopped(self.camera_config.name, output_filename or "unknown", duration) - # Clean up camera resources after recording (lazy cleanup) - self._cleanup_camera() - self.logger.info("Camera resources cleaned up after recording") + # Clean up camera resources after recording (only if we opened our own camera connection) + # Don't cleanup if we were using streamer frames (streamer owns the camera) + if not use_streamer_frames: + self._cleanup_camera() + self.logger.info("Camera resources cleaned up after recording") + else: + self.logger.info("Skipping camera cleanup - using shared streamer connection") - self.logger.info(f"Stopped recording - Duration: {duration:.1f}s, Frames: {self.frame_count}") + self.logger.info(f"Stopped recording - Duration: {duration:.1f}s, Frames: {frame_count}") return True - except Exception as e: - self.logger.error(f"Error stopping recording: {e}") - return False + except Exception as e: + self.logger.error(f"Error stopping recording: {e}") + import traceback + self.logger.error(f"Traceback: {traceback.format_exc()}") + # Ensure recording flag is cleared even on error + with self._lock: + self.recording = False + return False - def _recording_loop(self) -> None: - """Main recording loop running in separate thread""" + def _recording_loop(self, use_streamer_frames: bool = False) -> None: + """Main recording loop running in separate thread + + Args: + use_streamer_frames: If True, read frames from streamer's frame queue instead of capturing directly + """ try: # Initialize video writer if not self._initialize_video_writer(): self.logger.error("Failed to initialize video writer") return - self.logger.info("Recording loop started") + self.logger.info(f"Recording loop started (using {'streamer frames' if use_streamer_frames else 'direct capture'})") while not self._stop_recording_event.is_set(): try: - # Capture frame - pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 200) # 200ms timeout + if use_streamer_frames and self.streamer: + # Get frame from streamer's recording frame queue (shared frames, doesn't affect MJPEG/RTSP) + # Check if streamer is still active - if not, fall back to direct capture + if not self.streamer.streaming: + self.logger.warning("Streamer stopped while recording - cannot continue with shared frames") + break + + # Check stop event before blocking on queue + if self._stop_recording_event.is_set(): + break + + try: + # Use shorter timeout to check stop event more frequently + frame = self.streamer._recording_frame_queue.get(timeout=0.1) + except Exception: + # Timeout or queue empty - check stop event and continue if not stopping + if self._stop_recording_event.is_set(): + break + continue + else: + # Capture frame directly from camera + pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 200) # 200ms timeout - # Process frame - mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead) + # Process frame + mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead) - # Convert to OpenCV format - frame = self._convert_frame_to_opencv(FrameHead) + # Convert to OpenCV format + frame = self._convert_frame_to_opencv(FrameHead) + + # Release buffer + mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData) # Write frame to video if frame is not None and self.video_writer: self.video_writer.write(frame) self.frame_count += 1 - # Release buffer - mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData) - # Control frame rate (skip sleep if target_fps is 0 for maximum speed) if self.camera_config.target_fps > 0: time.sleep(1.0 / self.camera_config.target_fps) @@ -618,13 +687,16 @@ class CameraRecorder: self.logger.error(f"Error in recording loop: {e}") break - self.logger.info("Recording loop ended") + self.logger.info("Recording loop ended - stop event was set") except Exception as e: self.logger.error(f"Fatal error in recording loop: {e}") publish_recording_error(self.camera_config.name, str(e)) finally: + self.logger.info("Cleaning up recording resources...") self._cleanup_recording() + # Note: Don't set self.recording = False here - let stop_recording() handle it + # to avoid race conditions where stop_recording thinks recording already stopped def _initialize_video_writer(self) -> bool: """Initialize OpenCV video writer""" @@ -709,8 +781,10 @@ class CameraRecorder: if self.video_writer: self.video_writer.release() self.video_writer = None + self.logger.debug("Video writer released") - self.recording = False + # Note: Don't set self.recording = False here - let stop_recording() control the flag + # to maintain proper state synchronization except Exception as e: self.logger.error(f"Error during recording cleanup: {e}") diff --git a/camera-management-api/usda_vision_system/camera/streamer.py b/camera-management-api/usda_vision_system/camera/streamer.py index d2dc6d3..7cf09a1 100644 --- a/camera-management-api/usda_vision_system/camera/streamer.py +++ b/camera-management-api/usda_vision_system/camera/streamer.py @@ -76,8 +76,9 @@ class CameraStreamer: self._rtsp_thread: Optional[threading.Thread] = None self._stop_streaming_event = threading.Event() self._stop_rtsp_event = threading.Event() - self._frame_queue = queue.Queue(maxsize=5) # Buffer for latest frames + self._frame_queue = queue.Queue(maxsize=5) # Buffer for latest frames (for MJPEG streaming) self._rtsp_frame_queue = queue.Queue(maxsize=10) # Buffer for RTSP frames (larger buffer for smoother streaming) + self._recording_frame_queue = queue.Queue(maxsize=30) # Buffer for recording frames (shared with recorder) self._lock = threading.RLock() # Stream settings (optimized for preview) @@ -383,6 +384,21 @@ class CameraStreamer: self._rtsp_frame_queue.put_nowait(frame) except queue.Empty: pass + + # Add frame to recording queue (for concurrent recording) + # Always populate this queue - recorder will consume if needed + try: + # Put frame into recording queue (recorder can consume without affecting MJPEG/RTSP) + frame_copy = frame.copy() if hasattr(frame, 'copy') else frame + self._recording_frame_queue.put_nowait(frame_copy) + except queue.Full: + # Recording queue full - remove oldest and add new + try: + self._recording_frame_queue.get_nowait() + frame_copy = frame.copy() if hasattr(frame, 'copy') else frame + self._recording_frame_queue.put_nowait(frame_copy) + except queue.Empty: + pass # Release buffer mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)