""" Camera Monitor for the USDA Vision Camera System. This module monitors camera status and availability at regular intervals. """ import sys import os import threading import time import logging from typing import Dict, List, Optional, Any # Add camera SDK to path sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "camera_sdk")) import mvsdk from ..core.config import Config from ..core.state_manager import StateManager, CameraStatus from ..core.events import EventSystem, publish_camera_status_changed from .sdk_config import ensure_sdk_initialized from .utils import suppress_camera_errors from .constants import CAMERA_TEST_CAPTURE_TIMEOUT class CameraMonitor: """Monitors camera status and availability""" def __init__(self, config: Config, state_manager: StateManager, event_system: EventSystem, camera_manager=None): self.config = config self.state_manager = state_manager self.event_system = event_system self.camera_manager = camera_manager # Reference to camera manager self.logger = logging.getLogger(__name__) # Monitoring settings self.check_interval = config.system.camera_check_interval_seconds # Threading self.running = False self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() # Status tracking self.last_check_time: Optional[float] = None self.check_count = 0 self.error_count = 0 def start(self) -> bool: """Start camera monitoring""" if self.running: self.logger.warning("Camera monitor is already running") return True self.logger.info(f"Starting camera monitor (check interval: {self.check_interval}s)") self.running = True self._stop_event.clear() # Start monitoring thread self._thread = threading.Thread(target=self._monitoring_loop, daemon=True) self._thread.start() return True def stop(self) -> None: """Stop camera monitoring""" if not self.running: return self.logger.info("Stopping camera monitor...") self.running = False self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) self.logger.info("Camera monitor stopped") def _monitoring_loop(self) -> None: """Main monitoring loop""" self.logger.info("Camera monitoring loop started") while self.running and not self._stop_event.is_set(): try: self.last_check_time = time.time() self.check_count += 1 # Check all configured cameras self._check_all_cameras() # Wait for next check if self._stop_event.wait(self.check_interval): break except Exception as e: self.error_count += 1 self.logger.error(f"Error in camera monitoring loop: {e}") # Wait a bit before retrying if self._stop_event.wait(min(self.check_interval, 10)): break self.logger.info("Camera monitoring loop ended") def _check_all_cameras(self) -> None: """Check status of all configured cameras""" for camera_config in self.config.cameras: if not camera_config.enabled: continue try: self._check_camera_status(camera_config.name) except Exception as e: self.logger.error(f"Error checking camera {camera_config.name}: {e}") def _check_camera_status(self, camera_name: str) -> None: """Check status of a specific camera""" try: # Get current status from state manager current_info = self.state_manager.get_camera_status(camera_name) # Perform actual camera check status, details, device_info = self._perform_camera_check(camera_name) # Update state if changed old_status = current_info.status.value if current_info else "unknown" if old_status != status: self.state_manager.update_camera_status(name=camera_name, status=status, error=details if status == "error" else None, device_info=device_info) # Publish status change event publish_camera_status_changed(camera_name=camera_name, status=status, details=details) self.logger.info(f"Camera {camera_name} status changed: {old_status} -> {status}") except Exception as e: self.logger.error(f"Error checking camera {camera_name}: {e}") # Update to error state self.state_manager.update_camera_status(name=camera_name, status="error", error=str(e)) def _perform_camera_check(self, camera_name: str) -> tuple[str, str, Optional[Dict[str, Any]]]: """Perform actual camera availability check""" try: # Get camera device info from camera manager if not self.camera_manager: return "error", "Camera manager not available", None device_info = self.camera_manager._find_camera_device(camera_name) if not device_info: return "disconnected", "Camera device not found", None # ALWAYS check our streamer state first, before doing any camera availability tests streamer = self.camera_manager.camera_streamers.get(camera_name) self.logger.info(f"Checking streamer for {camera_name}: {streamer}") if streamer and streamer.is_streaming(): self.logger.info(f"Camera {camera_name} is streaming - setting status to streaming") return "streaming", "Camera streaming (live preview)", self._get_device_info_dict(device_info) # Also check if our recorder is active recorder = self.camera_manager.camera_recorders.get(camera_name) if recorder and recorder.hCamera and recorder.recording: self.logger.info(f"Camera {camera_name} is recording - setting status to available") return "available", "Camera recording (in use by system)", self._get_device_info_dict(device_info) # Check if camera is already opened by another process try: self.logger.info(f"Checking if camera {camera_name} is opened...") is_opened = mvsdk.CameraIsOpened(device_info) self.logger.info(f"CameraIsOpened result for {camera_name}: {is_opened}") if is_opened: self.logger.info(f"Camera {camera_name} is opened by another process - setting status to busy") return "busy", "Camera opened by another process", self._get_device_info_dict(device_info) else: self.logger.info(f"Camera {camera_name} is not opened, will try initialization") # Camera is not opened, so we can try to initialize it pass except Exception as e: self.logger.warning(f"CameraIsOpened failed for {camera_name}: {e}") # If we can't determine the status, try to initialize to see what happens self.logger.info(f"CameraIsOpened failed for {camera_name}, will try initialization: {e}") # Try to initialize camera briefly to test availability try: # Ensure SDK is initialized ensure_sdk_initialized() self.logger.info(f"Attempting to initialize camera {camera_name} for availability test...") # Check if camera is already in use by recorder or streamer before trying to initialize recorder = self.camera_manager.camera_recorders.get(camera_name) if self.camera_manager else None streamer = self.camera_manager.camera_streamers.get(camera_name) if self.camera_manager else None camera_in_use = False if recorder and recorder.hCamera: try: # Check if recorder has camera open if mvsdk.CameraIsOpened(recorder.hCamera): camera_in_use = True self.logger.info(f"Camera {camera_name} is already in use by recorder (handle: {recorder.hCamera})") except: pass if not camera_in_use and streamer and streamer.hCamera: try: # Check if streamer has camera open if mvsdk.CameraIsOpened(streamer.hCamera): camera_in_use = True self.logger.info(f"Camera {camera_name} is already in use by streamer (handle: {streamer.hCamera})") except: pass # If camera is already in use, mark as available (since it's working, just occupied) if camera_in_use: self.logger.info(f"Camera {camera_name} is in use by system components - marking as available") return "available", "Camera is in use by system", self._get_device_info_dict(device_info) # Suppress output to avoid MVCAMAPI error messages during camera testing hCamera = None try: with suppress_camera_errors(): hCamera = mvsdk.CameraInit(device_info, -1, -1) self.logger.info(f"Camera {camera_name} initialized successfully, starting test capture...") except mvsdk.CameraException as init_e: error_msg = f"CameraInit failed for {camera_name}: {init_e.message} (error_code: {init_e.error_code})" # Special handling for error code 32774 (camera already in use) if init_e.error_code == 32774: error_msg += " - Camera may be in use by another process or resource conflict. " error_msg += "This camera may still be functional if accessed through existing recorder/streamer." self.logger.warning(error_msg) # Mark as "available" but with warning, since it might be usable through existing connections # The UI can show a warning but camera operations might still work try: device_info_dict = self._get_device_info_dict(device_info) device_info_dict["init_error"] = "Camera appears in use (error 32774) but may be accessible" device_info_dict["init_error_code"] = 32774 except Exception as dev_info_e: self.logger.warning(f"Failed to get device info dict after CameraInit failure: {dev_info_e}") device_info_dict = None return "available", "Camera may be in use (error 32774) - check if recorder/streamer is active", device_info_dict else: self.logger.warning(error_msg) # Get device info dict before returning - wrap in try/except in case device_info is corrupted try: device_info_dict = self._get_device_info_dict(device_info) except Exception as dev_info_e: self.logger.warning(f"Failed to get device info dict after CameraInit failure: {dev_info_e}") device_info_dict = None return "error", f"Camera initialization failed: {init_e.message}", device_info_dict # Quick test - try to get one frame try: mvsdk.CameraSetTriggerMode(hCamera, 0) mvsdk.CameraPlay(hCamera) self.logger.info(f"Camera {camera_name} test: Attempting to capture frame with {CAMERA_TEST_CAPTURE_TIMEOUT}ms timeout...") # Try to capture with short timeout pRawData, FrameHead = mvsdk.CameraGetImageBuffer(hCamera, CAMERA_TEST_CAPTURE_TIMEOUT) mvsdk.CameraReleaseImageBuffer(hCamera, pRawData) # Success - camera is available mvsdk.CameraUnInit(hCamera) self.logger.info(f"Camera {camera_name} test successful - camera is available") return "available", "Camera test successful", self._get_device_info_dict(device_info) except mvsdk.CameraException as capture_e: if hCamera: mvsdk.CameraUnInit(hCamera) self.logger.warning(f"Camera {camera_name} capture test failed: {capture_e.message} (error_code: {capture_e.error_code})") if capture_e.error_code == mvsdk.CAMERA_STATUS_TIME_OUT: return "available", "Camera available but slow response", self._get_device_info_dict(device_info) else: return "error", f"Camera test failed: {capture_e.message}", self._get_device_info_dict(device_info) except mvsdk.CameraException as e: self.logger.error(f"CameraException during initialization test for {camera_name}: {e.message} (error_code: {e.error_code})") return "error", f"Camera initialization failed: {e.message}", self._get_device_info_dict(device_info) if device_info else None except Exception as e: self.logger.error(f"Unexpected exception during camera check for {camera_name}: {e}", exc_info=True) return "error", f"Camera check failed: {str(e)}", None def _get_device_info_dict(self, device_info) -> Dict[str, Any]: """Convert device info to dictionary""" if device_info is None: return {"error": "device_info is None"} try: # Safely access device info methods - wrap each in try/except to prevent segfaults friendly_name = "Unknown" port_type = "Unknown" serial_number = "Unknown" try: friendly_name = device_info.GetFriendlyName() except Exception as e: self.logger.warning(f"Failed to get friendly name: {e}") try: port_type = device_info.GetPortType() except Exception as e: self.logger.warning(f"Failed to get port type: {e}") try: serial_number = getattr(device_info, "acSn", "Unknown") except Exception as e: self.logger.warning(f"Failed to get serial number: {e}") return { "friendly_name": friendly_name, "port_type": port_type, "serial_number": serial_number, "last_checked": time.time() } except Exception as e: self.logger.error(f"Error getting device info: {e}", exc_info=True) return {"error": str(e)} def check_camera_now(self, camera_name: str) -> Dict[str, Any]: """Manually check a specific camera status""" try: status, details, device_info = self._perform_camera_check(camera_name) # Update state self.state_manager.update_camera_status(name=camera_name, status=status, error=details if status == "error" else None, device_info=device_info) return {"camera_name": camera_name, "status": status, "details": details, "device_info": device_info, "check_time": time.time()} except Exception as e: error_msg = f"Manual camera check failed: {e}" self.logger.error(error_msg) return {"camera_name": camera_name, "status": "error", "details": error_msg, "device_info": None, "check_time": time.time()} def check_all_cameras_now(self) -> Dict[str, Dict[str, Any]]: """Manually check all cameras""" results = {} for camera_config in self.config.cameras: if camera_config.enabled: results[camera_config.name] = self.check_camera_now(camera_config.name) return results def get_monitoring_stats(self) -> Dict[str, Any]: """Get monitoring statistics""" return {"running": self.running, "check_interval_seconds": self.check_interval, "total_checks": self.check_count, "error_count": self.error_count, "last_check_time": self.last_check_time, "success_rate": (self.check_count - self.error_count) / max(self.check_count, 1) * 100} def is_running(self) -> bool: """Check if monitor is running""" return self.running