""" FastAPI Server for the USDA Vision Camera System. This module provides REST API endpoints and WebSocket support for dashboard integration. """ import asyncio import logging import json import os from typing import Dict, List, Optional, Any from datetime import datetime, timedelta import threading from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Depends, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse import uvicorn from ..core.config import Config from ..core.state_manager import StateManager from ..core.events import EventSystem, EventType, Event from ..storage.manager import StorageManager from ..video.integration import create_video_module, VideoModule from .models import * from .routes import ( register_system_routes, register_camera_routes, register_recording_routes, register_mqtt_routes, register_storage_routes, register_auto_recording_routes, register_recordings_routes, ) class WebSocketManager: """Manages WebSocket connections for real-time updates""" def __init__(self): self.active_connections: List[WebSocket] = [] self.logger = logging.getLogger(f"{__name__}.WebSocketManager") async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) self.logger.debug(f"WebSocket connected. Total connections: {len(self.active_connections)}") def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) self.logger.debug(f"WebSocket disconnected. Total connections: {len(self.active_connections)}") async def send_personal_message(self, message: dict, websocket: WebSocket): try: await websocket.send_text(json.dumps(message)) except Exception as e: self.logger.error(f"Error sending personal message: {e}") async def broadcast(self, message: dict): if not self.active_connections: return disconnected = [] for connection in self.active_connections: try: await connection.send_text(json.dumps(message)) except Exception as e: self.logger.error(f"Error broadcasting to connection: {e}") disconnected.append(connection) # Remove disconnected connections for connection in disconnected: self.disconnect(connection) class APIServer: """FastAPI server for the USDA Vision Camera System""" def __init__(self, config: Config, state_manager: StateManager, event_system: EventSystem, camera_manager, mqtt_client, storage_manager: StorageManager, auto_recording_manager=None): self.config = config self.state_manager = state_manager self.event_system = event_system self.camera_manager = camera_manager self.mqtt_client = mqtt_client self.storage_manager = storage_manager self.auto_recording_manager = auto_recording_manager self.logger = logging.getLogger(__name__) # Initialize video module self.video_module: Optional[VideoModule] = None self._initialize_video_module() # FastAPI app self.app = FastAPI(title="USDA Vision Camera System API", description="API for monitoring and controlling the USDA vision camera system", version="1.0.0") # WebSocket manager self.websocket_manager = WebSocketManager() # Server state self.server_start_time = datetime.now() self.running = False self._server_thread: Optional[threading.Thread] = None self._event_loop: Optional[asyncio.AbstractEventLoop] = None # Setup CORS self.app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) # Configure appropriately for production # Setup routes self._setup_routes() # Subscribe to events for WebSocket broadcasting self._setup_event_subscriptions() def _initialize_video_module(self): """Initialize the modular video streaming system""" try: self.video_module = create_video_module(config=self.config, storage_manager=self.storage_manager, enable_caching=True, enable_conversion=True) self.logger.info("Video module initialized successfully") except Exception as e: self.logger.error(f"Failed to initialize video module: {e}") self.video_module = None def _setup_routes(self): """Setup API routes""" # Register routes from modules register_system_routes( app=self.app, state_manager=self.state_manager, video_module=self.video_module, server_start_time=self.server_start_time, logger=self.logger ) register_camera_routes( app=self.app, config=self.config, state_manager=self.state_manager, camera_manager=self.camera_manager, logger=self.logger ) register_recording_routes( app=self.app, camera_manager=self.camera_manager, logger=self.logger ) register_mqtt_routes( app=self.app, mqtt_client=self.mqtt_client, state_manager=self.state_manager, logger=self.logger ) register_storage_routes( app=self.app, storage_manager=self.storage_manager, logger=self.logger ) register_auto_recording_routes( app=self.app, config=self.config, state_manager=self.state_manager, auto_recording_manager=self.auto_recording_manager, logger=self.logger ) register_recordings_routes( app=self.app, state_manager=self.state_manager, logger=self.logger ) # WebSocket endpoint (not in route modules) @self.app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time updates""" await self.websocket_manager.connect(websocket) try: while True: # Keep connection alive and handle incoming messages data = await websocket.receive_text() # Echo back for now - could implement commands later await self.websocket_manager.send_personal_message({"type": "echo", "data": data}, websocket) except WebSocketDisconnect: self.websocket_manager.disconnect(websocket) # Include video module routes if available if self.video_module: try: video_routes = self.video_module.get_api_routes() admin_video_routes = self.video_module.get_admin_routes() self.app.include_router(video_routes) self.app.include_router(admin_video_routes) self.logger.info("Video streaming routes added successfully") except Exception as e: self.logger.error(f"Failed to add video routes: {e}") @self.app.get("/debug/camera-manager") async def debug_camera_manager(): """Debug endpoint to check camera manager state""" try: if not self.camera_manager: return {"error": "Camera manager not available"} return { "available_cameras": len(self.camera_manager.available_cameras), "camera_recorders": list(self.camera_manager.camera_recorders.keys()), "camera_streamers": list(self.camera_manager.camera_streamers.keys()), "streamer_states": { name: { "exists": streamer is not None, "is_streaming": streamer.is_streaming() if streamer else False, "streaming": getattr(streamer, 'streaming', False) if streamer else False } for name, streamer in self.camera_manager.camera_streamers.items() } } except Exception as e: return {"error": str(e)} def _setup_event_subscriptions(self): """Setup event subscriptions for WebSocket broadcasting""" def broadcast_event(event: Event): """Broadcast event to all WebSocket connections""" try: message = {"type": "event", "event_type": event.event_type.value, "source": event.source, "data": event.data, "timestamp": event.timestamp.isoformat()} # Schedule the broadcast in the event loop thread-safely if self._event_loop and not self._event_loop.is_closed(): # Use call_soon_threadsafe to schedule the coroutine from another thread asyncio.run_coroutine_threadsafe(self.websocket_manager.broadcast(message), self._event_loop) else: self.logger.debug("Event loop not available for broadcasting") except Exception as e: self.logger.error(f"Error broadcasting event: {e}") # Subscribe to all event types for broadcasting for event_type in EventType: self.event_system.subscribe(event_type, broadcast_event) def start(self) -> bool: """Start the API server""" if self.running: self.logger.warning("API server is already running") return True if not self.config.system.enable_api: self.logger.info("API server disabled in configuration") return False try: self.logger.info(f"Starting API server on {self.config.system.api_host}:{self.config.system.api_port}") self.running = True # Start server in separate thread self._server_thread = threading.Thread(target=self._run_server, daemon=True) self._server_thread.start() return True except Exception as e: self.logger.error(f"Error starting API server: {e}") return False def stop(self) -> None: """Stop the API server""" if not self.running: return self.logger.info("Stopping API server...") self.running = False # Clean up video module if self.video_module: try: # Note: This is synchronous cleanup - in a real async context you'd await this asyncio.run(self.video_module.cleanup()) self.logger.info("Video module cleanup completed") except Exception as e: self.logger.error(f"Error during video module cleanup: {e}") # Note: uvicorn doesn't have a clean way to stop from another thread # In production, you might want to use a process manager like gunicorn self.logger.info("API server stopped") def _run_server(self) -> None: """Run the uvicorn server""" try: # Capture the event loop for thread-safe event broadcasting self._event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._event_loop) # Map our log level to uvicorn's log level # Use "warning" for uvicorn to reduce noise (connection open/close messages) # Application logs will still use the configured log level uvicorn_log_level_map = { "DEBUG": "warning", # Suppress uvicorn DEBUG logs even if app is in DEBUG mode "INFO": "warning", # Suppress uvicorn INFO logs (connection messages) "WARNING": "warning", "ERROR": "error", "CRITICAL": "critical" } config_log_level = self.config.system.log_level.upper() uvicorn_log_level = uvicorn_log_level_map.get(config_log_level, "warning") uvicorn.run( self.app, host=self.config.system.api_host, port=self.config.system.api_port, log_level=uvicorn_log_level, access_log=False # Disable access logs (GET, POST, etc.) to reduce noise ) except Exception as e: self.logger.error(f"Error running API server: {e}") finally: self.running = False self._event_loop = None def is_running(self) -> bool: """Check if API server is running""" return self.running def get_server_info(self) -> Dict[str, Any]: """Get server information""" return {"running": self.running, "host": self.config.system.api_host, "port": self.config.system.api_port, "start_time": self.server_start_time.isoformat(), "uptime_seconds": (datetime.now() - self.server_start_time).total_seconds(), "websocket_connections": len(self.websocket_manager.active_connections)}