Files
usda-vision/camera-management-api/usda_vision_system/core/config.py
salirezav f6a37ca1ba Remove deprecated files and scripts to streamline the codebase
- Deleted unused API test files, RTSP diagnostic scripts, and development utility scripts to reduce clutter.
- Removed outdated database schema and modularization proposal documents to maintain focus on current architecture.
- Cleaned up configuration files and logging scripts that are no longer in use, enhancing project maintainability.
2025-11-02 10:07:59 -05:00

234 lines
8.8 KiB
Python

"""
Configuration management for the USDA Vision Camera System.
This module handles all configuration settings including MQTT broker settings,
camera configurations, storage paths, and system parameters.
"""
import os
import json
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
@dataclass
class MQTTConfig:
"""MQTT broker configuration"""
broker_host: str = "192.168.1.110"
broker_port: int = 1883
username: Optional[str] = None
password: Optional[str] = None
topics: Optional[Dict[str, str]] = None
def __post_init__(self):
if self.topics is None:
self.topics = {"vibratory_conveyor": "vision/vibratory_conveyor/state", "blower_separator": "vision/blower_separator/state"}
@dataclass
class CameraConfig:
"""Individual camera configuration"""
name: str
machine_topic: str # Which MQTT topic triggers this camera
storage_path: str
exposure_ms: float = 1.0
gain: float = 3.5
target_fps: float = 3.0
enabled: bool = True
# Video recording settings
video_format: str = "mp4" # Video file format (mp4, avi)
video_codec: str = "mp4v" # Video codec (mp4v for MP4, XVID for AVI)
video_quality: int = 95 # Video quality (0-100, higher is better)
# Auto-recording settings
auto_start_recording_enabled: bool = False # Enable automatic recording when machine turns on
auto_recording_max_retries: int = 3 # Maximum retry attempts for failed auto-recording starts
auto_recording_retry_delay_seconds: int = 5 # Delay between retry attempts
# Image Quality Settings
sharpness: int = 100 # 0-200, default 100 (no sharpening)
contrast: int = 100 # 0-200, default 100 (normal contrast)
saturation: int = 100 # 0-200, default 100 (normal saturation, color cameras only)
gamma: int = 100 # 0-300, default 100 (normal gamma)
# Noise Reduction
noise_filter_enabled: bool = True # Enable basic noise filtering
denoise_3d_enabled: bool = False # Enable advanced 3D denoising (may reduce FPS)
# Color Settings (for color cameras)
auto_white_balance: bool = True # Enable automatic white balance
color_temperature_preset: int = 0 # 0=auto, 1=daylight, 2=fluorescent, etc.
# Manual White Balance RGB Gains (for manual white balance mode)
wb_red_gain: float = 1.0 # Red channel gain (0.0-3.99, default 1.0)
wb_green_gain: float = 1.0 # Green channel gain (0.0-3.99, default 1.0)
wb_blue_gain: float = 1.0 # Blue channel gain (0.0-3.99, default 1.0)
# Advanced Settings
anti_flicker_enabled: bool = True # Reduce artificial lighting flicker
light_frequency: int = 1 # 0=50Hz, 1=60Hz (match local power frequency)
# Bit Depth & Format
bit_depth: int = 8 # 8, 10, 12, or 16 bits per channel
# HDR Settings
hdr_enabled: bool = False # Enable High Dynamic Range
hdr_gain_mode: int = 0 # HDR processing mode
@dataclass
class StorageConfig:
"""Storage configuration"""
base_path: str = "/mnt/nfs_share"
max_file_size_mb: int = 1000 # Max size per video file
max_recording_duration_minutes: int = 60 # Max recording duration
cleanup_older_than_days: int = 30 # Auto cleanup old files
@dataclass
class SystemConfig:
"""System-wide configuration"""
camera_check_interval_seconds: int = 2
log_level: str = "WARNING"
log_file: str = "usda_vision_system.log"
api_host: str = "0.0.0.0"
api_port: int = 8000
enable_api: bool = True
timezone: str = "America/New_York"
# Auto-recording system settings
auto_recording_enabled: bool = True # Global enable/disable for auto-recording feature # Atlanta, Georgia timezone
class Config:
"""Main configuration manager"""
def __init__(self, config_file: Optional[str] = None):
self.config_file = config_file or "config.json"
self.logger = logging.getLogger(__name__)
# Default configurations
self.mqtt = MQTTConfig()
self.storage = StorageConfig()
self.system = SystemConfig()
# Camera configurations - will be populated from config file or defaults
self.cameras: List[CameraConfig] = []
# Load configuration
self.load_config()
# Ensure storage directories exist
self._ensure_storage_directories()
def load_config(self) -> None:
"""Load configuration from file"""
config_path = Path(self.config_file)
if config_path.exists():
try:
with open(config_path, "r") as f:
config_data = json.load(f)
# Load MQTT config
if "mqtt" in config_data:
mqtt_data = config_data["mqtt"]
self.mqtt = MQTTConfig(**mqtt_data)
# Load storage config
if "storage" in config_data:
storage_data = config_data["storage"]
self.storage = StorageConfig(**storage_data)
# Load system config
if "system" in config_data:
system_data = config_data["system"]
self.system = SystemConfig(**system_data)
# Load camera configs
if "cameras" in config_data:
self.cameras = []
for cam_data in config_data["cameras"]:
# Set defaults for new video format fields if not present
cam_data.setdefault("video_format", "mp4")
cam_data.setdefault("video_codec", "mp4v")
cam_data.setdefault("video_quality", 95)
self.cameras.append(CameraConfig(**cam_data))
else:
self._create_default_camera_configs()
self.logger.info(f"Configuration loaded from {config_path}")
except Exception as e:
self.logger.error(f"Error loading config from {config_path}: {e}")
self._create_default_camera_configs()
else:
self.logger.info(f"Config file {config_path} not found, using defaults")
self._create_default_camera_configs()
self.save_config() # Save default config
def _create_default_camera_configs(self) -> None:
"""Create default camera configurations"""
self.cameras = [CameraConfig(name="camera1", machine_topic="vibratory_conveyor", storage_path=os.path.join(self.storage.base_path, "camera1")), CameraConfig(name="camera2", machine_topic="blower_separator", storage_path=os.path.join(self.storage.base_path, "camera2"))]
def save_config(self) -> None:
"""Save current configuration to file"""
config_data = {"mqtt": asdict(self.mqtt), "storage": asdict(self.storage), "system": asdict(self.system), "cameras": [asdict(cam) for cam in self.cameras]}
try:
with open(self.config_file, "w") as f:
json.dump(config_data, f, indent=2)
self.logger.info(f"Configuration saved to {self.config_file}")
except Exception as e:
self.logger.error(f"Error saving config to {self.config_file}: {e}")
def _ensure_storage_directories(self) -> None:
"""Ensure all storage directories exist"""
try:
# Create base storage directory
Path(self.storage.base_path).mkdir(parents=True, exist_ok=True)
# Create camera-specific directories
for camera in self.cameras:
Path(camera.storage_path).mkdir(parents=True, exist_ok=True)
self.logger.info("Storage directories verified/created")
except Exception as e:
self.logger.error(f"Error creating storage directories: {e}")
def get_camera_by_topic(self, topic: str) -> Optional[CameraConfig]:
"""Get camera configuration by MQTT topic"""
for camera in self.cameras:
if camera.machine_topic == topic:
return camera
return None
def get_camera_by_name(self, name: str) -> Optional[CameraConfig]:
"""Get camera configuration by name"""
for camera in self.cameras:
if camera.name == name:
return camera
return None
def update_camera_config(self, name: str, **kwargs) -> bool:
"""Update camera configuration"""
camera = self.get_camera_by_name(name)
if camera:
for key, value in kwargs.items():
if hasattr(camera, key):
setattr(camera, key, value)
self.save_config()
return True
return False
def to_dict(self) -> Dict[str, Any]:
"""Convert configuration to dictionary"""
return {"mqtt": asdict(self.mqtt), "storage": asdict(self.storage), "system": asdict(self.system), "cameras": [asdict(cam) for cam in self.cameras]}