Implement video processing module with FFmpeg conversion, OpenCV metadata extraction, and file system repository

- Added FFmpegVideoConverter for video format conversion using FFmpeg.
- Implemented NoOpVideoConverter for scenarios where FFmpeg is unavailable.
- Created OpenCVMetadataExtractor for extracting video metadata.
- Developed FileSystemVideoRepository for managing video files in the file system.
- Integrated video services with dependency injection in VideoModule.
- Established API routes for video management and streaming.
- Added request/response schemas for video metadata and streaming information.
- Implemented caching mechanisms for video streaming.
- Included error handling and logging throughout the module.
This commit is contained in:
Alireza Vaezi
2025-08-04 16:44:53 -04:00
parent 28400fbfb8
commit 37553163db
41 changed files with 4497 additions and 38 deletions

View File

@@ -142,6 +142,11 @@ class CameraConfigResponse(BaseModel):
gain: float
target_fps: float
# Video recording settings
video_format: str
video_codec: str
video_quality: int
# Image Quality Settings
sharpness: int
contrast: int

View File

@@ -20,6 +20,7 @@ from ..core.config import Config
from ..core.state_manager import StateManager
from ..core.events import EventSystem, EventType, Event
from ..storage.manager import StorageManager
from ..video.integration import create_video_module, VideoModule
from .models import *
@@ -76,6 +77,10 @@ class APIServer:
self.auto_recording_manager = auto_recording_manager
self.logger = logging.getLogger(__name__)
# Initialize video module
self.video_module: Optional[VideoModule] = None
self._initialize_video_module()
# FastAPI app
self.app = FastAPI(title="USDA Vision Camera System API", description="API for monitoring and controlling the USDA vision camera system", version="1.0.0")
@@ -97,6 +102,15 @@ class APIServer:
# Subscribe to events for WebSocket broadcasting
self._setup_event_subscriptions()
def _initialize_video_module(self):
"""Initialize the modular video streaming system"""
try:
self.video_module = create_video_module(config=self.config, storage_manager=self.storage_manager, enable_caching=True, enable_conversion=True)
self.logger.info("Video module initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize video module: {e}")
self.video_module = None
def _setup_routes(self):
"""Setup API routes"""
@@ -120,6 +134,20 @@ class APIServer:
self.logger.error(f"Error getting system status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/system/video-module")
async def get_video_module_status():
"""Get video module status and configuration"""
try:
if self.video_module:
status = self.video_module.get_module_status()
status["enabled"] = True
return status
else:
return {"enabled": False, "error": "Video module not initialized"}
except Exception as e:
self.logger.error(f"Error getting video module status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/machines", response_model=Dict[str, MachineStatusResponse])
async def get_machines():
"""Get all machine statuses"""
@@ -343,6 +371,10 @@ class APIServer:
exposure_ms=config.exposure_ms,
gain=config.gain,
target_fps=config.target_fps,
# Video recording settings
video_format=config.video_format,
video_codec=config.video_codec,
video_quality=config.video_quality,
# Image Quality Settings
sharpness=config.sharpness,
contrast=config.contrast,
@@ -643,6 +675,19 @@ class APIServer:
except WebSocketDisconnect:
self.websocket_manager.disconnect(websocket)
# Include video module routes if available
if self.video_module:
try:
video_routes = self.video_module.get_api_routes()
admin_video_routes = self.video_module.get_admin_routes()
self.app.include_router(video_routes)
self.app.include_router(admin_video_routes)
self.logger.info("Video streaming routes added successfully")
except Exception as e:
self.logger.error(f"Failed to add video routes: {e}")
def _setup_event_subscriptions(self):
"""Setup event subscriptions for WebSocket broadcasting"""
@@ -697,6 +742,15 @@ class APIServer:
self.logger.info("Stopping API server...")
self.running = False
# Clean up video module
if self.video_module:
try:
# Note: This is synchronous cleanup - in a real async context you'd await this
asyncio.run(self.video_module.cleanup())
self.logger.info("Video module cleanup completed")
except Exception as e:
self.logger.error(f"Error during video module cleanup: {e}")
# Note: uvicorn doesn't have a clean way to stop from another thread
# In production, you might want to use a process manager like gunicorn

View File

@@ -223,7 +223,9 @@ class CameraManager:
# Generate filename with Atlanta timezone timestamp
timestamp = format_filename_timestamp()
filename = f"{camera_name}_recording_{timestamp}.avi"
camera_config = self.config.get_camera_by_name(camera_name)
video_format = camera_config.video_format if camera_config else "mp4"
filename = f"{camera_name}_recording_{timestamp}.{video_format}"
# Start recording
success = recorder.start_recording(filename)
@@ -283,11 +285,14 @@ class CameraManager:
# Generate filename with datetime prefix
timestamp = format_filename_timestamp()
camera_config = self.config.get_camera_by_name(camera_name)
video_format = camera_config.video_format if camera_config else "mp4"
if filename:
# Always prepend datetime to the provided filename
filename = f"{timestamp}_{filename}"
else:
filename = f"{camera_name}_manual_{timestamp}.avi"
filename = f"{camera_name}_manual_{timestamp}.{video_format}"
return recorder.start_recording(filename)

View File

@@ -634,15 +634,23 @@ class CameraRecorder:
mvsdk.CameraImageProcess(self.hCamera, pRawData, self.frame_buffer, FrameHead)
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
# Set up video writer
fourcc = cv2.VideoWriter_fourcc(*"XVID")
# Set up video writer with configured codec
fourcc = cv2.VideoWriter_fourcc(*self.camera_config.video_codec)
frame_size = (FrameHead.iWidth, FrameHead.iHeight)
# Use 30 FPS for video writer if target_fps is 0 (unlimited)
video_fps = self.camera_config.target_fps if self.camera_config.target_fps > 0 else 30.0
# Create video writer with quality settings
self.video_writer = cv2.VideoWriter(self.output_filename, fourcc, video_fps, frame_size)
# Set quality if supported (for some codecs)
if hasattr(self.video_writer, "set") and self.camera_config.video_quality:
try:
self.video_writer.set(cv2.VIDEOWRITER_PROP_QUALITY, self.camera_config.video_quality)
except:
pass # Quality setting not supported for this codec
if not self.video_writer.isOpened():
self.logger.error(f"Failed to open video writer for {self.output_filename}")
return False

View File

@@ -40,6 +40,11 @@ class CameraConfig:
target_fps: float = 3.0
enabled: bool = True
# Video recording settings
video_format: str = "mp4" # Video file format (mp4, avi)
video_codec: str = "mp4v" # Video codec (mp4v for MP4, XVID for AVI)
video_quality: int = 95 # Video quality (0-100, higher is better)
# Auto-recording settings
auto_start_recording_enabled: bool = False # Enable automatic recording when machine turns on
auto_recording_max_retries: int = 3 # Maximum retry attempts for failed auto-recording starts
@@ -149,7 +154,13 @@ class Config:
# Load camera configs
if "cameras" in config_data:
self.cameras = [CameraConfig(**cam_data) for cam_data in config_data["cameras"]]
self.cameras = []
for cam_data in config_data["cameras"]:
# Set defaults for new video format fields if not present
cam_data.setdefault("video_format", "mp4")
cam_data.setdefault("video_codec", "mp4v")
cam_data.setdefault("video_quality", 95)
self.cameras.append(CameraConfig(**cam_data))
else:
self._create_default_camera_configs()

View File

@@ -19,7 +19,7 @@ from .core.timezone_utils import log_time_info, check_time_sync
from .mqtt.client import MQTTClient
from .camera.manager import CameraManager
from .storage.manager import StorageManager
from .recording.auto_manager import AutoRecordingManager
from .recording.standalone_auto_recorder import StandaloneAutoRecorder
from .api.server import APIServer
@@ -46,7 +46,7 @@ class USDAVisionSystem:
self.storage_manager = StorageManager(self.config, self.state_manager, self.event_system)
self.mqtt_client = MQTTClient(self.config, self.state_manager, self.event_system)
self.camera_manager = CameraManager(self.config, self.state_manager, self.event_system)
self.auto_recording_manager = AutoRecordingManager(self.config, self.state_manager, self.event_system, self.camera_manager)
self.auto_recording_manager = StandaloneAutoRecorder(config=self.config)
self.api_server = APIServer(self.config, self.state_manager, self.event_system, self.camera_manager, self.mqtt_client, self.storage_manager, self.auto_recording_manager)
# System state

View File

@@ -199,7 +199,7 @@ class AutoRecordingManager:
# Generate filename with timestamp and machine info
timestamp = format_filename_timestamp()
machine_name = camera_config.machine_topic.replace("_", "-")
filename = f"{camera_name}_auto_{machine_name}_{timestamp}.avi"
filename = f"{camera_name}_auto_{machine_name}_{timestamp}.{camera_config.video_format}"
# Use camera manager to start recording with the camera's default configuration
# Pass the camera's configured settings from config.json

View File

@@ -0,0 +1,373 @@
#!/usr/bin/env python3
"""
Standalone Auto-Recording System for USDA Vision Cameras
This is a simplified, reliable auto-recording system that:
1. Monitors MQTT messages directly
2. Starts/stops camera recordings based on machine state
3. Works independently of the main system
4. Is easy to debug and maintain
Usage:
sudo python -m usda_vision_system.recording.standalone_auto_recorder
"""
import json
import logging
import signal
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import paho.mqtt.client as mqtt
# Add the project root to the path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from usda_vision_system.core.config import Config
from usda_vision_system.camera.recorder import CameraRecorder
from usda_vision_system.core.state_manager import StateManager
from usda_vision_system.core.events import EventSystem
class StandaloneAutoRecorder:
"""Standalone auto-recording system that monitors MQTT and controls cameras directly"""
def __init__(self, config_path: str = "config.json", config: Optional[Config] = None):
# Load configuration
if config:
self.config = config
else:
self.config = Config(config_path)
# Setup logging (only if not already configured)
if not logging.getLogger().handlers:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("standalone_auto_recorder.log"), logging.StreamHandler()])
self.logger = logging.getLogger(__name__)
# Initialize components
self.state_manager = StateManager()
self.event_system = EventSystem()
# MQTT client
self.mqtt_client: Optional[mqtt.Client] = None
# Camera recorders
self.camera_recorders: Dict[str, CameraRecorder] = {}
self.active_recordings: Dict[str, str] = {} # camera_name -> filename
# Machine to camera mapping
self.machine_camera_map = self._build_machine_camera_map()
# Threading
self.running = False
self._stop_event = threading.Event()
self.logger.info("Standalone Auto-Recorder initialized")
self.logger.info(f"Machine-Camera mapping: {self.machine_camera_map}")
def _build_machine_camera_map(self) -> Dict[str, str]:
"""Build mapping from machine topics to camera names"""
mapping = {}
for camera_config in self.config.cameras:
if camera_config.enabled and camera_config.auto_start_recording_enabled:
machine_name = camera_config.machine_topic
if machine_name:
mapping[machine_name] = camera_config.name
self.logger.info(f"Auto-recording enabled: {machine_name} -> {camera_config.name}")
return mapping
def _setup_mqtt(self) -> bool:
"""Setup MQTT client"""
try:
self.mqtt_client = mqtt.Client()
self.mqtt_client.on_connect = self._on_mqtt_connect
self.mqtt_client.on_message = self._on_mqtt_message
self.mqtt_client.on_disconnect = self._on_mqtt_disconnect
# Connect to MQTT broker
self.logger.info(f"Connecting to MQTT broker at {self.config.mqtt.broker_host}:{self.config.mqtt.broker_port}")
self.mqtt_client.connect(self.config.mqtt.broker_host, self.config.mqtt.broker_port, 60)
# Start MQTT loop in background
self.mqtt_client.loop_start()
return True
except Exception as e:
self.logger.error(f"Failed to setup MQTT: {e}")
return False
def _on_mqtt_connect(self, client, userdata, flags, rc):
"""MQTT connection callback"""
if rc == 0:
self.logger.info("Connected to MQTT broker")
# Subscribe to machine state topics
for machine_name in self.machine_camera_map.keys():
if hasattr(self.config.mqtt, "topics") and self.config.mqtt.topics:
topic = self.config.mqtt.topics.get(machine_name)
if topic:
client.subscribe(topic)
self.logger.info(f"Subscribed to: {topic}")
else:
self.logger.warning(f"No MQTT topic configured for machine: {machine_name}")
else:
# Fallback to default topic format
topic = f"vision/{machine_name}/state"
client.subscribe(topic)
self.logger.info(f"Subscribed to: {topic} (default format)")
else:
self.logger.error(f"Failed to connect to MQTT broker: {rc}")
def _on_mqtt_disconnect(self, client, userdata, rc):
"""MQTT disconnection callback"""
self.logger.warning(f"Disconnected from MQTT broker: {rc}")
def _on_mqtt_message(self, client, userdata, msg):
"""MQTT message callback"""
try:
topic = msg.topic
payload = msg.payload.decode("utf-8").strip().lower()
# Extract machine name from topic (vision/{machine_name}/state)
topic_parts = topic.split("/")
if len(topic_parts) >= 3 and topic_parts[0] == "vision" and topic_parts[2] == "state":
machine_name = topic_parts[1]
self.logger.info(f"MQTT: {machine_name} -> {payload}")
# Handle state change
self._handle_machine_state_change(machine_name, payload)
except Exception as e:
self.logger.error(f"Error processing MQTT message: {e}")
def _handle_machine_state_change(self, machine_name: str, state: str):
"""Handle machine state change"""
try:
# Check if we have a camera for this machine
camera_name = self.machine_camera_map.get(machine_name)
if not camera_name:
return
self.logger.info(f"Handling state change: {machine_name} ({camera_name}) -> {state}")
if state == "on":
self._start_recording(camera_name, machine_name)
elif state == "off":
self._stop_recording(camera_name, machine_name)
except Exception as e:
self.logger.error(f"Error handling machine state change: {e}")
def _start_recording(self, camera_name: str, machine_name: str):
"""Start recording for a camera"""
try:
# Check if already recording
if camera_name in self.active_recordings:
self.logger.warning(f"Camera {camera_name} is already recording")
return
# Get or create camera recorder
recorder = self._get_camera_recorder(camera_name)
if not recorder:
self.logger.error(f"Failed to get recorder for camera {camera_name}")
return
# Generate filename with timestamp and machine info
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
camera_config = self.config.get_camera_by_name(camera_name)
video_format = camera_config.video_format if camera_config else "mp4"
filename = f"{camera_name}_auto_{machine_name}_{timestamp}.{video_format}"
# Start recording
success = recorder.start_recording(filename)
if success:
self.active_recordings[camera_name] = filename
self.logger.info(f"✅ Started recording: {camera_name} -> {filename}")
else:
self.logger.error(f"❌ Failed to start recording for camera {camera_name}")
except Exception as e:
self.logger.error(f"Error starting recording for {camera_name}: {e}")
def _stop_recording(self, camera_name: str, machine_name: str):
"""Stop recording for a camera"""
try:
# Check if recording
if camera_name not in self.active_recordings:
self.logger.warning(f"Camera {camera_name} is not recording")
return
# Get recorder
recorder = self._get_camera_recorder(camera_name)
if not recorder:
self.logger.error(f"Failed to get recorder for camera {camera_name}")
return
# Stop recording
filename = self.active_recordings.pop(camera_name)
success = recorder.stop_recording()
if success:
self.logger.info(f"✅ Stopped recording: {camera_name} -> {filename}")
else:
self.logger.error(f"❌ Failed to stop recording for camera {camera_name}")
except Exception as e:
self.logger.error(f"Error stopping recording for {camera_name}: {e}")
def _get_camera_recorder(self, camera_name: str) -> Optional[CameraRecorder]:
"""Get or create camera recorder"""
try:
# Return existing recorder
if camera_name in self.camera_recorders:
return self.camera_recorders[camera_name]
# Find camera config
camera_config = None
for config in self.config.cameras:
if config.name == camera_name:
camera_config = config
break
if not camera_config:
self.logger.error(f"No configuration found for camera {camera_name}")
return None
# Find device info (simplified camera discovery)
device_info = self._find_camera_device(camera_name)
if not device_info:
self.logger.error(f"No device found for camera {camera_name}")
return None
# Create recorder
recorder = CameraRecorder(camera_config=camera_config, device_info=device_info, state_manager=self.state_manager, event_system=self.event_system)
self.camera_recorders[camera_name] = recorder
self.logger.info(f"Created recorder for camera {camera_name}")
return recorder
except Exception as e:
self.logger.error(f"Error creating recorder for {camera_name}: {e}")
return None
def _find_camera_device(self, camera_name: str):
"""Simplified camera device discovery"""
try:
# Import camera SDK
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "camera_sdk"))
import mvsdk
# Initialize SDK
mvsdk.CameraSdkInit(1)
# Enumerate cameras
device_list = mvsdk.CameraEnumerateDevice()
# For now, map by index (camera1 = index 0, camera2 = index 1)
camera_index = int(camera_name.replace("camera", "")) - 1
if 0 <= camera_index < len(device_list):
return device_list[camera_index]
else:
self.logger.error(f"Camera index {camera_index} not found (total: {len(device_list)})")
return None
except Exception as e:
self.logger.error(f"Error finding camera device: {e}")
return None
def start(self) -> bool:
"""Start the standalone auto-recorder"""
try:
self.logger.info("Starting Standalone Auto-Recorder...")
# Setup MQTT
if not self._setup_mqtt():
return False
# Wait for MQTT connection
time.sleep(2)
self.running = True
self.logger.info("✅ Standalone Auto-Recorder started successfully")
return True
except Exception as e:
self.logger.error(f"Failed to start auto-recorder: {e}")
return False
def stop(self) -> bool:
"""Stop the standalone auto-recorder"""
try:
self.logger.info("Stopping Standalone Auto-Recorder...")
self.running = False
self._stop_event.set()
# Stop all active recordings
for camera_name in list(self.active_recordings.keys()):
self._stop_recording(camera_name, "system_shutdown")
# Cleanup camera recorders
for recorder in self.camera_recorders.values():
try:
recorder.cleanup()
except:
pass
# Stop MQTT
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
self.logger.info("✅ Standalone Auto-Recorder stopped")
return True
except Exception as e:
self.logger.error(f"Error stopping auto-recorder: {e}")
return False
def run(self):
"""Run the auto-recorder (blocking)"""
if not self.start():
return False
try:
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
self.logger.info("Auto-recorder running... Press Ctrl+C to stop")
# Main loop
while self.running and not self._stop_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Received keyboard interrupt")
finally:
self.stop()
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self.logger.info(f"Received signal {signum}, shutting down...")
self.running = False
self._stop_event.set()
def main():
"""Main entry point"""
recorder = StandaloneAutoRecorder()
recorder.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,13 @@
"""
Video Module for USDA Vision Camera System.
This module provides modular video streaming, processing, and management capabilities
following clean architecture principles.
"""
from .domain.models import VideoFile, VideoMetadata, StreamRange
from .application.video_service import VideoService
from .application.streaming_service import StreamingService
from .integration import VideoModule, create_video_module
__all__ = ["VideoFile", "VideoMetadata", "StreamRange", "VideoService", "StreamingService", "VideoModule", "create_video_module"]

View File

@@ -0,0 +1,14 @@
"""
Video Application Layer.
Contains use cases and application services that orchestrate domain logic
and coordinate between domain and infrastructure layers.
"""
from .video_service import VideoService
from .streaming_service import StreamingService
__all__ = [
"VideoService",
"StreamingService",
]

View File

@@ -0,0 +1,160 @@
"""
Video Streaming Application Service.
Handles video streaming use cases including range requests and caching.
"""
import asyncio
import logging
from typing import Optional, Tuple
from ..domain.interfaces import VideoRepository, StreamingCache
from ..domain.models import VideoFile, StreamRange
class StreamingService:
"""Application service for video streaming"""
def __init__(
self,
video_repository: VideoRepository,
streaming_cache: Optional[StreamingCache] = None
):
self.video_repository = video_repository
self.streaming_cache = streaming_cache
self.logger = logging.getLogger(__name__)
async def stream_video_range(
self,
file_id: str,
range_request: Optional[StreamRange] = None
) -> Tuple[Optional[bytes], Optional[VideoFile], Optional[StreamRange]]:
"""
Stream video data for a specific range.
Returns:
Tuple of (data, video_file, actual_range)
"""
try:
# Get video file
video_file = await self.video_repository.get_by_id(file_id)
if not video_file or not video_file.is_streamable:
return None, None, None
# If no range specified, create range for entire file
if range_request is None:
range_request = StreamRange(start=0, end=video_file.file_size_bytes - 1)
# Validate and adjust range
actual_range = self._validate_range(range_request, video_file.file_size_bytes)
if not actual_range:
return None, video_file, None
# Try to get from cache first
if self.streaming_cache:
cached_data = await self.streaming_cache.get_cached_range(file_id, actual_range)
if cached_data:
self.logger.debug(f"Serving cached range for {file_id}")
return cached_data, video_file, actual_range
# Read from file
data = await self.video_repository.get_file_range(video_file, actual_range)
# Cache the data if caching is enabled
if self.streaming_cache and data:
await self.streaming_cache.cache_range(file_id, actual_range, data)
return data, video_file, actual_range
except Exception as e:
self.logger.error(f"Error streaming video range for {file_id}: {e}")
return None, None, None
async def get_video_info(self, file_id: str) -> Optional[VideoFile]:
"""Get video information for streaming"""
try:
video_file = await self.video_repository.get_by_id(file_id)
if not video_file or not video_file.is_streamable:
return None
return video_file
except Exception as e:
self.logger.error(f"Error getting video info for {file_id}: {e}")
return None
async def invalidate_cache(self, file_id: str) -> bool:
"""Invalidate cached data for a video file"""
try:
if self.streaming_cache:
await self.streaming_cache.invalidate_file(file_id)
self.logger.info(f"Invalidated cache for {file_id}")
return True
return False
except Exception as e:
self.logger.error(f"Error invalidating cache for {file_id}: {e}")
return False
async def cleanup_cache(self, max_size_mb: int = 100) -> int:
"""Clean up streaming cache"""
try:
if self.streaming_cache:
return await self.streaming_cache.cleanup_cache(max_size_mb)
return 0
except Exception as e:
self.logger.error(f"Error cleaning up cache: {e}")
return 0
def _validate_range(self, range_request: StreamRange, file_size: int) -> Optional[StreamRange]:
"""Validate and adjust range request for file size"""
try:
start = range_request.start
end = range_request.end
# Validate start position
if start < 0:
start = 0
elif start >= file_size:
return None
# Validate end position
if end is None or end >= file_size:
end = file_size - 1
elif end < start:
return None
return StreamRange(start=start, end=end)
except Exception as e:
self.logger.error(f"Error validating range: {e}")
return None
def calculate_content_range_header(
self,
range_request: StreamRange,
file_size: int
) -> str:
"""Calculate Content-Range header value"""
return f"bytes {range_request.start}-{range_request.end}/{file_size}"
def should_use_partial_content(self, range_request: Optional[StreamRange], file_size: int) -> bool:
"""Determine if response should use 206 Partial Content"""
if not range_request:
return False
# Use partial content if not requesting the entire file
return not (range_request.start == 0 and range_request.end == file_size - 1)
async def get_optimal_chunk_size(self, file_size: int) -> int:
"""Get optimal chunk size for streaming based on file size"""
# Adaptive chunk sizing
if file_size < 1024 * 1024: # < 1MB
return 64 * 1024 # 64KB chunks
elif file_size < 10 * 1024 * 1024: # < 10MB
return 256 * 1024 # 256KB chunks
elif file_size < 100 * 1024 * 1024: # < 100MB
return 512 * 1024 # 512KB chunks
else:
return 1024 * 1024 # 1MB chunks for large files

View File

@@ -0,0 +1,228 @@
"""
Video Application Service.
Orchestrates video-related use cases and business logic.
"""
import asyncio
import logging
from typing import List, Optional
from datetime import datetime
from ..domain.interfaces import VideoRepository, MetadataExtractor, VideoConverter
from ..domain.models import VideoFile, VideoMetadata, VideoFormat
class VideoService:
"""Application service for video management"""
def __init__(
self,
video_repository: VideoRepository,
metadata_extractor: MetadataExtractor,
video_converter: VideoConverter
):
self.video_repository = video_repository
self.metadata_extractor = metadata_extractor
self.video_converter = video_converter
self.logger = logging.getLogger(__name__)
async def get_video_by_id(self, file_id: str) -> Optional[VideoFile]:
"""Get video file by ID with metadata"""
try:
video_file = await self.video_repository.get_by_id(file_id)
if not video_file:
return None
# Ensure metadata is available
if not video_file.metadata:
await self._ensure_metadata(video_file)
return video_file
except Exception as e:
self.logger.error(f"Error getting video {file_id}: {e}")
return None
async def get_videos_by_camera(
self,
camera_name: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: Optional[int] = None,
include_metadata: bool = False
) -> List[VideoFile]:
"""Get videos for a camera with optional metadata"""
try:
videos = await self.video_repository.get_by_camera(
camera_name=camera_name,
start_date=start_date,
end_date=end_date,
limit=limit
)
if include_metadata:
# Extract metadata for videos that don't have it
await self._ensure_metadata_for_videos(videos)
return videos
except Exception as e:
self.logger.error(f"Error getting videos for camera {camera_name}: {e}")
return []
async def get_all_videos(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: Optional[int] = None,
include_metadata: bool = False
) -> List[VideoFile]:
"""Get all videos with optional metadata"""
try:
videos = await self.video_repository.get_all(
start_date=start_date,
end_date=end_date,
limit=limit
)
if include_metadata:
await self._ensure_metadata_for_videos(videos)
return videos
except Exception as e:
self.logger.error(f"Error getting all videos: {e}")
return []
async def get_video_thumbnail(
self,
file_id: str,
timestamp_seconds: float = 1.0,
size: tuple = (320, 240)
) -> Optional[bytes]:
"""Get thumbnail for video"""
try:
video_file = await self.video_repository.get_by_id(file_id)
if not video_file or not video_file.is_streamable:
return None
return await self.metadata_extractor.extract_thumbnail(
video_file.file_path,
timestamp_seconds=timestamp_seconds,
size=size
)
except Exception as e:
self.logger.error(f"Error getting thumbnail for {file_id}: {e}")
return None
async def prepare_for_streaming(self, file_id: str) -> Optional[VideoFile]:
"""Prepare video for web streaming (convert if needed)"""
try:
video_file = await self.video_repository.get_by_id(file_id)
if not video_file:
return None
# Ensure metadata is available
await self._ensure_metadata(video_file)
# Check if conversion is needed for web compatibility
if video_file.needs_conversion():
converted_file = await self._convert_for_web(video_file)
return converted_file if converted_file else video_file
return video_file
except Exception as e:
self.logger.error(f"Error preparing video {file_id} for streaming: {e}")
return None
async def validate_video(self, file_id: str) -> bool:
"""Validate that video file is accessible and valid"""
try:
video_file = await self.video_repository.get_by_id(file_id)
if not video_file:
return False
# Check file exists and is readable
if not video_file.file_path.exists():
return False
# Validate video format
return await self.metadata_extractor.is_valid_video(video_file.file_path)
except Exception as e:
self.logger.error(f"Error validating video {file_id}: {e}")
return False
async def _ensure_metadata(self, video_file: VideoFile) -> None:
"""Ensure video has metadata extracted"""
if video_file.metadata:
return
try:
metadata = await self.metadata_extractor.extract(video_file.file_path)
if metadata:
# Update video file with metadata
# Note: In a real implementation, you might want to persist this
video_file.metadata = metadata
self.logger.debug(f"Extracted metadata for {video_file.file_id}")
except Exception as e:
self.logger.warning(f"Could not extract metadata for {video_file.file_id}: {e}")
async def _ensure_metadata_for_videos(self, videos: List[VideoFile]) -> None:
"""Extract metadata for multiple videos concurrently"""
tasks = []
for video in videos:
if not video.metadata:
tasks.append(self._ensure_metadata(video))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _convert_for_web(self, video_file: VideoFile) -> Optional[VideoFile]:
"""Convert video to web-compatible format"""
try:
target_format = video_file.web_compatible_format
# Get path for converted file
converted_path = await self.video_converter.get_converted_path(
video_file.file_path,
target_format
)
# Perform conversion
success = await self.video_converter.convert(
source_path=video_file.file_path,
target_path=converted_path,
target_format=target_format,
quality="medium"
)
if success and converted_path.exists():
# Create new VideoFile object for converted file
converted_video = VideoFile(
file_id=f"{video_file.file_id}_converted",
camera_name=video_file.camera_name,
filename=converted_path.name,
file_path=converted_path,
file_size_bytes=converted_path.stat().st_size,
created_at=video_file.created_at,
status=video_file.status,
format=target_format,
metadata=video_file.metadata,
start_time=video_file.start_time,
end_time=video_file.end_time,
machine_trigger=video_file.machine_trigger
)
self.logger.info(f"Successfully converted {video_file.file_id} to {target_format.value}")
return converted_video
return None
except Exception as e:
self.logger.error(f"Error converting video {video_file.file_id}: {e}")
return None

View File

@@ -0,0 +1,18 @@
"""
Video Domain Layer.
Contains pure business logic and domain models for video operations.
No external dependencies - only Python standard library and domain concepts.
"""
from .models import VideoFile, VideoMetadata, StreamRange
from .interfaces import VideoRepository, VideoConverter, MetadataExtractor
__all__ = [
"VideoFile",
"VideoMetadata",
"StreamRange",
"VideoRepository",
"VideoConverter",
"MetadataExtractor",
]

View File

@@ -0,0 +1,157 @@
"""
Video Domain Interfaces.
Abstract interfaces that define contracts for video operations.
These interfaces allow dependency inversion - domain logic doesn't depend on infrastructure.
"""
from abc import ABC, abstractmethod
from typing import List, Optional, BinaryIO
from datetime import datetime
from pathlib import Path
from .models import VideoFile, VideoMetadata, StreamRange, VideoFormat
class VideoRepository(ABC):
"""Abstract repository for video file access"""
@abstractmethod
async def get_by_id(self, file_id: str) -> Optional[VideoFile]:
"""Get video file by ID"""
pass
@abstractmethod
async def get_by_camera(
self,
camera_name: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: Optional[int] = None
) -> List[VideoFile]:
"""Get video files for a camera with optional filters"""
pass
@abstractmethod
async def get_all(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: Optional[int] = None
) -> List[VideoFile]:
"""Get all video files with optional filters"""
pass
@abstractmethod
async def exists(self, file_id: str) -> bool:
"""Check if video file exists"""
pass
@abstractmethod
async def get_file_stream(self, video_file: VideoFile) -> BinaryIO:
"""Get file stream for reading video data"""
pass
@abstractmethod
async def get_file_range(
self,
video_file: VideoFile,
range_request: StreamRange
) -> bytes:
"""Get specific byte range from video file"""
pass
class VideoConverter(ABC):
"""Abstract video format converter"""
@abstractmethod
async def convert(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> bool:
"""Convert video to target format"""
pass
@abstractmethod
async def is_conversion_needed(
self,
source_format: VideoFormat,
target_format: VideoFormat
) -> bool:
"""Check if conversion is needed"""
pass
@abstractmethod
async def get_converted_path(
self,
original_path: Path,
target_format: VideoFormat
) -> Path:
"""Get path for converted file"""
pass
@abstractmethod
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
"""Clean up old converted files"""
pass
class MetadataExtractor(ABC):
"""Abstract video metadata extractor"""
@abstractmethod
async def extract(self, file_path: Path) -> Optional[VideoMetadata]:
"""Extract metadata from video file"""
pass
@abstractmethod
async def extract_thumbnail(
self,
file_path: Path,
timestamp_seconds: float = 1.0,
size: tuple = (320, 240)
) -> Optional[bytes]:
"""Extract thumbnail image from video"""
pass
@abstractmethod
async def is_valid_video(self, file_path: Path) -> bool:
"""Check if file is a valid video"""
pass
class StreamingCache(ABC):
"""Abstract cache for streaming optimization"""
@abstractmethod
async def get_cached_range(
self,
file_id: str,
range_request: StreamRange
) -> Optional[bytes]:
"""Get cached byte range"""
pass
@abstractmethod
async def cache_range(
self,
file_id: str,
range_request: StreamRange,
data: bytes
) -> None:
"""Cache byte range data"""
pass
@abstractmethod
async def invalidate_file(self, file_id: str) -> None:
"""Invalidate all cached data for a file"""
pass
@abstractmethod
async def cleanup_cache(self, max_size_mb: int = 100) -> int:
"""Clean up cache to stay under size limit"""
pass

View File

@@ -0,0 +1,162 @@
"""
Video Domain Models.
Pure business entities and value objects for video operations.
These models contain no external dependencies and represent core business concepts.
"""
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
from enum import Enum
class VideoFormat(Enum):
"""Supported video formats"""
AVI = "avi"
MP4 = "mp4"
WEBM = "webm"
class VideoStatus(Enum):
"""Video file status"""
RECORDING = "recording"
COMPLETED = "completed"
PROCESSING = "processing"
ERROR = "error"
UNKNOWN = "unknown"
@dataclass(frozen=True)
class VideoMetadata:
"""Video metadata value object"""
duration_seconds: float
width: int
height: int
fps: float
codec: str
bitrate: Optional[int] = None
@property
def resolution(self) -> Tuple[int, int]:
"""Get video resolution as tuple"""
return (self.width, self.height)
@property
def aspect_ratio(self) -> float:
"""Calculate aspect ratio"""
return self.width / self.height if self.height > 0 else 0.0
@dataclass(frozen=True)
class StreamRange:
"""HTTP range request value object"""
start: int
end: Optional[int] = None
def __post_init__(self):
if self.start < 0:
raise ValueError("Start byte cannot be negative")
if self.end is not None and self.end < self.start:
raise ValueError("End byte cannot be less than start byte")
@property
def size(self) -> Optional[int]:
"""Get range size in bytes"""
if self.end is not None:
return self.end - self.start + 1
return None
@classmethod
def from_header(cls, range_header: str, file_size: int) -> 'StreamRange':
"""Parse HTTP Range header"""
if not range_header.startswith('bytes='):
raise ValueError("Invalid range header format")
range_spec = range_header[6:] # Remove 'bytes='
if '-' not in range_spec:
raise ValueError("Invalid range specification")
start_str, end_str = range_spec.split('-', 1)
if start_str:
start = int(start_str)
else:
# Suffix range (e.g., "-500" means last 500 bytes)
if not end_str:
raise ValueError("Invalid range specification")
suffix_length = int(end_str)
start = max(0, file_size - suffix_length)
end = file_size - 1
return cls(start=start, end=end)
if end_str:
end = min(int(end_str), file_size - 1)
else:
end = file_size - 1
return cls(start=start, end=end)
@dataclass
class VideoFile:
"""Video file entity"""
file_id: str
camera_name: str
filename: str
file_path: Path
file_size_bytes: int
created_at: datetime
status: VideoStatus
format: VideoFormat
metadata: Optional[VideoMetadata] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
machine_trigger: Optional[str] = None
error_message: Optional[str] = None
def __post_init__(self):
"""Validate video file data"""
if not self.file_id:
raise ValueError("File ID cannot be empty")
if not self.camera_name:
raise ValueError("Camera name cannot be empty")
if self.file_size_bytes < 0:
raise ValueError("File size cannot be negative")
@property
def duration_seconds(self) -> Optional[float]:
"""Get video duration from metadata"""
return self.metadata.duration_seconds if self.metadata else None
@property
def is_streamable(self) -> bool:
"""Check if video can be streamed"""
return (
self.status in [VideoStatus.COMPLETED, VideoStatus.RECORDING] and
self.file_path.exists() and
self.file_size_bytes > 0
)
@property
def web_compatible_format(self) -> VideoFormat:
"""Get web-compatible format for this video"""
# AVI files should be converted to MP4 for web compatibility
if self.format == VideoFormat.AVI:
return VideoFormat.MP4
return self.format
def needs_conversion(self) -> bool:
"""Check if video needs format conversion for web streaming"""
return self.format != self.web_compatible_format
def get_converted_filename(self) -> str:
"""Get filename for converted version"""
if not self.needs_conversion():
return self.filename
# Replace extension with web-compatible format
stem = Path(self.filename).stem
return f"{stem}.{self.web_compatible_format.value}"

View File

@@ -0,0 +1,18 @@
"""
Video Infrastructure Layer.
Contains implementations of domain interfaces using external dependencies
like file systems, FFmpeg, OpenCV, etc.
"""
from .repositories import FileSystemVideoRepository
from .converters import FFmpegVideoConverter
from .metadata_extractors import OpenCVMetadataExtractor
from .caching import InMemoryStreamingCache
__all__ = [
"FileSystemVideoRepository",
"FFmpegVideoConverter",
"OpenCVMetadataExtractor",
"InMemoryStreamingCache",
]

View File

@@ -0,0 +1,176 @@
"""
Streaming Cache Implementations.
In-memory and file-based caching for video streaming optimization.
"""
import asyncio
import logging
from typing import Optional, Dict, Tuple
from datetime import datetime, timedelta
import hashlib
from ..domain.interfaces import StreamingCache
from ..domain.models import StreamRange
class InMemoryStreamingCache(StreamingCache):
"""In-memory cache for video streaming"""
def __init__(self, max_size_mb: int = 100, max_age_minutes: int = 30):
self.max_size_bytes = max_size_mb * 1024 * 1024
self.max_age = timedelta(minutes=max_age_minutes)
self.logger = logging.getLogger(__name__)
# Cache storage: {cache_key: (data, timestamp, size)}
self._cache: Dict[str, Tuple[bytes, datetime, int]] = {}
self._current_size = 0
self._lock = asyncio.Lock()
async def get_cached_range(
self,
file_id: str,
range_request: StreamRange
) -> Optional[bytes]:
"""Get cached byte range"""
cache_key = self._generate_cache_key(file_id, range_request)
async with self._lock:
if cache_key in self._cache:
data, timestamp, size = self._cache[cache_key]
# Check if cache entry is still valid
if datetime.now() - timestamp <= self.max_age:
self.logger.debug(f"Cache hit for {file_id} range {range_request.start}-{range_request.end}")
return data
else:
# Remove expired entry
del self._cache[cache_key]
self._current_size -= size
self.logger.debug(f"Cache entry expired for {file_id}")
return None
async def cache_range(
self,
file_id: str,
range_request: StreamRange,
data: bytes
) -> None:
"""Cache byte range data"""
cache_key = self._generate_cache_key(file_id, range_request)
data_size = len(data)
async with self._lock:
# Check if we need to make space
while self._current_size + data_size > self.max_size_bytes and self._cache:
await self._evict_oldest()
# Add to cache
self._cache[cache_key] = (data, datetime.now(), data_size)
self._current_size += data_size
self.logger.debug(f"Cached {data_size} bytes for {file_id} range {range_request.start}-{range_request.end}")
async def invalidate_file(self, file_id: str) -> None:
"""Invalidate all cached data for a file"""
async with self._lock:
keys_to_remove = [key for key in self._cache.keys() if key.startswith(f"{file_id}:")]
for key in keys_to_remove:
_, _, size = self._cache[key]
del self._cache[key]
self._current_size -= size
if keys_to_remove:
self.logger.info(f"Invalidated {len(keys_to_remove)} cache entries for {file_id}")
async def cleanup_cache(self, max_size_mb: int = 100) -> int:
"""Clean up cache to stay under size limit"""
target_size = max_size_mb * 1024 * 1024
entries_removed = 0
async with self._lock:
# Remove expired entries first
current_time = datetime.now()
expired_keys = [
key for key, (_, timestamp, _) in self._cache.items()
if current_time - timestamp > self.max_age
]
for key in expired_keys:
_, _, size = self._cache[key]
del self._cache[key]
self._current_size -= size
entries_removed += 1
# Remove oldest entries if still over limit
while self._current_size > target_size and self._cache:
await self._evict_oldest()
entries_removed += 1
if entries_removed > 0:
self.logger.info(f"Cache cleanup removed {entries_removed} entries")
return entries_removed
async def _evict_oldest(self) -> None:
"""Evict the oldest cache entry"""
if not self._cache:
return
# Find oldest entry
oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1])
_, _, size = self._cache[oldest_key]
del self._cache[oldest_key]
self._current_size -= size
self.logger.debug(f"Evicted cache entry: {oldest_key}")
def _generate_cache_key(self, file_id: str, range_request: StreamRange) -> str:
"""Generate cache key for file and range"""
range_str = f"{range_request.start}-{range_request.end}"
return f"{file_id}:{range_str}"
async def get_cache_stats(self) -> dict:
"""Get cache statistics"""
async with self._lock:
return {
"entries": len(self._cache),
"size_bytes": self._current_size,
"size_mb": self._current_size / (1024 * 1024),
"max_size_mb": self.max_size_bytes / (1024 * 1024),
"utilization_percent": (self._current_size / self.max_size_bytes) * 100
}
class NoOpStreamingCache(StreamingCache):
"""No-operation cache that doesn't actually cache anything"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def get_cached_range(
self,
file_id: str,
range_request: StreamRange
) -> Optional[bytes]:
"""Always return None (no cache)"""
return None
async def cache_range(
self,
file_id: str,
range_request: StreamRange,
data: bytes
) -> None:
"""No-op caching"""
pass
async def invalidate_file(self, file_id: str) -> None:
"""No-op invalidation"""
pass
async def cleanup_cache(self, max_size_mb: int = 100) -> int:
"""No-op cleanup"""
return 0

View File

@@ -0,0 +1,220 @@
"""
Video Format Converters.
Implementations for converting video formats using FFmpeg.
"""
import asyncio
import logging
import shutil
from typing import Optional
from pathlib import Path
from datetime import datetime, timedelta
from ..domain.interfaces import VideoConverter
from ..domain.models import VideoFormat
class FFmpegVideoConverter(VideoConverter):
"""FFmpeg-based video converter"""
def __init__(self, temp_dir: Optional[Path] = None):
self.logger = logging.getLogger(__name__)
self.temp_dir = temp_dir or Path("/tmp/video_conversions")
self.temp_dir.mkdir(parents=True, exist_ok=True)
# Check if FFmpeg is available
self._ffmpeg_available = shutil.which("ffmpeg") is not None
if not self._ffmpeg_available:
self.logger.warning("FFmpeg not found - video conversion will be disabled")
async def convert(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> bool:
"""Convert video to target format"""
if not self._ffmpeg_available:
self.logger.error("FFmpeg not available for conversion")
return False
try:
# Ensure target directory exists
target_path.parent.mkdir(parents=True, exist_ok=True)
# Build FFmpeg command
cmd = self._build_ffmpeg_command(source_path, target_path, target_format, quality)
self.logger.info(f"Converting {source_path} to {target_path} using FFmpeg")
# Run FFmpeg conversion
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
self.logger.info(f"Successfully converted {source_path} to {target_path}")
return True
else:
error_msg = stderr.decode() if stderr else "Unknown FFmpeg error"
self.logger.error(f"FFmpeg conversion failed: {error_msg}")
return False
except Exception as e:
self.logger.error(f"Error during video conversion: {e}")
return False
async def is_conversion_needed(
self,
source_format: VideoFormat,
target_format: VideoFormat
) -> bool:
"""Check if conversion is needed"""
return source_format != target_format
async def get_converted_path(
self,
original_path: Path,
target_format: VideoFormat
) -> Path:
"""Get path for converted file"""
# Place converted files in temp directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
stem = original_path.stem
converted_filename = f"{stem}_{timestamp}.{target_format.value}"
return self.temp_dir / converted_filename
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
"""Clean up old converted files"""
try:
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
files_removed = 0
if not self.temp_dir.exists():
return 0
for file_path in self.temp_dir.iterdir():
if file_path.is_file():
# Get file modification time
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if file_mtime < cutoff_time:
try:
file_path.unlink()
files_removed += 1
self.logger.debug(f"Removed old converted file: {file_path}")
except Exception as e:
self.logger.warning(f"Could not remove {file_path}: {e}")
self.logger.info(f"Cleaned up {files_removed} old converted files")
return files_removed
except Exception as e:
self.logger.error(f"Error during converted files cleanup: {e}")
return 0
def _build_ffmpeg_command(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> list:
"""Build FFmpeg command for conversion"""
cmd = ["ffmpeg", "-i", str(source_path)]
# Add format-specific options
if target_format == VideoFormat.MP4:
cmd.extend([
"-c:v", "libx264", # H.264 video codec
"-c:a", "aac", # AAC audio codec
"-movflags", "+faststart", # Enable progressive download
])
# Quality settings
if quality == "high":
cmd.extend(["-crf", "18"])
elif quality == "medium":
cmd.extend(["-crf", "23"])
elif quality == "low":
cmd.extend(["-crf", "28"])
else:
cmd.extend(["-crf", "23"]) # Default medium quality
elif target_format == VideoFormat.WEBM:
cmd.extend([
"-c:v", "libvpx-vp9", # VP9 video codec
"-c:a", "libopus", # Opus audio codec
])
# Quality settings for WebM
if quality == "high":
cmd.extend(["-crf", "15", "-b:v", "0"])
elif quality == "medium":
cmd.extend(["-crf", "20", "-b:v", "0"])
elif quality == "low":
cmd.extend(["-crf", "25", "-b:v", "0"])
else:
cmd.extend(["-crf", "20", "-b:v", "0"]) # Default medium quality
# Common options
cmd.extend([
"-preset", "fast", # Encoding speed vs compression trade-off
"-y", # Overwrite output file
str(target_path)
])
return cmd
class NoOpVideoConverter(VideoConverter):
"""No-operation converter for when FFmpeg is not available"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def convert(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> bool:
"""No-op conversion - just copy file if formats match"""
try:
if source_path.suffix.lower().lstrip('.') == target_format.value:
# Same format, just copy
shutil.copy2(source_path, target_path)
return True
else:
self.logger.warning(f"Cannot convert {source_path} to {target_format} - no converter available")
return False
except Exception as e:
self.logger.error(f"Error in no-op conversion: {e}")
return False
async def is_conversion_needed(
self,
source_format: VideoFormat,
target_format: VideoFormat
) -> bool:
"""Check if conversion is needed"""
return source_format != target_format
async def get_converted_path(
self,
original_path: Path,
target_format: VideoFormat
) -> Path:
"""Get path for converted file"""
return original_path.with_suffix(f".{target_format.value}")
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
"""No-op cleanup"""
return 0

View File

@@ -0,0 +1,201 @@
"""
Video Metadata Extractors.
Implementations for extracting video metadata using OpenCV and other tools.
"""
import asyncio
import logging
from typing import Optional
from pathlib import Path
import cv2
import numpy as np
from ..domain.interfaces import MetadataExtractor
from ..domain.models import VideoMetadata
class OpenCVMetadataExtractor(MetadataExtractor):
"""OpenCV-based metadata extractor"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def extract(self, file_path: Path) -> Optional[VideoMetadata]:
"""Extract metadata from video file using OpenCV"""
try:
# Run OpenCV operations in thread pool to avoid blocking
return await asyncio.get_event_loop().run_in_executor(
None, self._extract_sync, file_path
)
except Exception as e:
self.logger.error(f"Error extracting metadata from {file_path}: {e}")
return None
def _extract_sync(self, file_path: Path) -> Optional[VideoMetadata]:
"""Synchronous metadata extraction"""
cap = None
try:
cap = cv2.VideoCapture(str(file_path))
if not cap.isOpened():
self.logger.warning(f"Could not open video file: {file_path}")
return None
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Calculate duration
duration_seconds = frame_count / fps if fps > 0 else 0.0
# Get codec information
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
codec = self._fourcc_to_string(fourcc)
# Try to get bitrate (not always available)
bitrate = cap.get(cv2.CAP_PROP_BITRATE)
bitrate = int(bitrate) if bitrate > 0 else None
return VideoMetadata(
duration_seconds=duration_seconds,
width=width,
height=height,
fps=fps,
codec=codec,
bitrate=bitrate
)
except Exception as e:
self.logger.error(f"Error in sync metadata extraction: {e}")
return None
finally:
if cap is not None:
cap.release()
async def extract_thumbnail(
self,
file_path: Path,
timestamp_seconds: float = 1.0,
size: tuple = (320, 240)
) -> Optional[bytes]:
"""Extract thumbnail image from video"""
try:
return await asyncio.get_event_loop().run_in_executor(
None, self._extract_thumbnail_sync, file_path, timestamp_seconds, size
)
except Exception as e:
self.logger.error(f"Error extracting thumbnail from {file_path}: {e}")
return None
def _extract_thumbnail_sync(
self,
file_path: Path,
timestamp_seconds: float,
size: tuple
) -> Optional[bytes]:
"""Synchronous thumbnail extraction"""
cap = None
try:
cap = cv2.VideoCapture(str(file_path))
if not cap.isOpened():
return None
# Get video FPS to calculate frame number
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 30 # Default fallback
# Calculate target frame
target_frame = int(timestamp_seconds * fps)
# Set position to target frame
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
# Read frame
ret, frame = cap.read()
if not ret or frame is None:
# Fallback to first frame
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = cap.read()
if not ret or frame is None:
return None
# Resize frame to thumbnail size
thumbnail = cv2.resize(frame, size)
# Encode as JPEG
success, buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 85])
if success:
return buffer.tobytes()
return None
except Exception as e:
self.logger.error(f"Error in sync thumbnail extraction: {e}")
return None
finally:
if cap is not None:
cap.release()
async def is_valid_video(self, file_path: Path) -> bool:
"""Check if file is a valid video"""
try:
return await asyncio.get_event_loop().run_in_executor(
None, self._is_valid_video_sync, file_path
)
except Exception as e:
self.logger.error(f"Error validating video {file_path}: {e}")
return False
def _is_valid_video_sync(self, file_path: Path) -> bool:
"""Synchronous video validation"""
cap = None
try:
if not file_path.exists():
return False
cap = cv2.VideoCapture(str(file_path))
if not cap.isOpened():
return False
# Try to read first frame
ret, frame = cap.read()
return ret and frame is not None
except Exception:
return False
finally:
if cap is not None:
cap.release()
def _fourcc_to_string(self, fourcc: int) -> str:
"""Convert OpenCV fourcc code to string"""
try:
# Convert fourcc integer to 4-character string
fourcc_bytes = [
(fourcc & 0xFF),
((fourcc >> 8) & 0xFF),
((fourcc >> 16) & 0xFF),
((fourcc >> 24) & 0xFF)
]
# Convert to string, handling non-printable characters
codec_chars = []
for byte_val in fourcc_bytes:
if 32 <= byte_val <= 126: # Printable ASCII
codec_chars.append(chr(byte_val))
else:
codec_chars.append('?')
return ''.join(codec_chars).strip()
except Exception:
return "UNKNOWN"

View File

@@ -0,0 +1,183 @@
"""
Video Repository Implementations.
File system-based implementation of video repository interface.
"""
import asyncio
import logging
from typing import List, Optional, BinaryIO
from datetime import datetime
from pathlib import Path
import aiofiles
from ..domain.interfaces import VideoRepository
from ..domain.models import VideoFile, VideoFormat, VideoStatus, StreamRange
from ...core.config import Config
from ...storage.manager import StorageManager
class FileSystemVideoRepository(VideoRepository):
"""File system implementation of video repository"""
def __init__(self, config: Config, storage_manager: StorageManager):
self.config = config
self.storage_manager = storage_manager
self.logger = logging.getLogger(__name__)
async def get_by_id(self, file_id: str) -> Optional[VideoFile]:
"""Get video file by ID"""
try:
# Get file info from storage manager
file_info = self.storage_manager.get_file_info(file_id)
if not file_info:
return None
return self._convert_to_video_file(file_info)
except Exception as e:
self.logger.error(f"Error getting video by ID {file_id}: {e}")
return None
async def get_by_camera(
self,
camera_name: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: Optional[int] = None
) -> List[VideoFile]:
"""Get video files for a camera with optional filters"""
try:
# Use storage manager to get files
files = self.storage_manager.get_recording_files(
camera_name=camera_name,
start_date=start_date,
end_date=end_date,
limit=limit
)
return [self._convert_to_video_file(file_info) for file_info in files]
except Exception as e:
self.logger.error(f"Error getting videos for camera {camera_name}: {e}")
return []
async def get_all(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: Optional[int] = None
) -> List[VideoFile]:
"""Get all video files with optional filters"""
try:
# Get files from all cameras
files = self.storage_manager.get_recording_files(
camera_name=None, # All cameras
start_date=start_date,
end_date=end_date,
limit=limit
)
return [self._convert_to_video_file(file_info) for file_info in files]
except Exception as e:
self.logger.error(f"Error getting all videos: {e}")
return []
async def exists(self, file_id: str) -> bool:
"""Check if video file exists"""
try:
video_file = await self.get_by_id(file_id)
return video_file is not None and video_file.file_path.exists()
except Exception as e:
self.logger.error(f"Error checking if video exists {file_id}: {e}")
return False
async def get_file_stream(self, video_file: VideoFile) -> BinaryIO:
"""Get file stream for reading video data"""
try:
# Use aiofiles for async file operations
return await aiofiles.open(video_file.file_path, 'rb')
except Exception as e:
self.logger.error(f"Error opening file stream for {video_file.file_id}: {e}")
raise
async def get_file_range(
self,
video_file: VideoFile,
range_request: StreamRange
) -> bytes:
"""Get specific byte range from video file"""
try:
async with aiofiles.open(video_file.file_path, 'rb') as f:
# Seek to start position
await f.seek(range_request.start)
# Calculate how many bytes to read
if range_request.end is not None:
bytes_to_read = range_request.end - range_request.start + 1
data = await f.read(bytes_to_read)
else:
# Read to end of file
data = await f.read()
return data
except Exception as e:
self.logger.error(f"Error reading file range for {video_file.file_id}: {e}")
raise
def _convert_to_video_file(self, file_info: dict) -> VideoFile:
"""Convert storage manager file info to VideoFile domain model"""
try:
file_path = Path(file_info["filename"])
# Determine video format from extension
extension = file_path.suffix.lower().lstrip('.')
if extension == 'avi':
format = VideoFormat.AVI
elif extension == 'mp4':
format = VideoFormat.MP4
elif extension == 'webm':
format = VideoFormat.WEBM
else:
format = VideoFormat.AVI # Default fallback
# Parse status
status_str = file_info.get("status", "unknown")
try:
status = VideoStatus(status_str)
except ValueError:
status = VideoStatus.UNKNOWN
# Parse timestamps
start_time = None
if file_info.get("start_time"):
start_time = datetime.fromisoformat(file_info["start_time"])
end_time = None
if file_info.get("end_time"):
end_time = datetime.fromisoformat(file_info["end_time"])
created_at = start_time or datetime.now()
return VideoFile(
file_id=file_info["file_id"],
camera_name=file_info["camera_name"],
filename=file_info["filename"],
file_path=file_path,
file_size_bytes=file_info.get("file_size_bytes", 0),
created_at=created_at,
status=status,
format=format,
start_time=start_time,
end_time=end_time,
machine_trigger=file_info.get("machine_trigger"),
error_message=None # Could be added to storage manager later
)
except Exception as e:
self.logger.error(f"Error converting file info to VideoFile: {e}")
raise

View File

@@ -0,0 +1,197 @@
"""
Video Module Integration.
Integrates the modular video system with the existing USDA Vision Camera System.
This module handles dependency injection and service composition.
"""
import logging
from typing import Optional
from ..core.config import Config
from ..storage.manager import StorageManager
# Domain interfaces
from .domain.interfaces import VideoRepository, VideoConverter, MetadataExtractor, StreamingCache
# Infrastructure implementations
from .infrastructure.repositories import FileSystemVideoRepository
from .infrastructure.converters import FFmpegVideoConverter, NoOpVideoConverter
from .infrastructure.metadata_extractors import OpenCVMetadataExtractor
from .infrastructure.caching import InMemoryStreamingCache, NoOpStreamingCache
# Application services
from .application.video_service import VideoService
from .application.streaming_service import StreamingService
# Presentation layer
from .presentation.controllers import VideoController, StreamingController
from .presentation.routes import create_video_routes, create_admin_video_routes
class VideoModuleConfig:
"""Configuration for video module"""
def __init__(
self,
enable_caching: bool = True,
cache_size_mb: int = 100,
cache_max_age_minutes: int = 30,
enable_conversion: bool = True,
conversion_quality: str = "medium"
):
self.enable_caching = enable_caching
self.cache_size_mb = cache_size_mb
self.cache_max_age_minutes = cache_max_age_minutes
self.enable_conversion = enable_conversion
self.conversion_quality = conversion_quality
class VideoModule:
"""
Main video module that provides dependency injection and service composition.
This class follows the composition root pattern, creating and wiring up
all dependencies for the video streaming functionality.
"""
def __init__(
self,
config: Config,
storage_manager: StorageManager,
video_config: Optional[VideoModuleConfig] = None
):
self.config = config
self.storage_manager = storage_manager
self.video_config = video_config or VideoModuleConfig()
self.logger = logging.getLogger(__name__)
# Initialize services
self._initialize_services()
self.logger.info("Video module initialized successfully")
def _initialize_services(self):
"""Initialize all video services with proper dependency injection"""
# Infrastructure layer
self.video_repository = self._create_video_repository()
self.video_converter = self._create_video_converter()
self.metadata_extractor = self._create_metadata_extractor()
self.streaming_cache = self._create_streaming_cache()
# Application layer
self.video_service = VideoService(
video_repository=self.video_repository,
metadata_extractor=self.metadata_extractor,
video_converter=self.video_converter
)
self.streaming_service = StreamingService(
video_repository=self.video_repository,
streaming_cache=self.streaming_cache
)
# Presentation layer
self.video_controller = VideoController(self.video_service)
self.streaming_controller = StreamingController(
streaming_service=self.streaming_service,
video_service=self.video_service
)
def _create_video_repository(self) -> VideoRepository:
"""Create video repository implementation"""
return FileSystemVideoRepository(
config=self.config,
storage_manager=self.storage_manager
)
def _create_video_converter(self) -> VideoConverter:
"""Create video converter implementation"""
if self.video_config.enable_conversion:
try:
return FFmpegVideoConverter()
except Exception as e:
self.logger.warning(f"FFmpeg converter not available, using no-op converter: {e}")
return NoOpVideoConverter()
else:
return NoOpVideoConverter()
def _create_metadata_extractor(self) -> MetadataExtractor:
"""Create metadata extractor implementation"""
return OpenCVMetadataExtractor()
def _create_streaming_cache(self) -> StreamingCache:
"""Create streaming cache implementation"""
if self.video_config.enable_caching:
return InMemoryStreamingCache(
max_size_mb=self.video_config.cache_size_mb,
max_age_minutes=self.video_config.cache_max_age_minutes
)
else:
return NoOpStreamingCache()
def get_api_routes(self):
"""Get FastAPI routes for video functionality"""
return create_video_routes(
video_controller=self.video_controller,
streaming_controller=self.streaming_controller
)
def get_admin_routes(self):
"""Get admin routes for video management"""
return create_admin_video_routes(
streaming_controller=self.streaming_controller
)
async def cleanup(self):
"""Clean up video module resources"""
try:
# Clean up cache
if self.streaming_cache:
await self.streaming_cache.cleanup_cache()
# Clean up converted files
if self.video_converter:
await self.video_converter.cleanup_converted_files()
self.logger.info("Video module cleanup completed")
except Exception as e:
self.logger.error(f"Error during video module cleanup: {e}")
def get_module_status(self) -> dict:
"""Get status information about the video module"""
return {
"video_repository": type(self.video_repository).__name__,
"video_converter": type(self.video_converter).__name__,
"metadata_extractor": type(self.metadata_extractor).__name__,
"streaming_cache": type(self.streaming_cache).__name__,
"caching_enabled": self.video_config.enable_caching,
"conversion_enabled": self.video_config.enable_conversion,
"cache_size_mb": self.video_config.cache_size_mb
}
def create_video_module(
config: Config,
storage_manager: StorageManager,
enable_caching: bool = True,
enable_conversion: bool = True
) -> VideoModule:
"""
Factory function to create a configured video module.
This is the main entry point for integrating video functionality
into the existing USDA Vision Camera System.
"""
video_config = VideoModuleConfig(
enable_caching=enable_caching,
enable_conversion=enable_conversion
)
return VideoModule(
config=config,
storage_manager=storage_manager,
video_config=video_config
)

View File

@@ -0,0 +1,18 @@
"""
Video Presentation Layer.
Contains HTTP controllers, request/response models, and API route definitions.
"""
from .controllers import VideoController, StreamingController
from .schemas import VideoInfoResponse, VideoListResponse, StreamingInfoResponse
from .routes import create_video_routes
__all__ = [
"VideoController",
"StreamingController",
"VideoInfoResponse",
"VideoListResponse",
"StreamingInfoResponse",
"create_video_routes",
]

View File

@@ -0,0 +1,207 @@
"""
Video HTTP Controllers.
Handle HTTP requests and responses for video operations.
"""
import logging
from typing import Optional
from datetime import datetime
from fastapi import HTTPException, Request, Response
from fastapi.responses import StreamingResponse
from ..application.video_service import VideoService
from ..application.streaming_service import StreamingService
from ..domain.models import StreamRange, VideoFile
from .schemas import (
VideoInfoResponse, VideoListResponse, VideoListRequest,
StreamingInfoResponse, ThumbnailRequest, VideoMetadataResponse
)
class VideoController:
"""Controller for video management operations"""
def __init__(self, video_service: VideoService):
self.video_service = video_service
self.logger = logging.getLogger(__name__)
async def get_video_info(self, file_id: str) -> VideoInfoResponse:
"""Get video information"""
video_file = await self.video_service.get_video_by_id(file_id)
if not video_file:
raise HTTPException(status_code=404, detail=f"Video {file_id} not found")
return self._convert_to_response(video_file)
async def list_videos(self, request: VideoListRequest) -> VideoListResponse:
"""List videos with optional filters"""
if request.camera_name:
videos = await self.video_service.get_videos_by_camera(
camera_name=request.camera_name,
start_date=request.start_date,
end_date=request.end_date,
limit=request.limit,
include_metadata=request.include_metadata
)
else:
videos = await self.video_service.get_all_videos(
start_date=request.start_date,
end_date=request.end_date,
limit=request.limit,
include_metadata=request.include_metadata
)
video_responses = [self._convert_to_response(video) for video in videos]
return VideoListResponse(
videos=video_responses,
total_count=len(video_responses)
)
async def get_video_thumbnail(
self,
file_id: str,
thumbnail_request: ThumbnailRequest
) -> Response:
"""Get video thumbnail"""
thumbnail_data = await self.video_service.get_video_thumbnail(
file_id=file_id,
timestamp_seconds=thumbnail_request.timestamp_seconds,
size=(thumbnail_request.width, thumbnail_request.height)
)
if not thumbnail_data:
raise HTTPException(status_code=404, detail=f"Could not generate thumbnail for {file_id}")
return Response(
content=thumbnail_data,
media_type="image/jpeg",
headers={
"Cache-Control": "public, max-age=3600", # Cache for 1 hour
"Content-Length": str(len(thumbnail_data))
}
)
async def validate_video(self, file_id: str) -> dict:
"""Validate video file"""
is_valid = await self.video_service.validate_video(file_id)
return {"file_id": file_id, "is_valid": is_valid}
def _convert_to_response(self, video_file: VideoFile) -> VideoInfoResponse:
"""Convert domain model to response model"""
metadata_response = None
if video_file.metadata:
metadata_response = VideoMetadataResponse(
duration_seconds=video_file.metadata.duration_seconds,
width=video_file.metadata.width,
height=video_file.metadata.height,
fps=video_file.metadata.fps,
codec=video_file.metadata.codec,
bitrate=video_file.metadata.bitrate,
aspect_ratio=video_file.metadata.aspect_ratio
)
return VideoInfoResponse(
file_id=video_file.file_id,
camera_name=video_file.camera_name,
filename=video_file.filename,
file_size_bytes=video_file.file_size_bytes,
format=video_file.format.value,
status=video_file.status.value,
created_at=video_file.created_at,
start_time=video_file.start_time,
end_time=video_file.end_time,
machine_trigger=video_file.machine_trigger,
metadata=metadata_response,
is_streamable=video_file.is_streamable,
needs_conversion=video_file.needs_conversion()
)
class StreamingController:
"""Controller for video streaming operations"""
def __init__(self, streaming_service: StreamingService, video_service: VideoService):
self.streaming_service = streaming_service
self.video_service = video_service
self.logger = logging.getLogger(__name__)
async def get_streaming_info(self, file_id: str) -> StreamingInfoResponse:
"""Get streaming information for a video"""
video_file = await self.streaming_service.get_video_info(file_id)
if not video_file:
raise HTTPException(status_code=404, detail=f"Video {file_id} not found")
chunk_size = await self.streaming_service.get_optimal_chunk_size(video_file.file_size_bytes)
content_type = self._get_content_type(video_file)
return StreamingInfoResponse(
file_id=file_id,
file_size_bytes=video_file.file_size_bytes,
content_type=content_type,
supports_range_requests=True,
chunk_size_bytes=chunk_size
)
async def stream_video(self, file_id: str, request: Request) -> Response:
"""Stream video with range request support"""
# Prepare video for streaming (convert if needed)
video_file = await self.video_service.prepare_for_streaming(file_id)
if not video_file:
raise HTTPException(status_code=404, detail=f"Video {file_id} not found or not streamable")
# Parse range header
range_header = request.headers.get("range")
range_request = None
if range_header:
try:
range_request = StreamRange.from_header(range_header, video_file.file_size_bytes)
except ValueError as e:
raise HTTPException(status_code=416, detail=f"Invalid range request: {e}")
# Get video data
data, _, actual_range = await self.streaming_service.stream_video_range(file_id, range_request)
if data is None:
raise HTTPException(status_code=500, detail="Failed to read video data")
# Determine response type and headers
content_type = self._get_content_type(video_file)
headers = {
"Accept-Ranges": "bytes",
"Content-Length": str(len(data)),
"Cache-Control": "public, max-age=3600"
}
# Use partial content if range was requested
if actual_range and self.streaming_service.should_use_partial_content(actual_range, video_file.file_size_bytes):
headers["Content-Range"] = self.streaming_service.calculate_content_range_header(
actual_range, video_file.file_size_bytes
)
status_code = 206 # Partial Content
else:
status_code = 200 # OK
return Response(
content=data,
status_code=status_code,
headers=headers,
media_type=content_type
)
async def invalidate_cache(self, file_id: str) -> dict:
"""Invalidate streaming cache for a video"""
success = await self.streaming_service.invalidate_cache(file_id)
return {"file_id": file_id, "cache_invalidated": success}
def _get_content_type(self, video_file: VideoFile) -> str:
"""Get MIME content type for video file"""
format_to_mime = {
"avi": "video/x-msvideo",
"mp4": "video/mp4",
"webm": "video/webm"
}
return format_to_mime.get(video_file.format.value, "application/octet-stream")

View File

@@ -0,0 +1,167 @@
"""
Video API Routes.
FastAPI route definitions for video streaming and management.
"""
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import Response
from .controllers import VideoController, StreamingController
from .schemas import (
VideoInfoResponse, VideoListResponse, VideoListRequest,
StreamingInfoResponse, ThumbnailRequest
)
def create_video_routes(
video_controller: VideoController,
streaming_controller: StreamingController
) -> APIRouter:
"""Create video API routes with dependency injection"""
router = APIRouter(prefix="/videos", tags=["videos"])
@router.get("/", response_model=VideoListResponse)
async def list_videos(
camera_name: Optional[str] = Query(None, description="Filter by camera name"),
start_date: Optional[datetime] = Query(None, description="Filter by start date"),
end_date: Optional[datetime] = Query(None, description="Filter by end date"),
limit: Optional[int] = Query(50, description="Maximum number of results"),
include_metadata: bool = Query(False, description="Include video metadata")
):
"""
List videos with optional filters.
- **camera_name**: Filter videos by camera name
- **start_date**: Filter videos created after this date
- **end_date**: Filter videos created before this date
- **limit**: Maximum number of videos to return
- **include_metadata**: Whether to include video metadata (duration, resolution, etc.)
"""
request = VideoListRequest(
camera_name=camera_name,
start_date=start_date,
end_date=end_date,
limit=limit,
include_metadata=include_metadata
)
return await video_controller.list_videos(request)
@router.get("/{file_id}", response_model=VideoInfoResponse)
async def get_video_info(file_id: str):
"""
Get detailed information about a specific video.
- **file_id**: Unique identifier for the video file
"""
return await video_controller.get_video_info(file_id)
@router.get("/{file_id}/stream")
async def stream_video(file_id: str, request: Request):
"""
Stream video with HTTP range request support.
Supports:
- **Range requests**: For seeking and progressive download
- **Partial content**: 206 responses for range requests
- **Format conversion**: Automatic conversion to web-compatible formats
- **Caching**: Intelligent caching for better performance
Usage in HTML5:
```html
<video controls>
<source src="/videos/{file_id}/stream" type="video/mp4">
</video>
```
"""
return await streaming_controller.stream_video(file_id, request)
@router.get("/{file_id}/info", response_model=StreamingInfoResponse)
async def get_streaming_info(file_id: str):
"""
Get streaming information for a video.
Returns technical details needed for optimal streaming:
- File size and content type
- Range request support
- Recommended chunk size
"""
return await streaming_controller.get_streaming_info(file_id)
@router.get("/{file_id}/thumbnail")
async def get_video_thumbnail(
file_id: str,
timestamp: float = Query(1.0, description="Timestamp in seconds to extract thumbnail from"),
width: int = Query(320, description="Thumbnail width in pixels"),
height: int = Query(240, description="Thumbnail height in pixels")
):
"""
Generate and return a thumbnail image from the video.
- **file_id**: Video file identifier
- **timestamp**: Time position in seconds to extract thumbnail from
- **width**: Thumbnail width in pixels
- **height**: Thumbnail height in pixels
Returns JPEG image data.
"""
thumbnail_request = ThumbnailRequest(
timestamp_seconds=timestamp,
width=width,
height=height
)
return await video_controller.get_video_thumbnail(file_id, thumbnail_request)
@router.post("/{file_id}/validate")
async def validate_video(file_id: str):
"""
Validate that a video file is accessible and playable.
- **file_id**: Video file identifier
Returns validation status and any issues found.
"""
return await video_controller.validate_video(file_id)
@router.post("/{file_id}/cache/invalidate")
async def invalidate_video_cache(file_id: str):
"""
Invalidate cached data for a video file.
Useful when a video file has been updated or replaced.
- **file_id**: Video file identifier
"""
return await streaming_controller.invalidate_cache(file_id)
return router
def create_admin_video_routes(streaming_controller: StreamingController) -> APIRouter:
"""Create admin routes for video management"""
router = APIRouter(prefix="/admin/videos", tags=["admin", "videos"])
@router.post("/cache/cleanup")
async def cleanup_video_cache(
max_size_mb: int = Query(100, description="Maximum cache size in MB")
):
"""
Clean up video streaming cache.
Removes old cached data to keep cache size under the specified limit.
- **max_size_mb**: Maximum cache size to maintain
"""
entries_removed = await streaming_controller.streaming_service.cleanup_cache(max_size_mb)
return {
"cache_cleaned": True,
"entries_removed": entries_removed,
"max_size_mb": max_size_mb
}
return router

View File

@@ -0,0 +1,138 @@
"""
Video API Request/Response Schemas.
Pydantic models for API serialization and validation.
"""
from typing import List, Optional, Tuple
from datetime import datetime
from pydantic import BaseModel, Field
class VideoMetadataResponse(BaseModel):
"""Video metadata response model"""
duration_seconds: float = Field(..., description="Video duration in seconds")
width: int = Field(..., description="Video width in pixels")
height: int = Field(..., description="Video height in pixels")
fps: float = Field(..., description="Video frame rate")
codec: str = Field(..., description="Video codec")
bitrate: Optional[int] = Field(None, description="Video bitrate in bps")
aspect_ratio: float = Field(..., description="Video aspect ratio")
class Config:
schema_extra = {
"example": {
"duration_seconds": 120.5,
"width": 1920,
"height": 1080,
"fps": 30.0,
"codec": "XVID",
"bitrate": 5000000,
"aspect_ratio": 1.777
}
}
class VideoInfoResponse(BaseModel):
"""Video file information response"""
file_id: str = Field(..., description="Unique file identifier")
camera_name: str = Field(..., description="Camera that recorded the video")
filename: str = Field(..., description="Original filename")
file_size_bytes: int = Field(..., description="File size in bytes")
format: str = Field(..., description="Video format (avi, mp4, webm)")
status: str = Field(..., description="Video status")
created_at: datetime = Field(..., description="Creation timestamp")
start_time: Optional[datetime] = Field(None, description="Recording start time")
end_time: Optional[datetime] = Field(None, description="Recording end time")
machine_trigger: Optional[str] = Field(None, description="Machine that triggered recording")
metadata: Optional[VideoMetadataResponse] = Field(None, description="Video metadata")
is_streamable: bool = Field(..., description="Whether video can be streamed")
needs_conversion: bool = Field(..., description="Whether video needs format conversion")
class Config:
schema_extra = {
"example": {
"file_id": "camera1_recording_20250804_143022.avi",
"camera_name": "camera1",
"filename": "camera1_recording_20250804_143022.avi",
"file_size_bytes": 52428800,
"format": "avi",
"status": "completed",
"created_at": "2025-08-04T14:30:22",
"start_time": "2025-08-04T14:30:22",
"end_time": "2025-08-04T14:32:22",
"machine_trigger": "vibratory_conveyor",
"is_streamable": True,
"needs_conversion": True
}
}
class VideoListResponse(BaseModel):
"""Video list response"""
videos: List[VideoInfoResponse] = Field(..., description="List of videos")
total_count: int = Field(..., description="Total number of videos")
class Config:
schema_extra = {
"example": {
"videos": [],
"total_count": 0
}
}
class StreamingInfoResponse(BaseModel):
"""Streaming information response"""
file_id: str = Field(..., description="Video file ID")
file_size_bytes: int = Field(..., description="Total file size")
content_type: str = Field(..., description="MIME content type")
supports_range_requests: bool = Field(..., description="Whether range requests are supported")
chunk_size_bytes: int = Field(..., description="Recommended chunk size for streaming")
class Config:
schema_extra = {
"example": {
"file_id": "camera1_recording_20250804_143022.avi",
"file_size_bytes": 52428800,
"content_type": "video/x-msvideo",
"supports_range_requests": True,
"chunk_size_bytes": 262144
}
}
class VideoListRequest(BaseModel):
"""Video list request parameters"""
camera_name: Optional[str] = Field(None, description="Filter by camera name")
start_date: Optional[datetime] = Field(None, description="Filter by start date")
end_date: Optional[datetime] = Field(None, description="Filter by end date")
limit: Optional[int] = Field(50, description="Maximum number of results")
include_metadata: bool = Field(False, description="Include video metadata")
class Config:
schema_extra = {
"example": {
"camera_name": "camera1",
"start_date": "2025-08-04T00:00:00",
"end_date": "2025-08-04T23:59:59",
"limit": 50,
"include_metadata": True
}
}
class ThumbnailRequest(BaseModel):
"""Thumbnail generation request"""
timestamp_seconds: float = Field(1.0, description="Timestamp to extract thumbnail from")
width: int = Field(320, description="Thumbnail width")
height: int = Field(240, description="Thumbnail height")
class Config:
schema_extra = {
"example": {
"timestamp_seconds": 5.0,
"width": 320,
"height": 240
}
}