Files
usda-vision/ai_agent/examples/notebooks/gige_camera_advanced.ipynb
Alireza Vaezi ff7cb2c8f3 Add comprehensive tests for camera streaming, time synchronization, and auto-recording functionality
- Implemented test script for camera streaming functionality, covering API endpoints and concurrent recording.
- Created time verification script to check system time synchronization against multiple APIs.
- Developed timezone utility tests to validate timezone functions and logging.
- Added integration tests for system components, including configuration, camera discovery, and API endpoints.
- Enhanced MQTT logging and API endpoint tests for machine and MQTT status.
- Established auto-recording tests to simulate state changes and verify automatic recording behavior.
- Created simple tests for auto-recording configuration and API model validation.
2025-07-29 11:15:10 -04:00

386 lines
14 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Advanced GigE Camera Configuration\n",
"\n",
"This notebook provides advanced testing and configuration for GigE cameras.\n",
"\n",
"## Features:\n",
"- Network interface detection\n",
"- GigE camera discovery\n",
"- Camera parameter configuration\n",
"- Performance testing\n",
"- Dual camera synchronization testing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import cv2\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import subprocess\n",
"import socket\n",
"import threading\n",
"import time\n",
"from datetime import datetime\n",
"import os\n",
"from pathlib import Path\n",
"import json\n",
"\n",
"print(\"✅ Imports successful!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Network Interface Detection"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_network_interfaces():\n",
" \"\"\"Get network interface information\"\"\"\n",
" try:\n",
" result = subprocess.run(['ip', 'addr', 'show'], capture_output=True, text=True)\n",
" print(\"🌐 Network Interfaces:\")\n",
" print(result.stdout)\n",
" \n",
" # Also check for GigE specific interfaces\n",
" result2 = subprocess.run(['ifconfig'], capture_output=True, text=True)\n",
" if result2.returncode == 0:\n",
" print(\"\\n📡 Interface Configuration:\")\n",
" print(result2.stdout)\n",
" except Exception as e:\n",
" print(f\"❌ Error getting network info: {e}\")\n",
"\n",
"get_network_interfaces()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## GigE Camera Discovery"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def discover_gige_cameras():\n",
" \"\"\"Attempt to discover GigE cameras on the network\"\"\"\n",
" print(\"🔍 Discovering GigE cameras...\")\n",
" \n",
" # Try different methods to find GigE cameras\n",
" methods = [\n",
" \"OpenCV with different backends\",\n",
" \"Network scanning\",\n",
" \"GStreamer pipeline testing\"\n",
" ]\n",
" \n",
" print(\"\\n1. Testing OpenCV backends:\")\n",
" backends = [\n",
" (cv2.CAP_GSTREAMER, \"GStreamer\"),\n",
" (cv2.CAP_V4L2, \"V4L2\"),\n",
" (cv2.CAP_FFMPEG, \"FFmpeg\"),\n",
" (cv2.CAP_ANY, \"Default\")\n",
" ]\n",
" \n",
" for backend_id, backend_name in backends:\n",
" print(f\" Testing {backend_name}...\")\n",
" for cam_id in range(5):\n",
" try:\n",
" cap = cv2.VideoCapture(cam_id, backend_id)\n",
" if cap.isOpened():\n",
" ret, frame = cap.read()\n",
" if ret:\n",
" print(f\" ✅ Camera {cam_id} accessible via {backend_name}\")\n",
" print(f\" Resolution: {frame.shape[1]}x{frame.shape[0]}\")\n",
" cap.release()\n",
" except Exception as e:\n",
" pass\n",
" \n",
" print(\"\\n2. Testing GStreamer pipelines:\")\n",
" # Common GigE camera GStreamer pipelines\n",
" gstreamer_pipelines = [\n",
" \"v4l2src device=/dev/video0 ! videoconvert ! appsink\",\n",
" \"v4l2src device=/dev/video1 ! videoconvert ! appsink\",\n",
" \"tcambin ! videoconvert ! appsink\", # For TIS cameras\n",
" \"aravis ! videoconvert ! appsink\", # For Aravis-supported cameras\n",
" ]\n",
" \n",
" for pipeline in gstreamer_pipelines:\n",
" try:\n",
" print(f\" Testing: {pipeline}\")\n",
" cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)\n",
" if cap.isOpened():\n",
" ret, frame = cap.read()\n",
" if ret:\n",
" print(f\" ✅ Pipeline works! Frame shape: {frame.shape}\")\n",
" else:\n",
" print(f\" ⚠️ Pipeline opened but no frames\")\n",
" else:\n",
" print(f\" ❌ Pipeline failed\")\n",
" cap.release()\n",
" except Exception as e:\n",
" print(f\" ❌ Error: {e}\")\n",
"\n",
"discover_gige_cameras()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Camera Parameter Configuration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def configure_camera_parameters(camera_id, backend=cv2.CAP_ANY):\n",
" \"\"\"Configure and test camera parameters\"\"\"\n",
" print(f\"⚙️ Configuring camera {camera_id}...\")\n",
" \n",
" cap = cv2.VideoCapture(camera_id, backend)\n",
" if not cap.isOpened():\n",
" print(f\"❌ Cannot open camera {camera_id}\")\n",
" return None\n",
" \n",
" # Get current parameters\n",
" current_params = {\n",
" 'width': cap.get(cv2.CAP_PROP_FRAME_WIDTH),\n",
" 'height': cap.get(cv2.CAP_PROP_FRAME_HEIGHT),\n",
" 'fps': cap.get(cv2.CAP_PROP_FPS),\n",
" 'brightness': cap.get(cv2.CAP_PROP_BRIGHTNESS),\n",
" 'contrast': cap.get(cv2.CAP_PROP_CONTRAST),\n",
" 'saturation': cap.get(cv2.CAP_PROP_SATURATION),\n",
" 'hue': cap.get(cv2.CAP_PROP_HUE),\n",
" 'gain': cap.get(cv2.CAP_PROP_GAIN),\n",
" 'exposure': cap.get(cv2.CAP_PROP_EXPOSURE),\n",
" 'auto_exposure': cap.get(cv2.CAP_PROP_AUTO_EXPOSURE),\n",
" 'white_balance': cap.get(cv2.CAP_PROP_WHITE_BALANCE_BLUE_U),\n",
" }\n",
" \n",
" print(\"📊 Current Camera Parameters:\")\n",
" for param, value in current_params.items():\n",
" print(f\" {param}: {value}\")\n",
" \n",
" # Test setting some parameters\n",
" print(\"\\n🔧 Testing parameter changes:\")\n",
" \n",
" # Try to set resolution (common GigE resolutions)\n",
" test_resolutions = [(1920, 1080), (1280, 720), (640, 480)]\n",
" for width, height in test_resolutions:\n",
" if cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) and cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height):\n",
" actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)\n",
" actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)\n",
" print(f\" Resolution {width}x{height}: Set to {actual_width}x{actual_height}\")\n",
" break\n",
" \n",
" # Test FPS settings\n",
" for fps in [30, 60, 120]:\n",
" if cap.set(cv2.CAP_PROP_FPS, fps):\n",
" actual_fps = cap.get(cv2.CAP_PROP_FPS)\n",
" print(f\" FPS {fps}: Set to {actual_fps}\")\n",
" break\n",
" \n",
" # Capture test frame with new settings\n",
" ret, frame = cap.read()\n",
" if ret:\n",
" print(f\"\\n✅ Test frame captured: {frame.shape}\")\n",
" \n",
" # Display frame\n",
" plt.figure(figsize=(10, 6))\n",
" if len(frame.shape) == 3:\n",
" plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))\n",
" else:\n",
" plt.imshow(frame, cmap='gray')\n",
" plt.title(f\"Camera {camera_id} - Configured\")\n",
" plt.axis('off')\n",
" plt.show()\n",
" \n",
" # Save configuration and test image\n",
" timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
" \n",
" # Save image\n",
" img_path = f\"/storage/camera{camera_id}/configured_test_{timestamp}.jpg\"\n",
" cv2.imwrite(img_path, frame)\n",
" print(f\"💾 Test image saved: {img_path}\")\n",
" \n",
" # Save configuration\n",
" config_path = f\"/storage/camera{camera_id}/config_{timestamp}.json\"\n",
" with open(config_path, 'w') as f:\n",
" json.dump(current_params, f, indent=2)\n",
" print(f\"💾 Configuration saved: {config_path}\")\n",
" \n",
" cap.release()\n",
" return current_params\n",
"\n",
"# Test configuration (change camera_id as needed)\n",
"camera_to_configure = 0\n",
"config = configure_camera_parameters(camera_to_configure)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dual Camera Testing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def test_dual_cameras(camera1_id=0, camera2_id=1, duration=5):\n",
" \"\"\"Test simultaneous capture from two cameras\"\"\"\n",
" print(f\"📷📷 Testing dual camera capture (cameras {camera1_id} and {camera2_id})...\")\n",
" \n",
" # Open both cameras\n",
" cap1 = cv2.VideoCapture(camera1_id)\n",
" cap2 = cv2.VideoCapture(camera2_id)\n",
" \n",
" if not cap1.isOpened():\n",
" print(f\"❌ Cannot open camera {camera1_id}\")\n",
" return\n",
" \n",
" if not cap2.isOpened():\n",
" print(f\"❌ Cannot open camera {camera2_id}\")\n",
" cap1.release()\n",
" return\n",
" \n",
" print(\"✅ Both cameras opened successfully\")\n",
" \n",
" # Capture test frames\n",
" ret1, frame1 = cap1.read()\n",
" ret2, frame2 = cap2.read()\n",
" \n",
" if ret1 and ret2:\n",
" print(f\"📊 Camera {camera1_id}: {frame1.shape}\")\n",
" print(f\"📊 Camera {camera2_id}: {frame2.shape}\")\n",
" \n",
" # Display both frames side by side\n",
" fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))\n",
" \n",
" if len(frame1.shape) == 3:\n",
" ax1.imshow(cv2.cvtColor(frame1, cv2.COLOR_BGR2RGB))\n",
" else:\n",
" ax1.imshow(frame1, cmap='gray')\n",
" ax1.set_title(f\"Camera {camera1_id}\")\n",
" ax1.axis('off')\n",
" \n",
" if len(frame2.shape) == 3:\n",
" ax2.imshow(cv2.cvtColor(frame2, cv2.COLOR_BGR2RGB))\n",
" else:\n",
" ax2.imshow(frame2, cmap='gray')\n",
" ax2.set_title(f\"Camera {camera2_id}\")\n",
" ax2.axis('off')\n",
" \n",
" plt.tight_layout()\n",
" plt.show()\n",
" \n",
" # Save test images\n",
" timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
" cv2.imwrite(f\"/storage/camera1/dual_test_{timestamp}.jpg\", frame1)\n",
" cv2.imwrite(f\"/storage/camera2/dual_test_{timestamp}.jpg\", frame2)\n",
" print(f\"💾 Dual camera test images saved with timestamp {timestamp}\")\n",
" \n",
" else:\n",
" print(\"❌ Failed to capture from one or both cameras\")\n",
" \n",
" # Test synchronized recording\n",
" print(f\"\\n🎥 Testing synchronized recording for {duration} seconds...\")\n",
" \n",
" # Setup video writers\n",
" timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
" \n",
" fourcc = cv2.VideoWriter_fourcc(*'mp4v')\n",
" fps = 30\n",
" \n",
" if ret1:\n",
" h1, w1 = frame1.shape[:2]\n",
" out1 = cv2.VideoWriter(f\"/storage/camera1/sync_test_{timestamp}.mp4\", fourcc, fps, (w1, h1))\n",
" \n",
" if ret2:\n",
" h2, w2 = frame2.shape[:2]\n",
" out2 = cv2.VideoWriter(f\"/storage/camera2/sync_test_{timestamp}.mp4\", fourcc, fps, (w2, h2))\n",
" \n",
" # Record synchronized video\n",
" start_time = time.time()\n",
" frame_count = 0\n",
" \n",
" while time.time() - start_time < duration:\n",
" ret1, frame1 = cap1.read()\n",
" ret2, frame2 = cap2.read()\n",
" \n",
" if ret1 and ret2:\n",
" out1.write(frame1)\n",
" out2.write(frame2)\n",
" frame_count += 1\n",
" else:\n",
" print(f\"⚠️ Frame drop at frame {frame_count}\")\n",
" \n",
" # Cleanup\n",
" cap1.release()\n",
" cap2.release()\n",
" if 'out1' in locals():\n",
" out1.release()\n",
" if 'out2' in locals():\n",
" out2.release()\n",
" \n",
" elapsed = time.time() - start_time\n",
" actual_fps = frame_count / elapsed\n",
" \n",
" print(f\"✅ Synchronized recording complete\")\n",
" print(f\"📊 Recorded {frame_count} frames in {elapsed:.2f}s\")\n",
" print(f\"📊 Actual FPS: {actual_fps:.2f}\")\n",
" print(f\"💾 Videos saved with timestamp {timestamp}\")\n",
"\n",
"# Test dual cameras (adjust camera IDs as needed)\n",
"test_dual_cameras(0, 1, duration=3)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "usda-vision-cameras",
"language": "python",
"name": "usda-vision-cameras"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}