Files
usda-vision/camera-management-api/usda_vision_system/api/routes/mqtt_routes.py
salirezav f1a9cb0c1e Refactor API route setup and enhance modularity
- Consolidated API route definitions by registering routes from separate modules for better organization and maintainability.
- Removed redundant route definitions from the APIServer class, improving code clarity.
- Updated camera monitoring and recording modules to utilize a shared context manager for suppressing camera SDK errors, enhancing error handling.
- Adjusted timeout settings in camera operations for improved reliability during frame capture.
- Enhanced logging and error handling across camera operations to facilitate better debugging and monitoring.
2025-11-01 15:53:01 -04:00

73 lines
2.6 KiB
Python

"""
MQTT-related API routes.
"""
import logging
from typing import Dict
from fastapi import FastAPI, HTTPException, Query
from ...core.state_manager import StateManager
from ...mqtt.client import MQTTClient
from ..models import MQTTStatusResponse, MQTTEventsHistoryResponse, MQTTEventResponse
def register_mqtt_routes(
app: FastAPI,
mqtt_client: MQTTClient,
state_manager: StateManager,
logger: logging.Logger
):
"""Register MQTT-related routes"""
@app.get("/mqtt/status", response_model=MQTTStatusResponse)
async def get_mqtt_status():
"""Get MQTT client status and statistics"""
try:
status = mqtt_client.get_status()
return MQTTStatusResponse(
connected=status["connected"],
broker_host=status["broker_host"],
broker_port=status["broker_port"],
subscribed_topics=status["subscribed_topics"],
last_message_time=status["last_message_time"],
message_count=status["message_count"],
error_count=status["error_count"],
uptime_seconds=status["uptime_seconds"]
)
except Exception as e:
logger.error(f"Error getting MQTT status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/mqtt/events", response_model=MQTTEventsHistoryResponse)
async def get_mqtt_events(
limit: int = Query(default=5, ge=1, le=50, description="Number of recent events to retrieve")
):
"""Get recent MQTT events history"""
try:
events = state_manager.get_recent_mqtt_events(limit)
total_events = state_manager.get_mqtt_event_count()
# Convert events to response format
event_responses = [
MQTTEventResponse(
machine_name=event.machine_name,
topic=event.topic,
payload=event.payload,
normalized_state=event.normalized_state,
timestamp=event.timestamp.isoformat(),
message_number=event.message_number
)
for event in events
]
last_updated = events[0].timestamp.isoformat() if events else None
return MQTTEventsHistoryResponse(
events=event_responses,
total_events=total_events,
last_updated=last_updated
)
except Exception as e:
logger.error(f"Error getting MQTT events: {e}")
raise HTTPException(status_code=500, detail=str(e))