diff --git a/CODEOWNERS b/CODEOWNERS index 2fbdd3519b2f77..2078968cbac81a 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -434,6 +434,8 @@ CLAUDE.md @home-assistant/core /tests/components/earn_e_p1/ @Miggets7 /homeassistant/components/easyenergy/ @klaasnicolaas /tests/components/easyenergy/ @klaasnicolaas +/homeassistant/components/easywave/ @eldateas +/tests/components/easywave/ @eldateas /homeassistant/components/ecoforest/ @pjanuario /tests/components/ecoforest/ @pjanuario /homeassistant/components/econet/ @w1ll1am23 diff --git a/homeassistant/components/easywave/__init__.py b/homeassistant/components/easywave/__init__.py new file mode 100644 index 00000000000000..6b8831c4cdc344 --- /dev/null +++ b/homeassistant/components/easywave/__init__.py @@ -0,0 +1,109 @@ +"""The Easywave integration.""" + +from __future__ import annotations + +from dataclasses import dataclass +import logging + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import Platform +from homeassistant.core import HomeAssistant +from homeassistant.helpers import issue_registry as ir + +from .const import ( + CONF_DEVICE_PATH, + CONF_USB_PID, + DOMAIN, + get_frequency_for_pid, + is_country_allowed_for_frequency, +) +from .coordinator import EasywaveCoordinator +from .transceiver import RX11Transceiver + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class EasywaveRuntimeData: + """Runtime data for the Easywave integration.""" + + coordinator: EasywaveCoordinator + frequency: str | None + country: str | None + + +type EasywaveConfigEntry = ConfigEntry[EasywaveRuntimeData] + +PLATFORMS: list[Platform] = [Platform.SENSOR] + + +async def async_setup_entry(hass: HomeAssistant, entry: EasywaveConfigEntry) -> bool: + """Set up Easywave from a config entry.""" + + # ── Regulatory compliance check (868 MHz) ────────────────────────── + # The operating frequency is derived from the USB device's PID. + # If the configured HA country is outside the allowed region for that + # frequency, the integration must not start. + usb_pid = entry.data.get(CONF_USB_PID) + frequency = get_frequency_for_pid(usb_pid) + country_code = hass.config.country # ISO 3166-1 alpha-2 or None + + if frequency and not is_country_allowed_for_frequency(frequency, country_code): + _LOGGER.warning( + "This hardware operates on %s, which is not permitted in " + "your configured region (%s). Integration disabled for " + "regulatory compliance", + frequency, + country_code or "unknown", + ) + # Create a persistent repair issue visible in the HA dashboard + ir.async_create_issue( + hass, + DOMAIN, + f"frequency_not_permitted_{entry.entry_id}", + is_fixable=False, + severity=ir.IssueSeverity.ERROR, + translation_key="frequency_not_permitted", + translation_placeholders={ + "frequency": frequency, + "country": country_code or "unknown", + }, + ) + # Return False for regulatory compliance violation (not a setup error) + return False + + # If the check passed, make sure any stale repair issue is removed + # (e.g. user changed their country setting). + ir.async_delete_issue(hass, DOMAIN, f"frequency_not_permitted_{entry.entry_id}") + + # ── Initialize transceiver and coordinator ────────────────────────── + # Create transceiver instance, prefer user-selected serial device_path + transceiver = RX11Transceiver(hass, entry.data.get(CONF_DEVICE_PATH)) + + # Create coordinator for managing connection lifecycle & offline mode + coordinator = EasywaveCoordinator(hass, transceiver, entry) + + # _async_setup + first data refresh; raises ConfigEntryNotReady on failure + await coordinator.async_config_entry_first_refresh() + + # Set runtime data for the integration + entry.runtime_data = EasywaveRuntimeData( + coordinator=coordinator, + frequency=frequency, + country=country_code, + ) + + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: EasywaveConfigEntry) -> bool: + """Unload a config entry.""" + # Unload platforms first; only shut down the coordinator if this succeeds + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + if not unload_ok: + return False + + await entry.runtime_data.coordinator.async_shutdown() + + return True diff --git a/homeassistant/components/easywave/config_flow.py b/homeassistant/components/easywave/config_flow.py new file mode 100644 index 00000000000000..55217a89293c90 --- /dev/null +++ b/homeassistant/components/easywave/config_flow.py @@ -0,0 +1,172 @@ +"""Config flow for the Easywave integration.""" + +from __future__ import annotations + +import logging +from typing import Any + +import serial.tools.list_ports +import voluptuous as vol + +from homeassistant.config_entries import ConfigFlow, ConfigFlowResult +from homeassistant.helpers.service_info.usb import UsbServiceInfo + +from .const import ( + CONF_DEVICE_PATH, + CONF_USB_MANUFACTURER, + CONF_USB_PID, + CONF_USB_PRODUCT, + CONF_USB_SERIAL_NUMBER, + CONF_USB_VID, + DOMAIN, + USB_DEVICE_NAMES, +) + +_LOGGER = logging.getLogger(__name__) + + +class EasywaveConfigFlow(ConfigFlow, domain=DOMAIN): + """Handle the config flow for Easywave.""" + + VERSION = 1 + + def __init__(self) -> None: + """Initialize.""" + self._device: dict[str, Any] = {} + + # ------------------------------------------------------------------ + # Manual setup: list all serial ports + # ------------------------------------------------------------------ + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Show available serial ports and let the user pick one.""" + errors: dict[str, str] = {} + ports = await self.hass.async_add_executor_job(serial.tools.list_ports.comports) + port_list = { + p.device: ( + f"{p.device}" + f"{f', s/n: {p.serial_number}' if p.serial_number else ''}" + f"{f' - {p.manufacturer}' if p.manufacturer else ''}" + ) + for p in ports + } + + if not port_list: + return self.async_abort(reason="no_devices_found") + + if user_input is not None: + selected_path = user_input[CONF_DEVICE_PATH] + # Find the matching port to extract USB metadata + port = next( + (p for p in ports if p.device == selected_path), + None, + ) + if port is None: + errors["base"] = "device_no_longer_available" + else: + self._device = { + "device": port.device, + "vid": port.vid, + "pid": port.pid, + "serial_number": port.serial_number or "unknown", + "manufacturer": port.manufacturer or "unknown", + "product": ( + USB_DEVICE_NAMES[(port.vid, port.pid)]["product"] + if (port.vid, port.pid) in USB_DEVICE_NAMES + else port.product or "Easywave Device" + ), + } + return await self.async_step_confirm() + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema({vol.Required(CONF_DEVICE_PATH): vol.In(port_list)}), + errors=errors, + ) + + # ------------------------------------------------------------------ + # USB auto-discovery (triggered by manifest `usb` matcher) + # ------------------------------------------------------------------ + + async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult: + """Handle USB discovery.""" + vid = int(discovery_info.vid, 16) + pid = int(discovery_info.pid, 16) + serial_number = discovery_info.serial_number or "unknown" + + unique_id = ( + f"easywave_{serial_number}" + if serial_number != "unknown" + else f"easywave_{vid:04X}_{pid:04X}" + ) + await self.async_set_unique_id(unique_id) + self._abort_if_unique_id_configured() + + device_entry = USB_DEVICE_NAMES.get((vid, pid)) + mfr = device_entry["manufacturer"] if device_entry else "ELDAT EaS GmbH" + prod = device_entry["product"] if device_entry else "Unknown Easywave Device" + + self._device = { + "device": discovery_info.device, + "vid": vid, + "pid": pid, + "serial_number": serial_number, + "manufacturer": discovery_info.manufacturer or mfr, + "product": prod, + } + self.context["title_placeholders"] = {"name": prod} + return await self.async_step_confirm() + + # ------------------------------------------------------------------ + # Confirmation step + # ------------------------------------------------------------------ + + async def async_step_confirm( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Show confirmation dialog and create the entry on submit.""" + serial_number = self._device["serial_number"] + vid = self._device.get("vid") + pid = self._device.get("pid") + + if serial_number != "unknown": + unique_id = f"easywave_{serial_number}" + elif vid is not None and pid is not None: + unique_id = f"easywave_{vid:04X}_{pid:04X}" + else: + unique_id = f"easywave_{self._device['device'].replace('/', '_')}" + + await self.async_set_unique_id(unique_id) + self._abort_if_unique_id_configured() + + if user_input is not None: + return self._create_entry() + + return self.async_show_form( + step_id="confirm", + data_schema=vol.Schema({}), + description_placeholders={ + "name": self._device["product"], + "serial_number": serial_number, + "device": self._device["device"], + }, + ) + + # ------------------------------------------------------------------ + + def _create_entry(self) -> ConfigFlowResult: + """Create the config entry.""" + d = self._device + return self.async_create_entry( + title="Easywave Gateway", + data={ + CONF_DEVICE_PATH: d["device"], + CONF_USB_VID: d["vid"], + CONF_USB_PID: d["pid"], + CONF_USB_SERIAL_NUMBER: d["serial_number"], + CONF_USB_MANUFACTURER: d["manufacturer"], + CONF_USB_PRODUCT: d["product"], + }, + ) diff --git a/homeassistant/components/easywave/const.py b/homeassistant/components/easywave/const.py new file mode 100644 index 00000000000000..5ed78d404ce37b --- /dev/null +++ b/homeassistant/components/easywave/const.py @@ -0,0 +1,125 @@ +"""Constants for the Easywave integration.""" + +from __future__ import annotations + +from datetime import timedelta +from typing import Final + +DOMAIN: Final = "easywave" + +# ── USB Device Registry ────────────────────────────────────────────────────── +# Single source of truth for supported USB sticks. +# Adding a new device here is sufficient — config flow and discovery pick it up +# automatically. Also update the `usb` list in manifest.json. +# +# Key: (VID, PID) as int +# Value: {"manufacturer": str, "product": str} +USB_DEVICE_NAMES: Final[dict[tuple[int, int], dict[str, str]]] = { + (0x155A, 0x1014): { + "manufacturer": "ELDAT", + "product": "RX11 USB Transceiver", + }, +} + +SUPPORTED_USB_IDS: Final = frozenset(USB_DEVICE_NAMES.keys()) + +# ── Coordinator Update Interval ────────────────────────────────────────────── +# Periodic polling interval for USB device reconnection attempts +DEVICE_SCAN_INTERVAL: Final = timedelta(seconds=30) + + +# ── Config Entry Keys ──────────────────────────────────────────────────────── +CONF_DEVICE_PATH: Final = "device_path" +CONF_USB_VID: Final = "usb_vid" +CONF_USB_PID: Final = "usb_pid" +CONF_USB_SERIAL_NUMBER: Final = "usb_serial_number" +CONF_USB_MANUFACTURER: Final = "usb_manufacturer" +CONF_USB_PRODUCT: Final = "usb_product" + +# ── Radio Frequency / Regulatory Compliance ───────────────────────────────── +# RX11 operates at 868 MHz (EU ISM band). Only permitted in CEPT countries. +FREQUENCY_868MHZ: Final = "868 MHz" + +FREQUENCY_ALLOWED_COUNTRIES: Final = { + FREQUENCY_868MHZ: frozenset( + { + # EU Member States (CEPT) + "AT", + "BE", + "BG", + "HR", + "CY", + "CZ", + "DK", + "EE", + "FI", + "FR", + "DE", + "GR", + "HU", + "IE", + "IT", + "LV", + "LT", + "LU", + "MT", + "NL", + "PL", + "PT", + "RO", + "SK", + "SI", + "ES", + "SE", + # CEPT Members (non-EU) + "CH", + "NO", + "IS", + "LI", + # UK (post-Brexit) + "GB", + "UK", + } + ), +} + +# Legacy constant for backward compatibility +ALLOWED_COUNTRIES_868MHZ: Final = FREQUENCY_ALLOWED_COUNTRIES[FREQUENCY_868MHZ] + + +def is_country_allowed_for_frequency(frequency: str, country_code: str | None) -> bool: + """Check whether a country is permitted to operate on the given frequency. + + Args: + frequency: The frequency band (e.g., FREQUENCY_868MHZ) + country_code: ISO 3166-1 alpha-2 country code, or None if not configured + + Returns: + True if country is allowed or unknown, False if explicitly disallowed. + """ + # No country configured — cannot enforce + if country_code is None: + return True + + allowed = FREQUENCY_ALLOWED_COUNTRIES.get(frequency) + if allowed is None: + # Unknown frequency — conservative: allow + return True + + return country_code.upper() in allowed + + +def get_frequency_for_pid(pid: int | None) -> str | None: + """Get frequency band for a USB device PID. + + RX11 USB Transceiver (0x1014) operates at 868 MHz. + """ + if pid == 0x1014: + return FREQUENCY_868MHZ + return None + + +# ── Gateway Events ────────────────────────────────────────────────────────── +EVENT_GATEWAY_CONNECTED: Final = f"{DOMAIN}_gateway_connected" +EVENT_GATEWAY_DISCONNECTED: Final = f"{DOMAIN}_gateway_disconnected" +EVENT_GATEWAY_STATUS_CHANGED: Final = f"{DOMAIN}_gateway_status_changed" diff --git a/homeassistant/components/easywave/coordinator.py b/homeassistant/components/easywave/coordinator.py new file mode 100644 index 00000000000000..badfb1b061b170 --- /dev/null +++ b/homeassistant/components/easywave/coordinator.py @@ -0,0 +1,146 @@ +"""Coordinator for Easywave integration with automatic USB reconnect.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +from .const import DEVICE_SCAN_INTERVAL, DOMAIN +from .transceiver import RX11Transceiver + +if TYPE_CHECKING: + from . import EasywaveConfigEntry + +_LOGGER = logging.getLogger(__name__) + + +class EasywaveCoordinator(DataUpdateCoordinator[dict[str, Any]]): + """Coordinator for Easywave integration.""" + + config_entry: EasywaveConfigEntry + + def __init__( + self, + hass: HomeAssistant, + transceiver: RX11Transceiver, + config_entry: EasywaveConfigEntry, + ) -> None: + """Initialize coordinator.""" + super().__init__( + hass, + _LOGGER, + name=DOMAIN, + update_interval=DEVICE_SCAN_INTERVAL, + config_entry=config_entry, + ) + self.transceiver = transceiver + self.is_offline = not transceiver.is_connected + + async def _async_setup(self) -> None: + """Set up coordinator and attempt initial connection. + + Called by DataUpdateCoordinator before the first update. + Raises UpdateFailed if initialization fails completely. + """ + try: + connected = await self.transceiver.connect() + self.is_offline = not connected + + if connected: + self.transceiver.set_disconnect_callback( + self._on_transceiver_disconnect + ) + else: + _LOGGER.warning( + "RX11 device not found, entering offline mode. " + "Entities will be unavailable until device connects" + ) + except (OSError, TimeoutError) as err: + raise UpdateFailed(f"Setup failed: {err}") from err + + def _on_transceiver_disconnect(self) -> None: + """Called from transceiver when connection is lost. + + May be invoked from the event loop (health-check / RxModule + disconnect handler), so use call_soon_threadsafe to guarantee + thread safety regardless of the calling context. + """ + self.hass.loop.call_soon_threadsafe(self._handle_disconnect) + + def _handle_disconnect(self) -> None: + """Mark offline and push updated data to listeners immediately.""" + if self.is_offline: + return + _LOGGER.warning("Lost connection to RX11, entering offline mode") + self.is_offline = True + self.async_set_updated_data( + { + "is_connected": False, + "device_path": None, + } + ) + + async def _async_update_data(self) -> dict[str, Any]: + """Update device data periodically. + + This is called every DEVICE_SCAN_INTERVAL to: + - Check connection status + - Attempt reconnection if offline + - Detect disconnections of previously connected devices + """ + try: + # If offline, attempt reconnect + if self.is_offline: + connected = await self.transceiver.reconnect() + if connected: + self.is_offline = False + # Re-register disconnect callback for new connection + self.transceiver.set_disconnect_callback( + self._on_transceiver_disconnect + ) + # Return new device state; coordinator will notify listeners + return { + "is_connected": self.transceiver.is_connected, + "device_path": self.transceiver.device_path, + } + # Still offline, no need to log as error — offline mode is expected + return { + "is_connected": False, + "device_path": None, + } + # Verify transceiver still reports connected + # (disconnect callback handles immediate detection, + # this is a safety net for edge cases) + if not self.transceiver.is_connected: + _LOGGER.warning("Connection lost, entering offline mode") + self.is_offline = True + return { + "is_connected": False, + "device_path": None, + } + except UpdateFailed: + if not self.is_offline: + self.is_offline = True + raise + except (OSError, TimeoutError) as err: + _LOGGER.warning("Error updating coordinator data: %s", err) + self.is_offline = True + raise UpdateFailed(f"Update failed: {err}") from err + else: + return { + "is_connected": self.transceiver.is_connected, + "device_path": self.transceiver.device_path, + } + + async def async_shutdown(self) -> None: + """Shutdown coordinator and disconnect transceiver.""" + try: + await self.transceiver.dispose() + _LOGGER.debug("Coordinator shutdown complete") + except (OSError, TimeoutError) as err: + _LOGGER.error("Error during coordinator shutdown: %s", err) + finally: + await super().async_shutdown() diff --git a/homeassistant/components/easywave/manifest.json b/homeassistant/components/easywave/manifest.json new file mode 100644 index 00000000000000..329a8ae28bd6f9 --- /dev/null +++ b/homeassistant/components/easywave/manifest.json @@ -0,0 +1,22 @@ +{ + "domain": "easywave", + "name": "Easywave", + "after_dependencies": ["usb"], + "codeowners": ["@eldateas"], + "config_flow": true, + "documentation": "https://www.home-assistant.io/integrations/easywave", + "integration_type": "device", + "iot_class": "local_polling", + "loggers": ["serial"], + "quality_scale": "bronze", + "requirements": ["pyserial==3.5", "easywave-home-control==0.1.1"], + "single_config_entry": true, + "usb": [ + { + "description": "*rx11*", + "manufacturer": "eldat*", + "pid": "1014", + "vid": "155A" + } + ] +} diff --git a/homeassistant/components/easywave/quality_scale.yaml b/homeassistant/components/easywave/quality_scale.yaml new file mode 100644 index 00000000000000..056822488f161a --- /dev/null +++ b/homeassistant/components/easywave/quality_scale.yaml @@ -0,0 +1,24 @@ +rules: + # Bronze + action-setup: + status: exempt + comment: Integration does not register custom actions. + appropriate-polling: done + brands: done + common-modules: done + config-flow: done + config-flow-test-coverage: done + dependency-transparency: done + docs-actions: + status: exempt + comment: Integration does not provide any service actions. + docs-high-level-description: done + docs-installation-instructions: done + docs-removal-instructions: done + entity-event-setup: done + entity-unique-id: done + has-entity-name: done + runtime-data: done + test-before-configure: done + test-before-setup: done + unique-config-entry: done diff --git a/homeassistant/components/easywave/sensor.py b/homeassistant/components/easywave/sensor.py new file mode 100644 index 00000000000000..297a2ad8c84b6a --- /dev/null +++ b/homeassistant/components/easywave/sensor.py @@ -0,0 +1,308 @@ +"""Sensor platform for the Easywave Core integration.""" + +from __future__ import annotations + +import logging +from typing import Any + +from homeassistant.components.sensor import SensorDeviceClass, SensorEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + EVENT_CORE_CONFIG_UPDATE, + EVENT_HOMEASSISTANT_STARTED, + EntityCategory, +) +from homeassistant.core import CoreState, HomeAssistant, callback +from homeassistant.helpers import device_registry as dr +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from . import EasywaveConfigEntry +from .const import ( + CONF_DEVICE_PATH, + CONF_USB_MANUFACTURER, + CONF_USB_PID, + CONF_USB_PRODUCT, + CONF_USB_SERIAL_NUMBER, + CONF_USB_VID, + DOMAIN, + EVENT_GATEWAY_CONNECTED, + EVENT_GATEWAY_DISCONNECTED, + EVENT_GATEWAY_STATUS_CHANGED, + USB_DEVICE_NAMES, +) +from .coordinator import EasywaveCoordinator + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry( + hass: HomeAssistant, + entry: EasywaveConfigEntry, + async_add_entities: AddConfigEntryEntitiesCallback, +) -> None: + """Set up Easywave Core sensors.""" + coordinator = entry.runtime_data.coordinator + async_add_entities([EasywaveGatewaySensor(entry, coordinator)]) + + +class EasywaveGatewaySensor(CoordinatorEntity[EasywaveCoordinator], SensorEntity): + """Represents the RX11 USB gateway connectivity/state.""" + + STATUS_KEYS = [ + "connected", + "disconnected", + ] + + _attr_has_entity_name = True + _attr_translation_key = "gateway_status" + _attr_device_class = SensorDeviceClass.ENUM + _attr_entity_category = EntityCategory.DIAGNOSTIC + _attr_options = STATUS_KEYS + + def __init__(self, entry: ConfigEntry, coordinator: EasywaveCoordinator) -> None: + """Initialize the sensor.""" + super().__init__(coordinator) + self._entry = entry + self._attr_unique_id = f"{entry.entry_id}_rx11_gateway" + self._last_status = "disconnected" + self._ha_started = False + self._attr_icon = "mdi:close-thick" + + # Get USB device info — always use the canonical lookup table so + # manufacturer/product stay in sync with const.py (the config entry + # may still hold a stale value from the initial setup). + vid: int | None = entry.data.get(CONF_USB_VID) + pid: int | None = entry.data.get(CONF_USB_PID) + device_entry = USB_DEVICE_NAMES.get((vid, pid)) if vid and pid else None + + # Use USB registry if available, fall back to entry config values + if device_entry: + self._usb_manufacturer = device_entry["manufacturer"] + self._usb_product = device_entry["product"] + else: + self._usb_manufacturer = ( + entry.data.get(CONF_USB_MANUFACTURER) or "ELDAT EaS GmbH" + ) + self._usb_product = ( + entry.data.get(CONF_USB_PRODUCT) or "Unknown Easywave Device" + ) + + # Prefer live transceiver serial/versions (already available after + # coordinator.async_setup) over stale config entry values. + transceiver = coordinator.transceiver + self._usb_serial_number = transceiver.usb_serial_number or entry.data.get( + CONF_USB_SERIAL_NUMBER, "unknown" + ) + self._hw_version: str | None = transceiver.hw_version + self._sw_version: str | None = transceiver.fw_version + + # Keep _current_status as None until EVENT_HOMEASSISTANT_STARTED so the + # recorder/logbook can capture an initial "unknown" → "connected" transition + # instead of leaving the last shutdown "unavailable" state as the latest entry. + self._current_status: str | None = None + + def _connection_status(self) -> str: + """Get connection status as constant key (translated by HA frontend). + + Returns the current connection status from the coordinator: + - "connected": Device is currently connected + - "disconnected": Device is not found or offline + """ + # Check if device is offline (not found) + if self.coordinator.is_offline: + return "disconnected" + + # Check transceiver connection status + transceiver = self.coordinator.transceiver + if transceiver and transceiver.is_connected: + return "connected" + + return "disconnected" + + @callback + def _update_gateway_device_info(self) -> None: + """Check if USB serial/version changed and update device registry. + + Updates local cached values from the transceiver and pushes changes + to the Home Assistant device registry for persistence. + """ + if self.coordinator.transceiver is None: + return + + transceiver = self.coordinator.transceiver + changed = False + + # Update serial number if transceiver reports a different one + if ( + transceiver.usb_serial_number + and transceiver.usb_serial_number != self._usb_serial_number + ): + self._usb_serial_number = transceiver.usb_serial_number + changed = True + + # Update hardware/firmware versions if available + if transceiver.hw_version and transceiver.hw_version != self._hw_version: + self._hw_version = transceiver.hw_version + changed = True + if transceiver.fw_version and transceiver.fw_version != self._sw_version: + self._sw_version = transceiver.fw_version + changed = True + + if changed: + # Push updated info to the device registry so the UI reflects + # serial/version changes immediately (async_write_ha_state only + # updates the entity state, not the device entry). + registry = dr.async_get(self.hass) + registry.async_get_or_create( + config_entry_id=self._entry.entry_id, + identifiers={(DOMAIN, f"{self._entry.entry_id}_gateway")}, + serial_number=self._usb_serial_number, + hw_version=self._hw_version, + sw_version=self._sw_version, + ) + self.async_write_ha_state() + + @callback + def _handle_coordinator_update(self) -> None: + """Handle updated data from the coordinator.""" + new_status = self._connection_status() + self._update_gateway_device_info() + + # Only update the persisted status and fire events once HA is + # running. Coordinator updates can arrive during early startup + # before EVENT_HOMEASSISTANT_STARTED fires; ignoring them keeps the + # initial None (unknown) → connected/disconnected transition intact. + if self._ha_started: + if new_status != self._last_status: + old_status = self._last_status + _LOGGER.info("Gateway status: %s -> %s", old_status, new_status) + self._last_status = new_status + + registry = dr.async_get(self.hass) + device = registry.async_get_device( + identifiers={(DOMAIN, f"{self._entry.entry_id}_gateway")} + ) + device_id = device.id if device is not None else None + + event_data = { + "device_id": device_id, + "old_status": old_status, + "new_status": new_status, + "entry_id": self._entry.entry_id, + } + self.hass.bus.async_fire(EVENT_GATEWAY_STATUS_CHANGED, event_data) + if new_status == "connected": + self.hass.bus.async_fire(EVENT_GATEWAY_CONNECTED, event_data) + elif new_status == "disconnected": + self.hass.bus.async_fire(EVENT_GATEWAY_DISCONNECTED, event_data) + + self._current_status = new_status + + super()._handle_coordinator_update() + + async def async_added_to_hass(self) -> None: + """Called when entity is added to hass.""" + await super().async_added_to_hass() + + # Initialise last status. + self._last_status = self._connection_status() + + # Write the correct state once HA has fully started so the recorder + # captures a real unknown → connected transition. + # native_value returns None until this fires (see _current_status). + @callback + def _on_ha_started(_event: Any = None) -> None: + self._ha_started = True + self._handle_coordinator_update() + + if self.hass.state is CoreState.running: + # Added while HA was already fully running (e.g. via UI config flow). + # Defer by one event-loop tick so the entity is fully registered + # in the state machine before the write. + self.hass.loop.call_soon(_on_ha_started) + else: + # async_listen_once removes itself after firing — do NOT also wrap + # with async_on_remove or HA raises ValueError on the double-remove. + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _on_ha_started) + + # Listen for language/config changes to update translations dynamically. + @callback + def _handle_config_update(_event: Any) -> None: + """Handle core config updates (including language changes).""" + self.async_write_ha_state() + + self.async_on_remove( + self.hass.bus.async_listen(EVENT_CORE_CONFIG_UPDATE, _handle_config_update) + ) + + @property + def native_value(self) -> str | None: + """Return connection status key - translated by frontend via translation_key. + + Returns None before EVENT_HOMEASSISTANT_STARTED so the + recorder captures the state transition on first write. + """ + return self._current_status + + @property + def icon(self) -> str: + """Return icon based on connection status.""" + if self._current_status == "connected": + return "mdi:usb" + # None / disconnected + return "mdi:close-thick" + + @property + def available(self) -> bool: + """Gateway sensor is always available to show status.""" + # Gateway sensor should always be available so users can see the connection status. + return True + + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return extra state attributes with device details.""" + # Prefer the live device path from coordinator data over config entry + # data, since the path may change on reconnect. + coordinator_data = self.coordinator.data + device_path = ( + coordinator_data.get("device_path") + if isinstance(coordinator_data, dict) + else None + ) or self._entry.data.get(CONF_DEVICE_PATH) + attrs: dict[str, Any] = {"device_path": device_path} + + # Add serial number if available + if self._usb_serial_number and self._usb_serial_number != "unknown": + attrs["usb_serial_number"] = self._usb_serial_number + + # Add hardware version if available + if self._hw_version and self._hw_version not in ("unknown", "error"): + attrs["hardware_version"] = self._hw_version + + # Add firmware version if available + if self._sw_version and self._sw_version not in ("unknown", "error"): + attrs["firmware_version"] = self._sw_version + + # Add connection status + attrs["connected"] = self._connection_status() == "connected" + + return attrs + + @property + def device_info(self) -> DeviceInfo: + """Return device info for the gateway.""" + return DeviceInfo( + identifiers={(DOMAIN, f"{self._entry.entry_id}_gateway")}, + name=self._usb_product, + manufacturer=self._usb_manufacturer, + model=self._usb_product, + serial_number=( + self._usb_serial_number + if self._usb_serial_number != "unknown" + else None + ), + hw_version=self._hw_version, + sw_version=self._sw_version, + ) diff --git a/homeassistant/components/easywave/strings.json b/homeassistant/components/easywave/strings.json new file mode 100644 index 00000000000000..5fb744f8bad139 --- /dev/null +++ b/homeassistant/components/easywave/strings.json @@ -0,0 +1,44 @@ +{ + "config": { + "abort": { + "no_devices_found": "No serial ports found. Make sure the RX11 USB Transceiver is plugged in and recognized by the system.", + "single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]" + }, + "error": { + "device_no_longer_available": "The selected device is no longer available. Please try again." + }, + "step": { + "confirm": { + "description": "Detected device:\n**{name} ({serial_number})**\n\nPort: {device}", + "title": "Set up RX11 USB Transceiver" + }, + "user": { + "data": { + "device_path": "[%key:common::config_flow::data::usb_path%]" + }, + "data_description": { + "device_path": "The serial port your RX11 USB Transceiver is connected to." + }, + "description": "Select the serial port of the RX11 USB Transceiver.", + "title": "Select Easywave device" + } + } + }, + "entity": { + "sensor": { + "gateway_status": { + "name": "Connection Status", + "state": { + "connected": "Connected", + "disconnected": "Not Connected" + } + } + } + }, + "issues": { + "frequency_not_permitted": { + "description": "This Easywave hardware operates on **{frequency}**, which is **not permitted** in your configured Home Assistant region (**{country}**).\n\nThe integration has been disabled for regulatory compliance.\n\nIf you believe this is incorrect, verify your country setting under **Settings → System → General → Country**. The 868 MHz band is permitted in EU/EEA countries, the UK, Switzerland, Norway, Iceland, and Liechtenstein.", + "title": "Easywave: {frequency} not permitted in {country}" + } + } +} diff --git a/homeassistant/components/easywave/transceiver.py b/homeassistant/components/easywave/transceiver.py new file mode 100644 index 00000000000000..ec627cd7cfe6ec --- /dev/null +++ b/homeassistant/components/easywave/transceiver.py @@ -0,0 +1,534 @@ +"""Transceiver abstraction for Easywave Core.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +import contextlib +import logging +import time + +from easywave_home_control import RX11Device, RX11ErrorCode +import serial +import serial.tools.list_ports + +from homeassistant.core import HomeAssistant + +from .const import SUPPORTED_USB_IDS + +_LOGGER = logging.getLogger(__name__) + +_SERIAL_ERRORS: tuple[type[Exception], ...] = ( + serial.SerialException, + serial.SerialTimeoutException, +) +_SERIAL_OR_OS_ERRORS: tuple[type[Exception], ...] = ( + *_SERIAL_ERRORS, + OSError, +) + + +class RX11Transceiver: + """ELDAT RX11 USB Transceiver implementation. + + Provides robust Connect/Disconnect, Hardware/Firmware version queries, + and serial number tracking with device swap detection. + + Implements connection health monitoring and graceful disconnect/reconnect + callbacks for notifying listeners of connection state changes. + """ + + # Connection health + HEALTH_CHECK_INTERVAL = 30.0 # seconds + + def __init__(self, hass: HomeAssistant, device_path: str | None = None) -> None: + """Initialize RX11 transceiver.""" + self.hass = hass + self.device_path = device_path + self.is_connected = False + + # USB device identification (for device swap detection) + self.usb_serial_number: str | None = None + + # Version information + self.hw_version: str | None = None + self.fw_version: str | None = None + + # Connection tracking + self._last_disconnect_time: float | None = None + self._reconnect_attempts = 0 + + # Device instance for protocol communication (async) + self._device: RX11Device | None = None + + # Callbacks for connection state changes + self._disconnect_callback: Callable[[], None] | None = None + + # Lock for async operations + self._lock = asyncio.Lock() + + # Health check task + self._health_check_task: asyncio.Task | None = None + self._health_check_stopping = False + + # Disposed flag + self._disposed = False + + def set_disconnect_callback(self, callback: Callable[[], None] | None) -> None: + """Set callback to be called when disconnect/hardware error occurs.""" + self._disconnect_callback = callback + + def _notify_disconnect(self) -> None: + """Notify disconnect callback if registered.""" + if self._disconnect_callback: + try: + self._disconnect_callback() + except (OSError, RuntimeError) as err: + _LOGGER.error("Error in disconnect callback: %s", err) + + async def connect(self) -> bool: + """Connect to RX11 transceiver using RX11Device. + + 1. Searches for RX11 device by VID/PID + 2. Creates RX11Device instance + 3. Connects and queries device info + 4. Starts health check + + Returns True if connected, False if device not found/error. + """ + if self._disposed: + return False + + async with self._lock: + if self.is_connected: + _LOGGER.debug("Already connected to RX11") + return True + + try: + # If specific device path provided, try it first + if self.device_path: + if await self._try_connect_to_path(self.device_path): + await self._refresh_usb_identity() + return True + + # Configured path failed or not set — search by VID/PID + port_info = await self.hass.async_add_executor_job( + self._find_usb_device + ) + + if port_info is None: + if self._reconnect_attempts == 0: + _LOGGER.warning("EASYWAVE device not found") + self.is_connected = False + return False + + device_path, serial_number = port_info + self.device_path = device_path + self.usb_serial_number = serial_number + + # Connect using RX11Device + if not await self._try_connect_to_path(device_path): + if self._reconnect_attempts == 0: + _LOGGER.warning("Failed to connect to RX11 at %s", device_path) + self.is_connected = False + return False + + # Only reset the attempt counter after a successful connection + self._reconnect_attempts = 0 + + except _SERIAL_OR_OS_ERRORS as err: + _LOGGER.debug("Cannot connect to RX11: %s", err) + self.is_connected = False + return False + else: + return True + + async def _try_connect_to_path(self, device_path: str) -> bool: + """Try to connect to RX11 at a specific path using RX11Device. + + Follows the proven connect sequence: + 1. RX11Device() — creates RX11Device instance + 2. RX11Device.connect() — connects and starts serial handler thread + 3. Short settle time — let serial interface stabilize + 4. Query device info — verify connection is alive + 5. Start health check + + Returns True if connection successful, False otherwise. + """ + try: + # Create RX11Device instance directly + self._device = RX11Device(port=device_path) + + if not self._device: + return False + + # Connect to the device - this opens the serial connection and starts the handler thread + connect_ok = await self._device.connect() + if not connect_ok: + _LOGGER.warning("Failed to connect RX11Device at %s", device_path) + if self._device: + with contextlib.suppress(*_SERIAL_ERRORS, OSError): + await self._device.disconnect() + self._device = None + return False + + self.is_connected = True + self.device_path = device_path + + # Wait for serial interface to be fully ready (avoid race conditions) + # This matches the HACS settle time that prevents startup errors + await asyncio.sleep(1.5) + + # Fetch versions BEFORE starting any continuous receive loops + versions_ok = await self._ensure_versions_fetched() + if not versions_ok: + _LOGGER.warning( + "Failed to query RX11 versions at %s; disconnecting", + device_path, + ) + if self._device: + with contextlib.suppress(*_SERIAL_ERRORS, OSError): + await self._device.disconnect() + self._device = None + self.is_connected = False + self.device_path = None + return False + + _LOGGER.info( + "Connected to RX11 at %s (SN:%s, HW=%s, FW=%s)", + device_path, + self.usb_serial_number, + self.hw_version or "unknown", + self.fw_version or "unknown", + ) + + # Setup device callbacks for library-detected disconnects + self._setup_device_callbacks() + + # Start health check + await self._start_health_check() + + except _SERIAL_OR_OS_ERRORS as err: + _LOGGER.warning("Error connecting to RX11 at %s: %s", device_path, err) + if self._device: + with contextlib.suppress(*_SERIAL_ERRORS, OSError): + await self._device.disconnect() + self._device = None + return False + else: + return True + + async def _refresh_usb_identity(self) -> None: + """Read USB serial number from the connected port and update identity.""" + if not self.device_path: + return + try: + + def _scan(): + for port in serial.tools.list_ports.comports(): + if port.device == self.device_path: + return port + return None + + port = await self.hass.async_add_executor_job(_scan) + if port: + old_serial = self.usb_serial_number + self.usb_serial_number = ( + port.serial_number or self.usb_serial_number or "unknown" + ) + if old_serial and old_serial != self.usb_serial_number: + _LOGGER.info( + "Device swap detected: %s -> %s", + old_serial, + self.usb_serial_number, + ) + self.hw_version = None + self.fw_version = None + except _SERIAL_OR_OS_ERRORS as e: + _LOGGER.debug("Could not refresh USB identity: %s", e) + + async def disconnect(self) -> None: + """Disconnect from the RX11 transceiver.""" + async with self._lock: + # Stop health check + await self._stop_health_check() + + # Clear device callbacks + if self._device: + self._device.set_disconnect_callback(None) + self._device.set_reconnect_callback(None) + + # Disconnect device + if self._device: + with contextlib.suppress(*_SERIAL_ERRORS, OSError): + await self._device.disconnect() + self._device = None + + self.is_connected = False + self._last_disconnect_time = time.time() + + async def dispose(self) -> None: + """Dispose of resources and clean up.""" + if self._disposed: + return + self._disposed = True + await self.disconnect() + + async def _start_health_check(self) -> None: + """Start health check task for device connection monitoring.""" + if self._health_check_task: + return # Already running + + self._health_check_stopping = False + self._health_check_task = self.hass.async_create_background_task( + self._health_check_loop(), "easywave health check" + ) + + async def _stop_health_check(self) -> None: + """Stop health check task.""" + self._health_check_stopping = True + if self._health_check_task: + task = self._health_check_task + self._health_check_task = None + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + async def _health_check_loop(self) -> None: + """Monitor device connection health using periodic ping requests.""" + try: + consecutive_failures = 0 + max_consecutive_failures = 3 + + while not self._health_check_stopping: + try: + await asyncio.sleep(self.HEALTH_CHECK_INTERVAL) + + device = self._device # Local reference for thread safety + if not self.is_connected or not device: + consecutive_failures = 0 + continue + + # Health check: Ping device to verify connection is active + connected = False + with contextlib.suppress(*_SERIAL_OR_OS_ERRORS): + connected = await device.ping_request() + + if connected: + consecutive_failures = 0 + _LOGGER.debug("Device health check passed") + else: + consecutive_failures += 1 + _LOGGER.warning( + "Device health check failed (%d/%d)", + consecutive_failures, + max_consecutive_failures, + ) + + if consecutive_failures >= max_consecutive_failures: + _LOGGER.error( + "Device health check failed %d times, disconnecting", + max_consecutive_failures, + ) + await self._handle_device_disconnect() + break + + except asyncio.CancelledError: + break + except _SERIAL_OR_OS_ERRORS as e: + _LOGGER.debug("Error in health check: %s", e) + await asyncio.sleep(1.0) + finally: + self._health_check_task = None + + def _setup_device_callbacks(self) -> None: + """Setup device disconnect/reconnect callbacks.""" + if self._device: + # RxModule calls these when errors occur + self._device.set_disconnect_callback(self._on_device_disconnect) + self._device.set_reconnect_callback(self._on_device_reconnect) + + def _on_device_disconnect(self) -> None: + """Handle device disconnect detected by RxModule.""" + _LOGGER.warning("Device disconnect detected by RxModule") + + def _schedule_disconnect_handling() -> None: + """Schedule device disconnect handling on the event loop.""" + self.hass.async_create_task(self._handle_device_disconnect()) + + # Callback may be invoked from the library's serial handler thread; + # use call_soon_threadsafe to safely schedule onto the HA event loop. + self.hass.loop.call_soon_threadsafe(_schedule_disconnect_handling) + + def _on_device_reconnect(self) -> None: + """Handle device reconnect detected by RxModule.""" + _LOGGER.info("Device reconnect detected by RxModule") + + async def _handle_device_disconnect(self) -> None: + """Handle device disconnect on the Home Assistant event loop.""" + async with self._lock: + if not self.is_connected: + return + self.is_connected = False + + self._notify_disconnect() + + # Schedule full cleanup (stop health check, dispose device) outside + # the lock to avoid deadlocks. + async def _cleanup_disconnect() -> None: + """Perform full cleanup after a device-initiated disconnect.""" + await self.disconnect() + + self.hass.async_create_background_task( + _cleanup_disconnect(), "easywave device disconnect cleanup" + ) + + async def _ensure_versions_fetched(self) -> bool: + """Ensure hardware and firmware versions are fetched. + + Uses RxModule query methods to get hardware and firmware versions. + Includes retry with sleep between attempts. + + Returns True if at least one version was obtained successfully. + """ + if not self._device: + return False + + # Query hardware version with retry + hw_version = None + for attempt in range(3): + try: + result, hw_bytes = await self._device.query_hw_version(timeout=5.0) + if result == RX11ErrorCode.SUCCESS: + # Find null terminator and decode + null_idx = hw_bytes.find(0) + if null_idx >= 0: + hw_bytes = hw_bytes[:null_idx] + hw_str = hw_bytes.decode("ascii", errors="ignore").strip() + if hw_str: + hw_version = hw_str + break + except _SERIAL_OR_OS_ERRORS: + pass + + if attempt < 2: + wait_time = 0.5 * (attempt + 1) + _LOGGER.debug( + "Hardware version query failed (attempt %d/3), retrying in %.1fs", + attempt + 1, + wait_time, + ) + await asyncio.sleep(wait_time) + + # Query firmware version with retry + fw_version = None + for attempt in range(3): + try: + result, major, minor, incomplete = await self._device.query_fw_version( + timeout=5.0 + ) + if result == RX11ErrorCode.SUCCESS: + fw_version = f"{major}.{minor}" + if incomplete: + fw_version += " (incomplete)" + break + except _SERIAL_OR_OS_ERRORS: + pass + + if attempt < 2: + wait_time = 0.5 * (attempt + 1) + _LOGGER.debug( + "Firmware version query failed (attempt %d/3), retrying in %.1fs", + attempt + 1, + wait_time, + ) + await asyncio.sleep(wait_time) + + self.hw_version = hw_version or "unknown" + self.fw_version = fw_version or "unknown" + + if not hw_version and not fw_version: + _LOGGER.warning("Version query failed") + return False + return True + + def _find_usb_device(self) -> tuple[str, str] | None: + """Find EASYWAVE device by VID/PID. + + This is a blocking call and should only be executed in an executor. + Searches through all supported device types. + + Returns: (device_path, serial_number) or None if not found + """ + try: + ports = list(serial.tools.list_ports.comports()) + except _SERIAL_OR_OS_ERRORS: + _LOGGER.exception("Error while enumerating serial ports for EASYWAVE") + return None + + for port in ports: + if (port.vid, port.pid) in SUPPORTED_USB_IDS: + # Test open/close + if self._test_serial_port(port.device): + return ( + port.device, + port.serial_number or "unknown", + ) + + return None + + def _test_serial_port(self, port_path: str) -> bool: + """Test if a serial port can be opened (blocking).""" + try: + test_port = serial.Serial( + port=port_path, + baudrate=115200, + timeout=0.5, + ) + test_port.close() + except _SERIAL_ERRORS: + return False + else: + return True + + async def reconnect(self) -> bool: + """Reconnect to RX11 with delay and exponential backoff. + + Implements pattern from RxModule: + - Adds delay if recently disconnected for device reset + - Increments reconnect attempt counter + - Searches for the device again if needed + - Calls reconnect callback on success + + Returns: + True if connection successful, False otherwise + """ + if self._disposed: + return False + + try: + # Add delay if recently disconnected to allow device to reset + if self._last_disconnect_time: + time_since_disconnect = time.time() - self._last_disconnect_time + if time_since_disconnect < 1.0: # Less than 1 second + delay = 1.0 - time_since_disconnect + _LOGGER.debug("Waiting %.2fs for RX11 device to reset", delay) + await asyncio.sleep(delay) + + await self.disconnect() + self._reconnect_attempts += 1 + + if await self.connect(): + self._reconnect_attempts = 0 + return True + if self._reconnect_attempts == 1: + _LOGGER.debug( + "RX11 reconnect failed, retrying every %ds", + int(self.HEALTH_CHECK_INTERVAL), + ) + + except _SERIAL_ERRORS as err: + _LOGGER.debug("Error during reconnect: %s", err) + return False + else: + return False diff --git a/homeassistant/generated/config_flows.py b/homeassistant/generated/config_flows.py index 2ab646a1cb620f..36431e9c59a43a 100644 --- a/homeassistant/generated/config_flows.py +++ b/homeassistant/generated/config_flows.py @@ -174,6 +174,7 @@ "eafm", "earn_e_p1", "easyenergy", + "easywave", "ecobee", "ecoforest", "econet", diff --git a/homeassistant/generated/integrations.json b/homeassistant/generated/integrations.json index be5ca8a5fe8600..6510c7d6ed6807 100644 --- a/homeassistant/generated/integrations.json +++ b/homeassistant/generated/integrations.json @@ -1581,6 +1581,13 @@ "iot_class": "cloud_polling", "single_config_entry": true }, + "easywave": { + "name": "Easywave", + "integration_type": "device", + "config_flow": true, + "iot_class": "local_polling", + "single_config_entry": true + }, "ebox": { "name": "EBox", "integration_type": "hub", diff --git a/homeassistant/generated/usb.py b/homeassistant/generated/usb.py index 70da80846d8f5c..a18db9c768c131 100644 --- a/homeassistant/generated/usb.py +++ b/homeassistant/generated/usb.py @@ -4,6 +4,13 @@ """ USB = [ + { + "description": "*rx11*", + "domain": "easywave", + "manufacturer": "eldat*", + "pid": "1014", + "vid": "155A", + }, { "description": "*usb 300*", "domain": "enocean", diff --git a/requirements_all.txt b/requirements_all.txt index 48ca1650d978d1..22a8c9e5b91a94 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -860,6 +860,9 @@ earn-e-p1==0.1.0 # homeassistant.components.easyenergy easyenergy==2.2.0 +# homeassistant.components.easywave +easywave-home-control==0.1.1 + # homeassistant.components.ebusd ebusdpy==0.0.17 @@ -2474,6 +2477,9 @@ pysensibo==1.2.1 # homeassistant.components.senz pysenz==1.0.2 +# homeassistant.components.easywave +pyserial==3.5 + # homeassistant.components.sesame pysesame2==1.0.1 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 19ba408efa1c4a..94b1f3cf367b8a 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -769,6 +769,9 @@ earn-e-p1==0.1.0 # homeassistant.components.easyenergy easyenergy==2.2.0 +# homeassistant.components.easywave +easywave-home-control==0.1.1 + # homeassistant.components.egauge egauge-async==0.4.0 @@ -2118,6 +2121,9 @@ pysensibo==1.2.1 # homeassistant.components.senz pysenz==1.0.2 +# homeassistant.components.easywave +pyserial==3.5 + # homeassistant.components.seventeentrack pyseventeentrack==1.1.3 diff --git a/tests/components/easywave/__init__.py b/tests/components/easywave/__init__.py new file mode 100644 index 00000000000000..1352148a140cd8 --- /dev/null +++ b/tests/components/easywave/__init__.py @@ -0,0 +1 @@ +"""Tests for the Easywave Core integration.""" diff --git a/tests/components/easywave/conftest.py b/tests/components/easywave/conftest.py new file mode 100644 index 00000000000000..c3aecf636712f4 --- /dev/null +++ b/tests/components/easywave/conftest.py @@ -0,0 +1,83 @@ +"""Pytest configuration and fixtures for Easywave Core tests.""" + +from __future__ import annotations + +from collections.abc import Generator +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from homeassistant.components.easywave.const import ( + CONF_DEVICE_PATH, + CONF_USB_MANUFACTURER, + CONF_USB_PID, + CONF_USB_PRODUCT, + CONF_USB_SERIAL_NUMBER, + CONF_USB_VID, + DOMAIN, +) +from homeassistant.helpers.service_info.usb import UsbServiceInfo + +from tests.common import MockConfigEntry + +MOCK_ENTRY_DATA = { + CONF_DEVICE_PATH: "/dev/ttyACM0", + CONF_USB_VID: 0x155A, + CONF_USB_PID: 0x1014, + CONF_USB_SERIAL_NUMBER: "12345", + CONF_USB_MANUFACTURER: "ELDAT", + CONF_USB_PRODUCT: "RX11 USB Transceiver", +} + + +@pytest.fixture +def mock_config_entry() -> MockConfigEntry: + """Return a mock ConfigEntry.""" + return MockConfigEntry( + version=1, + domain=DOMAIN, + title="Easywave Gateway", + data=MOCK_ENTRY_DATA, + source="usb", + unique_id="easywave_12345", + ) + + +@pytest.fixture +def mock_usb_discovery_info() -> UsbServiceInfo: + """Return a mock USB discovery info.""" + return UsbServiceInfo( + device="/dev/ttyACM0", + vid="155A", + pid="1014", + serial_number="12345", + manufacturer="ELDAT", + description="RX11 USB Transceiver", + ) + + +@pytest.fixture +def mock_coordinator() -> MagicMock: + """Return a mock EasywaveCoordinator.""" + coordinator = MagicMock() + coordinator.async_setup = AsyncMock(return_value=True) + coordinator.async_shutdown = AsyncMock() + coordinator.async_add_listener = MagicMock(return_value=lambda: None) + coordinator.is_offline = False + coordinator.transceiver = MagicMock() + coordinator.transceiver.is_connected = True + coordinator.transceiver.usb_serial_number = "12345" + coordinator.transceiver.hw_version = "1.0" + coordinator.transceiver.fw_version = "2.0" + coordinator.transceiver.device_path = "/dev/ttyACM0" + return coordinator + + +@pytest.fixture +def mock_setup_entry() -> Generator[AsyncMock]: + """Override async_setup_entry.""" + with patch( + "homeassistant.components.easywave.async_setup_entry", + return_value=True, + ) as mock_setup: + yield mock_setup diff --git a/tests/components/easywave/test_config_flow.py b/tests/components/easywave/test_config_flow.py new file mode 100644 index 00000000000000..48dbc5ae46633e --- /dev/null +++ b/tests/components/easywave/test_config_flow.py @@ -0,0 +1,265 @@ +"""Tests for the config flow of the Easywave Core integration.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from homeassistant.components.easywave.const import ( + CONF_DEVICE_PATH, + CONF_USB_PID, + CONF_USB_SERIAL_NUMBER, + CONF_USB_VID, + DOMAIN, +) +from homeassistant.config_entries import SOURCE_USB, SOURCE_USER +from homeassistant.core import HomeAssistant +from homeassistant.data_entry_flow import FlowResultType +from homeassistant.helpers.service_info.usb import UsbServiceInfo + +from tests.common import MockConfigEntry + +pytestmark = pytest.mark.usefixtures("mock_setup_entry") + +COMPORTS_PATH = ( + "homeassistant.components.easywave.config_flow.serial.tools.list_ports.comports" +) + + +def _make_port( + device: str = "/dev/ttyACM0", + vid: int = 0x155A, + pid: int = 0x1014, + serial_number: str = "12345", + manufacturer: str = "ELDAT", + product: str = "RX11 USB Transceiver", +) -> MagicMock: + """Create a mock serial port object.""" + port = MagicMock() + port.device = device + port.vid = vid + port.pid = pid + port.serial_number = serial_number + port.manufacturer = manufacturer + port.product = product + return port + + +async def test_user_flow_no_devices(hass: HomeAssistant) -> None: + """Test user flow aborts when no serial ports are found.""" + with patch(COMPORTS_PATH, return_value=[]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "no_devices_found" + + +async def test_user_flow_select_port(hass: HomeAssistant) -> None: + """Test user flow shows port selection form and proceeds to confirm.""" + port = _make_port() + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_DEVICE_PATH: "/dev/ttyACM0"}, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "confirm" + + +async def test_user_flow_creates_entry(hass: HomeAssistant) -> None: + """Test user flow creates entry after port selection and confirmation.""" + port = _make_port() + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_DEVICE_PATH: "/dev/ttyACM0"}, + ) + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["title"] == "Easywave Gateway" + assert result["data"][CONF_DEVICE_PATH] == "/dev/ttyACM0" + assert result["data"][CONF_USB_VID] == 0x155A + assert result["data"][CONF_USB_PID] == 0x1014 + assert result["data"][CONF_USB_SERIAL_NUMBER] == "12345" + + +async def test_user_flow_multiple_ports(hass: HomeAssistant) -> None: + """Test user flow with multiple serial ports shows selection form.""" + port1 = _make_port() + port2 = _make_port(device="/dev/ttyACM1", serial_number="54321") + + with patch(COMPORTS_PATH, return_value=[port1, port2]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + + +async def test_user_flow_already_configured( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test user flow aborts when integration is already configured.""" + mock_config_entry.add_to_hass(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "single_instance_allowed" + + +async def test_user_flow_device_disappeared(hass: HomeAssistant) -> None: + """Test user step shows error when selected device is no longer available.""" + port1 = _make_port() + port2 = _make_port(device="/dev/ttyACM1", serial_number="54321") + + with patch(COMPORTS_PATH, return_value=[port1, port2]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + + # On re-scan the selected port is gone + port3 = _make_port(device="/dev/ttyACM2", serial_number="99999") + with patch(COMPORTS_PATH, return_value=[port2, port3]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_DEVICE_PATH: "/dev/ttyACM0"}, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["errors"] == {"base": "device_no_longer_available"} + + +async def test_usb_discovery_flow( + hass: HomeAssistant, mock_usb_discovery_info: UsbServiceInfo +) -> None: + """Test USB auto-discovery shows confirm form.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USB}, + data=mock_usb_discovery_info, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "confirm" + + +async def test_usb_discovery_flow_creates_entry( + hass: HomeAssistant, mock_usb_discovery_info: UsbServiceInfo +) -> None: + """Test USB discovery creates entry after confirmation.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USB}, + data=mock_usb_discovery_info, + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["title"] == "Easywave Gateway" + assert result["data"][CONF_USB_VID] == 0x155A + assert result["data"][CONF_USB_PID] == 0x1014 + + +async def test_usb_discovery_already_configured( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_usb_discovery_info: UsbServiceInfo, +) -> None: + """Test USB discovery aborts when already configured.""" + mock_config_entry.add_to_hass(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USB}, + data=mock_usb_discovery_info, + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "single_instance_allowed" + + +async def test_confirm_unique_id_from_vid_pid(hass: HomeAssistant) -> None: + """Test unique_id falls back to VID/PID when serial is unknown.""" + port = _make_port(serial_number="unknown") + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_DEVICE_PATH: "/dev/ttyACM0"}, + ) + + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["result"].unique_id == "easywave_155A_1014" + + +async def test_confirm_unique_id_from_device_path(hass: HomeAssistant) -> None: + """Test unique_id falls back to device path when serial is unknown and no VID/PID.""" + port = _make_port(vid=None, pid=None, serial_number="unknown") + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": SOURCE_USER} + ) + + with patch(COMPORTS_PATH, return_value=[port]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={CONF_DEVICE_PATH: "/dev/ttyACM0"}, + ) + + assert result["step_id"] == "confirm" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], user_input={} + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["result"].unique_id == "easywave__dev_ttyACM0" diff --git a/tests/components/easywave/test_const.py b/tests/components/easywave/test_const.py new file mode 100644 index 00000000000000..1686256dc4d576 --- /dev/null +++ b/tests/components/easywave/test_const.py @@ -0,0 +1,118 @@ +"""Tests for constants in the Easywave Core integration.""" + +from __future__ import annotations + +from homeassistant.components.easywave.const import ( + ALLOWED_COUNTRIES_868MHZ, + DOMAIN, + EVENT_GATEWAY_CONNECTED, + EVENT_GATEWAY_DISCONNECTED, + EVENT_GATEWAY_STATUS_CHANGED, + FREQUENCY_868MHZ, + FREQUENCY_ALLOWED_COUNTRIES, + SUPPORTED_USB_IDS, + USB_DEVICE_NAMES, + get_frequency_for_pid, + is_country_allowed_for_frequency, +) + + +def test_domain() -> None: + """Test DOMAIN constant.""" + assert DOMAIN == "easywave" + + +def test_usb_device_names() -> None: + """Test USB_DEVICE_NAMES constant.""" + assert isinstance(USB_DEVICE_NAMES, dict) + assert (0x155A, 0x1014) in USB_DEVICE_NAMES + + device_info = USB_DEVICE_NAMES[(0x155A, 0x1014)] + assert device_info["manufacturer"] == "ELDAT" + assert device_info["product"] == "RX11 USB Transceiver" + + +def test_supported_usb_ids() -> None: + """Test SUPPORTED_USB_IDS constant.""" + assert isinstance(SUPPORTED_USB_IDS, frozenset) + assert (0x155A, 0x1014) in SUPPORTED_USB_IDS + + +def test_frequency_868mhz() -> None: + """Test FREQUENCY_868MHZ constant.""" + assert FREQUENCY_868MHZ == "868 MHz" + + +def test_frequency_allowed_countries() -> None: + """Test FREQUENCY_ALLOWED_COUNTRIES constant.""" + assert isinstance(FREQUENCY_ALLOWED_COUNTRIES, dict) + assert FREQUENCY_868MHZ in FREQUENCY_ALLOWED_COUNTRIES + + allowed = FREQUENCY_ALLOWED_COUNTRIES[FREQUENCY_868MHZ] + assert isinstance(allowed, frozenset) + # Check some key countries + assert "DE" in allowed # Germany + assert "FR" in allowed # France + assert "GB" in allowed # UK + assert "CH" in allowed # Switzerland + + +def test_allowed_countries_868mhz_legacy() -> None: + """Test ALLOWED_COUNTRIES_868MHZ backward compatibility.""" + assert FREQUENCY_ALLOWED_COUNTRIES[FREQUENCY_868MHZ] == ALLOWED_COUNTRIES_868MHZ + assert "DE" in ALLOWED_COUNTRIES_868MHZ + + +def test_is_country_allowed_for_frequency_allowed() -> None: + """Test is_country_allowed_for_frequency with allowed country.""" + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "DE") is True + assert ( + is_country_allowed_for_frequency(FREQUENCY_868MHZ, "de") is True + ) # Case insensitive + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "FR") is True + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "GB") is True + + +def test_is_country_allowed_for_frequency_not_allowed() -> None: + """Test is_country_allowed_for_frequency with disallowed country.""" + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "US") is False + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "JP") is False + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "BR") is False + + +def test_is_country_allowed_for_frequency_none() -> None: + """Test is_country_allowed_for_frequency with None country.""" + # No country configured — cannot enforce, so allow + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, None) is True + + +def test_is_country_allowed_for_frequency_unknown_frequency() -> None: + """Test is_country_allowed_for_frequency with unknown frequency.""" + # Unknown frequency — conservative: allow + assert is_country_allowed_for_frequency("unknown_freq", "US") is True + + +def test_get_frequency_for_pid_rx11() -> None: + """Test get_frequency_for_pid for RX11.""" + assert get_frequency_for_pid(0x1014) == FREQUENCY_868MHZ + + +def test_get_frequency_for_pid_unknown() -> None: + """Test get_frequency_for_pid with unknown PID.""" + assert get_frequency_for_pid(0x9999) is None + assert get_frequency_for_pid(None) is None + + +def test_event_gateway_connected() -> None: + """Test EVENT_GATEWAY_CONNECTED constant.""" + assert EVENT_GATEWAY_CONNECTED == "easywave_gateway_connected" + + +def test_event_gateway_disconnected() -> None: + """Test EVENT_GATEWAY_DISCONNECTED constant.""" + assert EVENT_GATEWAY_DISCONNECTED == "easywave_gateway_disconnected" + + +def test_event_gateway_status_changed() -> None: + """Test EVENT_GATEWAY_STATUS_CHANGED constant.""" + assert EVENT_GATEWAY_STATUS_CHANGED == "easywave_gateway_status_changed" diff --git a/tests/components/easywave/test_coordinator.py b/tests/components/easywave/test_coordinator.py new file mode 100644 index 00000000000000..6f8cca76a341fb --- /dev/null +++ b/tests/components/easywave/test_coordinator.py @@ -0,0 +1,274 @@ +"""Tests for the Easywave coordinator.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from homeassistant.components.easywave.const import DEVICE_SCAN_INTERVAL, DOMAIN +from homeassistant.components.easywave.coordinator import EasywaveCoordinator +from homeassistant.core import HomeAssistant +from homeassistant.helpers.update_coordinator import UpdateFailed + +from tests.common import MockConfigEntry + + +@pytest.fixture +def mock_transceiver() -> MagicMock: + """Return a mock RX11Transceiver.""" + transceiver = MagicMock() + transceiver.is_connected = True + transceiver.device_path = "/dev/ttyACM0" + transceiver.usb_serial_number = "12345" + transceiver.hw_version = "1.0" + transceiver.fw_version = "2.0" + transceiver.connect = AsyncMock(return_value=True) + transceiver.reconnect = AsyncMock(return_value=True) + transceiver.disconnect = AsyncMock() + transceiver.dispose = AsyncMock() + transceiver.set_disconnect_callback = MagicMock() + return transceiver + + +@pytest.fixture +def mock_entry() -> MockConfigEntry: + """Return a mock config entry.""" + return MockConfigEntry( + domain=DOMAIN, + title="Easywave Gateway", + data={"device_path": "/dev/ttyACM0"}, + ) + + +@pytest.fixture +def coordinator( + hass: HomeAssistant, + mock_transceiver: MagicMock, + mock_entry: MockConfigEntry, +) -> EasywaveCoordinator: + """Return an EasywaveCoordinator instance.""" + mock_entry.add_to_hass(hass) + return EasywaveCoordinator(hass, mock_transceiver, mock_entry) + + +def test_coordinator_init( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, + mock_entry: MockConfigEntry, +) -> None: + """Test coordinator initialisation.""" + assert coordinator.transceiver is mock_transceiver + assert coordinator.config_entry is mock_entry + assert coordinator.name == DOMAIN + assert coordinator.update_interval == DEVICE_SCAN_INTERVAL + assert coordinator.is_offline is False + + +def test_coordinator_init_offline( + hass: HomeAssistant, + mock_entry: MockConfigEntry, +) -> None: + """Test coordinator initialises as offline when transceiver not connected.""" + mock_entry.add_to_hass(hass) + transceiver = MagicMock() + transceiver.is_connected = False + coord = EasywaveCoordinator(hass, transceiver, mock_entry) + assert coord.is_offline is True + + +# ── _async_setup ──────────────────────────────────────────────────────────── + + +async def test_async_setup_connected( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test successful setup when transceiver connects.""" + await coordinator._async_setup() + + assert coordinator.is_offline is False + mock_transceiver.connect.assert_awaited_once() + mock_transceiver.set_disconnect_callback.assert_called_once() + + +async def test_async_setup_offline( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test setup enters offline mode when transceiver cannot connect.""" + mock_transceiver.connect = AsyncMock(return_value=False) + + await coordinator._async_setup() + + assert coordinator.is_offline is True + mock_transceiver.set_disconnect_callback.assert_not_called() + + +async def test_async_setup_exception( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test setup raises UpdateFailed on exception.""" + mock_transceiver.connect = AsyncMock(side_effect=OSError("port error")) + + with pytest.raises(UpdateFailed): + await coordinator._async_setup() + + +# ── disconnect handling ───────────────────────────────────────────────────── + + +async def test_on_transceiver_disconnect( + hass: HomeAssistant, + coordinator: EasywaveCoordinator, +) -> None: + """Test _on_transceiver_disconnect schedules _handle_disconnect.""" + coordinator.is_offline = False + coordinator._on_transceiver_disconnect() + # Allow the call_soon_threadsafe callback to execute + await hass.async_block_till_done() + assert coordinator.is_offline is True + + +async def test_handle_disconnect_already_offline( + coordinator: EasywaveCoordinator, +) -> None: + """Test _handle_disconnect is a no-op when already offline.""" + coordinator.is_offline = True + # Should not raise or change anything + coordinator._handle_disconnect() + assert coordinator.is_offline is True + + +async def test_handle_disconnect_sets_offline( + coordinator: EasywaveCoordinator, +) -> None: + """Test _handle_disconnect marks offline and pushes data.""" + coordinator.is_offline = False + coordinator.async_set_updated_data = MagicMock() + + coordinator._handle_disconnect() + + assert coordinator.is_offline is True + coordinator.async_set_updated_data.assert_called_once_with( + { + "is_connected": False, + "device_path": None, + } + ) + + +# ── _async_update_data ────────────────────────────────────────────────────── + + +async def test_update_data_online( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test update returns connected data when online.""" + coordinator.is_offline = False + + data = await coordinator._async_update_data() + + assert data == { + "is_connected": True, + "device_path": "/dev/ttyACM0", + } + + +async def test_update_data_reconnect_success( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test update reconnects successfully from offline.""" + coordinator.is_offline = True + mock_transceiver.reconnect = AsyncMock(return_value=True) + + data = await coordinator._async_update_data() + + assert coordinator.is_offline is False + mock_transceiver.set_disconnect_callback.assert_called() + assert data["is_connected"] is True + + +async def test_update_data_reconnect_fails( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test update stays offline when reconnect fails.""" + coordinator.is_offline = True + mock_transceiver.reconnect = AsyncMock(return_value=False) + + data = await coordinator._async_update_data() + + assert coordinator.is_offline is True + assert data == {"is_connected": False, "device_path": None} + + +async def test_update_data_detects_lost_connection( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test update detects connection loss during poll.""" + coordinator.is_offline = False + mock_transceiver.is_connected = False + + data = await coordinator._async_update_data() + + assert coordinator.is_offline is True + assert data == {"is_connected": False, "device_path": None} + + +async def test_update_data_update_failed( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test UpdateFailed is re-raised and sets offline.""" + coordinator.is_offline = True + mock_transceiver.reconnect = AsyncMock(side_effect=UpdateFailed("fail")) + + with pytest.raises(UpdateFailed): + await coordinator._async_update_data() + + assert coordinator.is_offline is True + + +async def test_update_data_generic_exception( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test OS error during reconnect is wrapped in UpdateFailed.""" + coordinator.is_offline = True + mock_transceiver.reconnect = AsyncMock(side_effect=OSError("boom")) + + with pytest.raises(UpdateFailed, match="boom"): + await coordinator._async_update_data() + + assert coordinator.is_offline is True + + +# ── async_shutdown ────────────────────────────────────────────────────────── + + +async def test_async_shutdown( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test clean shutdown disposes transceiver.""" + await coordinator.async_shutdown() + + mock_transceiver.dispose.assert_awaited_once() + + +async def test_async_shutdown_error( + coordinator: EasywaveCoordinator, + mock_transceiver: MagicMock, +) -> None: + """Test shutdown handles errors gracefully.""" + mock_transceiver.dispose = AsyncMock(side_effect=OSError("port busy")) + + # Should not raise + await coordinator.async_shutdown() + + mock_transceiver.dispose.assert_awaited_once() diff --git a/tests/components/easywave/test_init.py b/tests/components/easywave/test_init.py new file mode 100644 index 00000000000000..b6cd7b6007f013 --- /dev/null +++ b/tests/components/easywave/test_init.py @@ -0,0 +1,145 @@ +"""Tests for the init module of the Easywave Core integration.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +from homeassistant.components.easywave import async_setup_entry, async_unload_entry +from homeassistant.components.easywave.const import DOMAIN +from homeassistant.core import HomeAssistant +from homeassistant.helpers import issue_registry as ir + +from tests.common import MockConfigEntry + + +def _patch_transceiver_and_coordinator() -> tuple: + """Return context managers patching RX11Transceiver and EasywaveCoordinator.""" + mock_transceiver = MagicMock() + mock_coordinator = AsyncMock() + mock_coordinator.async_config_entry_first_refresh = AsyncMock() + mock_coordinator.async_shutdown = AsyncMock() + + transceiver_patch = patch( + "homeassistant.components.easywave.RX11Transceiver", + return_value=mock_transceiver, + ) + coordinator_patch = patch( + "homeassistant.components.easywave.EasywaveCoordinator", + return_value=mock_coordinator, + ) + forward_patch = patch( + "homeassistant.config_entries.ConfigEntries.async_forward_entry_setups", + return_value=None, + ) + return transceiver_patch, coordinator_patch, forward_patch, mock_coordinator + + +async def test_setup_entry_success( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test successful setup of config entry.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "DE" + + t_patch, c_patch, f_patch, mock_coord = _patch_transceiver_and_coordinator() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + assert mock_coord.async_config_entry_first_refresh.called + assert mock_config_entry.runtime_data.coordinator is mock_coord + + +async def test_setup_entry_country_allowed( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test setup succeeds with allowed country.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "FR" + + t_patch, c_patch, f_patch, _ = _patch_transceiver_and_coordinator() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + + +async def test_setup_entry_country_not_allowed( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test setup returns False for disallowed country.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "US" + + result = await async_setup_entry(hass, mock_config_entry) + + assert result is False + + +async def test_setup_entry_creates_repair_issue( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test repair issue created when country is not allowed.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "US" + + result = await async_setup_entry(hass, mock_config_entry) + + assert result is False + issues = ir.async_get(hass) + issue = issues.async_get_issue( + DOMAIN, f"frequency_not_permitted_{mock_config_entry.entry_id}" + ) + assert issue is not None + assert issue.translation_key == "frequency_not_permitted" + + +async def test_setup_entry_deletes_stale_repair_issue( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test stale repair issue is removed on successful setup.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "DE" + + t_patch, c_patch, f_patch, _ = _patch_transceiver_and_coordinator() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + issues = ir.async_get(hass) + issue = issues.async_get_issue( + DOMAIN, f"frequency_not_permitted_{mock_config_entry.entry_id}" + ) + assert issue is None + + +async def test_setup_entry_no_country( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test setup succeeds when no country is configured.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = None + + t_patch, c_patch, f_patch, _ = _patch_transceiver_and_coordinator() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + + +async def test_unload_entry( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test unload of config entry shuts down coordinator.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "DE" + + t_patch, c_patch, f_patch, mock_coord = _patch_transceiver_and_coordinator() + with t_patch, c_patch, f_patch: + await async_setup_entry(hass, mock_config_entry) + + with patch.object(hass.config_entries, "async_unload_platforms", return_value=True): + result = await async_unload_entry(hass, mock_config_entry) + + assert result is True + assert mock_coord.async_shutdown.called diff --git a/tests/components/easywave/test_regulatory_compliance.py b/tests/components/easywave/test_regulatory_compliance.py new file mode 100644 index 00000000000000..18bbf0d2138f8b --- /dev/null +++ b/tests/components/easywave/test_regulatory_compliance.py @@ -0,0 +1,212 @@ +"""Tests for regulatory compliance in the Easywave Core integration.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, patch + +from homeassistant.components.easywave import async_setup_entry +from homeassistant.components.easywave.const import ( + DOMAIN, + FREQUENCY_868MHZ, + FREQUENCY_ALLOWED_COUNTRIES, + get_frequency_for_pid, + is_country_allowed_for_frequency, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers import issue_registry as ir + +from tests.common import MockConfigEntry + + +def _patch_for_successful_setup() -> tuple: + """Return context managers for a successful setup.""" + mock_coordinator = AsyncMock() + mock_coordinator.async_setup = AsyncMock(return_value=True) + mock_coordinator.async_shutdown = AsyncMock() + return ( + patch("homeassistant.components.easywave.RX11Transceiver"), + patch( + "homeassistant.components.easywave.EasywaveCoordinator", + return_value=mock_coordinator, + ), + patch( + "homeassistant.config_entries.ConfigEntries.async_forward_entry_setups", + return_value=None, + ), + ) + + +class TestCountryValidation: + """Test country validation for radio frequencies.""" + + def test_all_allowed_countries_in_frequency_list(self) -> None: + """Test that all expected 868MHz countries are in the list.""" + allowed = FREQUENCY_ALLOWED_COUNTRIES[FREQUENCY_868MHZ] + + essential_eu = {"DE", "FR", "IT", "ES", "NL", "BE", "AT", "CZ", "PL"} + assert essential_eu.issubset(allowed) + + nordic = {"SE", "NO", "DK", "FI"} + assert nordic.issubset(allowed) + + assert "GB" in allowed + + cept_non_eu = {"CH", "IS", "LI"} + assert cept_non_eu.issubset(allowed) + + def test_country_code_case_insensitive(self) -> None: + """Test that country code comparison is case-insensitive.""" + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "de") is True + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "DE") is True + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "De") is True + + def test_disallowed_countries(self) -> None: + """Test that non-CEPT countries are blocked.""" + for country in ("US", "JP", "CN", "BR", "AU", "RU", "IN"): + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, country) is False + + def test_none_country_allowed(self) -> None: + """Test that None country (not configured) is allowed.""" + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, None) is True + + def test_unknown_frequency_allowed(self) -> None: + """Test that unknown frequency is allowed (conservative).""" + assert is_country_allowed_for_frequency("unknown", "US") is True + + +class TestFrequencyDetection: + """Test frequency detection from USB device PID.""" + + def test_rx11_pid_returns_868mhz(self) -> None: + """Test that RX11 PID returns 868 MHz.""" + assert get_frequency_for_pid(0x1014) == FREQUENCY_868MHZ + + def test_unknown_pid_returns_none(self) -> None: + """Test that unknown PID returns None.""" + assert get_frequency_for_pid(0x9999) is None + + def test_none_pid_returns_none(self) -> None: + """Test that None PID returns None.""" + assert get_frequency_for_pid(None) is None + + +class TestIntegrationSetupCompliance: + """Test regulatory compliance enforcement during setup.""" + + async def test_setup_succeeds_with_allowed_country( + self, hass: HomeAssistant, mock_config_entry: MockConfigEntry + ) -> None: + """Test that setup succeeds when country is allowed.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "DE" + + t_patch, c_patch, f_patch = _patch_for_successful_setup() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + + async def test_setup_fails_with_disallowed_country( + self, hass: HomeAssistant, mock_config_entry: MockConfigEntry + ) -> None: + """Test that setup fails when country is not allowed.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "US" + + result = await async_setup_entry(hass, mock_config_entry) + + assert result is False + + async def test_setup_succeeds_with_no_country_configured( + self, hass: HomeAssistant, mock_config_entry: MockConfigEntry + ) -> None: + """Test that setup succeeds when no country is configured.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = None + + t_patch, c_patch, f_patch = _patch_for_successful_setup() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + + async def test_repair_issue_created_on_disallowed_country( + self, hass: HomeAssistant, mock_config_entry: MockConfigEntry + ) -> None: + """Test that a repair issue is created when country is not allowed.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "US" + + await async_setup_entry(hass, mock_config_entry) + + issues = ir.async_get(hass) + issue = issues.async_get_issue( + DOMAIN, f"frequency_not_permitted_{mock_config_entry.entry_id}" + ) + assert issue is not None + assert issue.translation_key == "frequency_not_permitted" + assert "868 MHz" in str(issue.translation_placeholders) + + async def test_stale_repair_issue_deleted_on_allowed_country( + self, hass: HomeAssistant, mock_config_entry: MockConfigEntry + ) -> None: + """Test that stale repair issues are removed when country is allowed.""" + mock_config_entry.add_to_hass(hass) + hass.config.country = "FR" + + t_patch, c_patch, f_patch = _patch_for_successful_setup() + with t_patch, c_patch, f_patch: + result = await async_setup_entry(hass, mock_config_entry) + + assert result is True + issues = ir.async_get(hass) + issue = issues.async_get_issue( + DOMAIN, f"frequency_not_permitted_{mock_config_entry.entry_id}" + ) + assert issue is None + + async def test_all_eu_countries_allowed( + self, hass: HomeAssistant, mock_config_entry: MockConfigEntry + ) -> None: + """Test that all EU member states are in the allowed list.""" + eu_countries = { + "AT", + "BE", + "BG", + "HR", + "CY", + "CZ", + "DK", + "EE", + "FI", + "FR", + "DE", + "GR", + "HU", + "IE", + "IT", + "LV", + "LT", + "LU", + "MT", + "NL", + "PL", + "PT", + "RO", + "SK", + "SI", + "ES", + "SE", + } + for country in eu_countries: + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, country) is True + + async def test_uk_and_post_brexit_aliases(self) -> None: + """Test that both GB and UK aliases work.""" + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "GB") is True + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, "UK") is True + + async def test_cept_non_eu_members_allowed(self) -> None: + """Test that non-EU CEPT members are allowed.""" + for country in ("CH", "NO", "IS", "LI"): + assert is_country_allowed_for_frequency(FREQUENCY_868MHZ, country) is True diff --git a/tests/components/easywave/test_sensor.py b/tests/components/easywave/test_sensor.py new file mode 100644 index 00000000000000..69ded71beca2b7 --- /dev/null +++ b/tests/components/easywave/test_sensor.py @@ -0,0 +1,398 @@ +"""Tests for the sensor platform of the Easywave Core integration.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from homeassistant.components.easywave import EasywaveRuntimeData +from homeassistant.components.easywave.const import ( + DOMAIN, + EVENT_GATEWAY_CONNECTED, + EVENT_GATEWAY_DISCONNECTED, + EVENT_GATEWAY_STATUS_CHANGED, +) +from homeassistant.components.easywave.sensor import ( + EasywaveGatewaySensor, + async_setup_entry, +) +from homeassistant.components.sensor import SensorDeviceClass +from homeassistant.const import EVENT_HOMEASSISTANT_STARTED +from homeassistant.core import CoreState, HomeAssistant +from homeassistant.helpers.entity import EntityCategory + +from tests.common import MockConfigEntry + + +@pytest.fixture +def gateway_sensor( + mock_config_entry: MockConfigEntry, + mock_coordinator: MagicMock, +) -> EasywaveGatewaySensor: + """Return a gateway sensor instance with a mocked coordinator.""" + return EasywaveGatewaySensor(mock_config_entry, mock_coordinator) + + +async def test_sensor_setup_entry( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_coordinator: MagicMock, +) -> None: + """Test sensor platform setup from runtime_data.""" + mock_config_entry.runtime_data = EasywaveRuntimeData( + coordinator=mock_coordinator, + frequency="868 MHz", + country="DE", + ) + async_add_entities = MagicMock() + + await async_setup_entry(hass, mock_config_entry, async_add_entities) + + assert async_add_entities.called + entities = async_add_entities.call_args[0][0] + assert len(entities) == 1 + assert isinstance(entities[0], EasywaveGatewaySensor) + assert entities[0].coordinator is mock_coordinator + + +def test_sensor_class_attributes(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test sensor class attributes.""" + assert gateway_sensor._attr_has_entity_name is True + assert gateway_sensor._attr_translation_key == "gateway_status" + assert gateway_sensor._attr_device_class == SensorDeviceClass.ENUM + assert gateway_sensor._attr_entity_category == EntityCategory.DIAGNOSTIC + assert "connected" in gateway_sensor._attr_options + assert "disconnected" in gateway_sensor._attr_options + + +def test_sensor_unique_id( + gateway_sensor: EasywaveGatewaySensor, + mock_config_entry: MockConfigEntry, +) -> None: + """Test sensor unique ID format.""" + assert ( + gateway_sensor._attr_unique_id == f"{mock_config_entry.entry_id}_rx11_gateway" + ) + + +def test_sensor_initial_state(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test sensor initial state is disconnected/None.""" + assert gateway_sensor._last_status == "disconnected" + assert gateway_sensor._current_status is None + + +def test_native_value_before_started(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test native_value is None before HA started (recorder sees transition).""" + assert gateway_sensor.native_value is None + + +def test_native_value_after_update(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test native_value returns current status after set.""" + gateway_sensor._current_status = "connected" + assert gateway_sensor.native_value == "connected" + + +def test_icon_connected(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test icon when connected.""" + gateway_sensor._current_status = "connected" + assert gateway_sensor.icon == "mdi:usb" + + +def test_icon_disconnected(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test icon when disconnected.""" + gateway_sensor._current_status = "disconnected" + assert gateway_sensor.icon == "mdi:close-thick" + + +def test_icon_none(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test icon when status is None.""" + gateway_sensor._current_status = None + assert gateway_sensor.icon == "mdi:close-thick" + + +def test_available_always_true(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test sensor availability is always True.""" + assert gateway_sensor.available is True + + +def test_extra_state_attributes(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test extra state attributes contain expected keys.""" + attrs = gateway_sensor.extra_state_attributes + assert "device_path" in attrs + assert "connected" in attrs + + +def test_extra_state_attributes_with_details( + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test extra state attributes include version info when available.""" + # The mock_coordinator fixture provides versions + attrs = gateway_sensor.extra_state_attributes + assert "usb_serial_number" in attrs + assert "hardware_version" in attrs + assert "firmware_version" in attrs + + +def test_device_info(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test device_info from USB device registry.""" + info = gateway_sensor.device_info + assert info is not None + assert info["name"] == "RX11 USB Transceiver" + assert info["manufacturer"] == "ELDAT" + + +def test_device_info_serial_number(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test device_info includes serial number.""" + info = gateway_sensor.device_info + assert info["serial_number"] == "12345" + + +def test_device_info_fallback_unknown_device( + mock_coordinator: MagicMock, +) -> None: + """Test device_info falls back to entry data for unknown VID/PID.""" + entry = MockConfigEntry( + domain=DOMAIN, + data={ + "device_path": "/dev/ttyACM0", + "usb_vid": 0x9999, + "usb_pid": 0x9999, + "usb_serial_number": "12345", + "usb_manufacturer": "ELDAT", + "usb_product": "RX11 USB Transceiver", + }, + ) + sensor = EasywaveGatewaySensor(entry, mock_coordinator) + info = sensor.device_info + assert info["manufacturer"] == "ELDAT" + + +def test_connection_status_connected(gateway_sensor: EasywaveGatewaySensor) -> None: + """Test _connection_status returns connected when transceiver is connected.""" + assert gateway_sensor._connection_status() == "connected" + + +def test_connection_status_disconnected_offline( + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test _connection_status returns disconnected when offline.""" + gateway_sensor.coordinator.is_offline = True + assert gateway_sensor._connection_status() == "disconnected" + + +def test_handle_coordinator_update( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test _handle_coordinator_update updates current_status.""" + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor._ha_started = True + + gateway_sensor._handle_coordinator_update() + + assert gateway_sensor._current_status == "connected" + assert gateway_sensor.async_write_ha_state.called + + +async def test_handle_coordinator_update_fires_connected_event( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test that transitioning to connected fires EVENT_GATEWAY_CONNECTED.""" + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor._ha_started = True + gateway_sensor._last_status = "disconnected" + + events: list[str] = [] + hass.bus.async_listen( + EVENT_GATEWAY_CONNECTED, lambda e: events.append(e.event_type) + ) + hass.bus.async_listen( + EVENT_GATEWAY_STATUS_CHANGED, lambda e: events.append(e.event_type) + ) + + gateway_sensor._handle_coordinator_update() + await hass.async_block_till_done() + + assert EVENT_GATEWAY_STATUS_CHANGED in events + assert EVENT_GATEWAY_CONNECTED in events + + +async def test_handle_coordinator_update_fires_disconnected_event( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test that transitioning to disconnected fires EVENT_GATEWAY_DISCONNECTED.""" + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor._ha_started = True + gateway_sensor._last_status = "connected" + gateway_sensor.coordinator.is_offline = True + gateway_sensor.coordinator.transceiver.is_connected = False + + events: list[str] = [] + hass.bus.async_listen( + EVENT_GATEWAY_DISCONNECTED, lambda e: events.append(e.event_type) + ) + hass.bus.async_listen( + EVENT_GATEWAY_STATUS_CHANGED, lambda e: events.append(e.event_type) + ) + + gateway_sensor._handle_coordinator_update() + await hass.async_block_till_done() + + assert EVENT_GATEWAY_STATUS_CHANGED in events + assert EVENT_GATEWAY_DISCONNECTED in events + + +# ── _connection_status edge cases ─────────────────────────────────────────── + + +def test_connection_status_transceiver_not_connected( + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test _connection_status falls through to disconnected when transceiver not connected.""" + gateway_sensor.coordinator.is_offline = False + gateway_sensor.coordinator.transceiver.is_connected = False + assert gateway_sensor._connection_status() == "disconnected" + + +def test_connection_status_transceiver_none( + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test _connection_status when transceiver is None.""" + gateway_sensor.coordinator.is_offline = False + gateway_sensor.coordinator.transceiver = None + assert gateway_sensor._connection_status() == "disconnected" + + +# ── _update_gateway_device_info ───────────────────────────────────────────── + + +def test_update_device_info_no_transceiver( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test _update_gateway_device_info returns early without transceiver.""" + gateway_sensor.hass = hass + gateway_sensor.coordinator.transceiver = None + # Should not raise + gateway_sensor._update_gateway_device_info() + + +def test_update_device_info_no_change( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test _update_gateway_device_info does nothing when values match.""" + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + # Values already match the mock (serial=12345, hw=1.0, fw=2.0) + gateway_sensor._update_gateway_device_info() + gateway_sensor.async_write_ha_state.assert_not_called() + + +def test_update_device_info_serial_change( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, + mock_config_entry: MockConfigEntry, +) -> None: + """Test _update_gateway_device_info updates serial number.""" + mock_config_entry.add_to_hass(hass) + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor.coordinator.transceiver.usb_serial_number = "NEW_SERIAL" + + with patch("homeassistant.components.easywave.sensor.dr.async_get") as mock_dr: + mock_registry = MagicMock() + mock_dr.return_value = mock_registry + + gateway_sensor._update_gateway_device_info() + + assert gateway_sensor._usb_serial_number == "NEW_SERIAL" + mock_registry.async_get_or_create.assert_called_once() + gateway_sensor.async_write_ha_state.assert_called_once() + + +def test_update_device_info_hw_version_change( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, + mock_config_entry: MockConfigEntry, +) -> None: + """Test _update_gateway_device_info updates hardware version.""" + mock_config_entry.add_to_hass(hass) + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor.coordinator.transceiver.hw_version = "3.5" + + with patch("homeassistant.components.easywave.sensor.dr.async_get") as mock_dr: + mock_dr.return_value = MagicMock() + gateway_sensor._update_gateway_device_info() + + assert gateway_sensor._hw_version == "3.5" + + +def test_update_device_info_fw_version_change( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, + mock_config_entry: MockConfigEntry, +) -> None: + """Test _update_gateway_device_info updates firmware version.""" + mock_config_entry.add_to_hass(hass) + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor.coordinator.transceiver.fw_version = "4.2" + + with patch("homeassistant.components.easywave.sensor.dr.async_get") as mock_dr: + mock_dr.return_value = MagicMock() + gateway_sensor._update_gateway_device_info() + + assert gateway_sensor._sw_version == "4.2" + + +# ── async_added_to_hass ──────────────────────────────────────────────────── + + +async def test_async_added_to_hass_running( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test async_added_to_hass when HA is already running.""" + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor.async_on_remove = MagicMock() + + await gateway_sensor.async_added_to_hass() + await hass.async_block_till_done() + + # _handle_coordinator_update should have been called via loop.call_soon + assert gateway_sensor._current_status is not None + # Should have registered listeners + assert gateway_sensor.async_on_remove.call_count >= 1 + + +async def test_async_added_to_hass_not_started( + hass: HomeAssistant, + gateway_sensor: EasywaveGatewaySensor, +) -> None: + """Test async_added_to_hass when HA has not started yet.""" + gateway_sensor.hass = hass + gateway_sensor.async_write_ha_state = MagicMock() + gateway_sensor.async_on_remove = MagicMock() + + # Simulate HA not yet fully running (still in starting phase) + hass.set_state(CoreState.starting) + await gateway_sensor.async_added_to_hass() + + # _current_status should still be None (waiting for STARTED event) + assert gateway_sensor._current_status is None + + # Now fire the start event + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + + # Now the handler should have run + assert gateway_sensor._current_status is not None diff --git a/tests/components/easywave/test_transceiver.py b/tests/components/easywave/test_transceiver.py new file mode 100644 index 00000000000000..45847cddc8d09e --- /dev/null +++ b/tests/components/easywave/test_transceiver.py @@ -0,0 +1,947 @@ +"""Tests for the Easywave RX11Transceiver.""" + +from __future__ import annotations + +import asyncio +import logging +import time +from unittest.mock import AsyncMock, MagicMock, patch + +from easywave_home_control import RX11ErrorCode +import pytest +import serial + +from homeassistant.components.easywave.transceiver import RX11Transceiver +from homeassistant.core import HomeAssistant + +DEVICE_PATH = "/dev/ttyACM0" + +# Simulated library return values +_HW_BYTES = b"RX11 v1.0\x00\x00" +_HW_OK = (RX11ErrorCode.SUCCESS, _HW_BYTES) +_HW_FAIL = (RX11ErrorCode.ERR_RF_TIMEOUT, b"") +_FW_OK = (RX11ErrorCode.SUCCESS, 2, 5, False) # major=2, minor=5 +_FW_INCOMPLETE = (RX11ErrorCode.SUCCESS, 1, 0, True) # incomplete firmware +_FW_FAIL = (RX11ErrorCode.ERR_RF_TIMEOUT, 0, 0, False) + +# Keep a reference to the real asyncio.sleep before any patching. +_real_sleep = asyncio.sleep + + +@pytest.fixture +def mock_device() -> MagicMock: + """Return a mock RX11Device with all required methods pre-configured.""" + device = MagicMock() + device.connect = AsyncMock(return_value=True) + device.disconnect = AsyncMock() + device.ping_request = AsyncMock(return_value=True) + device.query_hw_version = AsyncMock(return_value=_HW_OK) + device.query_fw_version = AsyncMock(return_value=_FW_OK) + device.set_disconnect_callback = MagicMock() + device.set_reconnect_callback = MagicMock() + return device + + +@pytest.fixture +def transceiver(hass: HomeAssistant) -> RX11Transceiver: + """Return an RX11Transceiver with an explicit device path.""" + return RX11Transceiver(hass, DEVICE_PATH) + + +def _patch_device(mock_device: MagicMock): + """Context manager: patch RX11Device constructor to return mock_device.""" + return patch( + "homeassistant.components.easywave.transceiver.RX11Device", + return_value=mock_device, + ) + + +def _patch_sleep(): + """Context manager: replace asyncio.sleep with a fast version. + + Uses the real asyncio.sleep(0) so the event loop still yields once per + call — this ensures task cancellation works correctly in tests that start + a background health-check task via connect(). + """ + + async def _fast_sleep(_delay: float) -> None: + await _real_sleep(0) + + return patch("asyncio.sleep", new=_fast_sleep) + + +# ── connect / explicit device path ─────────────────────────────────────────── + + +async def test_connect_success( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test successful connect sets state, fetches versions, and registers callbacks.""" + with _patch_device(mock_device), _patch_sleep(): + result = await transceiver.connect() + + assert result is True + assert transceiver.is_connected is True + assert transceiver.device_path == DEVICE_PATH + assert transceiver.hw_version == "RX11 v1.0" + assert transceiver.fw_version == "2.5" + mock_device.connect.assert_awaited_once() + mock_device.set_disconnect_callback.assert_called_once_with( + transceiver._on_device_disconnect + ) + mock_device.set_reconnect_callback.assert_called_once_with( + transceiver._on_device_reconnect + ) + await transceiver.disconnect() + + +async def test_connect_already_connected( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test connect is a no-op when already connected.""" + with _patch_device(mock_device), _patch_sleep(): + await transceiver.connect() + result = await transceiver.connect() + await transceiver.disconnect() + + assert result is True + mock_device.connect.assert_awaited_once() + + +async def test_connect_when_disposed( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test connect returns False immediately after dispose.""" + transceiver._disposed = True + + with _patch_device(mock_device): + result = await transceiver.connect() + + assert result is False + mock_device.connect.assert_not_called() + + +async def test_connect_device_refuses_connection( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test connect returns False when RX11Device.connect() returns False.""" + mock_device.connect = AsyncMock(return_value=False) + + with _patch_device(mock_device): + result = await transceiver.connect() + + assert result is False + assert transceiver.is_connected is False + mock_device.disconnect.assert_awaited_once() + + +async def test_connect_version_fetch_fails( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test connect disconnects and returns False when version fetch fails.""" + mock_device.query_hw_version = AsyncMock(return_value=_HW_FAIL) + mock_device.query_fw_version = AsyncMock(return_value=_FW_FAIL) + + with _patch_device(mock_device), _patch_sleep(): + result = await transceiver.connect() + + assert result is False + assert transceiver.is_connected is False + mock_device.disconnect.assert_awaited() + + +# ── connect / device discovery ──────────────────────────────────────────────── + + +async def test_connect_finds_device_by_vid_pid( + hass: HomeAssistant, + mock_device: MagicMock, +) -> None: + """Test connect scans by VID/PID when no device path is configured.""" + transceiver = RX11Transceiver(hass) + mock_port = MagicMock() + mock_port.vid = 0x155A + mock_port.pid = 0x1014 + mock_port.device = DEVICE_PATH + mock_port.serial_number = "SN-42" + + with ( + _patch_device(mock_device), + _patch_sleep(), + patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + return_value=[mock_port], + ), + patch("homeassistant.components.easywave.transceiver.serial.Serial"), + ): + result = await transceiver.connect() + await transceiver.disconnect() + + assert result is True + assert transceiver.device_path == DEVICE_PATH + assert transceiver.usb_serial_number == "SN-42" + + +async def test_connect_no_device_found( + hass: HomeAssistant, +) -> None: + """Test connect returns False when VID/PID scan finds nothing.""" + transceiver = RX11Transceiver(hass) + + with patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + return_value=[], + ): + result = await transceiver.connect() + + assert result is False + assert transceiver.is_connected is False + + +# ── disconnect ──────────────────────────────────────────────────────────────── + + +async def test_disconnect_clears_state( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test disconnect clears callbacks, device reference, and connected flag.""" + with _patch_device(mock_device), _patch_sleep(): + await transceiver.connect() + await transceiver.disconnect() + + assert transceiver.is_connected is False + assert transceiver._device is None + mock_device.set_disconnect_callback.assert_called_with(None) + mock_device.set_reconnect_callback.assert_called_with(None) + mock_device.disconnect.assert_awaited() + + +async def test_disconnect_stops_health_check( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test disconnect cancels the running health check task.""" + with _patch_device(mock_device), _patch_sleep(): + await transceiver.connect() + assert transceiver._health_check_task is not None + await transceiver.disconnect() + + assert transceiver._health_check_task is None + + +async def test_disconnect_idempotent( + transceiver: RX11Transceiver, +) -> None: + """Test disconnect can safely be called multiple times without error.""" + await transceiver.disconnect() + await transceiver.disconnect() + + assert transceiver.is_connected is False + + +# ── dispose ─────────────────────────────────────────────────────────────────── + + +async def test_dispose_sets_disposed_flag_and_disconnects( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test dispose marks the transceiver as disposed and disconnects device.""" + with _patch_device(mock_device), _patch_sleep(): + await transceiver.connect() + + await transceiver.dispose() + + assert transceiver._disposed is True + assert transceiver.is_connected is False + mock_device.disconnect.assert_awaited() + + +async def test_dispose_blocks_future_connect( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test connect returns False after dispose.""" + await transceiver.dispose() + + with _patch_device(mock_device): + result = await transceiver.connect() + + assert result is False + mock_device.connect.assert_not_called() + + +async def test_dispose_blocks_future_reconnect( + transceiver: RX11Transceiver, +) -> None: + """Test reconnect returns False after dispose.""" + await transceiver.dispose() + + result = await transceiver.reconnect() + + assert result is False + + +async def test_dispose_idempotent( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test second dispose call is a no-op — device disconnect is not called again.""" + with _patch_device(mock_device), _patch_sleep(): + await transceiver.connect() + + await transceiver.dispose() + disconnect_count = mock_device.disconnect.await_count + + await transceiver.dispose() + + assert mock_device.disconnect.await_count == disconnect_count + + +# ── reconnect ───────────────────────────────────────────────────────────────── + + +async def test_reconnect_success( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test reconnect disconnects and re-connects successfully.""" + with _patch_device(mock_device), _patch_sleep(): + result = await transceiver.reconnect() + await transceiver.disconnect() + + assert result is True + assert transceiver._reconnect_attempts == 0 + + +async def test_reconnect_when_disposed( + transceiver: RX11Transceiver, +) -> None: + """Test reconnect returns False immediately when disposed.""" + transceiver._disposed = True + + result = await transceiver.reconnect() + + assert result is False + + +async def test_reconnect_when_connect_fails( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test reconnect returns False and increments attempt counter on failure.""" + mock_device.connect = AsyncMock(return_value=False) + + with _patch_device(mock_device), _patch_sleep(): + result = await transceiver.reconnect() + + assert result is False + assert transceiver._reconnect_attempts == 1 + + +# ── health check ────────────────────────────────────────────────────────────── + + +async def test_health_check_started_after_connect( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test a health check task is started after a successful connect.""" + with _patch_device(mock_device), _patch_sleep(): + await transceiver.connect() + assert transceiver._health_check_task is not None + await transceiver.disconnect() + + +async def test_health_check_ping_success( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test health check loop exits cleanly when stopping is requested after one ping.""" + transceiver._device = mock_device + transceiver.is_connected = True + + call_count = 0 + + async def one_shot_sleep(_: float) -> None: + nonlocal call_count + call_count += 1 + if call_count >= 1: + transceiver._health_check_stopping = True + + with patch("asyncio.sleep", side_effect=one_shot_sleep): + await transceiver._health_check_loop() + + mock_device.ping_request.assert_awaited_once() + assert transceiver.is_connected is True + + +async def test_health_check_three_failures_trigger_disconnect( + hass: HomeAssistant, + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test health check triggers disconnect after 3 consecutive ping failures.""" + mock_device.ping_request = AsyncMock(return_value=False) + disconnect_callback = MagicMock() + transceiver.set_disconnect_callback(disconnect_callback) + transceiver._device = mock_device + transceiver.is_connected = True + + with _patch_sleep(): + await transceiver._health_check_loop() + + await hass.async_block_till_done() + + disconnect_callback.assert_called_once() + assert mock_device.ping_request.await_count == 3 + + +async def test_health_check_resets_counter_on_recovery( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test health check failure counter resets after a successful ping (no disconnect).""" + call_count = 0 + + async def alternating_ping() -> bool: + nonlocal call_count + call_count += 1 + if call_count < 3: + return False # Two consecutive failures + transceiver._health_check_stopping = True + return True # Recovery — counter resets + + mock_device.ping_request = AsyncMock(side_effect=alternating_ping) + transceiver._device = mock_device + transceiver.is_connected = True + + with _patch_sleep(): + await transceiver._health_check_loop() + + # Two consecutive failures (< 3) must not trigger disconnect + assert transceiver.is_connected is True + + +# ── device callbacks ────────────────────────────────────────────────────────── + + +async def test_on_device_disconnect_notifies_callback( + hass: HomeAssistant, + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test library disconnect callback notifies registered listeners.""" + callback = MagicMock() + transceiver.set_disconnect_callback(callback) + transceiver._device = mock_device + transceiver.is_connected = True + + transceiver._on_device_disconnect() + await hass.async_block_till_done() + + callback.assert_called_once() + + +async def test_on_device_reconnect_logs_info( + transceiver: RX11Transceiver, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test library reconnect callback logs an info-level message.""" + with caplog.at_level( + logging.INFO, + logger="homeassistant.components.easywave.transceiver", + ): + transceiver._on_device_reconnect() + + assert "reconnect" in caplog.text.lower() + + +# ── version fetch ───────────────────────────────────────────────────────────── + + +async def test_ensure_versions_fetched_success( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test _ensure_versions_fetched parses hardware and firmware version strings.""" + transceiver._device = mock_device + + result = await transceiver._ensure_versions_fetched() + + assert result is True + assert transceiver.hw_version == "RX11 v1.0" + assert transceiver.fw_version == "2.5" + + +async def test_ensure_versions_fetched_no_device( + transceiver: RX11Transceiver, +) -> None: + """Test _ensure_versions_fetched returns False when no device is available.""" + result = await transceiver._ensure_versions_fetched() + + assert result is False + + +async def test_ensure_versions_fetched_incomplete_firmware( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test _ensure_versions_fetched appends '(incomplete)' for incomplete firmware.""" + mock_device.query_fw_version = AsyncMock(return_value=_FW_INCOMPLETE) + transceiver._device = mock_device + + result = await transceiver._ensure_versions_fetched() + + assert result is True + assert transceiver.fw_version is not None + assert "incomplete" in transceiver.fw_version + + +async def test_ensure_versions_fetched_all_queries_fail( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test _ensure_versions_fetched returns False when all queries fail.""" + mock_device.query_hw_version = AsyncMock(return_value=_HW_FAIL) + mock_device.query_fw_version = AsyncMock(return_value=_FW_FAIL) + transceiver._device = mock_device + + with _patch_sleep(): + result = await transceiver._ensure_versions_fetched() + + assert result is False + + +# ── notify_disconnect error handling ───────────────────────────────────────── + + +async def test_notify_disconnect_callback_raises_oserror( + transceiver: RX11Transceiver, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test _notify_disconnect logs an error when the callback raises OSError.""" + transceiver.set_disconnect_callback(MagicMock(side_effect=OSError("fail"))) + + with caplog.at_level( + logging.ERROR, + logger="homeassistant.components.easywave.transceiver", + ): + transceiver._notify_disconnect() + + assert "Error in disconnect callback" in caplog.text + + +async def test_notify_disconnect_callback_raises_runtime_error( + transceiver: RX11Transceiver, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test _notify_disconnect logs an error when the callback raises RuntimeError.""" + transceiver.set_disconnect_callback(MagicMock(side_effect=RuntimeError("boom"))) + + with caplog.at_level( + logging.ERROR, + logger="homeassistant.components.easywave.transceiver", + ): + transceiver._notify_disconnect() + + assert "Error in disconnect callback" in caplog.text + + +# ── connect / VID-PID path failure branches ─────────────────────────────────── + + +async def test_connect_vid_pid_try_connect_fails( + hass: HomeAssistant, + mock_device: MagicMock, +) -> None: + """Test connect returns False when _try_connect_to_path fails after VID/PID discovery.""" + transceiver = RX11Transceiver(hass) + mock_port = MagicMock() + mock_port.vid = 0x155A + mock_port.pid = 0x1014 + mock_port.device = DEVICE_PATH + mock_port.serial_number = "SN-99" + + mock_device.connect = AsyncMock(return_value=False) + + with ( + _patch_device(mock_device), + _patch_sleep(), + patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + return_value=[mock_port], + ), + patch( + "homeassistant.components.easywave.transceiver.serial.Serial", + ), + ): + result = await transceiver.connect() + + assert result is False + assert transceiver.is_connected is False + + +async def test_connect_serial_error_during_vid_pid_search( + hass: HomeAssistant, +) -> None: + """Test connect returns False when comports() raises inside _find_usb_device.""" + + transceiver = RX11Transceiver(hass) # no device_path → goes to VID/PID search + + with patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + side_effect=serial.SerialException("port scan failed"), + ): + result = await transceiver.connect() + + assert result is False + + +# ── _try_connect_to_path / serial error ────────────────────────────────────── + + +async def test_try_connect_device_constructor_returns_none( + transceiver: RX11Transceiver, +) -> None: + """Test _try_connect_to_path returns False when device constructor returns None.""" + with patch( + "homeassistant.components.easywave.transceiver.RX11Device", + return_value=None, + ): + result = await transceiver._try_connect_to_path(DEVICE_PATH) + + assert result is False + + +async def test_try_connect_serial_error_on_connect( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test _try_connect_to_path returns False when library connect() raises SerialException.""" + + mock_device.connect = AsyncMock(side_effect=serial.SerialException("port error")) + + with _patch_device(mock_device): + result = await transceiver._try_connect_to_path(DEVICE_PATH) + + assert result is False + mock_device.disconnect.assert_awaited_once() + + +# ── _refresh_usb_identity ───────────────────────────────────────────────────── + + +async def test_refresh_usb_identity_no_device_path( + hass: HomeAssistant, +) -> None: + """Test _refresh_usb_identity returns immediately when no device_path is set.""" + transceiver = RX11Transceiver(hass) + assert transceiver.device_path is None + + # Should not raise + await transceiver._refresh_usb_identity() + + assert transceiver.usb_serial_number is None + + +async def test_refresh_usb_identity_port_found( + transceiver: RX11Transceiver, +) -> None: + """Test _refresh_usb_identity updates usb_serial_number from the port.""" + transceiver.usb_serial_number = None + mock_port = MagicMock() + mock_port.device = DEVICE_PATH + mock_port.serial_number = "SN-NEW" + + with patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + return_value=[mock_port], + ): + await transceiver._refresh_usb_identity() + + assert transceiver.usb_serial_number == "SN-NEW" + + +async def test_refresh_usb_identity_device_swap_detected( + transceiver: RX11Transceiver, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test _refresh_usb_identity detects a device swap and clears version cache.""" + transceiver.usb_serial_number = "OLD-SN" + transceiver.hw_version = "1.0" + transceiver.fw_version = "2.0" + mock_port = MagicMock() + mock_port.device = DEVICE_PATH + mock_port.serial_number = "NEW-SN" + + with ( + caplog.at_level( + logging.INFO, + logger="homeassistant.components.easywave.transceiver", + ), + patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + return_value=[mock_port], + ), + ): + await transceiver._refresh_usb_identity() + + assert transceiver.usb_serial_number == "NEW-SN" + assert transceiver.hw_version is None + assert transceiver.fw_version is None + assert "swap" in caplog.text.lower() + + +async def test_refresh_usb_identity_serial_error( + transceiver: RX11Transceiver, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test _refresh_usb_identity handles SerialException gracefully.""" + + with ( + caplog.at_level( + logging.DEBUG, + logger="homeassistant.components.easywave.transceiver", + ), + patch( + "homeassistant.components.easywave.transceiver.serial.tools.list_ports.comports", + side_effect=serial.SerialException("read error"), + ), + ): + await transceiver._refresh_usb_identity() + + assert "Could not refresh USB identity" in caplog.text + + +# ── _start_health_check / already running ──────────────────────────────────── + + +async def test_start_health_check_already_running( + transceiver: RX11Transceiver, +) -> None: + """Test _start_health_check is a no-op when a task is already running.""" + sentinel = MagicMock() + transceiver._health_check_task = sentinel + + await transceiver._start_health_check() + + assert transceiver._health_check_task is sentinel + + +# ── health check / edge cases ───────────────────────────────────────────────── + + +async def test_health_check_skips_when_not_connected( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test health check loop skips ping when transceiver is marked disconnected.""" + transceiver._device = mock_device + transceiver.is_connected = False # already offline + + call_count = 0 + + async def one_shot_sleep(_: float) -> None: + nonlocal call_count + call_count += 1 + transceiver._health_check_stopping = True + + with patch("asyncio.sleep", side_effect=one_shot_sleep): + await transceiver._health_check_loop() + + mock_device.ping_request.assert_not_called() + + +async def test_health_check_ping_raises_serial_exception( + hass: HomeAssistant, + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test health check treats ping SerialException as a failure (not a crash).""" + + ping_count = 0 + + async def raise_then_stop() -> bool: + nonlocal ping_count + ping_count += 1 + if ping_count < 3: + raise serial.SerialException("port gone") + # On 3rd call stop the loop; still counts as failure + transceiver._health_check_stopping = True + raise serial.SerialException("port gone") + + mock_device.ping_request = AsyncMock(side_effect=raise_then_stop) + transceiver._device = mock_device + transceiver.is_connected = True + + with _patch_sleep(): + await transceiver._health_check_loop() + + await hass.async_block_till_done() + + +async def test_health_check_outer_serial_exception( + transceiver: RX11Transceiver, + mock_device: MagicMock, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test health check recovers from an unexpected serial error via outer handler.""" + + call_count = 0 + + async def sleep_with_exception(_: float) -> None: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise serial.SerialException("unexpected error") + transceiver._health_check_stopping = True + + transceiver._device = mock_device + transceiver.is_connected = True + + with ( + caplog.at_level( + logging.DEBUG, + logger="homeassistant.components.easywave.transceiver", + ), + patch("asyncio.sleep", side_effect=sleep_with_exception), + ): + await transceiver._health_check_loop() + + assert "Error in health check" in caplog.text + + +# ── _handle_device_disconnect / already offline ─────────────────────────────── + + +async def test_handle_device_disconnect_already_offline( + transceiver: RX11Transceiver, +) -> None: + """Test _handle_device_disconnect is a no-op when already marked offline.""" + transceiver.is_connected = False + callback = MagicMock() + transceiver.set_disconnect_callback(callback) + + await transceiver._handle_device_disconnect() + + callback.assert_not_called() + + +# ── version fetch / exception paths ────────────────────────────────────────── + + +async def test_ensure_versions_fetched_hw_exception_then_success( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test _ensure_versions_fetched retries hw query after SerialException.""" + + call_count = 0 + + async def hw_fail_once(**kwargs: object) -> tuple: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise serial.SerialException("timeout") + return _HW_OK + + mock_device.query_hw_version = AsyncMock(side_effect=hw_fail_once) + transceiver._device = mock_device + + with _patch_sleep(): + result = await transceiver._ensure_versions_fetched() + + assert result is True + assert transceiver.hw_version == "RX11 v1.0" + + +async def test_ensure_versions_fetched_fw_exception_then_success( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test _ensure_versions_fetched retries fw query after SerialException.""" + + call_count = 0 + + async def fw_fail_once(**kwargs: object) -> tuple: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise serial.SerialException("timeout") + return _FW_OK + + mock_device.query_fw_version = AsyncMock(side_effect=fw_fail_once) + transceiver._device = mock_device + + with _patch_sleep(): + result = await transceiver._ensure_versions_fetched() + + assert result is True + assert transceiver.fw_version == "2.5" + + +# ── _test_serial_port / serial error ───────────────────────────────────────── + + +def test_test_serial_port_serial_exception( + transceiver: RX11Transceiver, +) -> None: + """Test _test_serial_port returns False when SerialException is raised.""" + + with patch( + "homeassistant.components.easywave.transceiver.serial.Serial", + side_effect=serial.SerialException("port busy"), + ): + result = transceiver._test_serial_port(DEVICE_PATH) + + assert result is False + + +# ── reconnect / delay and error paths ──────────────────────────────────────── + + +async def test_reconnect_applies_delay_after_recent_disconnect( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test reconnect waits for device reset when recently disconnected.""" + + # Simulate a very recent disconnect (0.1 seconds ago) + transceiver._last_disconnect_time = time.time() - 0.1 + + sleeps: list[float] = [] + + async def _fast_sleep_capture(delay: float) -> None: + sleeps.append(delay) + await _real_sleep(0) + + with _patch_device(mock_device), patch("asyncio.sleep", new=_fast_sleep_capture): + result = await transceiver.reconnect() + await transceiver.disconnect() + + assert result is True + # First sleep should be the reconnect delay (close to 0.9s) + assert sleeps[0] > 0.5 + + +async def test_reconnect_serial_error_returns_false( + transceiver: RX11Transceiver, + mock_device: MagicMock, +) -> None: + """Test reconnect returns False when a serial error occurs during disconnect.""" + + with patch.object( + transceiver, + "disconnect", + side_effect=serial.SerialException("port gone"), + ): + result = await transceiver.reconnect() + + assert result is False