From 7cfbdc4f2a0860957b84579bd1ff25c6b1a4e637 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 21:45:46 +0000 Subject: [PATCH] Add radio_frequency platform to Broadlink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exposes one radio_frequency.RadioFrequencyTransmitterEntity per RF-capable Broadlink device (RM Pro / RM4 Pro). The entity accepts a RadioFrequencyCommand, encodes its raw OOK timings into Broadlink's pulse-length wire format (type byte 0xB2 for the 433 MHz ISM band or 0xB4 for the 315 MHz band), and transmits via the existing Broadlink client's send_data() method. Non-OOK modulation and out-of-band frequencies are rejected with a HomeAssistantError before any bytes are sent. symbol_rate and output_power on the command are advisory and ignored — the Broadlink RF front-end is fixed-rate OOK-only hardware. The existing remote.send_command path is unchanged. --- homeassistant/components/broadlink/const.py | 1 + .../components/broadlink/radio_frequency.py | 139 ++++++++ .../components/broadlink/strings.json | 11 + .../broadlink/test_radio_frequency.py | 315 ++++++++++++++++++ 4 files changed, 466 insertions(+) create mode 100644 homeassistant/components/broadlink/radio_frequency.py create mode 100644 tests/components/broadlink/test_radio_frequency.py diff --git a/homeassistant/components/broadlink/const.py b/homeassistant/components/broadlink/const.py index 602a3693b7b355..e5865168fdf30b 100644 --- a/homeassistant/components/broadlink/const.py +++ b/homeassistant/components/broadlink/const.py @@ -7,6 +7,7 @@ DOMAINS_AND_TYPES = { Platform.CLIMATE: {"HYS"}, Platform.LIGHT: {"LB1", "LB2"}, + Platform.RADIO_FREQUENCY: {"RM4PRO", "RMPRO"}, Platform.REMOTE: {"RM4MINI", "RM4PRO", "RMMINI", "RMMINIB", "RMPRO"}, Platform.SELECT: {"HYS"}, Platform.SENSOR: { diff --git a/homeassistant/components/broadlink/radio_frequency.py b/homeassistant/components/broadlink/radio_frequency.py new file mode 100644 index 00000000000000..aefbb549fb36a7 --- /dev/null +++ b/homeassistant/components/broadlink/radio_frequency.py @@ -0,0 +1,139 @@ +"""Radio Frequency platform for Broadlink.""" + +from __future__ import annotations + +import logging + +from broadlink.exceptions import BroadlinkException +from rf_protocols import ModulationType, RadioFrequencyCommand + +from homeassistant.components.radio_frequency import RadioFrequencyTransmitterEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback + +from .const import DOMAIN +from .device import BroadlinkDevice +from .entity import BroadlinkEntity + +_LOGGER = logging.getLogger(__name__) + +PARALLEL_UPDATES = 0 + +_TICK_US = 32.84 + +_RF_433_TYPE_BYTE = 0xB2 +_RF_315_TYPE_BYTE = 0xB4 + +_RF_433_RANGE = (433_050_000, 434_790_000) +_RF_315_RANGE = (314_950_000, 315_250_000) + +SUPPORTED_FREQUENCY_RANGES: list[tuple[int, int]] = [_RF_433_RANGE, _RF_315_RANGE] + + +def _type_byte_for_frequency(frequency: int) -> int: + """Return the Broadlink RF type byte for a given carrier frequency.""" + if _RF_433_RANGE[0] <= frequency <= _RF_433_RANGE[1]: + return _RF_433_TYPE_BYTE + if _RF_315_RANGE[0] <= frequency <= _RF_315_RANGE[1]: + return _RF_315_TYPE_BYTE + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="frequency_not_supported", + translation_placeholders={"frequency": str(frequency)}, + ) + + +def encode_rf_packet( + *, + type_byte: int, + repeat_count: int, + timings_us: list[int], +) -> bytes: + """Encode raw OOK timings as a Broadlink RF pulse-length packet. + + The layout is:: + + byte 0 type byte (0xB2 for 433 MHz, 0xB4 for 315 MHz) + byte 1 repeat count (additional transmissions after the first) + bytes 2..3 payload length (little-endian), counted from byte 4 + bytes 4..N-1 pulses: 1 byte when ticks < 256, otherwise + 0x00 followed by a 2-byte big-endian tick count + + Each pulse is expressed as multiples of 32.84 µs ticks, which is the + timing resolution of the Broadlink RF front-end. + """ + buf = bytearray([type_byte, repeat_count & 0xFF, 0, 0]) + for duration in timings_us: + ticks = round(abs(duration) / _TICK_US) + div, mod = divmod(ticks, 256) + if div: + buf.append(0x00) + buf.append(div) + buf.append(mod) + payload_len = len(buf) - 4 + buf[2] = payload_len & 0xFF + buf[3] = (payload_len >> 8) & 0xFF + return bytes(buf) + + +async def async_setup_entry( + hass: HomeAssistant, + config_entry: ConfigEntry, + async_add_entities: AddConfigEntryEntitiesCallback, +) -> None: + """Set up a Broadlink radio frequency transmitter.""" + # Uses legacy hass.data[DOMAIN] pattern + # pylint: disable-next=hass-use-runtime-data + device: BroadlinkDevice = hass.data[DOMAIN].devices[config_entry.entry_id] + async_add_entities([BroadlinkRadioFrequency(device)]) + + +class BroadlinkRadioFrequency(BroadlinkEntity, RadioFrequencyTransmitterEntity): + """Representation of a Broadlink RF transmitter.""" + + _attr_has_entity_name = True + _attr_name = None + + def __init__(self, device: BroadlinkDevice) -> None: + """Initialize the entity.""" + super().__init__(device) + self._attr_unique_id = device.unique_id + + @property + def supported_frequency_ranges(self) -> list[tuple[int, int]]: + """Return the Broadlink-supported narrow RF bands.""" + return SUPPORTED_FREQUENCY_RANGES + + async def async_send_command(self, command: RadioFrequencyCommand) -> None: + """Encode an OOK command and transmit it via the Broadlink device.""" + if command.modulation is not ModulationType.OOK: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="modulation_not_supported", + translation_placeholders={"modulation": str(command.modulation)}, + ) + + type_byte = _type_byte_for_frequency(command.frequency) + packet = encode_rf_packet( + type_byte=type_byte, + repeat_count=command.repeat_count, + timings_us=command.get_raw_timings(), + ) + _LOGGER.debug( + "Transmitting RF packet: %d bytes on %d Hz (repeat=%d)", + len(packet), + command.frequency, + command.repeat_count, + ) + + device = self._device + try: + await device.async_request(device.api.send_data, packet) + except (BroadlinkException, OSError) as err: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="transmit_failed", + translation_placeholders={"error": str(err)}, + ) from err diff --git a/homeassistant/components/broadlink/strings.json b/homeassistant/components/broadlink/strings.json index a019f350ec066d..19ec812e144cf2 100644 --- a/homeassistant/components/broadlink/strings.json +++ b/homeassistant/components/broadlink/strings.json @@ -48,6 +48,17 @@ } } }, + "exceptions": { + "frequency_not_supported": { + "message": "Broadlink devices cannot transmit on {frequency} Hz" + }, + "modulation_not_supported": { + "message": "Broadlink devices only support OOK modulation, got {modulation}" + }, + "transmit_failed": { + "message": "Failed to transmit RF command: {error}" + } + }, "entity": { "select": { "day_of_week": { diff --git a/tests/components/broadlink/test_radio_frequency.py b/tests/components/broadlink/test_radio_frequency.py new file mode 100644 index 00000000000000..33579fce411e54 --- /dev/null +++ b/tests/components/broadlink/test_radio_frequency.py @@ -0,0 +1,315 @@ +"""Tests for the Broadlink radio_frequency platform.""" + +from __future__ import annotations + +from base64 import b64decode +from unittest.mock import MagicMock, call + +from broadlink.exceptions import BroadlinkException +import pytest +from rf_protocols import OOKCommand + +from homeassistant.components import radio_frequency +from homeassistant.components.broadlink.const import DOMAIN +from homeassistant.components.broadlink.radio_frequency import ( + _RF_315_TYPE_BYTE, + _RF_433_TYPE_BYTE, + _TICK_US, + encode_rf_packet, +) +from homeassistant.const import STATE_UNAVAILABLE, Platform +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers import device_registry as dr, entity_registry as er + +from . import get_device + +# Real-world learned captures used for the encoder round-trip. Each capture +# starts with the 0xB1 0xC0 learning wrapper, followed by sweep telemetry, +# followed by the actual OOK pulse bytes. The declared payload length field +# ends just inside the capture, so we slice to that length for the round trip. +_PLUS_CAPTURE_B64 = ( + "scDSAACfBgAEAAQDBAAC4PO/DA0WGQsNFxgMDBcYDA0WGQsNFhkLDRcZCwACAAsN" + "FxcNDBcYDAwXGAwMFxkLDRcYDAwXGAsAAgELDRcYDAwYGAsMGBcNDBcYCw0XGAwM" + "FxkLAAIADAwYFwwMFxgMDRcYCwwYGAwMFxgMDBgXDAACAAwNFhgMDBgYDAsYGAwM" + "FxgLDhYZCwwYGAwAAgALDBgXDQwWGQsNFxgMDBcZCwwYFwwNFxgLAAIADA0WGQsM" + "GBgMDBcXDQwYFwwNFhkLDBgYCwAF3A==" +) +_PLUS_PULSE_START = 18 # end of 14-byte sweep telemetry (00 9f 06 00 ... f3 bf) + +_MINUS_CAPTURE_B64 = ( + "scCcAACfBgAGAAFuBC4EUQQ59L8MDBgXDA0WGQsNFxkLDBcYDA0WGAwYCw4WAAIA" + "DAwYFw0MFhgMDRcYDAwYFwwMGBcNFwsOFgACAAwMGBcMDBgXDA0XGAsNFxgMDBgX" + "DBgMDBcAAgAMDBcZCwwYFw0MFhkLDRcYDAwXGAwMGBcNFwsOFgACAAwMGBcMDBgX" + "DA0XGAsNFxgMDBgXDBgMDBcAAgAMDBcZCwwYFw0MFhkLDRcYDAwXGAwMGBcXDQwX" + "AAIADAwXGAwMGBcNCxgXDQwWGQsNFxgMGAsNFwAF3A==" +) +_MINUS_PULSE_START = 20 # end of 16-byte sweep telemetry (00 9f 06 00 ... f4 bf) + +_RF_DEVICES = ["Office", "Garage"] # RMPRO / RM4PRO +_NON_RF_DEVICES = ["Entrance", "Living Room"] # RMMINI / RMMINIB + +_FREQ_433 = 433_920_000 +_FREQ_315 = 315_000_000 + + +def _ticks_to_timings(ticks: list[int]) -> list[int]: + """Convert Broadlink ticks to signed alternating microseconds. + + Even indices are marks (positive), odd indices are spaces (negative), + matching the rf_protocols raw-timings convention. + """ + return [ + round(value * _TICK_US) * (1 if index % 2 == 0 else -1) + for index, value in enumerate(ticks) + ] + + +def _decode_pulse_bytes(pulse_bytes: bytes) -> list[int]: + """Parse a slice of Broadlink pulse bytes into tick counts.""" + result: list[int] = [] + index = 0 + while index < len(pulse_bytes): + value = pulse_bytes[index] + index += 1 + if value == 0: + value = (pulse_bytes[index] << 8) | pulse_bytes[index + 1] + index += 2 + result.append(value) + return result + + +def _extract_capture(b64: str, pulse_start: int) -> tuple[int, bytes, list[int]]: + """Return (repeat_count, pulse_bytes, timings_us) from a learned capture.""" + raw = b64decode(b64) + repeat_count = raw[1] + payload_len = raw[2] | (raw[3] << 8) + pulse_bytes = raw[pulse_start : 4 + payload_len] + ticks = _decode_pulse_bytes(pulse_bytes) + return repeat_count, pulse_bytes, _ticks_to_timings(ticks) + + +async def test_radio_frequency_setup_for_rf_devices( + hass: HomeAssistant, + device_registry: dr.DeviceRegistry, + entity_registry: er.EntityRegistry, +) -> None: + """Test that RF-capable Broadlink devices get a radio_frequency entity.""" + for device in map(get_device, _RF_DEVICES): + mock_setup = await device.setup_entry(hass) + + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, mock_setup.entry.unique_id)} + ) + entries = er.async_entries_for_device(entity_registry, device_entry.id) + rf_entities = [ + entry for entry in entries if entry.domain == Platform.RADIO_FREQUENCY + ] + assert len(rf_entities) == 1 + assert rf_entities[0].unique_id == device.mac + + +async def test_radio_frequency_not_registered_for_non_rf_devices( + hass: HomeAssistant, + device_registry: dr.DeviceRegistry, + entity_registry: er.EntityRegistry, +) -> None: + """Test that non-RF Broadlink devices don't get a radio_frequency entity.""" + for device in map(get_device, _NON_RF_DEVICES): + mock_setup = await device.setup_entry(hass) + + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, mock_setup.entry.unique_id)} + ) + entries = er.async_entries_for_device(entity_registry, device_entry.id) + rf_entities = [ + entry for entry in entries if entry.domain == Platform.RADIO_FREQUENCY + ] + assert rf_entities == [] + + +async def test_encoder_matches_simple_fixture() -> None: + """Test the encoder against a hand-crafted packet with escape-encoded pulses.""" + # A short, hand-crafted OOK train: one small pulse (12 ticks = 394 µs), + # one escape-encoded pulse (300 ticks = 9852 µs), then another small one. + # Ticks: 12, 300, 12 -> bytes: 0x0c, 0x00 0x01 0x2c, 0x0c -> 5 bytes payload. + timings = [round(12 * _TICK_US), round(300 * _TICK_US), round(12 * _TICK_US)] + + packet = encode_rf_packet( + type_byte=_RF_433_TYPE_BYTE, + repeat_count=3, + timings_us=timings, + ) + + assert packet == bytes([0xB2, 0x03, 0x05, 0x00, 0x0C, 0x00, 0x01, 0x2C, 0x0C]) + + +@pytest.mark.parametrize( + ("b64", "pulse_start"), + [ + (_PLUS_CAPTURE_B64, _PLUS_PULSE_START), + (_MINUS_CAPTURE_B64, _MINUS_PULSE_START), + ], + ids=["plus", "minus"], +) +async def test_encoder_round_trips_real_capture(b64: str, pulse_start: int) -> None: + """Re-encoded transmit payload is byte-identical to the captured pulses. + + The captures start with a 0xB1 0xC0 learning wrapper plus sweep telemetry. + Strip the telemetry, decode the pulse bytes, feed the resulting timings + to the encoder with a 0xB2 transmit type byte, and assert the output + equals the original header + pulses slice. + """ + repeat_count, pulse_bytes, timings = _extract_capture(b64, pulse_start) + + encoded = encode_rf_packet( + type_byte=_RF_433_TYPE_BYTE, + repeat_count=repeat_count, + timings_us=timings, + ) + + expected = ( + bytes( + [ + _RF_433_TYPE_BYTE, + repeat_count, + len(pulse_bytes) & 0xFF, + (len(pulse_bytes) >> 8) & 0xFF, + ] + ) + + pulse_bytes + ) + assert encoded == expected + + +async def test_encoder_uses_315_mhz_type_byte() -> None: + """A 315 MHz-band encoded packet uses type byte 0xB4.""" + packet = encode_rf_packet( + type_byte=_RF_315_TYPE_BYTE, + repeat_count=0, + timings_us=[round(12 * _TICK_US), round(12 * _TICK_US)], + ) + assert packet[0] == 0xB4 + + +async def _setup_rf_device(hass: HomeAssistant) -> tuple[MagicMock, str]: + """Set up a single RF-capable Broadlink device, return its api mock and entity_id.""" + device = get_device("Office") # RMPRO + mock_setup = await device.setup_entry(hass) + entity_registry = er.async_get(hass) + device_registry = dr.async_get(hass) + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, mock_setup.entry.unique_id)} + ) + entries = er.async_entries_for_device(entity_registry, device_entry.id) + rf_entity = next( + entry for entry in entries if entry.domain == Platform.RADIO_FREQUENCY + ) + return mock_setup.api, rf_entity.entity_id + + +async def test_async_send_command_transmits_once(hass: HomeAssistant) -> None: + """Sending an OOK command invokes send_data once with the encoded packet.""" + api, entity_id = await _setup_rf_device(hass) + + timings = [400, -800, 400, -800] + command = OOKCommand(frequency=_FREQ_433, timings=timings) + await radio_frequency.async_send_command(hass, entity_id, command) + + expected_packet = encode_rf_packet( + type_byte=_RF_433_TYPE_BYTE, + repeat_count=0, + timings_us=timings, + ) + assert api.send_data.call_count == 1 + assert api.send_data.call_args == call(expected_packet) + + +async def test_async_send_command_uses_315_band(hass: HomeAssistant) -> None: + """A command on the 315 MHz band produces a packet with type byte 0xB4.""" + api, entity_id = await _setup_rf_device(hass) + + command = OOKCommand(frequency=_FREQ_315, timings=[400, -800]) + await radio_frequency.async_send_command(hass, entity_id, command) + + assert api.send_data.call_count == 1 + sent_packet = api.send_data.call_args.args[0] + assert sent_packet[0] == _RF_315_TYPE_BYTE + + +async def test_async_send_command_rejects_unsupported_frequency( + hass: HomeAssistant, +) -> None: + """A command outside the two Broadlink bands is rejected before send.""" + api, entity_id = await _setup_rf_device(hass) + + command = OOKCommand(frequency=868_000_000, timings=[400, -800]) + with pytest.raises(HomeAssistantError): + await radio_frequency.async_send_command(hass, entity_id, command) + + assert api.send_data.call_count == 0 + + +async def test_async_send_command_rejects_non_ook_modulation( + hass: HomeAssistant, +) -> None: + """A non-OOK command is rejected before send. + + The public domain ``async_send_command`` short-circuits via + ``supports_modulation`` before dispatching to the entity, so a + synthetic non-OOK command still raises ``HomeAssistantError`` and + ``send_data`` is never called. + """ + api, entity_id = await _setup_rf_device(hass) + + command = OOKCommand(frequency=_FREQ_433, timings=[400, -800]) + # Bypass the StrEnum to simulate a hypothetical future modulation. + command.modulation = "FSK" + + with pytest.raises(HomeAssistantError): + await radio_frequency.async_send_command(hass, entity_id, command) + + assert api.send_data.call_count == 0 + + +async def test_async_send_command_transmit_failure_raises( + hass: HomeAssistant, +) -> None: + """A broadlink exception from send_data surfaces as HomeAssistantError.""" + api, entity_id = await _setup_rf_device(hass) + + api.send_data.side_effect = BroadlinkException("nope") + + command = OOKCommand(frequency=_FREQ_433, timings=[400, -800]) + with pytest.raises(HomeAssistantError): + await radio_frequency.async_send_command(hass, entity_id, command) + + +async def test_radio_frequency_entity_availability(hass: HomeAssistant) -> None: + """The entity is unavailable when the underlying device is unavailable.""" + device = get_device("Office") + mock_setup = await device.setup_entry(hass) + + entity_registry = er.async_get(hass) + device_registry = dr.async_get(hass) + device_entry = device_registry.async_get_device( + identifiers={(DOMAIN, mock_setup.entry.unique_id)} + ) + entries = er.async_entries_for_device(entity_registry, device_entry.id) + rf_entity = next( + entry for entry in entries if entry.domain == Platform.RADIO_FREQUENCY + ) + + state = hass.states.get(rf_entity.entity_id) + assert state is not None + assert state.state != STATE_UNAVAILABLE + + # Force the coordinator into a failed state and trigger a refresh. + broadlink_device = hass.data[DOMAIN].devices[mock_setup.entry.entry_id] + broadlink_device.update_manager.available = False + broadlink_device.update_manager.coordinator.async_update_listeners() + await hass.async_block_till_done() + + state = hass.states.get(rf_entity.entity_id) + assert state is not None + assert state.state == STATE_UNAVAILABLE