221 lines
7.6 KiB
Python
221 lines
7.6 KiB
Python
"""
|
|
Video Format Converters.
|
|
|
|
Implementations for converting video formats using FFmpeg.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import shutil
|
|
from typing import Optional
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
|
|
from ..domain.interfaces import VideoConverter
|
|
from ..domain.models import VideoFormat
|
|
|
|
|
|
class FFmpegVideoConverter(VideoConverter):
|
|
"""FFmpeg-based video converter"""
|
|
|
|
def __init__(self, temp_dir: Optional[Path] = None):
|
|
self.logger = logging.getLogger(__name__)
|
|
self.temp_dir = temp_dir or Path("/tmp/video_conversions")
|
|
self.temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check if FFmpeg is available
|
|
self._ffmpeg_available = shutil.which("ffmpeg") is not None
|
|
if not self._ffmpeg_available:
|
|
self.logger.warning("FFmpeg not found - video conversion will be disabled")
|
|
|
|
async def convert(
|
|
self,
|
|
source_path: Path,
|
|
target_path: Path,
|
|
target_format: VideoFormat,
|
|
quality: Optional[str] = None
|
|
) -> bool:
|
|
"""Convert video to target format"""
|
|
if not self._ffmpeg_available:
|
|
self.logger.error("FFmpeg not available for conversion")
|
|
return False
|
|
|
|
try:
|
|
# Ensure target directory exists
|
|
target_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build FFmpeg command
|
|
cmd = self._build_ffmpeg_command(source_path, target_path, target_format, quality)
|
|
|
|
self.logger.info(f"Converting {source_path} to {target_path} using FFmpeg")
|
|
|
|
# Run FFmpeg conversion
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
if process.returncode == 0:
|
|
self.logger.info(f"Successfully converted {source_path} to {target_path}")
|
|
return True
|
|
else:
|
|
error_msg = stderr.decode() if stderr else "Unknown FFmpeg error"
|
|
self.logger.error(f"FFmpeg conversion failed: {error_msg}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error during video conversion: {e}")
|
|
return False
|
|
|
|
async def is_conversion_needed(
|
|
self,
|
|
source_format: VideoFormat,
|
|
target_format: VideoFormat
|
|
) -> bool:
|
|
"""Check if conversion is needed"""
|
|
return source_format != target_format
|
|
|
|
async def get_converted_path(
|
|
self,
|
|
original_path: Path,
|
|
target_format: VideoFormat
|
|
) -> Path:
|
|
"""Get path for converted file"""
|
|
# Place converted files in temp directory with timestamp
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
stem = original_path.stem
|
|
converted_filename = f"{stem}_{timestamp}.{target_format.value}"
|
|
return self.temp_dir / converted_filename
|
|
|
|
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
|
|
"""Clean up old converted files"""
|
|
try:
|
|
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
|
|
files_removed = 0
|
|
|
|
if not self.temp_dir.exists():
|
|
return 0
|
|
|
|
for file_path in self.temp_dir.iterdir():
|
|
if file_path.is_file():
|
|
# Get file modification time
|
|
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
|
|
|
|
if file_mtime < cutoff_time:
|
|
try:
|
|
file_path.unlink()
|
|
files_removed += 1
|
|
self.logger.debug(f"Removed old converted file: {file_path}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not remove {file_path}: {e}")
|
|
|
|
self.logger.info(f"Cleaned up {files_removed} old converted files")
|
|
return files_removed
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error during converted files cleanup: {e}")
|
|
return 0
|
|
|
|
def _build_ffmpeg_command(
|
|
self,
|
|
source_path: Path,
|
|
target_path: Path,
|
|
target_format: VideoFormat,
|
|
quality: Optional[str] = None
|
|
) -> list:
|
|
"""Build FFmpeg command for conversion"""
|
|
cmd = ["ffmpeg", "-i", str(source_path)]
|
|
|
|
# Add format-specific options
|
|
if target_format == VideoFormat.MP4:
|
|
cmd.extend([
|
|
"-c:v", "libx264", # H.264 video codec
|
|
"-c:a", "aac", # AAC audio codec
|
|
"-movflags", "+faststart", # Enable progressive download
|
|
])
|
|
|
|
# Quality settings
|
|
if quality == "high":
|
|
cmd.extend(["-crf", "18"])
|
|
elif quality == "medium":
|
|
cmd.extend(["-crf", "23"])
|
|
elif quality == "low":
|
|
cmd.extend(["-crf", "28"])
|
|
else:
|
|
cmd.extend(["-crf", "23"]) # Default medium quality
|
|
|
|
elif target_format == VideoFormat.WEBM:
|
|
cmd.extend([
|
|
"-c:v", "libvpx-vp9", # VP9 video codec
|
|
"-c:a", "libopus", # Opus audio codec
|
|
])
|
|
|
|
# Quality settings for WebM
|
|
if quality == "high":
|
|
cmd.extend(["-crf", "15", "-b:v", "0"])
|
|
elif quality == "medium":
|
|
cmd.extend(["-crf", "20", "-b:v", "0"])
|
|
elif quality == "low":
|
|
cmd.extend(["-crf", "25", "-b:v", "0"])
|
|
else:
|
|
cmd.extend(["-crf", "20", "-b:v", "0"]) # Default medium quality
|
|
|
|
# Common options
|
|
cmd.extend([
|
|
"-preset", "fast", # Encoding speed vs compression trade-off
|
|
"-y", # Overwrite output file
|
|
str(target_path)
|
|
])
|
|
|
|
return cmd
|
|
|
|
|
|
class NoOpVideoConverter(VideoConverter):
|
|
"""No-operation converter for when FFmpeg is not available"""
|
|
|
|
def __init__(self):
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def convert(
|
|
self,
|
|
source_path: Path,
|
|
target_path: Path,
|
|
target_format: VideoFormat,
|
|
quality: Optional[str] = None
|
|
) -> bool:
|
|
"""No-op conversion - just copy file if formats match"""
|
|
try:
|
|
if source_path.suffix.lower().lstrip('.') == target_format.value:
|
|
# Same format, just copy
|
|
shutil.copy2(source_path, target_path)
|
|
return True
|
|
else:
|
|
self.logger.warning(f"Cannot convert {source_path} to {target_format} - no converter available")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Error in no-op conversion: {e}")
|
|
return False
|
|
|
|
async def is_conversion_needed(
|
|
self,
|
|
source_format: VideoFormat,
|
|
target_format: VideoFormat
|
|
) -> bool:
|
|
"""Check if conversion is needed"""
|
|
return source_format != target_format
|
|
|
|
async def get_converted_path(
|
|
self,
|
|
original_path: Path,
|
|
target_format: VideoFormat
|
|
) -> Path:
|
|
"""Get path for converted file"""
|
|
return original_path.with_suffix(f".{target_format.value}")
|
|
|
|
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
|
|
"""No-op cleanup"""
|
|
return 0
|