Enhance camera recording functionality with streamer integration

- Updated CameraRecorder to support frame sharing from CameraStreamer, allowing for more efficient video recording.
- Modified CameraManager to ensure streamer references are correctly assigned to recorders.
- Enhanced CameraStreamer to include a recording frame queue for concurrent access during recording.
- Improved logging for better tracking of recording states and streamer activity.
- Updated API tests to include new functionality for retrieving video lists.
This commit is contained in:
salirezav
2025-11-01 13:49:16 -04:00
parent 3cb6a8e76e
commit 43e1dace8c
4 changed files with 156 additions and 47 deletions

View File

@@ -46,6 +46,7 @@ POST {{API}}/cameras/camera1/stop-rtsp
### RTSP stream URL (use with VLC/ffplay): ### RTSP stream URL (use with VLC/ffplay):
# rtsp://{{host}}:{{rtsp_port}}/camera1 # rtsp://{{host}}:{{rtsp_port}}/camera1
### getting a list of all videos ### getting a list of all videos
GET {{MEDIA}}/videos/?page=10&limit=1 GET {{MEDIA}}/videos/?page=10&limit=1

View File

@@ -167,7 +167,9 @@ class CameraManager:
continue continue
# Create recorder (uses lazy initialization - camera will be initialized when recording starts) # Create recorder (uses lazy initialization - camera will be initialized when recording starts)
recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system) # Get corresponding streamer for frame sharing
streamer = self.camera_streamers.get(camera_config.name)
recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system, streamer=streamer)
# Add recorder to the list (camera will be initialized lazily when needed) # Add recorder to the list (camera will be initialized lazily when needed)
self.camera_recorders[camera_config.name] = recorder self.camera_recorders[camera_config.name] = recorder
@@ -291,6 +293,13 @@ class CameraManager:
self.logger.error(f"Camera not found: {camera_name}") self.logger.error(f"Camera not found: {camera_name}")
return False return False
# Ensure streamer reference is set (in case recorder was created before streamer)
if not recorder.streamer:
streamer = self.camera_streamers.get(camera_name)
if streamer:
recorder.streamer = streamer
self.logger.debug(f"Updated streamer reference for recorder {camera_name}")
# Update camera settings if provided # Update camera settings if provided
if exposure_ms is not None or gain is not None or fps is not None: if exposure_ms is not None or gain is not None or fps is not None:
settings_updated = recorder.update_camera_settings(exposure_ms=exposure_ms, gain=gain, target_fps=fps) settings_updated = recorder.update_camera_settings(exposure_ms=exposure_ms, gain=gain, target_fps=fps)
@@ -444,7 +453,9 @@ class CameraManager:
return False return False
# Create new recorder (uses lazy initialization) # Create new recorder (uses lazy initialization)
recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system) # Get corresponding streamer for frame sharing
streamer = self.camera_streamers.get(camera_config.name)
recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system, streamer=streamer)
# Success - add to recorders (camera will be initialized lazily when needed) # Success - add to recorders (camera will be initialized lazily when needed)
self.camera_recorders[camera_name] = recorder self.camera_recorders[camera_name] = recorder
@@ -481,6 +492,13 @@ class CameraManager:
# Add streamer to the list # Add streamer to the list
self.camera_streamers[camera_config.name] = streamer self.camera_streamers[camera_config.name] = streamer
# Update recorder's streamer reference if recorder already exists
recorder = self.camera_recorders.get(camera_config.name)
if recorder:
recorder.streamer = streamer
self.logger.debug(f"Updated streamer reference for recorder {camera_config.name}")
self.logger.info(f"Successfully created streamer for camera: {camera_config.name}") self.logger.info(f"Successfully created streamer for camera: {camera_config.name}")
except Exception as e: except Exception as e:

View File

@@ -54,12 +54,13 @@ def suppress_camera_errors():
class CameraRecorder: class CameraRecorder:
"""Handles video recording for a single camera""" """Handles video recording for a single camera"""
def __init__(self, camera_config: CameraConfig, device_info: Any, state_manager: StateManager, event_system: EventSystem, storage_manager=None): def __init__(self, camera_config: CameraConfig, device_info: Any, state_manager: StateManager, event_system: EventSystem, storage_manager=None, streamer=None):
self.camera_config = camera_config self.camera_config = camera_config
self.device_info = device_info self.device_info = device_info
self.state_manager = state_manager self.state_manager = state_manager
self.event_system = event_system self.event_system = event_system
self.storage_manager = storage_manager self.storage_manager = storage_manager
self.streamer = streamer # Reference to CameraStreamer for frame sharing
self.logger = logging.getLogger(f"{__name__}.{camera_config.name}") self.logger = logging.getLogger(f"{__name__}.{camera_config.name}")
# Camera handle and properties # Camera handle and properties
@@ -476,8 +477,20 @@ class CameraRecorder:
self.logger.warning("Already recording!") self.logger.warning("Already recording!")
return False return False
# Initialize camera if not already initialized (lazy initialization) # Check if streamer is active - if so, we can share frames without opening a new camera connection
if not self.hCamera: use_streamer_frames = False
if self.streamer:
if self.streamer.streaming:
self.logger.info("Streamer is active - will share frames instead of opening separate camera connection")
use_streamer_frames = True
# Don't initialize camera - we'll use frames from streamer
else:
self.logger.debug(f"Streamer exists but not streaming (streaming={self.streamer.streaming})")
else:
self.logger.debug("No streamer reference available - will use direct camera capture")
# Initialize camera only if streamer is not active
if not use_streamer_frames and not self.hCamera:
self.logger.info("Camera not initialized, initializing now...") self.logger.info("Camera not initialized, initializing now...")
if not self._initialize_camera(): if not self._initialize_camera():
self.logger.error("Failed to initialize camera for recording") self.logger.error("Failed to initialize camera for recording")
@@ -488,10 +501,11 @@ class CameraRecorder:
output_path = os.path.join(self.camera_config.storage_path, filename) output_path = os.path.join(self.camera_config.storage_path, filename)
Path(self.camera_config.storage_path).mkdir(parents=True, exist_ok=True) Path(self.camera_config.storage_path).mkdir(parents=True, exist_ok=True)
# Test camera capture before starting recording # Test camera capture before starting recording (only if not using streamer frames)
if not self._test_camera_capture(): if not use_streamer_frames:
self.logger.error("Camera capture test failed") if not self._test_camera_capture():
return False self.logger.error("Camera capture test failed")
return False
# Initialize recording state # Initialize recording state
self.output_filename = output_path self.output_filename = output_path
@@ -499,8 +513,8 @@ class CameraRecorder:
self.start_time = now_atlanta() # Use Atlanta timezone self.start_time = now_atlanta() # Use Atlanta timezone
self._stop_recording_event.clear() self._stop_recording_event.clear()
# Start recording thread # Start recording thread (pass use_streamer_frames flag)
self._recording_thread = threading.Thread(target=self._recording_loop, daemon=True) self._recording_thread = threading.Thread(target=self._recording_loop, daemon=True, kwargs={"use_streamer_frames": use_streamer_frames})
self._recording_thread.start() self._recording_thread.start()
# Update state # Update state
@@ -535,75 +549,130 @@ class CameraRecorder:
with self._lock: with self._lock:
if not self.recording: if not self.recording:
self.logger.warning("Not currently recording") self.logger.warning("Not currently recording")
# Check if thread is still alive (might be a race condition)
if self._recording_thread and self._recording_thread.is_alive():
self.logger.warning("Recording flag is False but thread is still alive - forcing stop")
self._stop_recording_event.set()
self._recording_thread.join(timeout=5)
self._cleanup_recording()
return False return False
try: # Signal recording thread to stop
# Signal recording thread to stop self.logger.info("Setting stop event for recording thread...")
self._stop_recording_event.set() self._stop_recording_event.set()
# Wait for recording thread to finish
if self._recording_thread and self._recording_thread.is_alive():
self._recording_thread.join(timeout=5)
# Save state before releasing lock
thread_to_join = self._recording_thread
output_filename = self.output_filename
start_time = self.start_time
frame_count = self.frame_count
use_streamer_frames = (self.streamer and self.streamer.streaming) if self.streamer else False
# Release lock while waiting for thread (to avoid deadlock)
# The recording loop might need the lock for cleanup
try:
if thread_to_join and thread_to_join.is_alive():
self.logger.info("Waiting for recording thread to finish (timeout: 10s)...")
thread_to_join.join(timeout=10)
if thread_to_join.is_alive():
self.logger.warning("Recording thread did not stop within timeout - may still be running")
else:
self.logger.info("Recording thread stopped successfully")
# Re-acquire lock for state updates
with self._lock:
# Update state # Update state
self.recording = False self.recording = False
# Calculate duration and file size # Calculate duration and file size
duration = 0 duration = 0
file_size = 0 file_size = 0
if self.start_time: if start_time:
duration = (now_atlanta() - self.start_time).total_seconds() duration = (now_atlanta() - start_time).total_seconds()
if self.output_filename and os.path.exists(self.output_filename): if output_filename and os.path.exists(output_filename):
file_size = os.path.getsize(self.output_filename) file_size = os.path.getsize(output_filename)
# Update state manager # Update state manager
if self.output_filename: if output_filename:
self.state_manager.stop_recording(self.output_filename, file_size, self.frame_count) self.state_manager.stop_recording(output_filename, file_size, frame_count)
# Publish event # Publish event
publish_recording_stopped(self.camera_config.name, self.output_filename or "unknown", duration) publish_recording_stopped(self.camera_config.name, output_filename or "unknown", duration)
# Clean up camera resources after recording (lazy cleanup) # Clean up camera resources after recording (only if we opened our own camera connection)
self._cleanup_camera() # Don't cleanup if we were using streamer frames (streamer owns the camera)
self.logger.info("Camera resources cleaned up after recording") if not use_streamer_frames:
self._cleanup_camera()
self.logger.info("Camera resources cleaned up after recording")
else:
self.logger.info("Skipping camera cleanup - using shared streamer connection")
self.logger.info(f"Stopped recording - Duration: {duration:.1f}s, Frames: {self.frame_count}") self.logger.info(f"Stopped recording - Duration: {duration:.1f}s, Frames: {frame_count}")
return True return True
except Exception as e: except Exception as e:
self.logger.error(f"Error stopping recording: {e}") self.logger.error(f"Error stopping recording: {e}")
return False import traceback
self.logger.error(f"Traceback: {traceback.format_exc()}")
# Ensure recording flag is cleared even on error
with self._lock:
self.recording = False
return False
def _recording_loop(self) -> None: def _recording_loop(self, use_streamer_frames: bool = False) -> None:
"""Main recording loop running in separate thread""" """Main recording loop running in separate thread
Args:
use_streamer_frames: If True, read frames from streamer's frame queue instead of capturing directly
"""
try: try:
# Initialize video writer # Initialize video writer
if not self._initialize_video_writer(): if not self._initialize_video_writer():
self.logger.error("Failed to initialize video writer") self.logger.error("Failed to initialize video writer")
return return
self.logger.info("Recording loop started") self.logger.info(f"Recording loop started (using {'streamer frames' if use_streamer_frames else 'direct capture'})")
while not self._stop_recording_event.is_set(): while not self._stop_recording_event.is_set():
try: try:
# Capture frame if use_streamer_frames and self.streamer:
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 200) # 200ms timeout # Get frame from streamer's recording frame queue (shared frames, doesn't affect MJPEG/RTSP)
# Check if streamer is still active - if not, fall back to direct capture
if not self.streamer.streaming:
self.logger.warning("Streamer stopped while recording - cannot continue with shared frames")
break
# Check stop event before blocking on queue
if self._stop_recording_event.is_set():
break
try:
# Use shorter timeout to check stop event more frequently
frame = self.streamer._recording_frame_queue.get(timeout=0.1)
except Exception:
# Timeout or queue empty - check stop event and continue if not stopping
if self._stop_recording_event.is_set():
break
continue
else:
# Capture frame directly from camera
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 200) # 200ms timeout
# Process frame # Process frame
mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead) mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead)
# Convert to OpenCV format # Convert to OpenCV format
frame = self._convert_frame_to_opencv(FrameHead) frame = self._convert_frame_to_opencv(FrameHead)
# Release buffer
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
# Write frame to video # Write frame to video
if frame is not None and self.video_writer: if frame is not None and self.video_writer:
self.video_writer.write(frame) self.video_writer.write(frame)
self.frame_count += 1 self.frame_count += 1
# Release buffer
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
# Control frame rate (skip sleep if target_fps is 0 for maximum speed) # Control frame rate (skip sleep if target_fps is 0 for maximum speed)
if self.camera_config.target_fps > 0: if self.camera_config.target_fps > 0:
time.sleep(1.0 / self.camera_config.target_fps) time.sleep(1.0 / self.camera_config.target_fps)
@@ -618,13 +687,16 @@ class CameraRecorder:
self.logger.error(f"Error in recording loop: {e}") self.logger.error(f"Error in recording loop: {e}")
break break
self.logger.info("Recording loop ended") self.logger.info("Recording loop ended - stop event was set")
except Exception as e: except Exception as e:
self.logger.error(f"Fatal error in recording loop: {e}") self.logger.error(f"Fatal error in recording loop: {e}")
publish_recording_error(self.camera_config.name, str(e)) publish_recording_error(self.camera_config.name, str(e))
finally: finally:
self.logger.info("Cleaning up recording resources...")
self._cleanup_recording() self._cleanup_recording()
# Note: Don't set self.recording = False here - let stop_recording() handle it
# to avoid race conditions where stop_recording thinks recording already stopped
def _initialize_video_writer(self) -> bool: def _initialize_video_writer(self) -> bool:
"""Initialize OpenCV video writer""" """Initialize OpenCV video writer"""
@@ -709,8 +781,10 @@ class CameraRecorder:
if self.video_writer: if self.video_writer:
self.video_writer.release() self.video_writer.release()
self.video_writer = None self.video_writer = None
self.logger.debug("Video writer released")
self.recording = False # Note: Don't set self.recording = False here - let stop_recording() control the flag
# to maintain proper state synchronization
except Exception as e: except Exception as e:
self.logger.error(f"Error during recording cleanup: {e}") self.logger.error(f"Error during recording cleanup: {e}")

View File

@@ -76,8 +76,9 @@ class CameraStreamer:
self._rtsp_thread: Optional[threading.Thread] = None self._rtsp_thread: Optional[threading.Thread] = None
self._stop_streaming_event = threading.Event() self._stop_streaming_event = threading.Event()
self._stop_rtsp_event = threading.Event() self._stop_rtsp_event = threading.Event()
self._frame_queue = queue.Queue(maxsize=5) # Buffer for latest frames self._frame_queue = queue.Queue(maxsize=5) # Buffer for latest frames (for MJPEG streaming)
self._rtsp_frame_queue = queue.Queue(maxsize=10) # Buffer for RTSP frames (larger buffer for smoother streaming) self._rtsp_frame_queue = queue.Queue(maxsize=10) # Buffer for RTSP frames (larger buffer for smoother streaming)
self._recording_frame_queue = queue.Queue(maxsize=30) # Buffer for recording frames (shared with recorder)
self._lock = threading.RLock() self._lock = threading.RLock()
# Stream settings (optimized for preview) # Stream settings (optimized for preview)
@@ -383,6 +384,21 @@ class CameraStreamer:
self._rtsp_frame_queue.put_nowait(frame) self._rtsp_frame_queue.put_nowait(frame)
except queue.Empty: except queue.Empty:
pass pass
# Add frame to recording queue (for concurrent recording)
# Always populate this queue - recorder will consume if needed
try:
# Put frame into recording queue (recorder can consume without affecting MJPEG/RTSP)
frame_copy = frame.copy() if hasattr(frame, 'copy') else frame
self._recording_frame_queue.put_nowait(frame_copy)
except queue.Full:
# Recording queue full - remove oldest and add new
try:
self._recording_frame_queue.get_nowait()
frame_copy = frame.copy() if hasattr(frame, 'copy') else frame
self._recording_frame_queue.put_nowait(frame_copy)
except queue.Empty:
pass
# Release buffer # Release buffer
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData) mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)