Files
usda-vision/test_rtsp.py
salirezav b7adc3788a Implement RTSP streaming functionality for cameras
- Added endpoints to start and stop RTSP streaming for cameras in the API.
- Enhanced CameraManager and CameraStreamer classes to manage RTSP streaming state and processes.
- Updated API documentation to include new RTSP streaming commands.
- Modified Docker configurations to include FFmpeg for RTSP streaming support.
- Adjusted MediaMTX settings for improved stream handling and timeout configurations.
2025-11-01 12:35:25 -04:00

114 lines
4.1 KiB
Python

#!/usr/bin/env python3
"""
Quick test script to verify RTSP streaming is working.
Tests API endpoints and MediaMTX status.
"""
import requests
import json
import time
import sys
API_BASE = "http://exp-dash:8000"
MEDIAMTX_BASE = "http://exp-dash:8889"
CAMERA = "camera1"
def test_api_rtsp():
"""Test RTSP start endpoint"""
print(f"\n{'='*60}")
print(f"Testing RTSP streaming for {CAMERA}")
print(f"{'='*60}\n")
# 1. Check if camera exists
print(f"1. Checking camera status...")
try:
resp = requests.get(f"{API_BASE}/cameras/{CAMERA}/status", timeout=5)
if resp.status_code == 200:
print(f" ✅ Camera {CAMERA} found")
print(f" Status: {json.dumps(resp.json(), indent=2)}")
else:
print(f" ❌ Camera {CAMERA} not found (status: {resp.status_code})")
return False
except Exception as e:
print(f" ❌ Error checking camera: {e}")
return False
# 2. Start RTSP streaming
print(f"\n2. Starting RTSP streaming...")
try:
resp = requests.post(f"{API_BASE}/cameras/{CAMERA}/start-rtsp", timeout=10)
print(f" Response status: {resp.status_code}")
print(f" Response: {json.dumps(resp.json(), indent=2)}")
if resp.status_code == 200 and resp.json().get("success"):
print(f" ✅ RTSP streaming started successfully")
rtsp_url = resp.json().get("rtsp_url", "N/A")
print(f" RTSP URL: {rtsp_url}")
else:
print(f" ❌ Failed to start RTSP streaming")
return False
except Exception as e:
print(f" ❌ Error starting RTSP: {e}")
return False
# 3. Wait a moment for FFmpeg to start
print(f"\n3. Waiting for stream to initialize (5 seconds)...")
time.sleep(5)
# 4. Check MediaMTX for the stream
print(f"\n4. Checking MediaMTX for stream...")
try:
resp = requests.get(f"{MEDIAMTX_BASE}/v2/paths/list", timeout=5)
if resp.status_code == 200:
paths = resp.json()
print(f" MediaMTX paths: {json.dumps(paths, indent=2)}")
# Check if our camera path exists
if isinstance(paths, dict) and "items" in paths:
items = paths.get("items", {})
if CAMERA in items:
path_info = items[CAMERA]
source_ready = path_info.get("sourceReady", False)
readers = path_info.get("readers", [])
print(f" ✅ Path '{CAMERA}' found in MediaMTX")
print(f" Source ready: {source_ready}")
print(f" Active readers: {len(readers)}")
if source_ready:
print(f" ✅ Stream is ready!")
else:
print(f" ⚠️ Stream not ready yet (sourceReady=false)")
else:
print(f" ❌ Path '{CAMERA}' not found in MediaMTX")
print(f" Available paths: {list(items.keys())}")
else:
print(f" ⚠️ Unexpected response format from MediaMTX")
else:
print(f" ⚠️ MediaMTX API returned status {resp.status_code}")
except Exception as e:
print(f" ⚠️ Error checking MediaMTX: {e}")
# 5. Check specific path
print(f"\n5. Getting detailed info for path '{CAMERA}'...")
try:
resp = requests.get(f"{MEDIAMTX_BASE}/v2/paths/get/{CAMERA}", timeout=5)
if resp.status_code == 200:
info = resp.json()
print(f" Path info: {json.dumps(info, indent=2)}")
else:
print(f" Path not found (status: {resp.status_code})")
except Exception as e:
print(f" Error: {e}")
print(f"\n{'='*60}")
print(f"Test complete!")
print(f"Try viewing the stream with:")
print(f" vlc rtsp://exp-dash:8554/{CAMERA}")
print(f" ffplay -rtsp_transport tcp rtsp://exp-dash:8554/{CAMERA}")
print(f"{'='*60}\n")
return True
if __name__ == "__main__":
test_api_rtsp()