Massive update - API and other modules added

This commit is contained in:
Alireza Vaezi
2025-07-25 21:39:07 -04:00
parent 172f46d44d
commit f6d6ba612e
70 changed files with 7276 additions and 15 deletions

View File

@@ -0,0 +1,12 @@
"""
Camera module for the USDA Vision Camera System.
This module handles GigE camera discovery, management, monitoring, and recording
using the python demo library (mvsdk).
"""
from .manager import CameraManager
from .recorder import CameraRecorder
from .monitor import CameraMonitor
__all__ = ["CameraManager", "CameraRecorder", "CameraMonitor"]

View File

@@ -0,0 +1,320 @@
"""
Camera Manager for the USDA Vision Camera System.
This module manages GigE camera discovery, initialization, and coordination
with the recording system based on machine state changes.
"""
import sys
import os
import threading
import logging
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime
# Add python demo to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python demo'))
import mvsdk
from ..core.config import Config, CameraConfig
from ..core.state_manager import StateManager, CameraStatus
from ..core.events import EventSystem, EventType, Event, publish_camera_status_changed
from ..core.timezone_utils import format_filename_timestamp
from .recorder import CameraRecorder
from .monitor import CameraMonitor
class CameraManager:
"""Manages all cameras in the system"""
def __init__(self, config: Config, state_manager: StateManager, event_system: EventSystem):
self.config = config
self.state_manager = state_manager
self.event_system = event_system
self.logger = logging.getLogger(__name__)
# Camera management
self.available_cameras: List[Any] = [] # mvsdk camera device info
self.camera_recorders: Dict[str, CameraRecorder] = {} # camera_name -> recorder
self.camera_monitor: Optional[CameraMonitor] = None
# Threading
self._lock = threading.RLock()
self.running = False
# Subscribe to machine state changes
self.event_system.subscribe(EventType.MACHINE_STATE_CHANGED, self._on_machine_state_changed)
# Initialize camera discovery
self._discover_cameras()
# Create camera monitor
self.camera_monitor = CameraMonitor(
config=config,
state_manager=state_manager,
event_system=event_system,
camera_manager=self
)
def start(self) -> bool:
"""Start the camera manager"""
if self.running:
self.logger.warning("Camera manager is already running")
return True
self.logger.info("Starting camera manager...")
self.running = True
# Start camera monitor
if self.camera_monitor:
self.camera_monitor.start()
# Initialize camera recorders
self._initialize_recorders()
self.logger.info("Camera manager started successfully")
return True
def stop(self) -> None:
"""Stop the camera manager"""
if not self.running:
return
self.logger.info("Stopping camera manager...")
self.running = False
# Stop camera monitor
if self.camera_monitor:
self.camera_monitor.stop()
# Stop all active recordings
with self._lock:
for recorder in self.camera_recorders.values():
if recorder.is_recording():
recorder.stop_recording()
recorder.cleanup()
self.logger.info("Camera manager stopped")
def _discover_cameras(self) -> None:
"""Discover available GigE cameras"""
try:
self.logger.info("Discovering GigE cameras...")
# Enumerate cameras using mvsdk
device_list = mvsdk.CameraEnumerateDevice()
self.available_cameras = device_list
self.logger.info(f"Found {len(device_list)} camera(s)")
for i, dev_info in enumerate(device_list):
try:
name = dev_info.GetFriendlyName()
port_type = dev_info.GetPortType()
serial = getattr(dev_info, 'acSn', 'Unknown')
self.logger.info(f" Camera {i}: {name} ({port_type}) - Serial: {serial}")
# Update state manager with discovered camera
camera_name = f"camera{i+1}" # Default naming
self.state_manager.update_camera_status(
name=camera_name,
status="available",
device_info={
"friendly_name": name,
"port_type": port_type,
"serial_number": serial,
"device_index": i
}
)
except Exception as e:
self.logger.error(f"Error processing camera {i}: {e}")
except Exception as e:
self.logger.error(f"Error discovering cameras: {e}")
self.available_cameras = []
def _initialize_recorders(self) -> None:
"""Initialize camera recorders for configured cameras"""
with self._lock:
for camera_config in self.config.cameras:
if not camera_config.enabled:
continue
try:
# Find matching physical camera
device_info = self._find_camera_device(camera_config.name)
if device_info is None:
self.logger.warning(f"No physical camera found for configured camera: {camera_config.name}")
continue
# Create recorder
recorder = CameraRecorder(
camera_config=camera_config,
device_info=device_info,
state_manager=self.state_manager,
event_system=self.event_system
)
self.camera_recorders[camera_config.name] = recorder
self.logger.info(f"Initialized recorder for camera: {camera_config.name}")
except Exception as e:
self.logger.error(f"Error initializing recorder for {camera_config.name}: {e}")
def _find_camera_device(self, camera_name: str) -> Optional[Any]:
"""Find physical camera device for a configured camera"""
# For now, use simple mapping: camera1 -> device 0, camera2 -> device 1, etc.
# This could be enhanced to use serial numbers or other identifiers
camera_index_map = {
"camera1": 0,
"camera2": 1,
"camera3": 2,
"camera4": 3
}
device_index = camera_index_map.get(camera_name)
if device_index is not None and device_index < len(self.available_cameras):
return self.available_cameras[device_index]
return None
def _on_machine_state_changed(self, event: Event) -> None:
"""Handle machine state change events"""
try:
machine_name = event.data.get("machine_name")
new_state = event.data.get("state")
if not machine_name or not new_state:
return
self.logger.info(f"Handling machine state change: {machine_name} -> {new_state}")
# Find camera associated with this machine
camera_config = None
for config in self.config.cameras:
if config.machine_topic == machine_name:
camera_config = config
break
if not camera_config:
self.logger.warning(f"No camera configured for machine: {machine_name}")
return
# Get the recorder for this camera
recorder = self.camera_recorders.get(camera_config.name)
if not recorder:
self.logger.warning(f"No recorder found for camera: {camera_config.name}")
return
# Handle state change
if new_state == "on":
self._start_recording(camera_config.name, recorder)
elif new_state in ["off", "error"]:
self._stop_recording(camera_config.name, recorder)
except Exception as e:
self.logger.error(f"Error handling machine state change: {e}")
def _start_recording(self, camera_name: str, recorder: CameraRecorder) -> None:
"""Start recording for a camera"""
try:
if recorder.is_recording():
self.logger.info(f"Camera {camera_name} is already recording")
return
# Generate filename with Atlanta timezone timestamp
timestamp = format_filename_timestamp()
filename = f"{camera_name}_recording_{timestamp}.avi"
# Start recording
success = recorder.start_recording(filename)
if success:
self.logger.info(f"Started recording for camera {camera_name}: {filename}")
else:
self.logger.error(f"Failed to start recording for camera {camera_name}")
except Exception as e:
self.logger.error(f"Error starting recording for {camera_name}: {e}")
def _stop_recording(self, camera_name: str, recorder: CameraRecorder) -> None:
"""Stop recording for a camera"""
try:
if not recorder.is_recording():
self.logger.info(f"Camera {camera_name} is not recording")
return
# Stop recording
success = recorder.stop_recording()
if success:
self.logger.info(f"Stopped recording for camera {camera_name}")
else:
self.logger.error(f"Failed to stop recording for camera {camera_name}")
except Exception as e:
self.logger.error(f"Error stopping recording for {camera_name}: {e}")
def get_camera_status(self, camera_name: str) -> Optional[Dict[str, Any]]:
"""Get status of a specific camera"""
recorder = self.camera_recorders.get(camera_name)
if not recorder:
return None
return recorder.get_status()
def get_all_camera_status(self) -> Dict[str, Dict[str, Any]]:
"""Get status of all cameras"""
status = {}
with self._lock:
for camera_name, recorder in self.camera_recorders.items():
status[camera_name] = recorder.get_status()
return status
def manual_start_recording(self, camera_name: str, filename: Optional[str] = None) -> bool:
"""Manually start recording for a camera"""
recorder = self.camera_recorders.get(camera_name)
if not recorder:
self.logger.error(f"Camera not found: {camera_name}")
return False
if not filename:
timestamp = format_filename_timestamp()
filename = f"{camera_name}_manual_{timestamp}.avi"
return recorder.start_recording(filename)
def manual_stop_recording(self, camera_name: str) -> bool:
"""Manually stop recording for a camera"""
recorder = self.camera_recorders.get(camera_name)
if not recorder:
self.logger.error(f"Camera not found: {camera_name}")
return False
return recorder.stop_recording()
def get_available_cameras(self) -> List[Dict[str, Any]]:
"""Get list of available physical cameras"""
cameras = []
for i, dev_info in enumerate(self.available_cameras):
try:
cameras.append({
"index": i,
"name": dev_info.GetFriendlyName(),
"port_type": dev_info.GetPortType(),
"serial_number": getattr(dev_info, 'acSn', 'Unknown')
})
except Exception as e:
self.logger.error(f"Error getting info for camera {i}: {e}")
return cameras
def refresh_camera_discovery(self) -> int:
"""Refresh camera discovery and return number of cameras found"""
self._discover_cameras()
return len(self.available_cameras)
def is_running(self) -> bool:
"""Check if camera manager is running"""
return self.running

View File

@@ -0,0 +1,267 @@
"""
Camera Monitor for the USDA Vision Camera System.
This module monitors camera status and availability at regular intervals.
"""
import sys
import os
import threading
import time
import logging
from typing import Dict, List, Optional, Any
# Add python demo to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python demo'))
import mvsdk
from ..core.config import Config
from ..core.state_manager import StateManager, CameraStatus
from ..core.events import EventSystem, publish_camera_status_changed
class CameraMonitor:
"""Monitors camera status and availability"""
def __init__(self, config: Config, state_manager: StateManager, event_system: EventSystem, camera_manager=None):
self.config = config
self.state_manager = state_manager
self.event_system = event_system
self.camera_manager = camera_manager # Reference to camera manager
self.logger = logging.getLogger(__name__)
# Monitoring settings
self.check_interval = config.system.camera_check_interval_seconds
# Threading
self.running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Status tracking
self.last_check_time: Optional[float] = None
self.check_count = 0
self.error_count = 0
def start(self) -> bool:
"""Start camera monitoring"""
if self.running:
self.logger.warning("Camera monitor is already running")
return True
self.logger.info(f"Starting camera monitor (check interval: {self.check_interval}s)")
self.running = True
self._stop_event.clear()
# Start monitoring thread
self._thread = threading.Thread(target=self._monitoring_loop, daemon=True)
self._thread.start()
return True
def stop(self) -> None:
"""Stop camera monitoring"""
if not self.running:
return
self.logger.info("Stopping camera monitor...")
self.running = False
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
self.logger.info("Camera monitor stopped")
def _monitoring_loop(self) -> None:
"""Main monitoring loop"""
self.logger.info("Camera monitoring loop started")
while self.running and not self._stop_event.is_set():
try:
self.last_check_time = time.time()
self.check_count += 1
# Check all configured cameras
self._check_all_cameras()
# Wait for next check
if self._stop_event.wait(self.check_interval):
break
except Exception as e:
self.error_count += 1
self.logger.error(f"Error in camera monitoring loop: {e}")
# Wait a bit before retrying
if self._stop_event.wait(min(self.check_interval, 10)):
break
self.logger.info("Camera monitoring loop ended")
def _check_all_cameras(self) -> None:
"""Check status of all configured cameras"""
for camera_config in self.config.cameras:
if not camera_config.enabled:
continue
try:
self._check_camera_status(camera_config.name)
except Exception as e:
self.logger.error(f"Error checking camera {camera_config.name}: {e}")
def _check_camera_status(self, camera_name: str) -> None:
"""Check status of a specific camera"""
try:
# Get current status from state manager
current_info = self.state_manager.get_camera_status(camera_name)
# Perform actual camera check
status, details, device_info = self._perform_camera_check(camera_name)
# Update state if changed
old_status = current_info.status.value if current_info else "unknown"
if old_status != status:
self.state_manager.update_camera_status(
name=camera_name,
status=status,
error=details if status == "error" else None,
device_info=device_info
)
# Publish status change event
publish_camera_status_changed(
camera_name=camera_name,
status=status,
details=details
)
self.logger.info(f"Camera {camera_name} status changed: {old_status} -> {status}")
except Exception as e:
self.logger.error(f"Error checking camera {camera_name}: {e}")
# Update to error state
self.state_manager.update_camera_status(
name=camera_name,
status="error",
error=str(e)
)
def _perform_camera_check(self, camera_name: str) -> tuple[str, str, Optional[Dict[str, Any]]]:
"""Perform actual camera availability check"""
try:
# Get camera device info from camera manager
if not self.camera_manager:
return "error", "Camera manager not available", None
device_info = self.camera_manager._find_camera_device(camera_name)
if not device_info:
return "disconnected", "Camera device not found", None
# Check if camera is already opened by another process
if mvsdk.CameraIsOpened(device_info):
# Camera is opened - check if it's our recorder
recorder = self.camera_manager.camera_recorders.get(camera_name)
if recorder and recorder.hCamera:
return "available", "Camera initialized and ready", self._get_device_info_dict(device_info)
else:
return "busy", "Camera opened by another process", self._get_device_info_dict(device_info)
# Try to initialize camera briefly to test availability
try:
hCamera = mvsdk.CameraInit(device_info, -1, -1)
# Quick test - try to get one frame
try:
mvsdk.CameraSetTriggerMode(hCamera, 0)
mvsdk.CameraPlay(hCamera)
# Try to capture with short timeout
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(hCamera, 500)
mvsdk.CameraReleaseImageBuffer(hCamera, pRawData)
# Success - camera is available
mvsdk.CameraUnInit(hCamera)
return "available", "Camera test successful", self._get_device_info_dict(device_info)
except mvsdk.CameraException as e:
mvsdk.CameraUnInit(hCamera)
if e.error_code == mvsdk.CAMERA_STATUS_TIME_OUT:
return "available", "Camera available but slow response", self._get_device_info_dict(device_info)
else:
return "error", f"Camera test failed: {e.message}", self._get_device_info_dict(device_info)
except mvsdk.CameraException as e:
return "error", f"Camera initialization failed: {e.message}", self._get_device_info_dict(device_info)
except Exception as e:
return "error", f"Camera check failed: {str(e)}", None
def _get_device_info_dict(self, device_info) -> Dict[str, Any]:
"""Convert device info to dictionary"""
try:
return {
"friendly_name": device_info.GetFriendlyName(),
"port_type": device_info.GetPortType(),
"serial_number": getattr(device_info, 'acSn', 'Unknown'),
"last_checked": time.time()
}
except Exception as e:
self.logger.error(f"Error getting device info: {e}")
return {"error": str(e)}
def check_camera_now(self, camera_name: str) -> Dict[str, Any]:
"""Manually check a specific camera status"""
try:
status, details, device_info = self._perform_camera_check(camera_name)
# Update state
self.state_manager.update_camera_status(
name=camera_name,
status=status,
error=details if status == "error" else None,
device_info=device_info
)
return {
"camera_name": camera_name,
"status": status,
"details": details,
"device_info": device_info,
"check_time": time.time()
}
except Exception as e:
error_msg = f"Manual camera check failed: {e}"
self.logger.error(error_msg)
return {
"camera_name": camera_name,
"status": "error",
"details": error_msg,
"device_info": None,
"check_time": time.time()
}
def check_all_cameras_now(self) -> Dict[str, Dict[str, Any]]:
"""Manually check all cameras"""
results = {}
for camera_config in self.config.cameras:
if camera_config.enabled:
results[camera_config.name] = self.check_camera_now(camera_config.name)
return results
def get_monitoring_stats(self) -> Dict[str, Any]:
"""Get monitoring statistics"""
return {
"running": self.running,
"check_interval_seconds": self.check_interval,
"total_checks": self.check_count,
"error_count": self.error_count,
"last_check_time": self.last_check_time,
"success_rate": (self.check_count - self.error_count) / max(self.check_count, 1) * 100
}
def is_running(self) -> bool:
"""Check if monitor is running"""
return self.running

View File

@@ -0,0 +1,372 @@
"""
Camera Recorder for the USDA Vision Camera System.
This module handles video recording from GigE cameras using the python demo library (mvsdk).
"""
import sys
import os
import threading
import time
import logging
import cv2
import numpy as np
from typing import Optional, Dict, Any
from datetime import datetime
from pathlib import Path
# Add python demo to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python demo'))
import mvsdk
from ..core.config import CameraConfig
from ..core.state_manager import StateManager
from ..core.events import EventSystem, publish_recording_started, publish_recording_stopped, publish_recording_error
from ..core.timezone_utils import now_atlanta, format_filename_timestamp
class CameraRecorder:
"""Handles video recording for a single camera"""
def __init__(self, camera_config: CameraConfig, device_info: Any, state_manager: StateManager, event_system: EventSystem):
self.camera_config = camera_config
self.device_info = device_info
self.state_manager = state_manager
self.event_system = event_system
self.logger = logging.getLogger(f"{__name__}.{camera_config.name}")
# Camera handle and properties
self.hCamera: Optional[int] = None
self.cap = None
self.monoCamera = False
self.frame_buffer = None
self.frame_buffer_size = 0
# Recording state
self.recording = False
self.video_writer: Optional[cv2.VideoWriter] = None
self.output_filename: Optional[str] = None
self.frame_count = 0
self.start_time: Optional[datetime] = None
# Threading
self._recording_thread: Optional[threading.Thread] = None
self._stop_recording_event = threading.Event()
self._lock = threading.RLock()
# Initialize camera
self._initialize_camera()
def _initialize_camera(self) -> bool:
"""Initialize the camera with configured settings"""
try:
self.logger.info(f"Initializing camera: {self.camera_config.name}")
# Initialize camera
self.hCamera = mvsdk.CameraInit(self.device_info, -1, -1)
self.logger.info("Camera initialized successfully")
# Get camera capabilities
self.cap = mvsdk.CameraGetCapability(self.hCamera)
self.monoCamera = self.cap.sIspCapacity.bMonoSensor != 0
self.logger.info(f"Camera type: {'Monochrome' if self.monoCamera else 'Color'}")
# Set output format
if self.monoCamera:
mvsdk.CameraSetIspOutFormat(self.hCamera, mvsdk.CAMERA_MEDIA_TYPE_MONO8)
else:
mvsdk.CameraSetIspOutFormat(self.hCamera, mvsdk.CAMERA_MEDIA_TYPE_BGR8)
# Configure camera settings
self._configure_camera_settings()
# Allocate frame buffer
self.frame_buffer_size = (self.cap.sResolutionRange.iWidthMax *
self.cap.sResolutionRange.iHeightMax *
(1 if self.monoCamera else 3))
self.frame_buffer = mvsdk.CameraAlignMalloc(self.frame_buffer_size, 16)
# Start camera
mvsdk.CameraPlay(self.hCamera)
self.logger.info("Camera started successfully")
return True
except mvsdk.CameraException as e:
self.logger.error(f"Camera initialization failed({e.error_code}): {e.message}")
return False
except Exception as e:
self.logger.error(f"Unexpected error during camera initialization: {e}")
return False
def _configure_camera_settings(self) -> None:
"""Configure camera settings from config"""
try:
# Set trigger mode (continuous acquisition)
mvsdk.CameraSetTriggerMode(self.hCamera, 0)
# Set manual exposure
mvsdk.CameraSetAeState(self.hCamera, 0) # Disable auto exposure
exposure_us = int(self.camera_config.exposure_ms * 1000) # Convert ms to microseconds
mvsdk.CameraSetExposureTime(self.hCamera, exposure_us)
# Set analog gain
gain_value = int(self.camera_config.gain * 100) # Convert to camera units
mvsdk.CameraSetAnalogGain(self.hCamera, gain_value)
self.logger.info(f"Camera settings configured - Exposure: {exposure_us}μs, Gain: {gain_value}")
except Exception as e:
self.logger.warning(f"Error configuring camera settings: {e}")
def start_recording(self, filename: str) -> bool:
"""Start video recording"""
with self._lock:
if self.recording:
self.logger.warning("Already recording!")
return False
if not self.hCamera:
self.logger.error("Camera not initialized")
return False
try:
# Prepare output path
output_path = os.path.join(self.camera_config.storage_path, filename)
Path(self.camera_config.storage_path).mkdir(parents=True, exist_ok=True)
# Test camera capture before starting recording
if not self._test_camera_capture():
self.logger.error("Camera capture test failed")
return False
# Initialize recording state
self.output_filename = output_path
self.frame_count = 0
self.start_time = now_atlanta() # Use Atlanta timezone
self._stop_recording_event.clear()
# Start recording thread
self._recording_thread = threading.Thread(target=self._recording_loop, daemon=True)
self._recording_thread.start()
# Update state
self.recording = True
recording_id = self.state_manager.start_recording(self.camera_config.name, output_path)
# Publish event
publish_recording_started(self.camera_config.name, output_path)
self.logger.info(f"Started recording to: {output_path}")
return True
except Exception as e:
self.logger.error(f"Error starting recording: {e}")
publish_recording_error(self.camera_config.name, str(e))
return False
def _test_camera_capture(self) -> bool:
"""Test if camera can capture frames"""
try:
# Try to capture one frame
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 1000) # 1 second timeout
mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead)
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
return True
except Exception as e:
self.logger.error(f"Camera capture test failed: {e}")
return False
def stop_recording(self) -> bool:
"""Stop video recording"""
with self._lock:
if not self.recording:
self.logger.warning("Not currently recording")
return False
try:
# Signal recording thread to stop
self._stop_recording_event.set()
# Wait for recording thread to finish
if self._recording_thread and self._recording_thread.is_alive():
self._recording_thread.join(timeout=5)
# Update state
self.recording = False
# Calculate duration and file size
duration = 0
file_size = 0
if self.start_time:
duration = (now_atlanta() - self.start_time).total_seconds()
if self.output_filename and os.path.exists(self.output_filename):
file_size = os.path.getsize(self.output_filename)
# Update state manager
if self.output_filename:
self.state_manager.stop_recording(self.output_filename, file_size, self.frame_count)
# Publish event
publish_recording_stopped(
self.camera_config.name,
self.output_filename or "unknown",
duration
)
self.logger.info(f"Stopped recording - Duration: {duration:.1f}s, Frames: {self.frame_count}")
return True
except Exception as e:
self.logger.error(f"Error stopping recording: {e}")
return False
def _recording_loop(self) -> None:
"""Main recording loop running in separate thread"""
try:
# Initialize video writer
if not self._initialize_video_writer():
self.logger.error("Failed to initialize video writer")
return
self.logger.info("Recording loop started")
while not self._stop_recording_event.is_set():
try:
# Capture frame
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 200) # 200ms timeout
# Process frame
mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead)
# Convert to OpenCV format
frame = self._convert_frame_to_opencv(FrameHead)
# Write frame to video
if frame is not None and self.video_writer:
self.video_writer.write(frame)
self.frame_count += 1
# Release buffer
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
# Control frame rate
time.sleep(1.0 / self.camera_config.target_fps)
except mvsdk.CameraException as e:
if e.error_code == mvsdk.CAMERA_STATUS_TIME_OUT:
continue # Timeout is normal, continue
else:
self.logger.error(f"Camera error during recording: {e.message}")
break
except Exception as e:
self.logger.error(f"Error in recording loop: {e}")
break
self.logger.info("Recording loop ended")
except Exception as e:
self.logger.error(f"Fatal error in recording loop: {e}")
publish_recording_error(self.camera_config.name, str(e))
finally:
self._cleanup_recording()
def _initialize_video_writer(self) -> bool:
"""Initialize OpenCV video writer"""
try:
# Get frame dimensions by capturing a test frame
pRawData, FrameHead = mvsdk.CameraGetImageBuffer(self.hCamera, 1000)
mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead)
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
# Set up video writer
fourcc = cv2.VideoWriter_fourcc(*'XVID')
frame_size = (FrameHead.iWidth, FrameHead.iHeight)
self.video_writer = cv2.VideoWriter(
self.output_filename,
fourcc,
self.camera_config.target_fps,
frame_size
)
if not self.video_writer.isOpened():
self.logger.error(f"Failed to open video writer for {self.output_filename}")
return False
self.logger.info(f"Video writer initialized - Size: {frame_size}, FPS: {self.camera_config.target_fps}")
return True
except Exception as e:
self.logger.error(f"Error initializing video writer: {e}")
return False
def _convert_frame_to_opencv(self, frame_head) -> Optional[np.ndarray]:
"""Convert camera frame to OpenCV format"""
try:
if self.monoCamera:
# Monochrome camera - convert to BGR
frame_data = np.frombuffer(self.frame_buffer, dtype=np.uint8)
frame = frame_data.reshape((frame_head.iHeight, frame_head.iWidth))
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
else:
# Color camera - already in BGR format
frame_data = np.frombuffer(self.frame_buffer, dtype=np.uint8)
frame_bgr = frame_data.reshape((frame_head.iHeight, frame_head.iWidth, 3))
return frame_bgr
except Exception as e:
self.logger.error(f"Error converting frame: {e}")
return None
def _cleanup_recording(self) -> None:
"""Clean up recording resources"""
try:
if self.video_writer:
self.video_writer.release()
self.video_writer = None
self.recording = False
except Exception as e:
self.logger.error(f"Error during recording cleanup: {e}")
def cleanup(self) -> None:
"""Clean up camera resources"""
try:
# Stop recording if active
if self.recording:
self.stop_recording()
# Clean up camera
if self.hCamera:
mvsdk.CameraUnInit(self.hCamera)
self.hCamera = None
# Free frame buffer
if self.frame_buffer:
mvsdk.CameraAlignFree(self.frame_buffer)
self.frame_buffer = None
self.logger.info("Camera resources cleaned up")
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")
def is_recording(self) -> bool:
"""Check if currently recording"""
return self.recording
def get_status(self) -> Dict[str, Any]:
"""Get recorder status"""
return {
"camera_name": self.camera_config.name,
"is_recording": self.recording,
"current_file": self.output_filename,
"frame_count": self.frame_count,
"start_time": self.start_time.isoformat() if self.start_time else None,
"camera_initialized": self.hCamera is not None,
"storage_path": self.camera_config.storage_path
}