Files

177 lines
6.0 KiB
Python

"""
Streaming Cache Implementations.
In-memory and file-based caching for video streaming optimization.
"""
import asyncio
import logging
from typing import Optional, Dict, Tuple
from datetime import datetime, timedelta
import hashlib
from ..domain.interfaces import StreamingCache
from ..domain.models import StreamRange
class InMemoryStreamingCache(StreamingCache):
"""In-memory cache for video streaming"""
def __init__(self, max_size_mb: int = 100, max_age_minutes: int = 30):
self.max_size_bytes = max_size_mb * 1024 * 1024
self.max_age = timedelta(minutes=max_age_minutes)
self.logger = logging.getLogger(__name__)
# Cache storage: {cache_key: (data, timestamp, size)}
self._cache: Dict[str, Tuple[bytes, datetime, int]] = {}
self._current_size = 0
self._lock = asyncio.Lock()
async def get_cached_range(
self,
file_id: str,
range_request: StreamRange
) -> Optional[bytes]:
"""Get cached byte range"""
cache_key = self._generate_cache_key(file_id, range_request)
async with self._lock:
if cache_key in self._cache:
data, timestamp, size = self._cache[cache_key]
# Check if cache entry is still valid
if datetime.now() - timestamp <= self.max_age:
self.logger.debug(f"Cache hit for {file_id} range {range_request.start}-{range_request.end}")
return data
else:
# Remove expired entry
del self._cache[cache_key]
self._current_size -= size
self.logger.debug(f"Cache entry expired for {file_id}")
return None
async def cache_range(
self,
file_id: str,
range_request: StreamRange,
data: bytes
) -> None:
"""Cache byte range data"""
cache_key = self._generate_cache_key(file_id, range_request)
data_size = len(data)
async with self._lock:
# Check if we need to make space
while self._current_size + data_size > self.max_size_bytes and self._cache:
await self._evict_oldest()
# Add to cache
self._cache[cache_key] = (data, datetime.now(), data_size)
self._current_size += data_size
self.logger.debug(f"Cached {data_size} bytes for {file_id} range {range_request.start}-{range_request.end}")
async def invalidate_file(self, file_id: str) -> None:
"""Invalidate all cached data for a file"""
async with self._lock:
keys_to_remove = [key for key in self._cache.keys() if key.startswith(f"{file_id}:")]
for key in keys_to_remove:
_, _, size = self._cache[key]
del self._cache[key]
self._current_size -= size
if keys_to_remove:
self.logger.info(f"Invalidated {len(keys_to_remove)} cache entries for {file_id}")
async def cleanup_cache(self, max_size_mb: int = 100) -> int:
"""Clean up cache to stay under size limit"""
target_size = max_size_mb * 1024 * 1024
entries_removed = 0
async with self._lock:
# Remove expired entries first
current_time = datetime.now()
expired_keys = [
key for key, (_, timestamp, _) in self._cache.items()
if current_time - timestamp > self.max_age
]
for key in expired_keys:
_, _, size = self._cache[key]
del self._cache[key]
self._current_size -= size
entries_removed += 1
# Remove oldest entries if still over limit
while self._current_size > target_size and self._cache:
await self._evict_oldest()
entries_removed += 1
if entries_removed > 0:
self.logger.info(f"Cache cleanup removed {entries_removed} entries")
return entries_removed
async def _evict_oldest(self) -> None:
"""Evict the oldest cache entry"""
if not self._cache:
return
# Find oldest entry
oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1])
_, _, size = self._cache[oldest_key]
del self._cache[oldest_key]
self._current_size -= size
self.logger.debug(f"Evicted cache entry: {oldest_key}")
def _generate_cache_key(self, file_id: str, range_request: StreamRange) -> str:
"""Generate cache key for file and range"""
range_str = f"{range_request.start}-{range_request.end}"
return f"{file_id}:{range_str}"
async def get_cache_stats(self) -> dict:
"""Get cache statistics"""
async with self._lock:
return {
"entries": len(self._cache),
"size_bytes": self._current_size,
"size_mb": self._current_size / (1024 * 1024),
"max_size_mb": self.max_size_bytes / (1024 * 1024),
"utilization_percent": (self._current_size / self.max_size_bytes) * 100
}
class NoOpStreamingCache(StreamingCache):
"""No-operation cache that doesn't actually cache anything"""
def __init__(self):
self.logger = logging.getLogger(__name__)
async def get_cached_range(
self,
file_id: str,
range_request: StreamRange
) -> Optional[bytes]:
"""Always return None (no cache)"""
return None
async def cache_range(
self,
file_id: str,
range_request: StreamRange,
data: bytes
) -> None:
"""No-op caching"""
pass
async def invalidate_file(self, file_id: str) -> None:
"""No-op invalidation"""
pass
async def cleanup_cache(self, max_size_mb: int = 100) -> int:
"""No-op cleanup"""
return 0