Skip to content
231 changes: 218 additions & 13 deletions music_assistant/providers/digitally_incorporated/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
- ClassicalRadio
- ZenRadio

The provider requires a premium account and listen key for authentication.
The provider requires a premium account and listen key for streaming. Library
favorites are read from each network's listen host ``favorites.pls`` (read-only;
edit favorites on the service website).
"""

from __future__ import annotations

import re
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

import aiohttp
from music_assistant_models.enums import (
Expand Down Expand Up @@ -73,6 +77,26 @@
CACHE_CHANNELS = 86400 # 24 hours
CACHE_GENRES = 86400 # 24 hours
CACHE_STREAM_URL = 3600 # 1 hour
CACHE_FAVORITES = 300 # 5 minutes

# Favorites playlist on listen.* (premium tier; ``public3`` is legacy and not used here).
_FILE_LINE_RE = re.compile(r"^\s*File\d+\s*=\s*(.+?)\s*$", re.IGNORECASE)
_PLS_CHANNEL_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$", re.IGNORECASE)

# Root ``/favorites.pls`` (no tier) often returns HTTP 406 — use premium only.
FAVORITES_PLS_PATHS: tuple[str, ...] = ("premium/favorites.pls",)
FAVORITES_PLS_EXTRA_PARAMS: dict[str, str] = {"download": "1"}

# Premium PLS may use codec-specific mounts (``pianojazz_aac``) while API ``key`` is ``pianojazz``.
PLS_KEY_CODEC_SUFFIXES: tuple[str, ...] = ("_aac", "_mp3")

# Optional short listen-host basename prefixes (besides ``3rdparty_{network}_`` / ``{network}_``).
NETWORK_PLS_SHORT_PREFIXES: dict[str, tuple[str, ...]] = {
"radiotunes": ("rt_",),
"rockradio": ("rr_",),
"classicalradio": ("cr_",),
"zenradio": ("zr_",),
}

# Rate limiting
RATE_LIMIT = 2 # requests per period
Expand Down Expand Up @@ -139,7 +163,10 @@ async def get_config_entries(
key="listen_key",
type=ConfigEntryType.STRING,
label="Listen Key",
description="Your premium listen key. Get this from your account settings.",
description=(
"Premium listen key from account settings. Used for playback and to load "
"your favorites (from favorites.pls); favorites cannot be edited here."
),
required=True,
)
)
Expand Down Expand Up @@ -272,13 +299,25 @@ async def search(
return results

async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
"""Retrieve all radio stations from active networks."""
"""Retrieve user's favorite radio stations from active networks."""
for network_key in self._get_active_networks():
try:
channels = await self._get_channels(network_key)
channel_keys = await self._get_favorite_channel_keys(network_key)
if not channel_keys:
continue

for channel_data in channels:
yield self._channel_to_radio(channel_data, network_key)
channels = await self._get_channels(network_key)
channels_by_key = {
str(ch["key"]): ch for ch in channels if isinstance(ch, dict) and ch.get("key")
}
channels_lower_by_key = {k.lower(): v for k, v in channels_by_key.items()}

for channel_key in channel_keys:
ch_data = self._channel_data_for_pls_key(
channel_key, channels_by_key, channels_lower_by_key
)
if ch_data:
yield self._channel_to_radio(ch_data, network_key)

except (
ProviderUnavailableError,
Expand All @@ -288,7 +327,10 @@ async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
KeyError,
) as err:
self.logger.debug(
"%s: Failed to get channels for network %s: %s", self.domain, network_key, err
"%s: Failed to get favorites for network %s: %s",
self.domain,
network_key,
err,
)
continue

Expand Down Expand Up @@ -459,7 +501,9 @@ async def _api_request(
self,
network_key: str,
endpoint: str,
method: str = "GET",
use_https: bool = True,
json_body: dict[str, Any] | None = None,
**params: Any,
) -> Any:
"""Make a generic API request to Digitally Incorporated."""
Expand All @@ -471,7 +515,9 @@ async def _api_request(

async with (
self._throttler,
self.mass.http_session.get(url, params=params, timeout=timeout) as resp,
self.mass.http_session.request(
method, url, params=params, json=json_body, timeout=timeout
) as resp,
):
if resp.status == 403:
msg = f"{self.domain}: Access denied - check your listen key and subscription"
Expand All @@ -484,8 +530,163 @@ async def _api_request(
raise ProviderUnavailableError(msg)

resp.raise_for_status()

if resp.status == 204:
return None
return await resp.json()

async def _fetch_favorites_pls(self, network_key: str) -> str:
"""Download favorites.pls from the listen.* host for this network."""
domain = NETWORKS[network_key]["domain"]
listen_key = self.config.get_value("listen_key")
timeout = aiohttp.ClientTimeout(total=API_TIMEOUT)
params: dict[str, str] = {"listen_key": str(listen_key), **FAVORITES_PLS_EXTRA_PARAMS}
try:
for rel in FAVORITES_PLS_PATHS:
url = f"https://listen.{domain}/{rel}"
async with (
self._throttler,
self.mass.http_session.get(url, params=params, timeout=timeout) as resp,
):
if resp.status == 200:
return await resp.text()
if resp.status == 403:
self.logger.debug(
"%s: favorites.pls access denied for network %s (%s)",
self.domain,
network_key,
rel,
)
return ""
if resp.status in (404, 406):
continue
if resp.status >= 500:
self.logger.debug(
"%s: favorites.pls HTTP %s for network %s (%s)",
self.domain,
resp.status,
network_key,
rel,
)
return ""
resp.raise_for_status()
self.logger.debug(
"%s: favorites.pls not usable for network %s (tried %s)",
self.domain,
network_key,
", ".join(FAVORITES_PLS_PATHS),
)
return ""
except (aiohttp.ClientError, aiohttp.ServerTimeoutError) as err:
self.logger.debug(
"%s: favorites.pls request failed for network %s: %s",
self.domain,
network_key,
err,
)
return ""

def _pls_url_basename(self, stream_url: str) -> str | None:
"""Last path segment of a stream URL (stream mount / slug)."""
parsed = urlparse(stream_url.strip())
path = parsed.path.strip("/")
if not path:
return None
return path.rsplit("/", maxsplit=1)[-1]

def _pls_basename_prefixes(self, network_key: str) -> tuple[str, ...]:
"""Prefixes to strip from PLS stream basenames; longest match wins."""
parts: list[str] = []
parts.extend(NETWORK_PLS_SHORT_PREFIXES.get(network_key, ()))
parts.append(f"3rdparty_{network_key}_")
parts.append(f"{network_key}_")
# Dedupe, then longest first so e.g. ``3rdparty_jazzradio_`` beats ``jazzradio_``.
unique = dict.fromkeys(parts)
return tuple(sorted(unique, key=len, reverse=True))

def _stream_url_to_channel_key(self, stream_url: str, network_key: str) -> str | None:
"""Map a favorites.pls stream URL to a slug we can match to API channel ``key``."""
basename = self._pls_url_basename(stream_url)
if not basename:
return None
for prefix in self._pls_basename_prefixes(network_key):
if basename.startswith(prefix):
key = basename[len(prefix) :]
if key:
return key
# Premium PLS often uses the API channel key alone (e.g. ``ambient``), not ``di_ambient``.
if _PLS_CHANNEL_SLUG_RE.fullmatch(basename):
return basename
return None

def _channel_data_for_pls_key(
self,
pls_key: str,
channels_by_key: dict[str, dict[str, Any]],
channels_lower_by_key: dict[str, dict[str, Any]],
) -> dict[str, Any] | None:
"""Match a PLS-derived slug to channel JSON (case-insensitive; ``*_aac`` / ``*_mp3``)."""
if pls_key in channels_by_key:
return channels_by_key[pls_key]
# Build a lowercase-keyed view for case-insensitive matching.
lower_channels_by_key = {key.lower(): value for key, value in channels_by_key.items()}
pls_lower = pls_key.lower()
if pls_lower in channels_lower_by_key:
return channels_lower_by_key[pls_lower]
for suffix in PLS_KEY_CODEC_SUFFIXES:
if pls_lower.endswith(suffix):
trimmed_lower = pls_lower[: -len(suffix)]
if trimmed_lower in channels_lower_by_key:
return channels_lower_by_key[trimmed_lower]
return None

def _parse_favorites_pls_channel_keys(self, pls_body: str, network_key: str) -> list[str]:
"""Parse PLS text; return ordered unique channel keys derived from FileN= URLs."""
ordered: list[str] = []
seen_keys: set[str] = set()
unmapped_basenames: set[str] = set()
for line in pls_body.splitlines():
match = _FILE_LINE_RE.match(line)
if not match:
continue
raw_url = match.group(1)
channel_key = self._stream_url_to_channel_key(raw_url, network_key)
if channel_key and channel_key not in seen_keys:
seen_keys.add(channel_key)
ordered.append(channel_key)
elif not channel_key:
bn = self._pls_url_basename(raw_url)
if bn:
unmapped_basenames.add(bn)
if unmapped_basenames:
sample = ", ".join(sorted(unmapped_basenames)[:12])
extra = len(unmapped_basenames) - 12
more = f" (+{extra} more)" if extra > 0 else ""
self.logger.debug(
"%s: PLS for network %s: could not map %d path(s) to channel keys: %s%s",
self.domain,
network_key,
len(unmapped_basenames),
sample,
more,
)
return ordered

@use_cache(CACHE_FAVORITES)
async def _get_favorite_channel_keys(self, network_key: str) -> list[str]:
"""Channel keys for user favorites, from listen host favorites.pls."""
pls_body = await self._fetch_favorites_pls(network_key)
if not pls_body.strip():
return []
keys = self._parse_favorites_pls_channel_keys(pls_body, network_key)
self.logger.debug(
"%s: Parsed %d favorite channel keys for network %s from PLS",
self.domain,
len(keys),
network_key,
)
return keys

@use_cache(CACHE_CHANNELS)
async def _get_channels(self, network_key: str) -> list[dict[str, Any]]:
"""Get channels for a specific network with enriched genre data."""
Expand Down Expand Up @@ -618,19 +819,23 @@ async def _get_stream_url(self, network_key: str, channel_key: str) -> str:
raise ProviderUnavailableError(msg)

try:
params = {"listen_key": listen_key}
playlist = await self._api_request(
network_key, f"listen/premium_high/{channel_key}", use_https=True, **params
network_key,
f"listen/premium_high/{channel_key}",
method="GET",
use_https=True,
listen_key=str(listen_key),
)

# Use the first stream URL from the playlist
self.logger.debug(
"%s: Digitally Incorporated playlist returned %d URLs", self.domain, len(playlist)
)
if not playlist or not isinstance(playlist, list):
msg = f"{self.domain}: No stream URLs returned from Digitally Incorporated API"
raise MediaNotFoundError(msg)

self.logger.debug(
"%s: Digitally Incorporated playlist returned %d URLs", self.domain, len(playlist)
)

# Log all available URLs for debugging
for i, url in enumerate(playlist):
self.logger.debug("%s: Available stream URL %d: %s", self.domain, i + 1, url)
Expand Down
Loading