Files
usda-vision/media-api/main.py
salirezav 00d4e5b275 Enhance video remote service and UI components
- Updated docker-compose.yml to include new media-api and mediamtx services for improved video handling.
- Modified package.json and package-lock.json to add TypeScript types for React and React DOM.
- Refactored video-related components (VideoCard, VideoList, VideoModal) for better user experience and responsiveness.
- Improved FiltersBar and Pagination components with enhanced styling and functionality.
- Added loading and error states in VideoList for better user feedback during data fetching.
- Enhanced CSS styles for a more polished look across the application.
2025-10-31 18:06:40 -04:00

169 lines
5.7 KiB
Python

import os
import pathlib
import subprocess
import urllib.parse
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Response, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
MEDIA_DIR = pathlib.Path(os.getenv("MEDIA_VIDEOS_DIR", "/mnt/videos")).resolve()
THUMBS_DIR = pathlib.Path(os.getenv("MEDIA_THUMBS_DIR", MEDIA_DIR / ".thumbnails")).resolve()
app = FastAPI(title="Media API", version="0.1.0")
# CORS for dashboard at exp-dash:8080 (and localhost for convenience)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://exp-dash:8080", "http://localhost:8080"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
def list_video_files() -> List[pathlib.Path]:
exts = {".mp4", ".avi", ".mov", ".mkv"}
files: List[pathlib.Path] = []
for root, _, fnames in os.walk(MEDIA_DIR):
# skip thumbnails dir
if THUMBS_DIR.as_posix() in pathlib.Path(root).as_posix():
continue
for f in fnames:
p = pathlib.Path(root) / f
if p.suffix.lower() in exts:
files.append(p)
return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)
def file_id_from_path(p: pathlib.Path) -> str:
return urllib.parse.quote_plus(p.relative_to(MEDIA_DIR).as_posix())
def path_from_file_id(fid: str) -> pathlib.Path:
rel = urllib.parse.unquote_plus(fid)
p = (MEDIA_DIR / rel).resolve()
if not p.is_file() or MEDIA_DIR not in p.parents:
raise HTTPException(status_code=404, detail="Video not found")
return p
def ensure_thumbnail(p: pathlib.Path, width: int = 320, height: int = 180) -> pathlib.Path:
THUMBS_DIR.mkdir(parents=True, exist_ok=True)
thumb_name = file_id_from_path(p) + f"_{width}x{height}.jpg"
out = THUMBS_DIR / thumb_name
if out.exists():
return out
# generate from 1s offset or 0 if too short
cmd = [
"ffmpeg", "-y",
"-ss", "1",
"-i", str(p),
"-vframes", "1",
"-vf", f"scale='min({width},iw)':-2,scale={width}:{height}:force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2",
str(out)
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
raise HTTPException(status_code=500, detail="Failed to generate thumbnail")
return out
@app.get("/health")
def health() -> dict:
return {"status": "ok"}
@app.get("/videos/")
def list_videos(camera_name: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None,
limit: int = 24, page: int = 1) -> dict:
files = list_video_files()
# basic filtering placeholder (camera_name can be inferred from path segments)
if camera_name:
files = [p for p in files if camera_name in p.as_posix()]
total = len(files)
start = max(0, (page - 1) * limit)
slice_ = files[start:start + limit]
items = []
for p in slice_:
stat = p.stat()
items.append({
"file_id": file_id_from_path(p),
"camera_name": p.parent.name,
"filename": p.name,
"file_size_bytes": stat.st_size,
"format": p.suffix.lstrip('.'),
"status": "completed",
"created_at": int(stat.st_mtime),
"is_streamable": True,
"needs_conversion": False,
})
total_pages = (total + limit - 1) // limit if limit else 1
return {
"videos": items,
"total_count": total,
"page": page,
"total_pages": total_pages,
}
@app.get("/videos/{file_id:path}/thumbnail")
def get_thumbnail(file_id: str, width: int = 320, height: int = 180):
p = path_from_file_id(file_id)
thumb = ensure_thumbnail(p, width, height)
return FileResponse(thumb, media_type="image/jpeg")
# Convenience endpoint: pass file_id via query instead of path (accepts raw or URL-encoded)
@app.get("/videos/thumbnail")
def get_thumbnail_q(file_id: str, width: int = 320, height: int = 180):
p = path_from_file_id(file_id)
thumb = ensure_thumbnail(p, width, height)
return FileResponse(thumb, media_type="image/jpeg")
def open_file_range(path: pathlib.Path, start: int, end: Optional[int]):
file_size = path.stat().st_size
if end is None or end >= file_size:
end = file_size - 1
length = end - start + 1
with open(path, 'rb') as f:
f.seek(start)
chunk = f.read(length)
return chunk, file_size, start, end
@app.get("/videos/{file_id:path}/stream")
def stream_file(request: Request, file_id: str):
p = path_from_file_id(file_id)
range_header = request.headers.get("range")
if not range_header:
return FileResponse(p, media_type="video/mp4")
# parse bytes=START-END
range_value = range_header.strip().lower().replace("bytes=", "")
start_str, _, end_str = range_value.partition("-")
try:
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else None
except ValueError:
raise HTTPException(status_code=400, detail="Bad Range header")
chunk, size, start, end = open_file_range(p, start, end)
headers = {
"Content-Range": f"bytes {start}-{end}/{size}",
"Accept-Ranges": "bytes",
"Content-Length": str(len(chunk)),
"Content-Type": "video/mp4",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Access-Control-Allow-Origin": "*",
}
return Response(content=chunk, status_code=206, headers=headers)
# Convenience endpoint: pass file_id via query instead of path (accepts raw or URL-encoded)
@app.get("/videos/stream")
def stream_file_q(request: Request, file_id: str):
return stream_file(request, file_id)