Files
usda-vision/media-api/main.py
salirezav 2bce817b4e Enhance media API with video file validation and Docker configuration update
- Added a function to check if video files are complete and valid using ffprobe, preventing errors during thumbnail generation.
- Updated thumbnail generation logic to skip incomplete or corrupted files, improving robustness.
- Modified docker-compose.yml to include a restart policy for the camera management API service, ensuring better container reliability.
2025-12-03 14:56:18 -05:00

777 lines
29 KiB
Python

import os
import pathlib
import subprocess
import threading
import time
import urllib.parse
from typing import List, Optional, Tuple
from fastapi import FastAPI, HTTPException, Response, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
MEDIA_DIR = pathlib.Path(os.getenv("MEDIA_VIDEOS_DIR", "/mnt/videos")).resolve()
THUMBS_DIR = pathlib.Path(os.getenv("MEDIA_THUMBS_DIR", MEDIA_DIR / ".thumbnails")).resolve()
# Limit concurrent transcoding operations to prevent resource exhaustion
# Adjust based on your CPU cores and available memory
MAX_CONCURRENT_TRANSCODING = int(os.getenv("MAX_CONCURRENT_TRANSCODING", "2"))
transcoding_semaphore = threading.Semaphore(MAX_CONCURRENT_TRANSCODING)
app = FastAPI(title="Media API", version="0.1.0")
# CORS for dashboard - allow all origins to support access from different IPs/hostnames
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for flexibility
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# Background thumbnail generator
_thumbnail_generator_running = False
_thumbnail_generator_thread = None
def thumbnail_generator_worker():
"""
Background worker that periodically scans for videos and generates missing thumbnails.
Runs continuously in the background.
"""
global _thumbnail_generator_running
print("[Thumbnail Generator] Background thumbnail generator started", flush=True)
# Track processed files to avoid reprocessing
processed_files = set()
# Scan interval (in seconds)
scan_interval = int(os.getenv("THUMBNAIL_SCAN_INTERVAL", "30")) # Default: scan every 30 seconds
while _thumbnail_generator_running:
try:
# Get all video files
video_files = list_video_files()
# Process files that don't have thumbnails yet
for video_path in video_files:
if not _thumbnail_generator_running:
break
try:
# Check if we've already processed this file recently
file_id = str(video_path)
if file_id in processed_files:
continue
# Try to generate thumbnail
success = generate_thumbnail_background(video_path)
if success:
processed_files.add(file_id)
print(f"[Thumbnail Generator] Generated thumbnail for: {video_path.name}", flush=True)
# If failed, we'll try again on next scan (file might still be recording)
except Exception as e:
# Log error but continue processing other files
print(f"[Thumbnail Generator] Error processing {video_path}: {e}", flush=True)
continue
# Clean up old entries from processed_files (keep last 1000)
if len(processed_files) > 1000:
# Keep only recent entries (this is a simple approach)
processed_files = set(list(processed_files)[-500:])
except Exception as e:
print(f"[Thumbnail Generator] Error in scan loop: {e}", flush=True)
# Sleep before next scan
time.sleep(scan_interval)
print("[Thumbnail Generator] Background thumbnail generator stopped", flush=True)
@app.on_event("startup")
def startup_event():
"""Start background thumbnail generator when app starts"""
global _thumbnail_generator_running, _thumbnail_generator_thread
_thumbnail_generator_running = True
_thumbnail_generator_thread = threading.Thread(
target=thumbnail_generator_worker,
daemon=True,
name="ThumbnailGenerator"
)
_thumbnail_generator_thread.start()
print("[Media API] Background thumbnail generator started", flush=True)
@app.on_event("shutdown")
def shutdown_event():
"""Stop background thumbnail generator when app shuts down"""
global _thumbnail_generator_running
_thumbnail_generator_running = False
if _thumbnail_generator_thread:
_thumbnail_generator_thread.join(timeout=5)
print("[Media API] Background thumbnail generator stopped", flush=True)
def list_video_files() -> List[pathlib.Path]:
exts = {".mp4", ".avi", ".mov", ".mkv"}
files: List[pathlib.Path] = []
for root, _, fnames in os.walk(MEDIA_DIR):
# skip thumbnails dir
if THUMBS_DIR.as_posix() in pathlib.Path(root).as_posix():
continue
for f in fnames:
p = pathlib.Path(root) / f
if p.suffix.lower() in exts:
files.append(p)
return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)
def file_id_from_path(p: pathlib.Path) -> str:
return urllib.parse.quote_plus(p.relative_to(MEDIA_DIR).as_posix())
def path_from_file_id(fid: str) -> pathlib.Path:
# Handle double-encoding: decode until no more changes
rel = fid
while True:
decoded = urllib.parse.unquote_plus(rel)
if decoded == rel:
break
rel = decoded
p = (MEDIA_DIR / rel).resolve()
if not p.is_file() or MEDIA_DIR not in p.parents:
raise HTTPException(status_code=404, detail="Video not found")
return p
def is_video_file_complete(p: pathlib.Path) -> bool:
"""
Check if video file is complete and valid using ffprobe.
Returns True if file is complete and can be processed, False otherwise.
"""
try:
# Quick check: use ffprobe to verify file is valid
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(p)
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5
)
# If ffprobe succeeds, file is valid
return result.returncode == 0
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, Exception):
return False
def generate_thumbnail_background(p: pathlib.Path, width: int = 320, height: int = 180) -> bool:
"""
Generate thumbnail in background (non-blocking, doesn't raise exceptions).
Returns True if successful, False otherwise.
"""
THUMBS_DIR.mkdir(parents=True, exist_ok=True)
thumb_name = file_id_from_path(p) + f"_{width}x{height}.jpg"
out = THUMBS_DIR / thumb_name
# Check if thumbnail already exists and is valid
if out.exists():
try:
if out.stat().st_size > 0:
return True # Thumbnail already exists and is valid
else:
# Empty thumbnail, remove it
out.unlink()
except OSError:
pass # Ignore errors checking/removing
# Check if video file exists and is readable
if not p.exists():
return False
try:
file_size = p.stat().st_size
if file_size == 0:
return False # File is empty, skip
except OSError:
return False # Cannot access file
# For very small files (< 1MB), they might still be recording
# Wait a bit and check if size is stable
if file_size < 1024 * 1024:
time.sleep(1.0) # Wait 1 second
try:
new_size = p.stat().st_size
# If file grew significantly, it's likely being written - skip for now
if new_size > file_size * 1.1:
return False # File is actively being written, skip
except OSError:
return False
# Check if video file is complete and valid before attempting thumbnail generation
# This prevents errors with incomplete/corrupted files
if not is_video_file_complete(p):
return False # File is incomplete or corrupted, skip
# Try to generate thumbnail - try 1s first, then 0s if that fails
for seek_time in [1.0, 0.0]:
cmd = [
"ffmpeg", "-y",
"-ss", str(seek_time),
"-i", str(p),
"-frames:v", "1",
"-update", "1", # Required for single image output with image2 muxer
"-vf", f"scale='min({width},iw)':-2,scale={width}:{height}:force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2",
str(out)
]
try:
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30
)
# Check if thumbnail was created successfully
if out.exists() and out.stat().st_size > 0:
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
# Try next seek time or give up
continue
except Exception:
# Any other error, skip this file
return False
return False # Failed to generate thumbnail
def ensure_thumbnail(p: pathlib.Path, width: int = 320, height: int = 180) -> pathlib.Path:
"""
Ensure thumbnail exists (for API requests).
Raises HTTPException on failure.
"""
THUMBS_DIR.mkdir(parents=True, exist_ok=True)
thumb_name = file_id_from_path(p) + f"_{width}x{height}.jpg"
out = THUMBS_DIR / thumb_name
if out.exists() and out.stat().st_size > 0:
return out
# Try to generate on-demand
if generate_thumbnail_background(p, width, height):
if out.exists() and out.stat().st_size > 0:
return out
raise HTTPException(status_code=500, detail="Failed to generate thumbnail")
@app.get("/health")
def health() -> dict:
return {"status": "ok"}
@app.get("/videos/")
def list_videos(camera_name: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None,
limit: int = 24, page: int = 1) -> dict:
files = list_video_files()
# basic filtering placeholder (camera_name can be inferred from path segments)
if camera_name:
files = [p for p in files if camera_name in p.as_posix()]
total = len(files)
start = max(0, (page - 1) * limit)
slice_ = files[start:start + limit]
items = []
for p in slice_:
stat = p.stat()
items.append({
"file_id": file_id_from_path(p),
"camera_name": p.parent.name,
"filename": p.name,
"file_size_bytes": stat.st_size,
"format": p.suffix.lstrip('.'),
"status": "completed",
"created_at": int(stat.st_mtime),
"is_streamable": True,
"needs_conversion": False,
})
total_pages = (total + limit - 1) // limit if limit else 1
return {
"videos": items,
"total_count": total,
"page": page,
"total_pages": total_pages,
}
@app.get("/videos/{file_id:path}/thumbnail")
def get_thumbnail(file_id: str, width: int = 320, height: int = 180):
p = path_from_file_id(file_id)
thumb = ensure_thumbnail(p, width, height)
return FileResponse(thumb, media_type="image/jpeg")
# Convenience endpoint: pass file_id via query instead of path (accepts raw or URL-encoded)
@app.get("/videos/thumbnail")
def get_thumbnail_q(file_id: str, width: int = 320, height: int = 180):
p = path_from_file_id(file_id)
thumb = ensure_thumbnail(p, width, height)
return FileResponse(thumb, media_type="image/jpeg")
def get_video_mime_type(path: pathlib.Path) -> str:
"""Get MIME type based on file extension"""
ext = path.suffix.lower()
mime_types = {
".mp4": "video/mp4",
".avi": "video/x-msvideo",
".mov": "video/quicktime",
".mkv": "video/x-matroska",
".webm": "video/webm",
}
return mime_types.get(ext, "video/mp4")
def generate_file_range(path: pathlib.Path, start: int, end: Optional[int], chunk_size: int = 8192):
"""
Generator that yields file chunks in a memory-efficient way.
This prevents MemoryError when streaming large video files.
"""
file_size = path.stat().st_size
if end is None or end >= file_size:
end = file_size - 1
remaining = end - start + 1
with open(path, 'rb') as f:
f.seek(start)
while remaining > 0:
read_size = min(chunk_size, remaining)
chunk = f.read(read_size)
if not chunk:
break
yield chunk
remaining -= len(chunk)
@app.head("/videos/{file_id:path}/stream")
@app.get("/videos/{file_id:path}/stream")
def stream_file(request: Request, file_id: str):
p = path_from_file_id(file_id)
file_size = p.stat().st_size
content_type = get_video_mime_type(p)
range_header = request.headers.get("range")
# Base headers for all responses
base_headers = {
"Accept-Ranges": "bytes",
"Content-Type": content_type,
"Cache-Control": "public, max-age=3600",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, HEAD, OPTIONS",
"Access-Control-Allow-Headers": "Range, Content-Type",
"Access-Control-Expose-Headers": "Content-Range, Accept-Ranges, Content-Length",
}
# For HEAD requests, just return headers without body
if request.method == "HEAD":
headers = {
**base_headers,
"Content-Length": str(file_size),
}
return Response(status_code=200, headers=headers)
if not range_header:
# No range request - return full file with proper headers
headers = {
**base_headers,
"Content-Length": str(file_size),
}
return FileResponse(
p,
media_type=content_type,
headers=headers,
status_code=200
)
# Parse range request: bytes=START-END
range_value = range_header.strip().lower().replace("bytes=", "")
start_str, _, end_str = range_value.partition("-")
try:
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else None
except ValueError:
raise HTTPException(status_code=400, detail="Bad Range header")
# Validate range
if start < 0:
start = 0
if end is None or end >= file_size:
end = file_size - 1
if start > end:
raise HTTPException(status_code=416, detail="Range Not Satisfiable")
# Calculate content length
content_length = end - start + 1
# Use streaming response to avoid loading entire chunk into memory
# This prevents MemoryError for large video files
headers = {
**base_headers,
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Content-Length": str(content_length),
}
return StreamingResponse(
generate_file_range(p, start, end),
media_type=content_type,
headers=headers,
status_code=206
)
# Convenience endpoint: pass file_id via query instead of path (accepts raw or URL-encoded)
@app.head("/videos/stream")
@app.get("/videos/stream")
def stream_file_q(request: Request, file_id: str):
return stream_file(request, file_id)
# Handle OPTIONS requests for CORS preflight
@app.options("/videos/{file_id:path}/stream")
@app.options("/videos/stream")
@app.options("/videos/{file_id:path}/stream-transcoded")
@app.options("/videos/stream-transcoded")
def stream_options():
return Response(
status_code=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, HEAD, OPTIONS",
"Access-Control-Allow-Headers": "Range, Content-Type",
"Access-Control-Max-Age": "86400",
}
)
def get_video_info(file_path: pathlib.Path) -> Tuple[float, Optional[int]]:
"""Get video duration and bitrate using ffprobe"""
try:
# Get duration
cmd_duration = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(file_path)
]
result_duration = subprocess.run(cmd_duration, capture_output=True, text=True, check=True)
duration = float(result_duration.stdout.strip())
# Get bitrate
cmd_bitrate = [
"ffprobe",
"-v", "error",
"-show_entries", "format=bit_rate",
"-of", "default=noprint_wrappers=1:nokey=1",
str(file_path)
]
result_bitrate = subprocess.run(cmd_bitrate, capture_output=True, text=True, check=True)
bitrate_str = result_bitrate.stdout.strip()
bitrate = int(bitrate_str) if bitrate_str and bitrate_str.isdigit() else None
return duration, bitrate
except (subprocess.CalledProcessError, ValueError):
# Fallback: estimate from file size (very rough estimate)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
duration = max(10.0, file_size_mb * 20) # Rough estimate: 20 seconds per MB
return duration, None
def generate_transcoded_stream(file_path: pathlib.Path, start_time: float = 0.0, duration: Optional[float] = None):
"""
Transcode video to H.264 on-the-fly using FFmpeg.
Streams H.264/MP4 that browsers can actually play.
Uses semaphore to limit concurrent transcoding operations.
"""
if not file_path.exists():
raise HTTPException(status_code=404, detail="Video file not found")
if file_path.stat().st_size == 0:
raise HTTPException(status_code=500, detail="Video file is empty (0 bytes)")
# Acquire semaphore to limit concurrent transcoding
# This prevents resource exhaustion from too many simultaneous FFmpeg processes
semaphore_acquired = False
try:
if not transcoding_semaphore.acquire(blocking=False):
raise HTTPException(
status_code=503,
detail=f"Server busy: Maximum concurrent transcoding operations ({MAX_CONCURRENT_TRANSCODING}) reached. Please try again in a moment."
)
semaphore_acquired = True
# FFmpeg command to transcode to H.264 with web-optimized settings
# Use fragmented MP4 for HTTP streaming (doesn't require seekable output)
# frag_keyframe: fragment at keyframes
# dash: use DASH-compatible fragmentation
# omit_tfhd_offset: avoid seeking by omitting track fragment header offset
# Optimized for resource usage: ultrafast preset, limited threads
# Build command with proper order: input seeking first, then input, then filters/codecs
cmd = ["ffmpeg"]
# If seeking to specific time, use input seeking (before -i, more accurate)
if start_time > 0:
cmd.extend(["-ss", str(start_time)])
# Input file
cmd.extend(["-i", str(file_path)])
# Video codec settings
cmd.extend([
"-c:v", "libx264", # H.264 codec
"-preset", "ultrafast", # Fast encoding for real-time (lowest CPU usage)
"-tune", "zerolatency", # Low latency
"-crf", "23", # Quality (18-28, lower = better)
"-threads", "2", # Limit threads to reduce CPU usage (adjust based on CPU cores)
"-max_muxing_queue_size", "1024", # Prevent buffer overflow
])
# If duration is specified (for range requests), limit output duration
if duration is not None:
cmd.extend(["-t", str(duration)])
# Audio codec settings
cmd.extend([
"-c:a", "aac", # AAC audio if present
"-b:a", "128k", # Limit audio bitrate to save resources
])
# Output format settings
cmd.extend([
"-movflags", "frag_keyframe+dash+omit_tfhd_offset", # Fragmented MP4 optimized for HTTP streaming
"-f", "mp4", # MP4 container
"-" # Output to stdout
])
process = None
stderr_thread = None
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0 # Unbuffered for better streaming
)
# Stream chunks
chunk_size = 8192
bytes_yielded = 0
stderr_data = []
# Read stderr in background (to avoid blocking)
def read_stderr():
while True:
chunk = process.stderr.read(1024)
if not chunk:
break
stderr_data.append(chunk)
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
try:
while True:
chunk = process.stdout.read(chunk_size)
if not chunk:
break
bytes_yielded += len(chunk)
yield chunk
except GeneratorExit:
# Generator was closed/stopped - cleanup process
if process and process.poll() is None:
process.terminate()
# Wait briefly, then force kill if needed
time.sleep(0.5)
if process.poll() is None:
try:
process.kill()
except Exception:
pass
raise
except Exception as e:
# Error during streaming - cleanup and re-raise
if process and process.poll() is None:
process.terminate()
# Wait briefly, then force kill if needed
time.sleep(0.5)
if process.poll() is None:
try:
process.kill()
except Exception:
pass
raise
# Wait for process to finish and stderr thread to complete
process.wait()
if stderr_thread:
stderr_thread.join(timeout=1)
# Check for errors
if process.returncode != 0:
stderr = b''.join(stderr_data).decode('utf-8', errors='ignore')
# Extract actual error message (skip version banner)
error_lines = stderr.split('\n')
# Skip version/configuration lines and get actual error
error_msg = '\n'.join([line for line in error_lines if line and
not line.startswith('ffmpeg version') and
not line.startswith('built with') and
not line.startswith('configuration:') and
not line.startswith('libav') and
'Copyright' not in line])
# If no meaningful error found, use last few lines
if not error_msg.strip():
error_msg = '\n'.join(error_lines[-10:])
print(f"FFmpeg error (code {process.returncode}): Full stderr:\n{stderr}")
print(f"FFmpeg command was: {' '.join(cmd)}")
if bytes_yielded == 0:
# Show first 500 chars of actual error (not just version info)
error_detail = error_msg[:500] if error_msg else stderr[:500]
raise HTTPException(status_code=500, detail=f"FFmpeg transcoding failed: {error_detail}")
except HTTPException:
raise
except Exception as e:
# Ensure process is cleaned up on any error
if process and process.poll() is None:
try:
process.terminate()
# Wait briefly, then force kill if needed
time.sleep(0.5)
if process.poll() is None:
try:
process.kill()
except Exception:
pass
except Exception:
try:
process.kill()
except Exception:
pass
print(f"FFmpeg transcoding error: {e}")
raise HTTPException(status_code=500, detail=f"Transcoding error: {str(e)}")
finally:
# Always release semaphore when done (success or error)
# Only release if we actually acquired it
if semaphore_acquired:
try:
transcoding_semaphore.release()
except Exception as e:
print(f"Error releasing semaphore: {e}")
@app.head("/videos/{file_id:path}/stream-transcoded")
@app.get("/videos/{file_id:path}/stream-transcoded")
@app.head("/videos/stream-transcoded")
@app.get("/videos/stream-transcoded")
def stream_transcoded(request: Request, file_id: str, start_time: float = 0.0):
"""
Stream video transcoded to H.264 on-the-fly.
This endpoint converts MPEG-4 Part 2 videos to H.264 for browser compatibility.
Supports seeking via HTTP Range requests or start_time parameter.
"""
p = path_from_file_id(file_id)
content_type = "video/mp4"
# Get video duration and bitrate for range request handling
video_duration, original_bitrate = get_video_info(p)
# Base headers
headers = {
"Content-Type": content_type,
"Cache-Control": "no-cache, no-store, must-revalidate",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, HEAD, OPTIONS",
"Access-Control-Allow-Headers": "Range, Content-Type",
"Access-Control-Expose-Headers": "Content-Range, Accept-Ranges, Content-Length",
"Accept-Ranges": "bytes",
}
# For HEAD requests, return headers (estimate size)
if request.method == "HEAD":
# Rough estimate: ~2-3 MB per minute of video
estimated_size = int(video_duration * 50000) # ~50KB per second estimate
headers["Content-Length"] = str(estimated_size)
return Response(status_code=200, headers=headers)
# Handle Range requests for seeking
range_header = request.headers.get("range")
if range_header:
# Parse range request: bytes=START-END
range_value = range_header.strip().lower().replace("bytes=", "")
start_str, _, end_str = range_value.partition("-")
try:
byte_start = int(start_str) if start_str else 0
byte_end = int(end_str) if end_str else None
except ValueError:
# Invalid range, ignore and stream from start
range_header = None
if range_header:
# For seeking, convert byte range to time-based seeking
# Estimate transcoded bitrate (H.264 is typically more efficient than original)
# Use original bitrate if available, otherwise estimate
if original_bitrate:
# H.264 transcoding typically uses 70-80% of original bitrate at same quality
transcoded_bitrate = int(original_bitrate * 0.75)
else:
# Default estimate: 2 Mbps
transcoded_bitrate = 2000000
estimated_total_bytes = int(video_duration * transcoded_bitrate / 8)
if estimated_total_bytes > 0 and byte_start < estimated_total_bytes:
# Calculate time position from byte offset
time_start_sec = (byte_start / estimated_total_bytes) * video_duration
time_start_sec = max(0.0, min(time_start_sec, video_duration - 0.5))
# For seeking, don't limit duration - stream to end
# The browser will handle buffering
duration_sec = None # None means stream to end
# Update headers for range response
# For seeking, we typically don't know the exact end, so estimate
actual_byte_end = min(byte_end or estimated_total_bytes - 1, estimated_total_bytes - 1)
headers["Content-Range"] = f"bytes {byte_start}-{actual_byte_end}/{estimated_total_bytes}"
headers["Content-Length"] = str(actual_byte_end - byte_start + 1)
# Stream from the calculated time position using FFmpeg's -ss flag
# Duration is None, so it will stream to the end
return StreamingResponse(
generate_transcoded_stream(p, time_start_sec, duration_sec),
media_type=content_type,
headers=headers,
status_code=206 # Partial Content
)
# No range request or invalid range - stream from start_time
return StreamingResponse(
generate_transcoded_stream(p, start_time),
media_type=content_type,
headers=headers
)