from __future__ import annotations import asyncio import json import logging import socket import voluptuous as vol from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity from homeassistant.const import CONF_NAME, CONF_VALUE_TEMPLATE, CONF_UNIT_OF_MEASUREMENT, EVENT_HOMEASSISTANT_STOP from homeassistant.core import HomeAssistant, callback import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType _LOGGER = logging.getLogger(__name__) CONF_HOST = "host" CONF_PORT = "port" CONF_DEFAULT_PORT = 23 DEFAULT_NAME = "TCP Serial Sensor" PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_HOST): cv.string, vol.Optional(CONF_PORT, default=CONF_DEFAULT_PORT): cv.port, vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string, vol.Optional(CONF_VALUE_TEMPLATE): cv.template, } ) async def async_setup_platform( hass: HomeAssistant, config: ConfigType, async_add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up the Serial sensor platform.""" name = config.get(CONF_NAME) host = config.get(CONF_HOST) unit = config.get(CONF_UNIT_OF_MEASUREMENT), port = config.get(CONF_PORT) if (value_template := config.get(CONF_VALUE_TEMPLATE)) is not None: value_template.hass = hass sensor = TCPSerialSensor( name, unit, host, port, value_template ) hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, sensor.stop_serial_read) async_add_entities([sensor], True) class TCPSerialSensor(SensorEntity): """Representation of a Serial sensor.""" _attr_should_poll = False def __init__( self, name, unit, host, port, value_template, ): """Initialize the Serial sensor.""" self._name = name self._unit = unit self._host = host self._state = None self._port = port self._serial_loop_task = None self._template = value_template self._attributes = None async def async_added_to_hass(self) -> None: """Handle when an entity is about to be added to Home Assistant.""" self._serial_loop_task = self.hass.loop.create_task( self.serial_read(self._host, self._port) ) async def serial_read(self, host, port): """Read the data from the port.""" logged_error = False while True: try: reader, _ = await asyncio.open_connection(host, port) except (OSError, socket.gaierror) as exc: if not logged_error: _LOGGER.exception( "Unable to connect to the serial device %s: %s. Will retry", device, exc, ) logged_error = True await self._handle_error() else: _LOGGER.info("Serial device %s connected", device) while True: try: line = await reader.readline() except (OSError, asyncio.CancelledError) as exc: _LOGGER.exception( "Error while reading serial device %s: %s", device, exc ) await self._handle_error() break else: line = line.decode("utf-8").strip() try: data = json.loads(line) except ValueError: pass else: if isinstance(data, dict): self._attributes = data if self._template is not None: line = self._template.async_render_with_possible_json_value( line ) _LOGGER.debug("Received: %s", line) self._state = line self.async_write_ha_state() async def _handle_error(self): """Handle error for serial connection.""" self._state = None self._attributes = None self.async_write_ha_state() await asyncio.sleep(5) @callback def stop_serial_read(self, event): """Close resources.""" if self._serial_loop_task: self._serial_loop_task.cancel() @property def name(self): """Return the name of the sensor.""" return self._name @property def extra_state_attributes(self): """Return the attributes of the entity (if any JSON present).""" return self._attributes @property def native_unit_of_measurement(self) -> str | None: """Return the unit of measurement of this entity.""" return self._unit @property def native_value(self): """Return the state of the sensor.""" return self._state