Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CODEOWNERS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions homeassistant/components/specialized_turbo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Specialized Turbo BLE integration for Home Assistant."""

from __future__ import annotations

from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ADDRESS, Platform
from homeassistant.core import HomeAssistant

from .const import CONF_PIN
from .coordinator import SpecializedTurboCoordinator

PLATFORMS: list[Platform] = [Platform.SENSOR]

type SpecializedTurboConfigEntry = ConfigEntry[SpecializedTurboCoordinator]


async def async_setup_entry(
hass: HomeAssistant, entry: SpecializedTurboConfigEntry
) -> bool:
"""Set up Specialized Turbo from a config entry."""
address: str = entry.data[CONF_ADDRESS]
pin: str | None = entry.data.get(CONF_PIN)

coordinator = SpecializedTurboCoordinator(
hass,
address=address,
pin=pin,
)

entry.runtime_data = coordinator

await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

entry.async_on_unload(coordinator.async_start())

return True


async def async_unload_entry(
hass: HomeAssistant, entry: SpecializedTurboConfigEntry
) -> bool:
"""Unload a Specialized Turbo config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
await entry.runtime_data.async_shutdown()
return unload_ok
165 changes: 165 additions & 0 deletions homeassistant/components/specialized_turbo/config_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Config flow for Specialized Turbo integration."""

from __future__ import annotations

from typing import Any

from bleak import BleakClient
from bleak.exc import BleakError
from bleak_retry_connector import establish_connection
from specialized_turbo import is_specialized_advertisement
import voluptuous as vol

from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
async_ble_device_from_address,
async_discovered_service_info,
)
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_ADDRESS
from homeassistant.helpers.device_registry import format_mac

from .const import CONF_PIN, DOMAIN


class SpecializedTurboConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Specialized Turbo bikes."""

VERSION = 1

def __init__(self) -> None:
"""Initialize the config flow."""
self._discovery_info: BluetoothServiceInfoBleak | None = None
self._discovered_devices: dict[str, BluetoothServiceInfoBleak] = {}

async def _async_test_connection(self, address: str) -> bool:
"""Attempt a BLE connection to verify the device is reachable."""
ble_device = async_ble_device_from_address(self.hass, address, connectable=True)
if ble_device is None:
return False
try:
client = await establish_connection(BleakClient, ble_device, address)
await client.disconnect()
except BleakError, TimeoutError:
return False
return True

async def async_step_bluetooth(
self, discovery_info: BluetoothServiceInfoBleak
) -> ConfigFlowResult:
"""Handle a Bluetooth discovery."""
await self.async_set_unique_id(format_mac(discovery_info.address))
self._abort_if_unique_id_configured()

self._discovery_info = discovery_info
self.context["title_placeholders"] = {
"name": discovery_info.name or "Specialized Turbo",
"address": discovery_info.address,
}
return await self.async_step_bluetooth_confirm()

async def async_step_bluetooth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm Bluetooth discovery and collect PIN."""
assert self._discovery_info is not None
errors: dict[str, str] = {}

if user_input is not None:
if not await self._async_test_connection(self._discovery_info.address):
errors["base"] = "cannot_connect"
else:
return self.async_create_entry(
title=self._discovery_info.name or "Specialized Turbo",
data={
CONF_ADDRESS: self._discovery_info.address,
CONF_PIN: user_input.get(CONF_PIN),
},
)

return self.async_show_form(
step_id="bluetooth_confirm",
data_schema=vol.Schema(
{
vol.Optional(CONF_PIN): str,
}
),
description_placeholders={
"name": self._discovery_info.name or "Specialized Turbo",
"address": self._discovery_info.address,
},
errors=errors,
)

async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a user-initiated flow."""
errors: dict[str, str] = {}

if user_input is not None:
address = user_input[CONF_ADDRESS]
await self.async_set_unique_id(format_mac(address), raise_on_progress=False)
self._abort_if_unique_id_configured()

if not await self._async_test_connection(address):
errors["base"] = "cannot_connect"
else:
return self.async_create_entry(
title=self._discovered_devices[address].name or "Specialized Turbo",
data={
CONF_ADDRESS: address,
CONF_PIN: user_input.get(CONF_PIN),
},
)

# Discover available Specialized bikes
current_addresses = self._async_current_ids()
for info in async_discovered_service_info(self.hass):
if format_mac(info.address) in current_addresses:
continue
if _is_specialized_service_info(info):
self._discovered_devices[info.address] = info

if not self._discovered_devices:
return self.async_abort(reason="no_devices_found")

address_options = {
addr: f"{info.name or 'Specialized Turbo'} ({addr})"
for addr, info in self._discovered_devices.items()
}

return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): vol.In(address_options),
vol.Optional(CONF_PIN): str,
}
),
errors=errors,
)

async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reconfiguration to update the pairing PIN."""
if user_input is not None:
return self.async_update_reload_and_abort(
self._get_reconfigure_entry(),
data_updates={CONF_PIN: user_input.get(CONF_PIN)},
)

return self.async_show_form(
step_id="reconfigure",
data_schema=vol.Schema(
{
vol.Optional(CONF_PIN): str,
}
),
)


def _is_specialized_service_info(info: BluetoothServiceInfoBleak) -> bool:
"""Check if a BluetoothServiceInfoBleak is a Specialized bike."""
return bool(is_specialized_advertisement(info.manufacturer_data))
5 changes: 5 additions & 0 deletions homeassistant/components/specialized_turbo/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Constants for the Specialized Turbo integration."""

DOMAIN = "specialized_turbo"

CONF_PIN = "pin"
168 changes: 168 additions & 0 deletions homeassistant/components/specialized_turbo/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""BLE coordinator for Specialized Turbo bikes.

Connects over BLE, subscribes to GATT notifications, parses incoming
telemetry, and pushes updates to HA entities.
"""

from __future__ import annotations

import logging

from bleak import BleakClient, BleakError
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak_retry_connector import establish_connection
from specialized_turbo import CHAR_NOTIFY, TelemetrySnapshot, parse_message

from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.active_update_coordinator import (
ActiveBluetoothDataUpdateCoordinator,
)
from homeassistant.core import HomeAssistant, callback

_LOGGER = logging.getLogger(__name__)


class SpecializedTurboCoordinator(ActiveBluetoothDataUpdateCoordinator[None]):
"""Manages the BLE connection and notification subscription for one bike."""

def __init__(
self,
hass: HomeAssistant,
*,
address: str,
pin: str | None = None,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass=hass,
logger=_LOGGER,
address=address,
needs_poll_method=self._needs_poll,
poll_method=self._do_poll,
mode=bluetooth.BluetoothScanningMode.ACTIVE,
connectable=True,
)
self._address = address
self._pin = pin
self.snapshot = TelemetrySnapshot()
self._client: BleakClient | None = None
self._was_unavailable = False

@callback
def _needs_poll(
self,
service_info: bluetooth.BluetoothServiceInfoBleak,
seconds_since_last_update: float | None,
) -> bool:
"""True if we need to (re)connect to the bike."""
return self._client is None or not self._client.is_connected

async def _do_poll(
self,
service_info: bluetooth.BluetoothServiceInfoBleak | None = None,
) -> None:
"""Connect to the bike and subscribe to notifications."""
try:
await self._ensure_connected(service_info)
except BleakError as err:
_LOGGER.debug("BLE connection unavailable for %s: %s", self._address, err)
self._client = None
raise

async def _ensure_connected(
self,
service_info: bluetooth.BluetoothServiceInfoBleak | None = None,
) -> None:
"""Establish BLE connection and subscribe to notifications."""
if self._client and self._client.is_connected:
return

_LOGGER.debug("Connecting to Specialized Turbo at %s", self._address)

ble_device = (
service_info.device
if service_info is not None
else bluetooth.async_ble_device_from_address(
self.hass, self._address, connectable=True
)
)

if ble_device is None:
if not self._was_unavailable:
_LOGGER.info("Specialized Turbo at %s is unavailable", self._address)
self._was_unavailable = True
return

client = await establish_connection(
BleakClient,
ble_device,
self._address,
disconnected_callback=self._on_disconnect,
)
self._client = client

if self._was_unavailable:
_LOGGER.info("Specialized Turbo at %s is available again", self._address)
self._was_unavailable = False

# Trigger pairing if PIN is provided
if self._pin is not None:
try:
await client.pair(protection_level=2)
_LOGGER.debug("Pairing completed")
except NotImplementedError:
_LOGGER.debug("Backend does not support programmatic pairing")
except Exception: # noqa: BLE001
_LOGGER.warning("Pairing failed", exc_info=True)

# Subscribe to telemetry notifications
await client.start_notify(CHAR_NOTIFY, self._notification_handler)
_LOGGER.debug("Subscribed to telemetry notifications")

def _notification_handler(
self, sender: BleakGATTCharacteristic | int, data: bytearray
) -> None:
"""Parse a BLE notification and push the update to HA."""
try:
msg = parse_message(data)
except Exception: # noqa: BLE001
_LOGGER.debug("Failed to parse notification: %s", data.hex(), exc_info=True)
return

self.snapshot.update_from_message(msg)

if msg.field_name:
_LOGGER.debug("%s = %s %s", msg.field_name, msg.converted_value, msg.unit)
Comment on lines +132 to +135
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move notification parsing/state updates onto the Home Assistant event loop (or guard with a lock) to avoid mutating self.snapshot from a Bleak callback thread.

Copilot uses AI. Check for mistakes.

# Push the update to HA — schedule on the event loop since
# bleak notification callbacks may come from a background thread.
self.hass.loop.call_soon_threadsafe(self.async_update_listeners)

@property
def connected(self) -> bool:
"""Return True if the BLE client is connected."""
return self._client is not None and self._client.is_connected

def _on_disconnect(self, client: BleakClient) -> None:
"""Handle unexpected disconnection."""
if not self._was_unavailable:
_LOGGER.info("Disconnected from Specialized Turbo at %s", self._address)
self._was_unavailable = True
self._client = None
# Notify listeners so entities mark themselves unavailable.
# Schedule on the event loop since bleak disconnect callbacks
# may come from a background thread.
self.hass.loop.call_soon_threadsafe(self.async_update_listeners)

Comment on lines +146 to +156
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schedule disconnect state changes on the Home Assistant event loop (or protect them with a lock) so _client/availability flags aren't modified from a Bleak callback thread.

Suggested change
def _on_disconnect(self, client: BleakClient) -> None:
"""Handle unexpected disconnection."""
if not self._was_unavailable:
_LOGGER.info("Disconnected from Specialized Turbo at %s", self._address)
self._was_unavailable = True
self._client = None
# Notify listeners so entities mark themselves unavailable.
# Schedule on the event loop since bleak disconnect callbacks
# may come from a background thread.
self.hass.loop.call_soon_threadsafe(self.async_update_listeners)
@callback
def _handle_disconnect(self) -> None:
"""Handle unexpected disconnection on the HA event loop."""
if not self._was_unavailable:
_LOGGER.info("Disconnected from Specialized Turbo at %s", self._address)
self._was_unavailable = True
self._client = None
# Notify listeners so entities mark themselves unavailable.
self.async_update_listeners()
def _on_disconnect(self, client: BleakClient) -> None:
"""Schedule unexpected disconnection handling on the HA event loop."""
# bleak disconnect callbacks may come from a background thread.
self.hass.loop.call_soon_threadsafe(self._handle_disconnect)

Copilot uses AI. Check for mistakes.
async def async_shutdown(self) -> None:
"""Clean up BLE connection on unload."""
if self._client and self._client.is_connected:
try:
await self._client.stop_notify(CHAR_NOTIFY)
except Exception: # noqa: BLE001
_LOGGER.debug("Error stopping notifications", exc_info=True)
try:
await self._client.disconnect()
except Exception: # noqa: BLE001
_LOGGER.debug("Error disconnecting", exc_info=True)
self._client = None
Loading
Loading