diff --git a/music_assistant/providers/digitally_incorporated/__init__.py b/music_assistant/providers/digitally_incorporated/__init__.py index f85c88ed96..588cf8b051 100644 --- a/music_assistant/providers/digitally_incorporated/__init__.py +++ b/music_assistant/providers/digitally_incorporated/__init__.py @@ -9,13 +9,17 @@ - ClassicalRadio - ZenRadio -The provider requires a premium account and listen key for authentication. +The provider requires a premium account and listen key for streaming. Library +favorites are read from each network's listen host ``favorites.pls`` (read-only; +edit favorites on the service website). """ from __future__ import annotations +import re from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse import aiohttp from music_assistant_models.enums import ( @@ -73,6 +77,26 @@ CACHE_CHANNELS = 86400 # 24 hours CACHE_GENRES = 86400 # 24 hours CACHE_STREAM_URL = 3600 # 1 hour +CACHE_FAVORITES = 300 # 5 minutes + +# Favorites playlist on listen.* (premium tier; ``public3`` is legacy and not used here). +_FILE_LINE_RE = re.compile(r"^\s*File\d+\s*=\s*(.+?)\s*$", re.IGNORECASE) +_PLS_CHANNEL_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$", re.IGNORECASE) + +# Root ``/favorites.pls`` (no tier) often returns HTTP 406 — use premium only. +FAVORITES_PLS_PATHS: tuple[str, ...] = ("premium/favorites.pls",) +FAVORITES_PLS_EXTRA_PARAMS: dict[str, str] = {"download": "1"} + +# Premium PLS may use codec-specific mounts (``pianojazz_aac``) while API ``key`` is ``pianojazz``. +PLS_KEY_CODEC_SUFFIXES: tuple[str, ...] = ("_aac", "_mp3") + +# Optional short listen-host basename prefixes (besides ``3rdparty_{network}_`` / ``{network}_``). +NETWORK_PLS_SHORT_PREFIXES: dict[str, tuple[str, ...]] = { + "radiotunes": ("rt_",), + "rockradio": ("rr_",), + "classicalradio": ("cr_",), + "zenradio": ("zr_",), +} # Rate limiting RATE_LIMIT = 2 # requests per period @@ -139,7 +163,10 @@ async def get_config_entries( key="listen_key", type=ConfigEntryType.STRING, label="Listen Key", - description="Your premium listen key. Get this from your account settings.", + description=( + "Premium listen key from account settings. Used for playback and to load " + "your favorites (from favorites.pls); favorites cannot be edited here." + ), required=True, ) ) @@ -272,13 +299,25 @@ async def search( return results async def get_library_radios(self) -> AsyncGenerator[Radio, None]: - """Retrieve all radio stations from active networks.""" + """Retrieve user's favorite radio stations from active networks.""" for network_key in self._get_active_networks(): try: - channels = await self._get_channels(network_key) + channel_keys = await self._get_favorite_channel_keys(network_key) + if not channel_keys: + continue - for channel_data in channels: - yield self._channel_to_radio(channel_data, network_key) + channels = await self._get_channels(network_key) + channels_by_key = { + str(ch["key"]): ch for ch in channels if isinstance(ch, dict) and ch.get("key") + } + channels_lower_by_key = {k.lower(): v for k, v in channels_by_key.items()} + + for channel_key in channel_keys: + ch_data = self._channel_data_for_pls_key( + channel_key, channels_by_key, channels_lower_by_key + ) + if ch_data: + yield self._channel_to_radio(ch_data, network_key) except ( ProviderUnavailableError, @@ -288,7 +327,10 @@ async def get_library_radios(self) -> AsyncGenerator[Radio, None]: KeyError, ) as err: self.logger.debug( - "%s: Failed to get channels for network %s: %s", self.domain, network_key, err + "%s: Failed to get favorites for network %s: %s", + self.domain, + network_key, + err, ) continue @@ -459,7 +501,9 @@ async def _api_request( self, network_key: str, endpoint: str, + method: str = "GET", use_https: bool = True, + json_body: dict[str, Any] | None = None, **params: Any, ) -> Any: """Make a generic API request to Digitally Incorporated.""" @@ -471,7 +515,9 @@ async def _api_request( async with ( self._throttler, - self.mass.http_session.get(url, params=params, timeout=timeout) as resp, + self.mass.http_session.request( + method, url, params=params, json=json_body, timeout=timeout + ) as resp, ): if resp.status == 403: msg = f"{self.domain}: Access denied - check your listen key and subscription" @@ -484,8 +530,163 @@ async def _api_request( raise ProviderUnavailableError(msg) resp.raise_for_status() + + if resp.status == 204: + return None return await resp.json() + async def _fetch_favorites_pls(self, network_key: str) -> str: + """Download favorites.pls from the listen.* host for this network.""" + domain = NETWORKS[network_key]["domain"] + listen_key = self.config.get_value("listen_key") + timeout = aiohttp.ClientTimeout(total=API_TIMEOUT) + params: dict[str, str] = {"listen_key": str(listen_key), **FAVORITES_PLS_EXTRA_PARAMS} + try: + for rel in FAVORITES_PLS_PATHS: + url = f"https://listen.{domain}/{rel}" + async with ( + self._throttler, + self.mass.http_session.get(url, params=params, timeout=timeout) as resp, + ): + if resp.status == 200: + return await resp.text() + if resp.status == 403: + self.logger.debug( + "%s: favorites.pls access denied for network %s (%s)", + self.domain, + network_key, + rel, + ) + return "" + if resp.status in (404, 406): + continue + if resp.status >= 500: + self.logger.debug( + "%s: favorites.pls HTTP %s for network %s (%s)", + self.domain, + resp.status, + network_key, + rel, + ) + return "" + resp.raise_for_status() + self.logger.debug( + "%s: favorites.pls not usable for network %s (tried %s)", + self.domain, + network_key, + ", ".join(FAVORITES_PLS_PATHS), + ) + return "" + except (aiohttp.ClientError, aiohttp.ServerTimeoutError) as err: + self.logger.debug( + "%s: favorites.pls request failed for network %s: %s", + self.domain, + network_key, + err, + ) + return "" + + def _pls_url_basename(self, stream_url: str) -> str | None: + """Last path segment of a stream URL (stream mount / slug).""" + parsed = urlparse(stream_url.strip()) + path = parsed.path.strip("/") + if not path: + return None + return path.rsplit("/", maxsplit=1)[-1] + + def _pls_basename_prefixes(self, network_key: str) -> tuple[str, ...]: + """Prefixes to strip from PLS stream basenames; longest match wins.""" + parts: list[str] = [] + parts.extend(NETWORK_PLS_SHORT_PREFIXES.get(network_key, ())) + parts.append(f"3rdparty_{network_key}_") + parts.append(f"{network_key}_") + # Dedupe, then longest first so e.g. ``3rdparty_jazzradio_`` beats ``jazzradio_``. + unique = dict.fromkeys(parts) + return tuple(sorted(unique, key=len, reverse=True)) + + def _stream_url_to_channel_key(self, stream_url: str, network_key: str) -> str | None: + """Map a favorites.pls stream URL to a slug we can match to API channel ``key``.""" + basename = self._pls_url_basename(stream_url) + if not basename: + return None + for prefix in self._pls_basename_prefixes(network_key): + if basename.startswith(prefix): + key = basename[len(prefix) :] + if key: + return key + # Premium PLS often uses the API channel key alone (e.g. ``ambient``), not ``di_ambient``. + if _PLS_CHANNEL_SLUG_RE.fullmatch(basename): + return basename + return None + + def _channel_data_for_pls_key( + self, + pls_key: str, + channels_by_key: dict[str, dict[str, Any]], + channels_lower_by_key: dict[str, dict[str, Any]], + ) -> dict[str, Any] | None: + """Match a PLS-derived slug to channel JSON (case-insensitive; ``*_aac`` / ``*_mp3``).""" + if pls_key in channels_by_key: + return channels_by_key[pls_key] + # Build a lowercase-keyed view for case-insensitive matching. + lower_channels_by_key = {key.lower(): value for key, value in channels_by_key.items()} + pls_lower = pls_key.lower() + if pls_lower in channels_lower_by_key: + return channels_lower_by_key[pls_lower] + for suffix in PLS_KEY_CODEC_SUFFIXES: + if pls_lower.endswith(suffix): + trimmed_lower = pls_lower[: -len(suffix)] + if trimmed_lower in channels_lower_by_key: + return channels_lower_by_key[trimmed_lower] + return None + + def _parse_favorites_pls_channel_keys(self, pls_body: str, network_key: str) -> list[str]: + """Parse PLS text; return ordered unique channel keys derived from FileN= URLs.""" + ordered: list[str] = [] + seen_keys: set[str] = set() + unmapped_basenames: set[str] = set() + for line in pls_body.splitlines(): + match = _FILE_LINE_RE.match(line) + if not match: + continue + raw_url = match.group(1) + channel_key = self._stream_url_to_channel_key(raw_url, network_key) + if channel_key and channel_key not in seen_keys: + seen_keys.add(channel_key) + ordered.append(channel_key) + elif not channel_key: + bn = self._pls_url_basename(raw_url) + if bn: + unmapped_basenames.add(bn) + if unmapped_basenames: + sample = ", ".join(sorted(unmapped_basenames)[:12]) + extra = len(unmapped_basenames) - 12 + more = f" (+{extra} more)" if extra > 0 else "" + self.logger.debug( + "%s: PLS for network %s: could not map %d path(s) to channel keys: %s%s", + self.domain, + network_key, + len(unmapped_basenames), + sample, + more, + ) + return ordered + + @use_cache(CACHE_FAVORITES) + async def _get_favorite_channel_keys(self, network_key: str) -> list[str]: + """Channel keys for user favorites, from listen host favorites.pls.""" + pls_body = await self._fetch_favorites_pls(network_key) + if not pls_body.strip(): + return [] + keys = self._parse_favorites_pls_channel_keys(pls_body, network_key) + self.logger.debug( + "%s: Parsed %d favorite channel keys for network %s from PLS", + self.domain, + len(keys), + network_key, + ) + return keys + @use_cache(CACHE_CHANNELS) async def _get_channels(self, network_key: str) -> list[dict[str, Any]]: """Get channels for a specific network with enriched genre data.""" @@ -618,19 +819,23 @@ async def _get_stream_url(self, network_key: str, channel_key: str) -> str: raise ProviderUnavailableError(msg) try: - params = {"listen_key": listen_key} playlist = await self._api_request( - network_key, f"listen/premium_high/{channel_key}", use_https=True, **params + network_key, + f"listen/premium_high/{channel_key}", + method="GET", + use_https=True, + listen_key=str(listen_key), ) # Use the first stream URL from the playlist - self.logger.debug( - "%s: Digitally Incorporated playlist returned %d URLs", self.domain, len(playlist) - ) if not playlist or not isinstance(playlist, list): msg = f"{self.domain}: No stream URLs returned from Digitally Incorporated API" raise MediaNotFoundError(msg) + self.logger.debug( + "%s: Digitally Incorporated playlist returned %d URLs", self.domain, len(playlist) + ) + # Log all available URLs for debugging for i, url in enumerate(playlist): self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url)