Implement RTSP streaming functionality for cameras

- Added endpoints to start and stop RTSP streaming for cameras in the API.
- Enhanced CameraManager and CameraStreamer classes to manage RTSP streaming state and processes.
- Updated API documentation to include new RTSP streaming commands.
- Modified Docker configurations to include FFmpeg for RTSP streaming support.
- Adjusted MediaMTX settings for improved stream handling and timeout configurations.
This commit is contained in:
salirezav
2025-11-01 12:35:25 -04:00
parent 70f614e9ff
commit b7adc3788a
10 changed files with 628 additions and 2 deletions

View File

@@ -7,6 +7,7 @@ This module provides REST API endpoints and WebSocket support for dashboard inte
import asyncio
import logging
import json
import os
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import threading
@@ -347,6 +348,45 @@ class APIServer:
self.logger.error(f"Error stopping camera stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/cameras/{camera_name}/start-rtsp")
async def start_camera_rtsp_stream(camera_name: str):
"""Start RTSP streaming for a camera to MediaMTX"""
try:
if not self.camera_manager:
raise HTTPException(status_code=503, detail="Camera manager not available")
success = self.camera_manager.start_camera_rtsp_streaming(camera_name)
if success:
rtsp_url = f"rtsp://{os.getenv('MEDIAMTX_HOST', 'localhost')}:{os.getenv('MEDIAMTX_RTSP_PORT', '8554')}/{camera_name}"
return {
"success": True,
"message": f"Started RTSP streaming for camera {camera_name}",
"rtsp_url": rtsp_url
}
else:
return {"success": False, "message": f"Failed to start RTSP streaming for camera {camera_name}"}
except Exception as e:
self.logger.error(f"Error starting RTSP stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/cameras/{camera_name}/stop-rtsp")
async def stop_camera_rtsp_stream(camera_name: str):
"""Stop RTSP streaming for a camera"""
try:
if not self.camera_manager:
raise HTTPException(status_code=503, detail="Camera manager not available")
success = self.camera_manager.stop_camera_rtsp_streaming(camera_name)
if success:
return {"success": True, "message": f"Stopped RTSP streaming for camera {camera_name}"}
else:
return {"success": False, "message": f"Failed to stop RTSP streaming for camera {camera_name}"}
except Exception as e:
self.logger.error(f"Error stopping RTSP stream: {e}")
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/cameras/{camera_name}/config", response_model=CameraConfigResponse)
async def get_camera_config(camera_name: str):
"""Get camera configuration"""

View File

@@ -520,6 +520,32 @@ class CameraManager:
return streamer.is_streaming()
def start_camera_rtsp_streaming(self, camera_name: str) -> bool:
"""Start RTSP streaming for a specific camera"""
streamer = self.camera_streamers.get(camera_name)
if not streamer:
self.logger.error(f"Camera streamer not found: {camera_name}")
return False
return streamer.start_rtsp_streaming()
def stop_camera_rtsp_streaming(self, camera_name: str) -> bool:
"""Stop RTSP streaming for a specific camera"""
streamer = self.camera_streamers.get(camera_name)
if not streamer:
self.logger.error(f"Camera streamer not found: {camera_name}")
return False
return streamer.stop_rtsp_streaming()
def is_camera_rtsp_streaming(self, camera_name: str) -> bool:
"""Check if a camera is currently RTSP streaming"""
streamer = self.camera_streamers.get(camera_name)
if not streamer:
return False
return streamer.is_rtsp_streaming()
def get_camera_config(self, camera_name: str) -> Optional[CameraConfig]:
"""Get camera configuration"""
return self.config.get_camera_by_name(camera_name)

View File

@@ -13,6 +13,7 @@ import logging
import cv2
import numpy as np
import contextlib
import subprocess
from typing import Optional, Dict, Any, Generator
from datetime import datetime
import queue
@@ -70,14 +71,26 @@ class CameraStreamer:
# Streaming state
self.streaming = False
self.rtsp_streaming = False # RTSP streaming state
self._streaming_thread: Optional[threading.Thread] = None
self._rtsp_thread: Optional[threading.Thread] = None
self._stop_streaming_event = threading.Event()
self._stop_rtsp_event = threading.Event()
self._frame_queue = queue.Queue(maxsize=5) # Buffer for latest frames
self._rtsp_frame_queue = queue.Queue(maxsize=10) # Buffer for RTSP frames (larger buffer for smoother streaming)
self._lock = threading.RLock()
# Stream settings (optimized for preview)
self.preview_fps = 10.0 # Lower FPS for preview to reduce load
self.preview_quality = 70 # JPEG quality for streaming
# RTSP settings
self.rtsp_fps = 15.0 # RTSP FPS (can be higher than MJPEG preview)
self.rtsp_host = os.getenv("MEDIAMTX_HOST", "localhost")
self.rtsp_port = int(os.getenv("MEDIAMTX_RTSP_PORT", "8554"))
self._rtsp_process: Optional[subprocess.Popen] = None
self._rtsp_frame_width = 0
self._rtsp_frame_height = 0
def start_streaming(self) -> bool:
"""Start streaming preview frames"""
@@ -130,6 +143,88 @@ class CameraStreamer:
self.logger.error(f"Error stopping streaming: {e}")
return False
def start_rtsp_streaming(self) -> bool:
"""Start RTSP streaming to MediaMTX"""
with self._lock:
if self.rtsp_streaming:
self.logger.warning("RTSP streaming already active")
return True
# Ensure camera is initialized for streaming
if not self.streaming:
if not self.start_streaming():
return False
try:
# Get frame dimensions from the first frame if available
if self._frame_queue.empty():
self.logger.warning("No frames available yet, will initialize RTSP with default dimensions")
# Will update dimensions when first frame arrives
else:
# Peek at a frame to get dimensions (don't remove it)
try:
test_frame = self._frame_queue.queue[0]
self._rtsp_frame_height, self._rtsp_frame_width = test_frame.shape[:2]
except (IndexError, AttributeError):
pass
# Start RTSP thread
self._stop_rtsp_event.clear()
self._rtsp_thread = threading.Thread(target=self._rtsp_streaming_loop, daemon=True)
self._rtsp_thread.start()
self.rtsp_streaming = True
self.logger.info(f"Started RTSP streaming for camera: {self.camera_config.name} -> rtsp://{self.rtsp_host}:{self.rtsp_port}/{self.camera_config.name}")
return True
except Exception as e:
self.logger.error(f"Error starting RTSP streaming: {e}")
return False
def stop_rtsp_streaming(self) -> bool:
"""Stop RTSP streaming"""
with self._lock:
if not self.rtsp_streaming:
return True
try:
# Signal RTSP thread to stop
self._stop_rtsp_event.set()
# Wait for thread to finish
if self._rtsp_thread and self._rtsp_thread.is_alive():
self._rtsp_thread.join(timeout=5.0)
# Kill FFmpeg process if still running
if self._rtsp_process:
try:
self._rtsp_process.terminate()
self._rtsp_process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
self._rtsp_process.kill()
except Exception as e:
self.logger.warning(f"Error stopping RTSP process: {e}")
self._rtsp_process = None
# Clear RTSP frame queue
while not self._rtsp_frame_queue.empty():
try:
self._rtsp_frame_queue.get_nowait()
except queue.Empty:
break
self.rtsp_streaming = False
self.logger.info(f"Stopped RTSP streaming for camera: {self.camera_config.name}")
return True
except Exception as e:
self.logger.error(f"Error stopping RTSP streaming: {e}")
return False
def is_rtsp_streaming(self) -> bool:
"""Check if RTSP streaming is active"""
return self.rtsp_streaming
def get_latest_frame(self) -> Optional[bytes]:
"""Get the latest frame as JPEG bytes for streaming"""
try:
@@ -258,7 +353,12 @@ class CameraStreamer:
frame = self._convert_frame_to_opencv(FrameHead)
if frame is not None:
# Add frame to queue (replace oldest if queue is full)
# Update RTSP frame dimensions if not set
if self.rtsp_streaming and self._rtsp_frame_width == 0:
self._rtsp_frame_height, self._rtsp_frame_width = frame.shape[:2]
self.logger.info(f"RTSP frame dimensions set: {self._rtsp_frame_width}x{self._rtsp_frame_height}")
# Add frame to MJPEG queue (replace oldest if queue is full)
try:
self._frame_queue.put_nowait(frame)
except queue.Full:
@@ -269,6 +369,18 @@ class CameraStreamer:
except queue.Empty:
pass
# Add frame to RTSP queue if RTSP is active
if self.rtsp_streaming:
try:
self._rtsp_frame_queue.put_nowait(frame)
except queue.Full:
# Remove oldest frame and add new one
try:
self._rtsp_frame_queue.get_nowait()
self._rtsp_frame_queue.put_nowait(frame)
except queue.Empty:
pass
# Release buffer
mvsdk.CameraReleaseImageBuffer(self.hCamera, pRawData)
@@ -406,7 +518,167 @@ class CameraStreamer:
except Exception as e:
self.logger.warning(f"Error configuring advanced settings: {e}")
def _rtsp_streaming_loop(self):
"""Main RTSP streaming loop that feeds frames to FFmpeg"""
self.logger.info("Starting RTSP streaming loop")
# Wait for frame dimensions to be set
timeout = 10.0 # Wait up to 10 seconds for first frame
start_time = time.time()
while self._rtsp_frame_width == 0 and (time.time() - start_time) < timeout:
if self._stop_rtsp_event.is_set():
return
time.sleep(0.1)
if self._rtsp_frame_width == 0:
self.logger.error("Could not determine frame dimensions for RTSP streaming")
self.rtsp_streaming = False
return
rtsp_url = f"rtsp://{self.rtsp_host}:{self.rtsp_port}/{self.camera_config.name}"
self.logger.info(f"Publishing RTSP stream to {rtsp_url} with dimensions {self._rtsp_frame_width}x{self._rtsp_frame_height} @ {self.rtsp_fps}fps")
try:
# FFmpeg command to encode frames from stdin and publish to RTSP
# Input: raw video from stdin (BGR24 format)
# Output: H.264 encoded stream via RTSP
# Using TCP transport for better reliability
cmd = [
"ffmpeg",
"-f", "rawvideo",
"-pixel_format", "bgr24",
"-video_size", f"{self._rtsp_frame_width}x{self._rtsp_frame_height}",
"-framerate", str(self.rtsp_fps),
"-i", "-", # Read from stdin
"-c:v", "libx264",
"-preset", "ultrafast", # Fast encoding for low latency
"-tune", "zerolatency", # Zero latency tuning
"-g", "30", # GOP size (keyframe interval)
"-crf", "23", # Quality (lower = better, but larger file)
"-f", "rtsp",
"-rtsp_transport", "tcp", # Use TCP instead of UDP for reliability
"-muxdelay", "0.1", # Reduce mux delay
rtsp_url
]
# Start FFmpeg process
self._rtsp_process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0 # Unbuffered for low latency
)
self.logger.info(f"FFmpeg RTSP process started (PID: {self._rtsp_process.pid})")
# Start thread to monitor FFmpeg stderr for errors
def monitor_ffmpeg_stderr():
if self._rtsp_process.stderr:
try:
for line in iter(self._rtsp_process.stderr.readline, b''):
if line:
line_str = line.decode('utf-8', errors='ignore').strip()
if line_str:
self.logger.debug(f"FFmpeg stderr: {line_str}")
# Log errors at warning level
if any(keyword in line_str.lower() for keyword in ['error', 'failed', 'fatal']):
self.logger.warning(f"FFmpeg error: {line_str}")
except Exception as e:
self.logger.debug(f"FFmpeg stderr monitor ended: {e}")
stderr_thread = threading.Thread(target=monitor_ffmpeg_stderr, daemon=True)
stderr_thread.start()
frame_interval = 1.0 / self.rtsp_fps
last_frame_time = time.time()
frames_sent = 0
frames_skipped = 0
while not self._stop_rtsp_event.is_set():
try:
# Check if FFmpeg process is still alive
if self._rtsp_process.poll() is not None:
self.logger.error(f"FFmpeg process exited with code {self._rtsp_process.returncode}")
# Read stderr to see what went wrong
if self._rtsp_process.stderr:
try:
stderr_output = self._rtsp_process.stderr.read().decode('utf-8', errors='ignore')
if stderr_output:
self.logger.error(f"FFmpeg stderr output: {stderr_output[-500:]}") # Last 500 chars
except:
pass
break
# Get frame from queue with timeout
try:
frame = self._rtsp_frame_queue.get(timeout=0.1)
except queue.Empty:
frames_skipped += 1
# Log warning if we've skipped many frames (might indicate queue isn't being filled)
if frames_skipped % 100 == 0 and frames_skipped > 0:
self.logger.warning(f"RTSP frame queue empty ({frames_skipped} skipped, {frames_sent} sent)")
continue
# Ensure frame dimensions match (resize if necessary)
if frame.shape[1] != self._rtsp_frame_width or frame.shape[0] != self._rtsp_frame_height:
frame = cv2.resize(frame, (self._rtsp_frame_width, self._rtsp_frame_height))
# Write frame to FFmpeg stdin
if self._rtsp_process.stdin is None:
self.logger.error("FFmpeg stdin is None")
break
try:
self._rtsp_process.stdin.write(frame.tobytes())
self._rtsp_process.stdin.flush()
frames_sent += 1
if frames_sent % 100 == 0:
self.logger.debug(f"Sent {frames_sent} frames to FFmpeg")
except BrokenPipeError:
self.logger.error(f"FFmpeg process pipe broken (sent {frames_sent} frames before error)")
break
except Exception as e:
self.logger.error(f"Error writing frame to FFmpeg: {e} (sent {frames_sent} frames)")
break
# Control frame rate
current_time = time.time()
elapsed = current_time - last_frame_time
if elapsed < frame_interval:
time.sleep(frame_interval - elapsed)
last_frame_time = time.time()
except Exception as e:
if not self._stop_rtsp_event.is_set():
self.logger.error(f"Error in RTSP streaming loop: {e}")
time.sleep(0.1)
except Exception as e:
self.logger.error(f"Fatal error in RTSP streaming loop: {e}")
finally:
# Cleanup FFmpeg process
if self._rtsp_process:
try:
if self._rtsp_process.stdin:
self._rtsp_process.stdin.close()
except Exception:
pass
try:
self._rtsp_process.terminate()
self._rtsp_process.wait(timeout=2.0)
except subprocess.TimeoutExpired:
self._rtsp_process.kill()
except Exception as e:
self.logger.warning(f"Error stopping FFmpeg process: {e}")
self._rtsp_process = None
self.logger.info("RTSP streaming loop ended")
def __del__(self):
"""Destructor to ensure cleanup"""
if self.rtsp_streaming:
self.stop_rtsp_streaming()
if self.streaming:
self.stop_streaming()