Files

221 lines
7.6 KiB
Python

"""
Video Format Converters.
Implementations for converting video formats using FFmpeg.
"""
import asyncio
import logging
import shutil
from typing import Optional
from pathlib import Path
from datetime import datetime, timedelta
from ..domain.interfaces import VideoConverter
from ..domain.models import VideoFormat
class FFmpegVideoConverter(VideoConverter):
"""FFmpeg-based video converter"""
def __init__(self, temp_dir: Optional[Path] = None):
self.logger = logging.getLogger(__name__)
self.temp_dir = temp_dir or Path("/tmp/video_conversions")
self.temp_dir.mkdir(parents=True, exist_ok=True)
# Check if FFmpeg is available
self._ffmpeg_available = shutil.which("ffmpeg") is not None
if not self._ffmpeg_available:
self.logger.warning("FFmpeg not found - video conversion will be disabled")
async def convert(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> bool:
"""Convert video to target format"""
if not self._ffmpeg_available:
self.logger.error("FFmpeg not available for conversion")
return False
try:
# Ensure target directory exists
target_path.parent.mkdir(parents=True, exist_ok=True)
# Build FFmpeg command
cmd = self._build_ffmpeg_command(source_path, target_path, target_format, quality)
self.logger.info(f"Converting {source_path} to {target_path} using FFmpeg")
# Run FFmpeg conversion
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
self.logger.info(f"Successfully converted {source_path} to {target_path}")
return True
else:
error_msg = stderr.decode() if stderr else "Unknown FFmpeg error"
self.logger.error(f"FFmpeg conversion failed: {error_msg}")
return False
except Exception as e:
self.logger.error(f"Error during video conversion: {e}")
return False
async def is_conversion_needed(
self,
source_format: VideoFormat,
target_format: VideoFormat
) -> bool:
"""Check if conversion is needed"""
return source_format != target_format
async def get_converted_path(
self,
original_path: Path,
target_format: VideoFormat
) -> Path:
"""Get path for converted file"""
# Place converted files in temp directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
stem = original_path.stem
converted_filename = f"{stem}_{timestamp}.{target_format.value}"
return self.temp_dir / converted_filename
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
"""Clean up old converted files"""
try:
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
files_removed = 0
if not self.temp_dir.exists():
return 0
for file_path in self.temp_dir.iterdir():
if file_path.is_file():
# Get file modification time
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if file_mtime < cutoff_time:
try:
file_path.unlink()
files_removed += 1
self.logger.debug(f"Removed old converted file: {file_path}")
except Exception as e:
self.logger.warning(f"Could not remove {file_path}: {e}")
self.logger.info(f"Cleaned up {files_removed} old converted files")
return files_removed
except Exception as e:
self.logger.error(f"Error during converted files cleanup: {e}")
return 0
def _build_ffmpeg_command(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> list:
"""Build FFmpeg command for conversion"""
cmd = ["ffmpeg", "-i", str(source_path)]
# Add format-specific options
if target_format == VideoFormat.MP4:
cmd.extend([
"-c:v", "libx264", # H.264 video codec
"-c:a", "aac", # AAC audio codec
"-movflags", "+faststart", # Enable progressive download
])
# Quality settings
if quality == "high":
cmd.extend(["-crf", "18"])
elif quality == "medium":
cmd.extend(["-crf", "23"])
elif quality == "low":
cmd.extend(["-crf", "28"])
else:
cmd.extend(["-crf", "23"]) # Default medium quality
elif target_format == VideoFormat.WEBM:
cmd.extend([
"-c:v", "libvpx-vp9", # VP9 video codec
"-c:a", "libopus", # Opus audio codec
])
# Quality settings for WebM
if quality == "high":
cmd.extend(["-crf", "15", "-b:v", "0"])
elif quality == "medium":
cmd.extend(["-crf", "20", "-b:v", "0"])
elif quality == "low":
cmd.extend(["-crf", "25", "-b:v", "0"])
else:
cmd.extend(["-crf", "20", "-b:v", "0"]) # Default medium quality
# Common options
cmd.extend([
"-preset", "fast", # Encoding speed vs compression trade-off
"-y", # Overwrite output file
str(target_path)
])
return cmd
class NoOpVideoConverter(VideoConverter):
"""No-operation converter for when FFmpeg is not available"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def convert(
self,
source_path: Path,
target_path: Path,
target_format: VideoFormat,
quality: Optional[str] = None
) -> bool:
"""No-op conversion - just copy file if formats match"""
try:
if source_path.suffix.lower().lstrip('.') == target_format.value:
# Same format, just copy
shutil.copy2(source_path, target_path)
return True
else:
self.logger.warning(f"Cannot convert {source_path} to {target_format} - no converter available")
return False
except Exception as e:
self.logger.error(f"Error in no-op conversion: {e}")
return False
async def is_conversion_needed(
self,
source_format: VideoFormat,
target_format: VideoFormat
) -> bool:
"""Check if conversion is needed"""
return source_format != target_format
async def get_converted_path(
self,
original_path: Path,
target_format: VideoFormat
) -> Path:
"""Get path for converted file"""
return original_path.with_suffix(f".{target_format.value}")
async def cleanup_converted_files(self, max_age_hours: int = 24) -> int:
"""No-op cleanup"""
return 0