import os import pathlib import subprocess import threading import time import urllib.parse from typing import List, Optional, Tuple from fastapi import FastAPI, HTTPException, Response, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, StreamingResponse MEDIA_DIR = pathlib.Path(os.getenv("MEDIA_VIDEOS_DIR", "/mnt/videos")).resolve() THUMBS_DIR = pathlib.Path(os.getenv("MEDIA_THUMBS_DIR", MEDIA_DIR / ".thumbnails")).resolve() # Limit concurrent transcoding operations to prevent resource exhaustion # Adjust based on your CPU cores and available memory MAX_CONCURRENT_TRANSCODING = int(os.getenv("MAX_CONCURRENT_TRANSCODING", "2")) transcoding_semaphore = threading.Semaphore(MAX_CONCURRENT_TRANSCODING) app = FastAPI(title="Media API", version="0.1.0") # CORS for dashboard - allow all origins to support access from different IPs/hostnames app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allow all origins for flexibility allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) # Background thumbnail generator _thumbnail_generator_running = False _thumbnail_generator_thread = None def thumbnail_generator_worker(): """ Background worker that periodically scans for videos and generates missing thumbnails. Runs continuously in the background. """ global _thumbnail_generator_running print("[Thumbnail Generator] Background thumbnail generator started", flush=True) # Track processed files to avoid reprocessing processed_files = set() # Scan interval (in seconds) scan_interval = int(os.getenv("THUMBNAIL_SCAN_INTERVAL", "30")) # Default: scan every 30 seconds while _thumbnail_generator_running: try: # Get all video files video_files = list_video_files() # Process files that don't have thumbnails yet for video_path in video_files: if not _thumbnail_generator_running: break try: # Check if we've already processed this file recently file_id = str(video_path) if file_id in processed_files: continue # Try to generate thumbnail success = generate_thumbnail_background(video_path) if success: processed_files.add(file_id) print(f"[Thumbnail Generator] Generated thumbnail for: {video_path.name}", flush=True) # If failed, we'll try again on next scan (file might still be recording) except Exception as e: # Log error but continue processing other files print(f"[Thumbnail Generator] Error processing {video_path}: {e}", flush=True) continue # Clean up old entries from processed_files (keep last 1000) if len(processed_files) > 1000: # Keep only recent entries (this is a simple approach) processed_files = set(list(processed_files)[-500:]) except Exception as e: print(f"[Thumbnail Generator] Error in scan loop: {e}", flush=True) # Sleep before next scan time.sleep(scan_interval) print("[Thumbnail Generator] Background thumbnail generator stopped", flush=True) @app.on_event("startup") def startup_event(): """Start background thumbnail generator when app starts""" global _thumbnail_generator_running, _thumbnail_generator_thread _thumbnail_generator_running = True _thumbnail_generator_thread = threading.Thread( target=thumbnail_generator_worker, daemon=True, name="ThumbnailGenerator" ) _thumbnail_generator_thread.start() print("[Media API] Background thumbnail generator started", flush=True) @app.on_event("shutdown") def shutdown_event(): """Stop background thumbnail generator when app shuts down""" global _thumbnail_generator_running _thumbnail_generator_running = False if _thumbnail_generator_thread: _thumbnail_generator_thread.join(timeout=5) print("[Media API] Background thumbnail generator stopped", flush=True) def list_video_files() -> List[pathlib.Path]: exts = {".mp4", ".avi", ".mov", ".mkv"} files: List[pathlib.Path] = [] for root, _, fnames in os.walk(MEDIA_DIR): # skip thumbnails dir if THUMBS_DIR.as_posix() in pathlib.Path(root).as_posix(): continue for f in fnames: p = pathlib.Path(root) / f if p.suffix.lower() in exts: files.append(p) return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True) def file_id_from_path(p: pathlib.Path) -> str: return urllib.parse.quote_plus(p.relative_to(MEDIA_DIR).as_posix()) def path_from_file_id(fid: str) -> pathlib.Path: # Handle double-encoding: decode until no more changes rel = fid while True: decoded = urllib.parse.unquote_plus(rel) if decoded == rel: break rel = decoded p = (MEDIA_DIR / rel).resolve() if not p.is_file() or MEDIA_DIR not in p.parents: raise HTTPException(status_code=404, detail="Video not found") return p def is_video_file_complete(p: pathlib.Path) -> bool: """ Check if video file is complete and valid using ffprobe. Returns True if file is complete and can be processed, False otherwise. """ try: # Quick check: use ffprobe to verify file is valid cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(p) ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5 ) # If ffprobe succeeds, file is valid return result.returncode == 0 except (subprocess.TimeoutExpired, subprocess.CalledProcessError, Exception): return False def generate_thumbnail_background(p: pathlib.Path, width: int = 320, height: int = 180) -> bool: """ Generate thumbnail in background (non-blocking, doesn't raise exceptions). Returns True if successful, False otherwise. """ THUMBS_DIR.mkdir(parents=True, exist_ok=True) thumb_name = file_id_from_path(p) + f"_{width}x{height}.jpg" out = THUMBS_DIR / thumb_name # Check if thumbnail already exists and is valid if out.exists(): try: if out.stat().st_size > 0: return True # Thumbnail already exists and is valid else: # Empty thumbnail, remove it out.unlink() except OSError: pass # Ignore errors checking/removing # Check if video file exists and is readable if not p.exists(): return False try: file_size = p.stat().st_size if file_size == 0: return False # File is empty, skip except OSError: return False # Cannot access file # For very small files (< 1MB), they might still be recording # Wait a bit and check if size is stable if file_size < 1024 * 1024: time.sleep(1.0) # Wait 1 second try: new_size = p.stat().st_size # If file grew significantly, it's likely being written - skip for now if new_size > file_size * 1.1: return False # File is actively being written, skip except OSError: return False # Check if video file is complete and valid before attempting thumbnail generation # This prevents errors with incomplete/corrupted files if not is_video_file_complete(p): return False # File is incomplete or corrupted, skip # Try to generate thumbnail - try 1s first, then 0s if that fails for seek_time in [1.0, 0.0]: cmd = [ "ffmpeg", "-y", "-ss", str(seek_time), "-i", str(p), "-frames:v", "1", "-update", "1", # Required for single image output with image2 muxer "-vf", f"scale='min({width},iw)':-2,scale={width}:{height}:force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2", str(out) ] try: result = subprocess.run( cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=30 ) # Check if thumbnail was created successfully if out.exists() and out.stat().st_size > 0: return True except (subprocess.CalledProcessError, subprocess.TimeoutExpired): # Try next seek time or give up continue except Exception: # Any other error, skip this file return False return False # Failed to generate thumbnail def ensure_thumbnail(p: pathlib.Path, width: int = 320, height: int = 180) -> pathlib.Path: """ Ensure thumbnail exists (for API requests). Raises HTTPException on failure. """ THUMBS_DIR.mkdir(parents=True, exist_ok=True) thumb_name = file_id_from_path(p) + f"_{width}x{height}.jpg" out = THUMBS_DIR / thumb_name if out.exists() and out.stat().st_size > 0: return out # Try to generate on-demand if generate_thumbnail_background(p, width, height): if out.exists() and out.stat().st_size > 0: return out raise HTTPException(status_code=500, detail="Failed to generate thumbnail") @app.get("/health") def health() -> dict: return {"status": "ok"} @app.get("/videos/") def list_videos(camera_name: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 24, page: int = 1) -> dict: files = list_video_files() # basic filtering placeholder (camera_name can be inferred from path segments) if camera_name: files = [p for p in files if camera_name in p.as_posix()] total = len(files) start = max(0, (page - 1) * limit) slice_ = files[start:start + limit] items = [] for p in slice_: stat = p.stat() items.append({ "file_id": file_id_from_path(p), "camera_name": p.parent.name, "filename": p.name, "file_size_bytes": stat.st_size, "format": p.suffix.lstrip('.'), "status": "completed", "created_at": int(stat.st_mtime), "is_streamable": True, "needs_conversion": False, }) total_pages = (total + limit - 1) // limit if limit else 1 return { "videos": items, "total_count": total, "page": page, "total_pages": total_pages, } @app.get("/videos/{file_id:path}/thumbnail") def get_thumbnail(file_id: str, width: int = 320, height: int = 180): p = path_from_file_id(file_id) thumb = ensure_thumbnail(p, width, height) return FileResponse(thumb, media_type="image/jpeg") # Convenience endpoint: pass file_id via query instead of path (accepts raw or URL-encoded) @app.get("/videos/thumbnail") def get_thumbnail_q(file_id: str, width: int = 320, height: int = 180): p = path_from_file_id(file_id) thumb = ensure_thumbnail(p, width, height) return FileResponse(thumb, media_type="image/jpeg") def get_video_mime_type(path: pathlib.Path) -> str: """Get MIME type based on file extension""" ext = path.suffix.lower() mime_types = { ".mp4": "video/mp4", ".avi": "video/x-msvideo", ".mov": "video/quicktime", ".mkv": "video/x-matroska", ".webm": "video/webm", } return mime_types.get(ext, "video/mp4") def generate_file_range(path: pathlib.Path, start: int, end: Optional[int], chunk_size: int = 8192): """ Generator that yields file chunks in a memory-efficient way. This prevents MemoryError when streaming large video files. """ file_size = path.stat().st_size if end is None or end >= file_size: end = file_size - 1 remaining = end - start + 1 with open(path, 'rb') as f: f.seek(start) while remaining > 0: read_size = min(chunk_size, remaining) chunk = f.read(read_size) if not chunk: break yield chunk remaining -= len(chunk) @app.head("/videos/{file_id:path}/stream") @app.get("/videos/{file_id:path}/stream") def stream_file(request: Request, file_id: str): p = path_from_file_id(file_id) file_size = p.stat().st_size content_type = get_video_mime_type(p) range_header = request.headers.get("range") # Base headers for all responses base_headers = { "Accept-Ranges": "bytes", "Content-Type": content_type, "Cache-Control": "public, max-age=3600", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, HEAD, OPTIONS", "Access-Control-Allow-Headers": "Range, Content-Type", "Access-Control-Expose-Headers": "Content-Range, Accept-Ranges, Content-Length", } # For HEAD requests, just return headers without body if request.method == "HEAD": headers = { **base_headers, "Content-Length": str(file_size), } return Response(status_code=200, headers=headers) if not range_header: # No range request - return full file with proper headers headers = { **base_headers, "Content-Length": str(file_size), } return FileResponse( p, media_type=content_type, headers=headers, status_code=200 ) # Parse range request: bytes=START-END range_value = range_header.strip().lower().replace("bytes=", "") start_str, _, end_str = range_value.partition("-") try: start = int(start_str) if start_str else 0 end = int(end_str) if end_str else None except ValueError: raise HTTPException(status_code=400, detail="Bad Range header") # Validate range if start < 0: start = 0 if end is None or end >= file_size: end = file_size - 1 if start > end: raise HTTPException(status_code=416, detail="Range Not Satisfiable") # Calculate content length content_length = end - start + 1 # Use streaming response to avoid loading entire chunk into memory # This prevents MemoryError for large video files headers = { **base_headers, "Content-Range": f"bytes {start}-{end}/{file_size}", "Content-Length": str(content_length), } return StreamingResponse( generate_file_range(p, start, end), media_type=content_type, headers=headers, status_code=206 ) # Convenience endpoint: pass file_id via query instead of path (accepts raw or URL-encoded) @app.head("/videos/stream") @app.get("/videos/stream") def stream_file_q(request: Request, file_id: str): return stream_file(request, file_id) # Handle OPTIONS requests for CORS preflight @app.options("/videos/{file_id:path}/stream") @app.options("/videos/stream") @app.options("/videos/{file_id:path}/stream-transcoded") @app.options("/videos/stream-transcoded") def stream_options(): return Response( status_code=200, headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, HEAD, OPTIONS", "Access-Control-Allow-Headers": "Range, Content-Type", "Access-Control-Max-Age": "86400", } ) def get_video_info(file_path: pathlib.Path) -> Tuple[float, Optional[int]]: """Get video duration and bitrate using ffprobe""" try: # Get duration cmd_duration = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path) ] result_duration = subprocess.run(cmd_duration, capture_output=True, text=True, check=True) duration = float(result_duration.stdout.strip()) # Get bitrate cmd_bitrate = [ "ffprobe", "-v", "error", "-show_entries", "format=bit_rate", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path) ] result_bitrate = subprocess.run(cmd_bitrate, capture_output=True, text=True, check=True) bitrate_str = result_bitrate.stdout.strip() bitrate = int(bitrate_str) if bitrate_str and bitrate_str.isdigit() else None return duration, bitrate except (subprocess.CalledProcessError, ValueError): # Fallback: estimate from file size (very rough estimate) file_size_mb = file_path.stat().st_size / (1024 * 1024) duration = max(10.0, file_size_mb * 20) # Rough estimate: 20 seconds per MB return duration, None def generate_transcoded_stream(file_path: pathlib.Path, start_time: float = 0.0, duration: Optional[float] = None): """ Transcode video to H.264 on-the-fly using FFmpeg. Streams H.264/MP4 that browsers can actually play. Uses semaphore to limit concurrent transcoding operations. """ if not file_path.exists(): raise HTTPException(status_code=404, detail="Video file not found") if file_path.stat().st_size == 0: raise HTTPException(status_code=500, detail="Video file is empty (0 bytes)") # Acquire semaphore to limit concurrent transcoding # This prevents resource exhaustion from too many simultaneous FFmpeg processes semaphore_acquired = False try: if not transcoding_semaphore.acquire(blocking=False): raise HTTPException( status_code=503, detail=f"Server busy: Maximum concurrent transcoding operations ({MAX_CONCURRENT_TRANSCODING}) reached. Please try again in a moment." ) semaphore_acquired = True # FFmpeg command to transcode to H.264 with web-optimized settings # Use fragmented MP4 for HTTP streaming (doesn't require seekable output) # frag_keyframe: fragment at keyframes # dash: use DASH-compatible fragmentation # omit_tfhd_offset: avoid seeking by omitting track fragment header offset # Optimized for resource usage: ultrafast preset, limited threads # Build command with proper order: input seeking first, then input, then filters/codecs cmd = ["ffmpeg"] # If seeking to specific time, use input seeking (before -i, more accurate) if start_time > 0: cmd.extend(["-ss", str(start_time)]) # Input file cmd.extend(["-i", str(file_path)]) # Video codec settings cmd.extend([ "-c:v", "libx264", # H.264 codec "-preset", "ultrafast", # Fast encoding for real-time (lowest CPU usage) "-tune", "zerolatency", # Low latency "-crf", "23", # Quality (18-28, lower = better) "-threads", "2", # Limit threads to reduce CPU usage (adjust based on CPU cores) "-max_muxing_queue_size", "1024", # Prevent buffer overflow ]) # If duration is specified (for range requests), limit output duration if duration is not None: cmd.extend(["-t", str(duration)]) # Audio codec settings cmd.extend([ "-c:a", "aac", # AAC audio if present "-b:a", "128k", # Limit audio bitrate to save resources ]) # Output format settings cmd.extend([ "-movflags", "frag_keyframe+dash+omit_tfhd_offset", # Fragmented MP4 optimized for HTTP streaming "-f", "mp4", # MP4 container "-" # Output to stdout ]) process = None stderr_thread = None try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0 # Unbuffered for better streaming ) # Stream chunks chunk_size = 8192 bytes_yielded = 0 stderr_data = [] # Read stderr in background (to avoid blocking) def read_stderr(): while True: chunk = process.stderr.read(1024) if not chunk: break stderr_data.append(chunk) stderr_thread = threading.Thread(target=read_stderr, daemon=True) stderr_thread.start() try: while True: chunk = process.stdout.read(chunk_size) if not chunk: break bytes_yielded += len(chunk) yield chunk except GeneratorExit: # Generator was closed/stopped - cleanup process if process and process.poll() is None: process.terminate() # Wait briefly, then force kill if needed time.sleep(0.5) if process.poll() is None: try: process.kill() except Exception: pass raise except Exception as e: # Error during streaming - cleanup and re-raise if process and process.poll() is None: process.terminate() # Wait briefly, then force kill if needed time.sleep(0.5) if process.poll() is None: try: process.kill() except Exception: pass raise # Wait for process to finish and stderr thread to complete process.wait() if stderr_thread: stderr_thread.join(timeout=1) # Check for errors if process.returncode != 0: stderr = b''.join(stderr_data).decode('utf-8', errors='ignore') # Extract actual error message (skip version banner) error_lines = stderr.split('\n') # Skip version/configuration lines and get actual error error_msg = '\n'.join([line for line in error_lines if line and not line.startswith('ffmpeg version') and not line.startswith('built with') and not line.startswith('configuration:') and not line.startswith('libav') and 'Copyright' not in line]) # If no meaningful error found, use last few lines if not error_msg.strip(): error_msg = '\n'.join(error_lines[-10:]) print(f"FFmpeg error (code {process.returncode}): Full stderr:\n{stderr}") print(f"FFmpeg command was: {' '.join(cmd)}") if bytes_yielded == 0: # Show first 500 chars of actual error (not just version info) error_detail = error_msg[:500] if error_msg else stderr[:500] raise HTTPException(status_code=500, detail=f"FFmpeg transcoding failed: {error_detail}") except HTTPException: raise except Exception as e: # Ensure process is cleaned up on any error if process and process.poll() is None: try: process.terminate() # Wait briefly, then force kill if needed time.sleep(0.5) if process.poll() is None: try: process.kill() except Exception: pass except Exception: try: process.kill() except Exception: pass print(f"FFmpeg transcoding error: {e}") raise HTTPException(status_code=500, detail=f"Transcoding error: {str(e)}") finally: # Always release semaphore when done (success or error) # Only release if we actually acquired it if semaphore_acquired: try: transcoding_semaphore.release() except Exception as e: print(f"Error releasing semaphore: {e}") @app.head("/videos/{file_id:path}/stream-transcoded") @app.get("/videos/{file_id:path}/stream-transcoded") @app.head("/videos/stream-transcoded") @app.get("/videos/stream-transcoded") def stream_transcoded(request: Request, file_id: str, start_time: float = 0.0): """ Stream video transcoded to H.264 on-the-fly. This endpoint converts MPEG-4 Part 2 videos to H.264 for browser compatibility. Supports seeking via HTTP Range requests or start_time parameter. """ p = path_from_file_id(file_id) content_type = "video/mp4" # Get video duration and bitrate for range request handling video_duration, original_bitrate = get_video_info(p) # Base headers headers = { "Content-Type": content_type, "Cache-Control": "no-cache, no-store, must-revalidate", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, HEAD, OPTIONS", "Access-Control-Allow-Headers": "Range, Content-Type", "Access-Control-Expose-Headers": "Content-Range, Accept-Ranges, Content-Length", "Accept-Ranges": "bytes", } # For HEAD requests, return headers (estimate size) if request.method == "HEAD": # Rough estimate: ~2-3 MB per minute of video estimated_size = int(video_duration * 50000) # ~50KB per second estimate headers["Content-Length"] = str(estimated_size) return Response(status_code=200, headers=headers) # Handle Range requests for seeking range_header = request.headers.get("range") if range_header: # Parse range request: bytes=START-END range_value = range_header.strip().lower().replace("bytes=", "") start_str, _, end_str = range_value.partition("-") try: byte_start = int(start_str) if start_str else 0 byte_end = int(end_str) if end_str else None except ValueError: # Invalid range, ignore and stream from start range_header = None if range_header: # For seeking, convert byte range to time-based seeking # Estimate transcoded bitrate (H.264 is typically more efficient than original) # Use original bitrate if available, otherwise estimate if original_bitrate: # H.264 transcoding typically uses 70-80% of original bitrate at same quality transcoded_bitrate = int(original_bitrate * 0.75) else: # Default estimate: 2 Mbps transcoded_bitrate = 2000000 estimated_total_bytes = int(video_duration * transcoded_bitrate / 8) if estimated_total_bytes > 0 and byte_start < estimated_total_bytes: # Calculate time position from byte offset time_start_sec = (byte_start / estimated_total_bytes) * video_duration time_start_sec = max(0.0, min(time_start_sec, video_duration - 0.5)) # For seeking, don't limit duration - stream to end # The browser will handle buffering duration_sec = None # None means stream to end # Update headers for range response # For seeking, we typically don't know the exact end, so estimate actual_byte_end = min(byte_end or estimated_total_bytes - 1, estimated_total_bytes - 1) headers["Content-Range"] = f"bytes {byte_start}-{actual_byte_end}/{estimated_total_bytes}" headers["Content-Length"] = str(actual_byte_end - byte_start + 1) # Stream from the calculated time position using FFmpeg's -ss flag # Duration is None, so it will stream to the end return StreamingResponse( generate_transcoded_stream(p, time_start_sec, duration_sec), media_type=content_type, headers=headers, status_code=206 # Partial Content ) # No range request or invalid range - stream from start_time return StreamingResponse( generate_transcoded_stream(p, start_time), media_type=content_type, headers=headers )