diff --git a/music_assistant/providers/dlna_receiver/__init__.py b/music_assistant/providers/dlna_receiver/__init__.py new file mode 100644 index 0000000000..90f9e8ae0d --- /dev/null +++ b/music_assistant/providers/dlna_receiver/__init__.py @@ -0,0 +1,114 @@ +""" +DLNA Receiver — Music Assistant Plugin Provider. + +Exposes Music Assistant as a UPnP/DLNA MediaRenderer so that external +applications (Qobuz, BubbleUPnP, foobar2000, mconnect, etc.) can discover +and cast audio streams to any MA player. + +Architecture +~~~~~~~~~~~~ +1. SSDP advertisement — announces virtual MediaRenderers on the LAN +2. UPnP HTTP server — serves device/service XML descriptions and + accepts SOAP control actions (AVTransport, + RenderingControl, ConnectionManager) +3. PluginSource bridge — received audio URL is fed into the MA streaming + pipeline as a PluginSource, routed to the + corresponding target player + +Multi-player mode +~~~~~~~~~~~~~~~~~ +When ``target_players`` contains multiple comma-separated player_id values +(or the special value ``*``), the provider creates one virtual DLNA +renderer per player, each with a unique UDN and HTTP port. DLNA control +points see each renderer as a separate device — e.g. +"Music Assistant — Kitchen", "Music Assistant — Living Room". +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.config_entries import ConfigEntry, ConfigValueType +from music_assistant_models.enums import ConfigEntryType + +from .constants import ( + CONF_BIND_IP, + CONF_FRIENDLY_NAME, + CONF_HTTP_PORT, + CONF_TARGET_PLAYERS, + DEFAULT_FRIENDLY_NAME, + DEFAULT_HTTP_PORT, +) + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + return ( + ConfigEntry( + key=CONF_FRIENDLY_NAME, + type=ConfigEntryType.STRING, + label="Friendly name prefix", + description=( + "Prefix for DLNA renderer names shown on the network. " + "Player name is appended automatically in multi-player mode." + ), + default_value=DEFAULT_FRIENDLY_NAME, + required=True, + ), + ConfigEntry( + key=CONF_TARGET_PLAYERS, + type=ConfigEntryType.STRING, + label="Target players", + description=( + "Comma-separated MA player_ids to expose as DLNA renderers. " + 'Use "*" to auto-create a renderer for every MA player. ' + "Leave empty for a single renderer without a fixed target." + ), + required=False, + ), + ConfigEntry( + key=CONF_BIND_IP, + type=ConfigEntryType.STRING, + label="Bind IP address", + description=( + "IP address to bind the UPnP HTTP server and SSDP listener. " + "Leave empty to auto-detect." + ), + required=False, + ), + ConfigEntry( + key=CONF_HTTP_PORT, + type=ConfigEntryType.INTEGER, + label="HTTP base port", + description=( + "Base port for UPnP HTTP servers. In multi-player mode, " + "each renderer uses an incrementing port (8298, 8299, …)." + ), + default_value=DEFAULT_HTTP_PORT, + required=True, + ), + ) + + +async def setup( + mass: MusicAssistant, + manifest: ProviderManifest, + config: ProviderConfig, +) -> ProviderInstanceType: + """Set up the DLNA Receiver provider.""" + # Deferred to avoid loading music_assistant internals at module import time. + from .provider import DLNAReceiverProvider # noqa: PLC0415, RUF100 + + return DLNAReceiverProvider(mass, manifest, config) diff --git a/music_assistant/providers/dlna_receiver/constants.py b/music_assistant/providers/dlna_receiver/constants.py new file mode 100644 index 0000000000..0c2cf210b7 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/constants.py @@ -0,0 +1,51 @@ +"""Constants for the DLNA Receiver plugin provider.""" + +from __future__ import annotations + +# UPnP device and service identifiers +UPNP_DEVICE_TYPE = "urn:schemas-upnp-org:device:MediaRenderer:1" +UPNP_SERVICE_AV_TRANSPORT = "urn:schemas-upnp-org:service:AVTransport:1" +UPNP_SERVICE_RENDERING_CONTROL = "urn:schemas-upnp-org:service:RenderingControl:1" +UPNP_SERVICE_CONNECTION_MANAGER = "urn:schemas-upnp-org:service:ConnectionManager:1" + +# SSDP +SSDP_MULTICAST_ADDR = "239.255.255.250" +SSDP_PORT = 1900 +SSDP_MAX_AGE = 1800 # seconds + +# Config entry keys +CONF_FRIENDLY_NAME = "friendly_name" +CONF_TARGET_PLAYER = "target_player" +CONF_TARGET_PLAYERS = "target_players" +CONF_BIND_IP = "bind_ip" +CONF_HTTP_PORT = "http_port" + +# Defaults +DEFAULT_FRIENDLY_NAME = "Music Assistant" +DEFAULT_HTTP_PORT = 8298 # UPnP renderer HTTP port (separate from MA stream port) + +# Supported MIME types for incoming streams +SUPPORTED_MIME_TYPES = [ + "audio/flac", + "audio/x-flac", + "audio/wav", + "audio/x-wav", + "audio/mpeg", + "audio/mp3", + "audio/mp4", + "audio/aac", + "audio/ogg", + "audio/vorbis", + "audio/L16", + "audio/*", +] + +# UPnP transport states +TRANSPORT_STATE_STOPPED = "STOPPED" +TRANSPORT_STATE_PLAYING = "PLAYING" +TRANSPORT_STATE_PAUSED = "PAUSED_PLAYBACK" +TRANSPORT_STATE_TRANSITIONING = "TRANSITIONING" +TRANSPORT_STATE_NO_MEDIA = "NO_MEDIA_PRESENT" + +# UUID namespace for deterministic UDN generation +UDN_NAMESPACE = "ma-dlna-receiver" diff --git a/music_assistant/providers/dlna_receiver/eventing.py b/music_assistant/providers/dlna_receiver/eventing.py new file mode 100644 index 0000000000..1028e3e7b8 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/eventing.py @@ -0,0 +1,302 @@ +"""UPnP GENA Eventing — subscription management and NOTIFY dispatch. + +Implements the General Event Notification Architecture (GENA) protocol +for UPnP service state variable change notifications. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +import uuid +from dataclasses import dataclass, field +from time import monotonic +from xml.sax.saxutils import escape + +import aiohttp + +LOGGER = logging.getLogger(__name__) + +DEFAULT_SUBSCRIPTION_TIMEOUT = 1800 # seconds + + +@dataclass +class Subscription: + """Represents a single GENA event subscription.""" + + sid: str + callback_urls: list[str] + timeout: int + created_at: float = field(default_factory=monotonic) + seq: int = 0 + + @property + def is_expired(self) -> bool: + """Check if this subscription has expired.""" + return (monotonic() - self.created_at) > self.timeout + + +class EventingManager: + """Manages GENA subscriptions and sends NOTIFY events. + + Each UPnP service has its own EventingManager instance keyed by + service name (e.g., "AVTransport", "RenderingControl"). + """ + + def __init__(self, session: aiohttp.ClientSession | None = None) -> None: + """Initialize with no subscriptions and (optionally) a shared session. + + If ``session`` is provided, NOTIFY traffic reuses it and the manager + does not own its lifecycle; this lets a single renderer instance + share one connector/DNS cache across all three services, and lets + the provider pass down ``mass.http_session`` in the typical case. + When ``session`` is None we fall back to owning a private session + (kept for standalone/test use) that is closed in ``stop()``. + """ + self._subscriptions: dict[str, Subscription] = {} + self._cleanup_task: asyncio.Task[None] | None = None + self._pending_tasks: set[asyncio.Task[None]] = set() + self._session: aiohttp.ClientSession | None = session + self._owns_session: bool = session is None + + async def start(self) -> None: + """Start the periodic subscription cleanup task.""" + if self._session is None: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=5), + ) + self._owns_session = True + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + + async def stop(self) -> None: + """Stop the cleanup task and clear all subscriptions.""" + if self._cleanup_task: + self._cleanup_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._cleanup_task + self._cleanup_task = None + if self._pending_tasks: + # Snapshot: done-callbacks mutate self._pending_tasks. + pending = list(self._pending_tasks) + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + self._pending_tasks.clear() + # Only close the session if we created it ourselves; an injected + # (shared) session is owned by the caller (e.g. Music Assistant). + if self._session is not None and self._owns_session: + await self._session.close() + self._session = None + self._subscriptions.clear() + + def subscribe( + self, + callback_header: str, + timeout_header: str | None = None, + ) -> tuple[str, int]: + """Register a new subscription. + + Args: + callback_header: CALLBACK header value, e.g., '' + timeout_header: TIMEOUT header value, e.g., 'Second-1800' + + Returns: + Tuple of (SID, actual_timeout_seconds). + """ + callback_urls = self._parse_callback_header(callback_header) + if not callback_urls: + raise ValueError("No valid callback URLs in CALLBACK header") + + timeout = self._parse_timeout(timeout_header) + sid = f"uuid:{uuid.uuid4()}" + + self._subscriptions[sid] = Subscription( + sid=sid, + callback_urls=callback_urls, + timeout=timeout, + ) + LOGGER.info( + "New GENA subscription: SID=%s, callbacks=%s, timeout=%ds", + sid, + callback_urls, + timeout, + ) + return sid, timeout + + def renew( + self, + sid: str, + timeout_header: str | None = None, + ) -> int: + """Renew an existing subscription. + + Args: + sid: Subscription ID to renew. + timeout_header: New TIMEOUT header value. + + Returns: + Actual timeout seconds. + + Raises: + KeyError: If SID not found. + """ + sub = self._subscriptions.get(sid) + if sub is None: + raise KeyError(f"Unknown SID: {sid}") + if sub.is_expired: + # Per UPnP spec, renew of an expired SID is a 412 Precondition Failed + self._subscriptions.pop(sid, None) + raise KeyError(f"Expired SID: {sid}") + + timeout = self._parse_timeout(timeout_header) + sub.timeout = timeout + sub.created_at = monotonic() + LOGGER.debug("Renewed subscription %s, timeout=%ds", sid, timeout) + return timeout + + def unsubscribe(self, sid: str) -> None: + """Remove a subscription.""" + removed = self._subscriptions.pop(sid, None) + if removed: + LOGGER.info("Removed GENA subscription: %s", sid) + else: + LOGGER.debug("Unsubscribe for unknown SID: %s", sid) + + async def notify(self, changed_vars: dict[str, str]) -> None: + """Send NOTIFY to all active subscribers with changed state variables. + + Args: + changed_vars: Mapping of state variable name to new value. + """ + if not changed_vars or not self._subscriptions: + return + + xml_body = self._build_propertyset(changed_vars) + + expired_sids: list[str] = [] + for sid, sub in self._subscriptions.items(): + if sub.is_expired: + expired_sids.append(sid) + continue + task: asyncio.Task[None] = asyncio.create_task( + self._send_notify(sub, xml_body), + ) + self._pending_tasks.add(task) + task.add_done_callback(self._pending_tasks.discard) + + for sid in expired_sids: + self._subscriptions.pop(sid, None) + + async def notify_initial(self, sid: str, all_vars: dict[str, str]) -> None: + """Send initial event (SEQ=0) to a newly subscribed control point.""" + sub = self._subscriptions.get(sid) + if sub is None: + return + xml_body = self._build_propertyset(all_vars) + await self._send_notify(sub, xml_body) + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + async def _send_notify(self, sub: Subscription, xml_body: str) -> None: + """Send a GENA NOTIFY to a single subscriber.""" + session = self._session + if session is None or session.closed: + LOGGER.debug("NOTIFY skipped — session not started") + return + + headers = { + "Content-Type": 'text/xml; charset="utf-8"', + "NT": "upnp:event", + "NTS": "upnp:propchange", + "SID": sub.sid, + "SEQ": str(sub.seq), + } + sub.seq += 1 + + # Per-request timeout so we don't depend on the injected session's + # default (mass.http_session has no 5s cap) and stay consistent with + # the timeout we used when owning our own session. + timeout = aiohttp.ClientTimeout(total=5) + + # UDA §4.1.2: CALLBACK lists URLs in preference order. Use the first + # one that actually accepts the NOTIFY (2xx); on HTTP errors or network + # failures, fall through to the next URL. + for url in sub.callback_urls: + try: + async with session.request( + "NOTIFY", + url, + headers=headers, + data=xml_body, + timeout=timeout, + ) as resp: + if resp.status >= 300: + LOGGER.warning( + "NOTIFY to %s returned %s", + url, + resp.status, + ) + continue + LOGGER.debug("NOTIFY sent to %s (SEQ=%d)", url, sub.seq - 1) + return + except Exception: + LOGGER.debug("NOTIFY to %s failed, trying next callback", url) + + LOGGER.warning("All NOTIFY callbacks failed for SID %s", sub.sid) + + @staticmethod + def _build_propertyset(variables: dict[str, str]) -> str: + """Build a GENA propertyset XML body.""" + props = "" + for name, value in variables.items(): + props += f"<{name}>{escape(value)}" + return ( + '' + '' + f"{props}" + "" + ) + + @staticmethod + def _parse_callback_header(header: str) -> list[str]: + """Parse CALLBACK header: '' -> ['url1', 'url2']. + + Per UDA §4.1.2 a GENA CALLBACK URL must use the http/https scheme; + a bare ``startswith("http")`` check would also let ``httpx://`` or + ``httpfake://`` through and then fail at NOTIFY time. + """ + urls: list[str] = [] + for raw in header.split(">"): + part = raw.strip() + if part.startswith("<"): + url = part[1:] + if url.startswith(("http://", "https://")): + urls.append(url) + return urls + + @staticmethod + def _parse_timeout(header: str | None) -> int: + """Parse TIMEOUT header: 'Second-1800' -> 1800.""" + if not header: + return DEFAULT_SUBSCRIPTION_TIMEOUT + header = header.strip() + if header.lower() == "infinite": + return DEFAULT_SUBSCRIPTION_TIMEOUT + if header.lower().startswith("second-"): + try: + return int(header.split("-", 1)[1]) + except (ValueError, IndexError): + pass + return DEFAULT_SUBSCRIPTION_TIMEOUT + + async def _cleanup_loop(self) -> None: + """Periodically remove expired subscriptions.""" + while True: + await asyncio.sleep(60) + expired = [sid for sid, sub in self._subscriptions.items() if sub.is_expired] + for sid in expired: + self._subscriptions.pop(sid, None) + LOGGER.debug("Cleaned up expired subscription: %s", sid) diff --git a/music_assistant/providers/dlna_receiver/manifest.json b/music_assistant/providers/dlna_receiver/manifest.json new file mode 100644 index 0000000000..70aadfd197 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/manifest.json @@ -0,0 +1,15 @@ +{ + "type": "plugin", + "domain": "dlna_receiver", + "name": "DLNA Receiver", + "description": "Expose Music Assistant as a UPnP/DLNA MediaRenderer so external apps can cast audio to any MA player.", + "codeowners": ["@trudenboy"], + "credits": [ + "[async-upnp-client](https://github.com/StevenLooman/async_upnp_client)" + ], + "requirements": [], + "documentation": "https://trudenboy.github.io/ma-provider-dlna-receiver/", + "stage": "alpha", + "multi_instance": false, + "builtin": false +} diff --git a/music_assistant/providers/dlna_receiver/provider.py b/music_assistant/providers/dlna_receiver/provider.py new file mode 100644 index 0000000000..8ef0fa1511 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/provider.py @@ -0,0 +1,822 @@ +"""DLNA Receiver — Main provider implementation. + +Registers as a PluginProvider with AUDIO_SOURCE feature so that audio +received from external DLNA control points is routed through the MA +streaming pipeline to any configured player. + +Supports multi-player mode: one virtual DLNA renderer per MA player, +each with a unique UDN, HTTP port, and SSDP advertisement. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import ipaddress +import logging +import time +import uuid +import xml.etree.ElementTree as ET +from collections.abc import AsyncGenerator +from dataclasses import dataclass +from html import unescape +from typing import TYPE_CHECKING + +import aiohttp +from music_assistant_models.config_entries import ConfigValueType # noqa: F401 +from music_assistant_models.enums import ContentType, ProviderFeature +from music_assistant_models.errors import SetupFailedError +from music_assistant_models.media_items.audio_format import AudioFormat +from music_assistant_models.streamdetails import StreamMetadata + +from music_assistant.helpers.util import get_ip_addresses +from music_assistant.models.plugin import PluginProvider, PluginSource + +from .constants import ( + CONF_BIND_IP, + CONF_FRIENDLY_NAME, + CONF_HTTP_PORT, + CONF_TARGET_PLAYER, + CONF_TARGET_PLAYERS, + DEFAULT_FRIENDLY_NAME, + DEFAULT_HTTP_PORT, + UDN_NAMESPACE, +) +from .renderer import UPnPRenderer +from .ssdp import SSDPAdvertiser +from .urls import redact_url as _redact_url +from .urls import validate_stream_url as _validate_stream_url + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + +LOGGER = logging.getLogger(__name__) + +# DIDL-Lite metadata is normally < 4 KiB; bound input (measured in characters) +# to guard CPU/memory on parse. +_MAX_DIDL_CHARS = 64 * 1024 + + +def _is_concrete_ipv4(value: str) -> bool: + """Return True iff ``value`` is a non-wildcard, non-loopback IPv4 literal. + + SSDP uses ``socket.inet_aton`` + ``IP_ADD_MEMBERSHIP`` which require a + concrete IPv4 interface; ``0.0.0.0`` joins the multicast group on the + wrong interface (silently dropping alive packets on multi-homed hosts), + ``127.0.0.1`` joins on the loopback interface where SSDP multicast + never reaches real DLNA control points, and IPv6 / hostnames fail + outright. + """ + if not value: + return False + try: + addr = ipaddress.ip_address(value) + except ValueError: + return False + return ( + isinstance(addr, ipaddress.IPv4Address) and not addr.is_unspecified and not addr.is_loopback + ) + + +@dataclass +class RendererInstance: + """Per-player renderer state: UPnP renderer + SSDP + streaming context.""" + + player_id: str + player_name: str + renderer: UPnPRenderer + ssdp: SSDPAdvertiser + current_stream_url: str | None = None + current_metadata: dict[str, str | None] | None = None + plugin_source: PluginSource | None = None + + +class DLNAReceiverProvider(PluginProvider): + """DLNA Receiver plugin provider for Music Assistant. + + Exposes MA as one or more UPnP MediaRenderers on the local network + so that external apps can send audio streams which are then played + on the corresponding MA player. + """ + + SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE} + + def __init__( + self, + mass: MusicAssistant, + manifest: ProviderManifest, + config: ProviderConfig, + ) -> None: + """Initialize provider state; renderer instances are created in loaded_in_mass.""" + super().__init__(mass, manifest, config, self.SUPPORTED_FEATURES) + self._instances: dict[str, RendererInstance] = {} + self._plugin_source: PluginSource | None = None + # Playback state for elapsed time tracking + self._active_player_id: str | None = None + self._play_start_time: float | None = None + self._elapsed_offset: int = 0 + self._metadata_task: asyncio.Task[None] | None = None + self._discovery_task: asyncio.Task[None] | None = None + # Monotonically bumped per renderer; assigned lazily in loaded_in_mass. + self._next_port: int = 0 + + @property + def supported_features(self) -> set[ProviderFeature]: + """Return supported features.""" + return {ProviderFeature.AUDIO_SOURCE} + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def loaded_in_mass(self) -> None: + """Initialize renderer instances when loaded in Music Assistant.""" + self._friendly_prefix = str( + self.config.get_value(CONF_FRIENDLY_NAME) or DEFAULT_FRIENDLY_NAME, + ) + configured_ip = str(self.config.get_value(CONF_BIND_IP) or "") + if configured_ip: + self._bind_ip = configured_ip + else: + # Reuse MA's shared primary-IP helper (threaded probe + ranked + # interface scan) instead of rolling our own. Prefer the first + # IPv4 entry: SSDP needs a routable IPv4 for the multicast + # interface and the LOCATION header. + ip_addresses = await get_ip_addresses() + self._bind_ip = next( + (ip for ip in ip_addresses if _is_concrete_ipv4(ip)), + "", + ) + # Fail fast with a clear message instead of letting SSDP + # inet_aton / IP_ADD_MEMBERSHIP explode on 0.0.0.0, an IPv6 + # literal, or a typo from the user. + if not _is_concrete_ipv4(self._bind_ip): + msg = ( + "DLNA Receiver requires a concrete IPv4 bind address; got " + f"{self._bind_ip!r}. Configure a valid IPv4 for this host, " + "or run MA on a network that exposes an IPv4 interface." + ) + raise SetupFailedError(msg) + self._base_port = int( + self.config.get_value(CONF_HTTP_PORT) or DEFAULT_HTTP_PORT # type: ignore[arg-type] + ) + self._next_port = self._base_port + + raw_target = self._raw_target() + player_specs = self._resolve_player_specs() + + if player_specs: + for pid, pname in player_specs: + await self._create_instance( + player_id=pid, + player_name=pname, + friendly_prefix=self._friendly_prefix, + bind_ip=self._bind_ip, + http_port=self._next_port, + ) + self._next_port += 1 + else: + # Fallback: single unbound renderer so DLNA discovery works + # immediately. In target_players=* mode this is retired as soon + # as the first real player renderer registers, so we don't keep + # advertising an unroutable renderer once real ones exist. + await self._create_instance( + player_id="", + player_name="", + friendly_prefix=self._friendly_prefix, + bind_ip=self._bind_ip, + http_port=self._next_port, + ) + self._next_port += 1 + + # For target_players=*, keep looking for late-registering players in the + # background instead of blocking startup for up to 72 seconds. + if raw_target == "*": + self._discovery_task = asyncio.create_task(self._adopt_late_players()) + + LOGGER.info( + "DLNA Receiver started: %d renderer(s) on %s (base port %s)", + len(self._instances), + self._bind_ip, + self._base_port, + ) + + async def _adopt_late_players(self) -> None: + """Poll for newly-registered MA players and spin up renderers for them. + + Runs only when ``target_players=*``. Caps the total wait at ~5 minutes + so stale provider instances don't poll forever. The first time a real + player renderer is created we retire the unbound ``__default__`` + fallback so we aren't advertising a renderer that can't route audio. + """ + known_ids = {inst.player_id for inst in self._instances.values() if inst.player_id} + default_retired = False + try: + for _ in range(60): # ~5 minutes at 5s intervals + await asyncio.sleep(5) + specs = self._resolve_player_specs() + new = [(pid, name) for pid, name in specs if pid and pid not in known_ids] + for pid, name in new: + await self._create_instance( + player_id=pid, + player_name=name, + friendly_prefix=self._friendly_prefix, + bind_ip=self._bind_ip, + http_port=self._next_port, + ) + self._next_port += 1 + known_ids.add(pid) + if not default_retired: + await self._retire_default_instance() + default_retired = True + except asyncio.CancelledError: + pass + except Exception: + LOGGER.debug("Late-player discovery loop error", exc_info=True) + + async def _retire_default_instance(self) -> None: + """Stop and drop the unbound ``__default__`` renderer, if any. + + Called once from ``_adopt_late_players`` after the first real player + renderer is created so control points no longer see a fallback + renderer that would silently swallow Play commands. + """ + default = self._instances.pop("__default__", None) + if default is None: + return + await default.ssdp.stop() + await default.renderer.stop() + LOGGER.info("Retired unbound fallback renderer — real players registered") + + async def unload(self, is_removed: bool = False) -> None: + """Unload the provider — stop all renderer instances. + + Cancels and *awaits* background tasks before touching ``_instances``: + otherwise ``_adopt_late_players`` could still be mid-``_create_instance`` + and append a new entry to the dict while we're iterating it, which + raises ``RuntimeError`` and leaks sockets. + """ + for task in (self._metadata_task, self._discovery_task): + if task is None or task.done(): + continue + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + self._metadata_task = None + self._discovery_task = None + # Snapshot before iterating: no more background mutation possible now, + # but cheap defense against future concurrent shutdown paths. + for inst in list(self._instances.values()): + await inst.ssdp.stop() + await inst.renderer.stop() + self._instances.clear() + LOGGER.info("DLNA Receiver provider unloaded") + + # ------------------------------------------------------------------ + # Instance management + # ------------------------------------------------------------------ + + async def _create_instance( + self, + player_id: str, + player_name: str, + friendly_prefix: str, + bind_ip: str, + http_port: int, + ) -> RendererInstance: + """Create and start a single renderer instance for a player.""" + friendly_name = f"{friendly_prefix} — {player_name}" if player_name else friendly_prefix + + udn = self._deterministic_udn(player_id) + + renderer = UPnPRenderer( + friendly_name=friendly_name, + bind_ip=bind_ip, + http_port=http_port, + udn=udn, + # Reuse MA's shared HTTP session so every GENA NOTIFY across + # all three services on every renderer shares one connector. + session=self.mass.http_session, + ) + + inst = RendererInstance( + player_id=player_id, + player_name=player_name, + renderer=renderer, + ssdp=SSDPAdvertiser( + udn=udn, + description_url=renderer.description_url, + bind_ip=bind_ip, + ), + ) + + # Wire SOAP callbacks bound to this instance + renderer.on_set_av_transport_uri = lambda uri, meta: self._on_set_transport_uri( + inst, uri, meta + ) + renderer.on_play = lambda: self._on_play(inst) + renderer.on_pause = lambda: self._on_pause(inst) + renderer.on_stop = lambda: self._on_stop(inst) + renderer.on_seek = lambda unit, target: self._on_seek(inst, unit, target) + renderer.on_set_volume = lambda vol: self._on_set_volume(inst, vol) + renderer.on_set_mute = lambda m: self._on_set_mute(inst, m) + + # Roll back a partially-started instance so a failed SSDP start + # (port collision, multicast permission denied, etc.) doesn't + # leave the HTTP runner holding the port across reloads. + await renderer.start() + # If http_port was 0, renderer.start() just learned the ephemeral + # port from the bound socket; refresh the cached SSDP LOCATION so + # alive/M-SEARCH responses advertise the real port instead of + # whatever description_url resolved to at construction time. + inst.ssdp.description_url = renderer.description_url + try: + await inst.ssdp.start() + except Exception: + with contextlib.suppress(Exception): + await inst.ssdp.stop() + with contextlib.suppress(Exception): + await renderer.stop() + raise + + key = player_id or "__default__" + self._instances[key] = inst + + LOGGER.info( + "Renderer '%s' → player '%s' on port %d (UDN: %s)", + friendly_name, + player_id or "(none)", + http_port, + udn, + ) + return inst + + def _raw_target(self) -> str: + """Return the configured target spec, honoring the legacy config key. + + Centralizes the CONF_TARGET_PLAYERS → CONF_TARGET_PLAYER fallback so + every call site (late-player adoption check, spec resolution) sees + the same value; otherwise a legacy ``"*"`` config would skip the + _adopt_late_players background task. + """ + raw = str(self.config.get_value(CONF_TARGET_PLAYERS) or "").strip() + if not raw: + raw = str(self.config.get_value(CONF_TARGET_PLAYER) or "").strip() + return raw + + def _resolve_player_specs(self) -> list[tuple[str, str]]: + """Resolve configured player targets to (player_id, display_name) pairs. + + Supports: + - Empty / not set → empty list (single unbound renderer) + - "*" → all currently known MA players + - Comma-separated player_ids + - Legacy CONF_TARGET_PLAYER (single player_id, backward compat) + """ + raw = self._raw_target() + + if not raw: + return [] + + if raw == "*": + return self._get_all_players() + + # Comma-separated list (order-preserving dedupe: repeated ids would + # collide on the same instance key and leak sockets on each rebind). + specs: list[tuple[str, str]] = [] + seen: set[str] = set() + for raw_pid in raw.split(","): + pid = raw_pid.strip() + if not pid or pid in seen: + continue + seen.add(pid) + name = self._get_player_name(pid) + specs.append((pid, name)) + return specs + + def _get_all_players(self) -> list[tuple[str, str]]: + """Get all MA players as (player_id, display_name) pairs. + + Includes unavailable players (they may come online later). + Filters out protocol players and our own DLNA Receiver renderers + (which the UPnP player provider discovers as "up" players). + """ + try: + players = self.mass.players.all_players( + return_unavailable=True, + return_protocol_players=False, + ) + all_pids = {p.player_id for p in players} + + # For every player_id, compute the UPnP player_id that would + # result from discovering our deterministic-UDN renderer. + # This catches ALL recursion levels without needing _instances. + own_renderer_pids: set[str] = set() + for pid in all_pids: + udn = self._deterministic_udn(pid) + upnp_pid = "up" + udn.replace("uuid:", "").replace("-", "") + own_renderer_pids.add(upnp_pid) + + # Also filter already-created instances (belt-and-suspenders) + own_renderer_pids.update( + inst.player_id for inst in self._instances.values() if inst.player_id + ) + + result = [] + for p in players: + if p.player_id in own_renderer_pids: + LOGGER.debug( + "Filtering out own renderer player: %s (%s)", + p.player_id, + p.display_name or p.name, + ) + continue + result.append((p.player_id, p.display_name or p.name or p.player_id)) + return result + except Exception: + LOGGER.warning("Could not enumerate MA players", exc_info=True) + return [] + + def _get_player_name(self, player_id: str) -> str: + """Get the display name for a player, falling back to the id.""" + try: + player = self.mass.players.get_player(player_id) + if player: + return player.display_name or player.name or player_id + return player_id + except Exception: + return player_id + + # ------------------------------------------------------------------ + # PluginProvider audio source interface + # ------------------------------------------------------------------ + + def get_source(self) -> PluginSource: + """Return the plugin source descriptor for this DLNA receiver. + + Returns a persistent instance so metadata updates are visible + to the MA controller between calls. + """ + if self._plugin_source is None: + # Upstream DLNA senders push arbitrary compressed formats + # (FLAC/MP3/AAC/WAV/etc). Declare content_type/codec as UNKNOWN + # so MA's ffmpeg input pipeline probes the actual codec from + # stdin instead of misinterpreting compressed bytes as PCM. + self._plugin_source = PluginSource( + id=self.instance_id, + name=self.name or "DLNA Receiver", + passive=True, + can_play_pause=True, + audio_format=AudioFormat( + content_type=ContentType.UNKNOWN, + codec_type=ContentType.UNKNOWN, + ), + ) + return self._plugin_source + + async def get_audio_stream( + self, + player_id: str, + ) -> AsyncGenerator[bytes, None]: + """Yield audio bytes from the received DLNA stream. + + MA calls this when the plugin source is activated on a player. + We proxy the external URL through aiohttp and yield raw bytes. + """ + inst = self._instances.get(player_id) or self._instances.get("__default__") + stream_url = inst.current_stream_url if inst else None + + if not stream_url: + LOGGER.warning( + "get_audio_stream(%s) called but no stream URL set", + player_id, + ) + return + + LOGGER.debug("Proxying DLNA stream for %s: %s", player_id, _redact_url(stream_url)) + # total=None: streams may be long-running; bound connect + per-chunk read only. + timeout = aiohttp.ClientTimeout(total=None, sock_connect=10, sock_read=30) + # Reuse MA's shared HTTP session (matches streams/audio.py) so we + # don't open a fresh TCP connector + DNS cache per activation. + try: + async with self.mass.http_session.get(stream_url, timeout=timeout) as resp: + # Re-validate after any redirects: the final URL still has to + # be an http(s) endpoint, otherwise refuse to stream. (aiohttp + # won't follow non-http schemes, but belt-and-suspenders.) + final_url = str(resp.url) + if _validate_stream_url(final_url) is None: + LOGGER.warning( + "Upstream DLNA source redirected to disallowed URL: %s", + _redact_url(final_url), + ) + return + # Accept any 2xx (e.g. 206 Partial Content is common for audio). + if not 200 <= resp.status < 300: + LOGGER.warning( + "Upstream DLNA source returned HTTP %s for %s", + resp.status, + _redact_url(stream_url), + ) + return + async for chunk in resp.content.iter_any(): + yield chunk + except (aiohttp.ClientError, TimeoutError): + LOGGER.warning( + "Error proxying DLNA stream %s", + _redact_url(stream_url), + exc_info=True, + ) + return + LOGGER.debug("DLNA stream ended for %s", player_id) + + # ------------------------------------------------------------------ + # DIDL-Lite metadata parsing + # ------------------------------------------------------------------ + + @staticmethod + def _parse_didl_metadata(metadata: str | None) -> dict[str, str | None]: + """Parse DIDL-Lite XML and extract title, artist, album, image_url, duration.""" + result: dict[str, str | None] = { + "title": None, + "artist": None, + "album": None, + "image_url": None, + "duration": None, + } + if not metadata: + return result + + # Bound untrusted input before parsing (normal DIDL is well under this). + if len(metadata) > _MAX_DIDL_CHARS: + LOGGER.info( + "DIDL metadata truncated from %d to %d chars", + len(metadata), + _MAX_DIDL_CHARS, + ) + metadata = metadata[:_MAX_DIDL_CHARS] + + # SOAP bodies may contain XML-escaped DIDL-Lite content + metadata = unescape(metadata) + + # Defence-in-depth: reject DOCTYPE/ENTITY declarations before parsing + # with the stdlib XML parser (defusedxml is not yet a dependency) to + # avoid billion-laughs / external-entity DoS on untrusted LAN input. + lowered = metadata.lower() + if " element + res_el = item.find("didl:res", ns) + if res_el is not None: + dur = res_el.get("duration") + if dur: + result["duration"] = dur + + return result + + # ------------------------------------------------------------------ + # Renderer callbacks (per-instance) + # ------------------------------------------------------------------ + + async def _on_set_transport_uri( + self, + inst: RendererInstance, + uri: str, + metadata: str | None, + ) -> None: + """Handle SetAVTransportURI for a specific renderer instance. + + Raises: + ValueError: if the URI is not a safe http(s) stream URL. The + renderer turns this into SOAP fault 716 so the control + point sees the rejection instead of a silent 200 OK. + """ + safe_url = _validate_stream_url(uri) + if safe_url is None: + LOGGER.warning( + "Rejecting transport URI for '%s' (unsupported scheme/host): %s", + inst.player_name or "(default)", + _redact_url(uri), + ) + raise ValueError("unsupported URI scheme or missing host") + LOGGER.info( + "Received transport URI for '%s': %s", + inst.player_name or "(default)", + _redact_url(safe_url), + ) + inst.current_stream_url = safe_url + inst.current_metadata = self._parse_didl_metadata(metadata) + + async def _on_play(self, inst: RendererInstance) -> None: + """Handle Play — start streaming to this instance's player.""" + target = inst.player_id + if not target: + LOGGER.warning("No target player bound — ignoring Play") + return + + if not inst.current_stream_url: + LOGGER.warning("Play received but no stream URL for %s", target) + return + + LOGGER.info("Starting playback on player %s", target) + meta = inst.current_metadata or {} + LOGGER.debug("DIDL metadata for %s: %s", target, meta) + duration = self._parse_duration(meta.get("duration")) + + # Update plugin source metadata for MA UI display + source = self.get_source() + source.in_use_by = target + source.metadata = StreamMetadata( + title=meta.get("title") or "DLNA Stream", + artist=meta.get("artist"), + album=meta.get("album"), + image_url=meta.get("image_url"), + duration=duration, + uri=inst.current_stream_url, + elapsed_time=0, + elapsed_time_last_updated=time.time(), + ) + + # Track playback state for elapsed time + self._active_player_id = target + self._play_start_time = time.time() + self._elapsed_offset = 0 + self._ensure_metadata_task() + + # Route through the registered PluginSource so MA pulls bytes via + # our get_audio_stream() proxy instead of handing the raw upstream + # URI to the player (which would bypass the SSRF/redirect guards + # and fail on players that require MA to serve the stream). + await self.mass.players.select_source(target, self.instance_id) + + async def _on_pause(self, inst: RendererInstance) -> None: + """Handle Pause for this instance's player.""" + if inst.player_id: + self._freeze_elapsed() + await self.mass.players.cmd_pause(inst.player_id) + + async def _on_stop(self, inst: RendererInstance) -> None: + """Handle Stop for this instance's player.""" + if inst.player_id: + await self.mass.players.cmd_stop(inst.player_id) + inst.current_stream_url = None + self._clear_playback_state() + + async def _on_seek(self, inst: RendererInstance, unit: str, target: str) -> None: + """Handle Seek for this instance's player.""" + if not inst.player_id: + return + position = self._parse_duration(target) + if position is not None: + LOGGER.info("Seeking player %s to %ds", inst.player_id, position) + await self.mass.players.cmd_seek(inst.player_id, position) + else: + LOGGER.warning("Could not parse seek target: %s", target) + + async def _on_set_volume(self, inst: RendererInstance, volume: int) -> None: + """Handle volume change for this instance's player.""" + if inst.player_id: + await self.mass.players.cmd_volume_set(inst.player_id, volume) + + async def _on_set_mute(self, inst: RendererInstance, mute: bool) -> None: + """Handle mute change for this instance's player.""" + if inst.player_id: + await self.mass.players.cmd_volume_mute(inst.player_id, mute) + + # ------------------------------------------------------------------ + # Playback state & metadata helpers + # ------------------------------------------------------------------ + + def _freeze_elapsed(self) -> None: + """Freeze elapsed time at the current playback position.""" + if self._play_start_time: + self._elapsed_offset += int(time.time() - self._play_start_time) + self._play_start_time = None + + def _clear_playback_state(self) -> None: + """Clear all playback state and metadata.""" + self._play_start_time = None + self._elapsed_offset = 0 + self._active_player_id = None + if self._plugin_source: + self._plugin_source.metadata = None + self._plugin_source.in_use_by = None + # Drop the reference after cancel so the next playback cycle creates + # a fresh task. Without this, a canceled-but-not-yet-done task still + # reports done() == False to _ensure_metadata_task for a brief + # window, and a rapid stop→play would skip restarting the loop. + if self._metadata_task is not None: + if not self._metadata_task.done(): + self._metadata_task.cancel() + self._metadata_task = None + + def _ensure_metadata_task(self) -> None: + """Start the metadata update loop if not already running.""" + if self._metadata_task and not self._metadata_task.done(): + return + self._metadata_task = asyncio.create_task(self._metadata_update_loop()) + + async def _metadata_update_loop(self) -> None: + """Periodically update elapsed time and trigger UI refresh.""" + try: + while True: + await asyncio.sleep(2) + source = self._plugin_source + if not source or not source.metadata: + break + + now = time.time() + + if self._play_start_time: + elapsed = self._elapsed_offset + int(now - self._play_start_time) + source.metadata.elapsed_time = elapsed + source.metadata.elapsed_time_last_updated = now + else: + # Paused — keep last_updated fresh to freeze UI display + source.metadata.elapsed_time = self._elapsed_offset + source.metadata.elapsed_time_last_updated = now + + # Check if player still exists and source is still active + if self._active_player_id: + player = self.mass.players.get_player(self._active_player_id) + if not player: + LOGGER.debug("Metadata loop: player %s gone", self._active_player_id) + self._clear_playback_state() + break + + # Trigger player update so UI reflects metadata changes + if self._active_player_id: + self.mass.players.trigger_player_update(self._active_player_id) + except asyncio.CancelledError: + pass + except Exception: + LOGGER.debug("Metadata update loop error", exc_info=True) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _parse_duration(value: str | None) -> int | None: + """Parse UPnP duration string (H:MM:SS or H:MM:SS.xxx) to seconds.""" + if not value: + return None + try: + parts = value.split(":") + if len(parts) == 3: + h, m, s = parts + return int(h) * 3600 + int(m) * 60 + int(float(s)) + if len(parts) == 2: + m, s = parts + return int(m) * 60 + int(float(s)) + return int(float(value)) + except (ValueError, TypeError): + return None + + @staticmethod + def _deterministic_udn(player_id: str) -> str: + """Generate a deterministic UDN from the player_id. + + Uses UUID5 so the same player always gets the same UDN, + keeping DLNA control point bookmarks stable across restarts. + """ + namespace = uuid.uuid5(uuid.NAMESPACE_URL, UDN_NAMESPACE) + return f"uuid:{uuid.uuid5(namespace, player_id or '__default__')}" diff --git a/music_assistant/providers/dlna_receiver/renderer.py b/music_assistant/providers/dlna_receiver/renderer.py new file mode 100644 index 0000000000..ba71d363a9 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/renderer.py @@ -0,0 +1,763 @@ +"""DLNA Receiver — UPnP MediaRenderer implementation. + +This module contains the HTTP server that serves UPnP device/service XML +descriptions and processes incoming SOAP control actions from DLNA +control points. Includes GENA eventing for state change notifications. +""" + +from __future__ import annotations + +import logging +import re +import uuid +from collections.abc import Awaitable, Callable +from pathlib import Path +from xml.etree.ElementTree import Element, ParseError, SubElement, fromstring, tostring +from xml.sax.saxutils import escape + +import aiohttp +from aiohttp import web + +from .constants import ( + DEFAULT_HTTP_PORT, + SUPPORTED_MIME_TYPES, + TRANSPORT_STATE_NO_MEDIA, + TRANSPORT_STATE_PAUSED, + TRANSPORT_STATE_PLAYING, + TRANSPORT_STATE_STOPPED, + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_CONNECTION_MANAGER, + UPNP_SERVICE_RENDERING_CONTROL, +) +from .eventing import EventingManager + +LOGGER = logging.getLogger(__name__) + +SCPD_DIR = Path(__file__).parent / "scpd" + +SoapCallback = Callable[..., Awaitable[None]] + +# Extra entity mapping for XML attribute values (default escape() handles only &, <, >). +_ATTR_ENTITIES = {'"': """} + +# Upper bound on SOAP body size we agree to parse (normal UPnP bodies are < 4 KiB). +_MAX_SOAP_BODY_CHARS = 64 * 1024 + +# Strip a leading XML declaration so we can safely wrap the remaining body in a +# synthetic root for ElementTree parsing. +_XML_DECLARATION_RE = re.compile(r"^\s*<\?xml[^?]*\?>\s*", re.IGNORECASE) + + +class UPnPRenderer: + """Virtual UPnP MediaRenderer with SOAP action handling.""" + + def __init__( + self, + friendly_name: str, + bind_ip: str, + http_port: int = DEFAULT_HTTP_PORT, + udn: str | None = None, + session: aiohttp.ClientSession | None = None, + ) -> None: + """Create a renderer bound to the given IP/port with a stable UDN. + + ``session`` — optional shared aiohttp session. When supplied (the + typical provider path passes ``mass.http_session``), all three + GENA eventing managers reuse the same connector/DNS cache instead + of each spinning up its own. In multi-player mode this collapses + ``3 * N`` sessions down to one. + """ + self.friendly_name = friendly_name + self.bind_ip = bind_ip + self.http_port = http_port + self.udn = udn or f"uuid:{uuid.uuid4()}" + + # Transport state + self.transport_state: str = TRANSPORT_STATE_NO_MEDIA + self.current_uri: str = "" + self.current_uri_metadata: str = "" + self.volume: int = 50 + self.mute: bool = False + + # HTTP server + self._app = web.Application() + self._runner: web.AppRunner | None = None + # Pre-load SCPD XML bytes once: each request should not do sync + # file I/O on the event loop. + self._scpd_cache: dict[str, bytes] = { + name: (SCPD_DIR / name).read_bytes() + for name in ("AVTransport.xml", "RenderingControl.xml", "ConnectionManager.xml") + } + self._setup_routes() + + # GENA eventing managers (one per service). Share the provided + # aiohttp session across all three so NOTIFY traffic reuses a + # single connector instead of spawning one per service per renderer. + self._evt_av_transport = EventingManager(session=session) + self._evt_rendering_control = EventingManager(session=session) + self._evt_connection_manager = EventingManager(session=session) + + # Callbacks (set by provider) + self.on_set_av_transport_uri: SoapCallback | None = None + self.on_play: SoapCallback | None = None + self.on_pause: SoapCallback | None = None + self.on_stop: SoapCallback | None = None + self.on_seek: SoapCallback | None = None + self.on_set_volume: SoapCallback | None = None + self.on_set_mute: SoapCallback | None = None + + def _setup_routes(self) -> None: + """Register HTTP routes for UPnP description, control, and eventing.""" + self._app.router.add_get("/description.xml", self._handle_description) + # SCPD routes + self._app.router.add_get( + "/AVTransport/description.xml", + self._handle_av_transport_scpd, + ) + self._app.router.add_get( + "/RenderingControl/description.xml", + self._handle_rendering_control_scpd, + ) + self._app.router.add_get( + "/ConnectionManager/description.xml", + self._handle_connection_manager_scpd, + ) + # SOAP control routes + self._app.router.add_post("/AVTransport/control", self._handle_av_transport) + self._app.router.add_post( + "/RenderingControl/control", + self._handle_rendering_control, + ) + self._app.router.add_post( + "/ConnectionManager/control", + self._handle_connection_manager, + ) + # GENA event subscription routes + self._app.router.add_route( + "SUBSCRIBE", + "/AVTransport/event", + self._handle_subscribe_av_transport, + ) + self._app.router.add_route( + "UNSUBSCRIBE", + "/AVTransport/event", + self._handle_unsubscribe_av_transport, + ) + self._app.router.add_route( + "SUBSCRIBE", + "/RenderingControl/event", + self._handle_subscribe_rendering_control, + ) + self._app.router.add_route( + "UNSUBSCRIBE", + "/RenderingControl/event", + self._handle_unsubscribe_rendering_control, + ) + self._app.router.add_route( + "SUBSCRIBE", + "/ConnectionManager/event", + self._handle_subscribe_connection_manager, + ) + self._app.router.add_route( + "UNSUBSCRIBE", + "/ConnectionManager/event", + self._handle_unsubscribe_connection_manager, + ) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the UPnP HTTP server and eventing managers.""" + self._runner = web.AppRunner(self._app) + await self._runner.setup() + site = web.TCPSite(self._runner, self.bind_ip, self.http_port) + await site.start() + # If the caller requested an ephemeral port (http_port == 0), learn + # the actual bound port from the runner so description_url and the + # SSDP LOCATION header advertise a routable port instead of ":0". + if self.http_port == 0: + for address in self._runner.addresses: + if isinstance(address, tuple) and len(address) >= 2: + self.http_port = int(address[1]) + break + await self._evt_av_transport.start() + await self._evt_rendering_control.start() + await self._evt_connection_manager.start() + LOGGER.info( + "UPnP renderer HTTP server listening on %s:%s", + self.bind_ip, + self.http_port, + ) + + async def stop(self) -> None: + """Stop the UPnP HTTP server and eventing managers.""" + await self._evt_av_transport.stop() + await self._evt_rendering_control.stop() + await self._evt_connection_manager.stop() + if self._runner: + await self._runner.cleanup() + self._runner = None + LOGGER.info("UPnP renderer HTTP server stopped") + + @property + def description_url(self) -> str: + """Return the device description URL. + + IPv6 literals need square brackets in URL host components + (RFC 3986 §3.2.2); without them the resulting URL would be + unparsable by strict control points consuming SSDP LOCATION. + """ + host = f"[{self.bind_ip}]" if ":" in self.bind_ip else self.bind_ip + return f"http://{host}:{self.http_port}/description.xml" + + # ------------------------------------------------------------------ + # UPnP Device Description + # ------------------------------------------------------------------ + + async def _handle_description(self, _request: web.Request) -> web.Response: + """Return the root UPnP device description XML.""" + root = Element("root", xmlns="urn:schemas-upnp-org:device-1-0") + spec = SubElement(root, "specVersion") + SubElement(spec, "major").text = "1" + SubElement(spec, "minor").text = "0" + + device = SubElement(root, "device") + SubElement(device, "deviceType").text = UPNP_DEVICE_TYPE + SubElement(device, "friendlyName").text = self.friendly_name + SubElement(device, "manufacturer").text = "Music Assistant" + SubElement(device, "modelName").text = "DLNA Receiver" + SubElement(device, "modelDescription").text = "Music Assistant DLNA Receiver Bridge" + SubElement(device, "UDN").text = self.udn + + service_list = SubElement(device, "serviceList") + for svc_type, svc_id, scpd_url, ctrl_url, event_url in [ + ( + UPNP_SERVICE_AV_TRANSPORT, + "urn:upnp-org:serviceId:AVTransport", + "/AVTransport/description.xml", + "/AVTransport/control", + "/AVTransport/event", + ), + ( + UPNP_SERVICE_RENDERING_CONTROL, + "urn:upnp-org:serviceId:RenderingControl", + "/RenderingControl/description.xml", + "/RenderingControl/control", + "/RenderingControl/event", + ), + ( + UPNP_SERVICE_CONNECTION_MANAGER, + "urn:upnp-org:serviceId:ConnectionManager", + "/ConnectionManager/description.xml", + "/ConnectionManager/control", + "/ConnectionManager/event", + ), + ]: + svc = SubElement(service_list, "service") + SubElement(svc, "serviceType").text = svc_type + SubElement(svc, "serviceId").text = svc_id + SubElement(svc, "SCPDURL").text = scpd_url + SubElement(svc, "controlURL").text = ctrl_url + SubElement(svc, "eventSubURL").text = event_url + + xml_bytes = b'' + tostring(root, encoding="unicode").encode() + return web.Response(body=xml_bytes, content_type="text/xml") + + # ------------------------------------------------------------------ + # Service SCPDs (served from static XML files) + # ------------------------------------------------------------------ + + async def _handle_av_transport_scpd(self, _request: web.Request) -> web.Response: + """Return AVTransport service description.""" + return self._serve_scpd("AVTransport.xml") + + async def _handle_rendering_control_scpd( + self, + _request: web.Request, + ) -> web.Response: + """Return RenderingControl service description.""" + return self._serve_scpd("RenderingControl.xml") + + async def _handle_connection_manager_scpd( + self, + _request: web.Request, + ) -> web.Response: + """Return ConnectionManager service description.""" + return self._serve_scpd("ConnectionManager.xml") + + def _serve_scpd(self, filename: str) -> web.Response: + """Serve a SCPD XML file from the startup-populated cache.""" + return web.Response(body=self._scpd_cache[filename], content_type="text/xml") + + # ------------------------------------------------------------------ + # SOAP Action Handlers + # ------------------------------------------------------------------ + + async def _handle_av_transport(self, request: web.Request) -> web.Response: + """Handle AVTransport SOAP actions.""" + body = await request.text() + soap_action = request.headers.get("SOAPACTION", "").strip('"') + action_name = soap_action.rsplit("#", 1)[-1] if "#" in soap_action else "" + LOGGER.debug("AVTransport action: %s", action_name) + + if action_name == "SetAVTransportURI": + uri = self._extract_xml_value(body, "CurrentURI") or "" + metadata = self._extract_xml_value(body, "CurrentURIMetaData") + LOGGER.debug("SetAVTransportURI raw metadata (first 500): %s", (metadata or "")[:500]) + # Validate before mutating state: if the callback rejects the URI + # (raises ValueError), keep the prior transport state intact and + # surface a SOAP fault so the control point knows it was refused, + # instead of returning 200 OK and silently ignoring the request. + if self.on_set_av_transport_uri: + try: + await self.on_set_av_transport_uri(uri, metadata) + except ValueError as exc: + LOGGER.info("SetAVTransportURI rejected: %s", exc) + return self._soap_error(716, "Illegal URI") + self.current_uri = uri + self.current_uri_metadata = metadata or "" + self.transport_state = TRANSPORT_STATE_STOPPED + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Play": + self.transport_state = TRANSPORT_STATE_PLAYING + if self.on_play: + await self.on_play() + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Pause": + self.transport_state = TRANSPORT_STATE_PAUSED + if self.on_pause: + await self.on_pause() + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Stop": + self.transport_state = TRANSPORT_STATE_STOPPED + if self.on_stop: + await self.on_stop() + await self._notify_av_transport_change() + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "Seek": + unit = self._extract_xml_value(body, "Unit") or "" + target = self._extract_xml_value(body, "Target") or "" + LOGGER.info("Seek requested: Unit=%s, Target=%s", unit, target) + if self.on_seek: + await self.on_seek(unit, target) + return self._soap_response(action_name, UPNP_SERVICE_AV_TRANSPORT) + + if action_name == "GetTransportInfo": + return self._soap_response( + action_name, + UPNP_SERVICE_AV_TRANSPORT, + { + "CurrentTransportState": self.transport_state, + "CurrentTransportStatus": "OK", + "CurrentSpeed": "1", + }, + ) + + if action_name == "GetPositionInfo": + return self._soap_response( + action_name, + UPNP_SERVICE_AV_TRANSPORT, + { + "Track": "1", + "TrackDuration": "00:00:00", + "TrackMetaData": self.current_uri_metadata, + "TrackURI": self.current_uri, + "RelTime": "00:00:00", + "AbsTime": "00:00:00", + "RelCount": "0", + "AbsCount": "0", + }, + ) + + if action_name == "GetMediaInfo": + return self._soap_response( + action_name, + UPNP_SERVICE_AV_TRANSPORT, + { + "NrTracks": "1", + "MediaDuration": "00:00:00", + "CurrentURI": self.current_uri, + "CurrentURIMetaData": self.current_uri_metadata, + "NextURI": "", + "NextURIMetaData": "", + "PlayMedium": "NETWORK", + "RecordMedium": "NOT_IMPLEMENTED", + "WriteStatus": "NOT_IMPLEMENTED", + }, + ) + + LOGGER.warning("Unhandled AVTransport action: %s", action_name) + return self._soap_error(401, "Invalid Action") + + async def _handle_rendering_control(self, request: web.Request) -> web.Response: + """Handle RenderingControl SOAP actions.""" + body = await request.text() + soap_action = request.headers.get("SOAPACTION", "").strip('"') + action_name = soap_action.rsplit("#", 1)[-1] if "#" in soap_action else "" + LOGGER.debug("RenderingControl action: %s", action_name) + + if action_name == "GetVolume": + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + {"CurrentVolume": str(self.volume)}, + ) + + if action_name == "SetVolume": + vol_str = self._extract_xml_value(body, "DesiredVolume") + if vol_str is not None: + try: + vol = int(vol_str.strip()) + except (ValueError, TypeError): + LOGGER.warning("Invalid DesiredVolume value: %r", vol_str) + return self._soap_error(402, "Invalid Args") + self.volume = max(0, min(100, vol)) + if self.on_set_volume: + await self.on_set_volume(self.volume) + await self._notify_rendering_control_change() + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + ) + + if action_name == "GetMute": + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + {"CurrentMute": "1" if self.mute else "0"}, + ) + + if action_name == "SetMute": + mute_str = self._extract_xml_value(body, "DesiredMute") + if mute_str is not None: + self.mute = mute_str.strip().lower() in {"1", "true", "yes"} + if self.on_set_mute: + await self.on_set_mute(self.mute) + await self._notify_rendering_control_change() + return self._soap_response( + action_name, + UPNP_SERVICE_RENDERING_CONTROL, + ) + + LOGGER.warning("Unhandled RenderingControl action: %s", action_name) + return self._soap_error(401, "Invalid Action") + + async def _handle_connection_manager(self, request: web.Request) -> web.Response: + """Handle ConnectionManager SOAP actions.""" + soap_action = request.headers.get("SOAPACTION", "").strip('"') + action_name = soap_action.rsplit("#", 1)[-1] if "#" in soap_action else "" + LOGGER.debug("ConnectionManager action: %s", action_name) + + if action_name == "GetProtocolInfo": + sink_protocols = ",".join(f"http-get:*:{mime}:*" for mime in SUPPORTED_MIME_TYPES) + return self._soap_response( + action_name, + UPNP_SERVICE_CONNECTION_MANAGER, + {"Source": "", "Sink": sink_protocols}, + ) + + if action_name == "GetCurrentConnectionIDs": + return self._soap_response( + action_name, + UPNP_SERVICE_CONNECTION_MANAGER, + {"ConnectionIDs": "0"}, + ) + + if action_name == "GetCurrentConnectionInfo": + sink_protocols = ",".join(f"http-get:*:{mime}:*" for mime in SUPPORTED_MIME_TYPES) + return self._soap_response( + action_name, + UPNP_SERVICE_CONNECTION_MANAGER, + { + "RcsID": "0", + "AVTransportID": "0", + "ProtocolInfo": sink_protocols, + "PeerConnectionManager": "", + "PeerConnectionID": "-1", + "Direction": "Input", + "Status": "OK", + }, + ) + + LOGGER.warning("Unhandled ConnectionManager action: %s", action_name) + return self._soap_error(401, "Invalid Action") + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _extract_xml_value(xml_str: str, tag: str) -> str | None: + """Extract a value from a SOAP XML body by tag name. + + Accepts fragments (tests) or full envelopes: strips a leading + ```` declaration, wraps the remainder in a synthetic + root so ElementTree can always parse it, and searches + namespace-agnostically via the ``{*}tag`` wildcard. + + Defence-in-depth: rejects oversized bodies and anything carrying a + DOCTYPE/ENTITY declaration, since we parse untrusted LAN input with + the stdlib parser (defusedxml is not yet a dependency). + """ + if len(xml_str) > _MAX_SOAP_BODY_CHARS: + return None + lowered = xml_str.lower() + if "{body}") # noqa: S314 + except ParseError: + return None + elem = root.find(f".//{{*}}{tag}") + if elem is None: + return None + return elem.text or "" + + @staticmethod + def _soap_response( + action_name: str, + service_type: str, + values: dict[str, str] | None = None, + ) -> web.Response: + """Build a UPnP SOAP response envelope.""" + body = f""" + + + """ + if values: + for key, val in values.items(): + body += f"\n <{key}>{escape(val)}" + body += f""" + + +""" + return web.Response(body=body, content_type="text/xml", charset="utf-8") + + @staticmethod + def _soap_error(code: int, description: str) -> web.Response: + """Build a UPnP SOAP error response.""" + body = f""" + + + + s:Client + UPnPError + + + {code} + {escape(description)} + + + + +""" + return web.Response( + body=body, + status=500, + content_type="text/xml", + charset="utf-8", + ) + + # ------------------------------------------------------------------ + # GENA Event Subscription Handlers + # ------------------------------------------------------------------ + + async def _handle_subscribe_av_transport( + self, + request: web.Request, + ) -> web.Response: + """Handle SUBSCRIBE for AVTransport events.""" + return await self._handle_subscribe( + request, + self._evt_av_transport, + self._get_av_transport_vars(), + ) + + async def _handle_unsubscribe_av_transport( + self, + request: web.Request, + ) -> web.Response: + """Handle UNSUBSCRIBE for AVTransport events.""" + return self._handle_unsubscribe(request, self._evt_av_transport) + + async def _handle_subscribe_rendering_control( + self, + request: web.Request, + ) -> web.Response: + """Handle SUBSCRIBE for RenderingControl events.""" + return await self._handle_subscribe( + request, + self._evt_rendering_control, + self._get_rendering_control_vars(), + ) + + async def _handle_unsubscribe_rendering_control( + self, + request: web.Request, + ) -> web.Response: + """Handle UNSUBSCRIBE for RenderingControl events.""" + return self._handle_unsubscribe(request, self._evt_rendering_control) + + async def _handle_subscribe_connection_manager( + self, + request: web.Request, + ) -> web.Response: + """Handle SUBSCRIBE for ConnectionManager events.""" + sink_protocols = ",".join(f"http-get:*:{mime}:*" for mime in SUPPORTED_MIME_TYPES) + initial_vars = { + "SourceProtocolInfo": "", + "SinkProtocolInfo": sink_protocols, + "CurrentConnectionIDs": "0", + } + return await self._handle_subscribe( + request, + self._evt_connection_manager, + initial_vars, + ) + + async def _handle_unsubscribe_connection_manager( + self, + request: web.Request, + ) -> web.Response: + """Handle UNSUBSCRIBE for ConnectionManager events.""" + return self._handle_unsubscribe(request, self._evt_connection_manager) + + async def _handle_subscribe( + self, + request: web.Request, + manager: EventingManager, + initial_vars: dict[str, str], + ) -> web.Response: + """Handle SUBSCRIBE requests for a UPnP service.""" + sid = request.headers.get("SID") + + if sid: + # Renewal + try: + timeout = manager.renew( + sid, + request.headers.get("TIMEOUT"), + ) + except KeyError: + return web.Response(status=412, text="Invalid SID") + return web.Response( + status=200, + headers={ + "SID": sid, + "TIMEOUT": f"Second-{timeout}", + }, + ) + + # New subscription + callback = request.headers.get("CALLBACK") + if not callback: + return web.Response(status=412, text="Missing CALLBACK header") + + try: + sid, timeout = manager.subscribe( + callback, + request.headers.get("TIMEOUT"), + ) + except ValueError as exc: + return web.Response(status=412, text=str(exc)) + + # Send initial event with current state + await manager.notify_initial(sid, initial_vars) + + return web.Response( + status=200, + headers={ + "SID": sid, + "TIMEOUT": f"Second-{timeout}", + "Server": "UPnP/1.0 MusicAssistant/1.0", + }, + ) + + @staticmethod + def _handle_unsubscribe( + request: web.Request, + manager: EventingManager, + ) -> web.Response: + """Handle UNSUBSCRIBE requests for a UPnP service.""" + sid = request.headers.get("SID") + if not sid: + return web.Response(status=412, text="Missing SID header") + manager.unsubscribe(sid) + return web.Response(status=200) + + # ------------------------------------------------------------------ + # Event Notification Helpers + # ------------------------------------------------------------------ + + def _get_av_transport_vars(self) -> dict[str, str]: + """Get current AVTransport state as a LastChange XML fragment.""" + last_change = self._build_last_change( + "urn:schemas-upnp-org:service:AVTransport:1", + { + "TransportState": self.transport_state, + "TransportStatus": "OK", + "TransportPlaySpeed": "1", + "CurrentTrackURI": self.current_uri, + "AVTransportURI": self.current_uri, + "AVTransportURIMetaData": self.current_uri_metadata, + "CurrentTrackMetaData": self.current_uri_metadata, + }, + ) + return {"LastChange": last_change} + + def _get_rendering_control_vars(self) -> dict[str, str]: + """Get current RenderingControl state as a LastChange XML fragment.""" + last_change = self._build_last_change( + "urn:schemas-upnp-org:service:RenderingControl:1", + { + "Volume": str(self.volume), + "Mute": "1" if self.mute else "0", + }, + channel="Master", + ) + return {"LastChange": last_change} + + async def _notify_av_transport_change(self) -> None: + """Notify AVTransport subscribers of state changes.""" + await self._evt_av_transport.notify(self._get_av_transport_vars()) + + async def _notify_rendering_control_change(self) -> None: + """Notify RenderingControl subscribers of state changes.""" + await self._evt_rendering_control.notify( + self._get_rendering_control_vars(), + ) + + @staticmethod + def _build_last_change( + namespace: str, + variables: dict[str, str], + channel: str | None = None, + ) -> str: + """Build a LastChange XML value for GENA eventing. + + The LastChange event wraps state variable changes in an + structure as required by UPnP spec. + """ + parts: list[str] = [] + for name, value in variables.items(): + attrs = f'val="{escape(value, _ATTR_ENTITIES)}"' + if channel: + attrs += f' channel="{channel}"' + parts.append(f"<{name} {attrs}/>") + + return ( + f'{"".join(parts)}' + ) diff --git a/music_assistant/providers/dlna_receiver/scpd/AVTransport.xml b/music_assistant/providers/dlna_receiver/scpd/AVTransport.xml new file mode 100644 index 0000000000..3cc047d0a5 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/scpd/AVTransport.xml @@ -0,0 +1,339 @@ + + + + 1 + 0 + + + + SetAVTransportURI + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + CurrentURI + in + AVTransportURI + + + CurrentURIMetaData + in + AVTransportURIMetaData + + + + + GetTransportInfo + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + CurrentTransportState + out + TransportState + + + CurrentTransportStatus + out + TransportStatus + + + CurrentSpeed + out + TransportPlaySpeed + + + + + Play + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Speed + in + TransportPlaySpeed + + + + + Pause + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + + + Stop + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + + + Seek + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Unit + in + A_ARG_TYPE_SeekMode + + + Target + in + A_ARG_TYPE_SeekTarget + + + + + GetPositionInfo + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Track + out + CurrentTrack + + + TrackDuration + out + CurrentTrackDuration + + + TrackMetaData + out + CurrentTrackMetaData + + + TrackURI + out + CurrentTrackURI + + + RelTime + out + RelativeTimePosition + + + AbsTime + out + AbsoluteTimePosition + + + RelCount + out + RelativeCounterPosition + + + AbsCount + out + AbsoluteCounterPosition + + + + + GetMediaInfo + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + NrTracks + out + NumberOfTracks + + + MediaDuration + out + CurrentMediaDuration + + + CurrentURI + out + AVTransportURI + + + CurrentURIMetaData + out + AVTransportURIMetaData + + + NextURI + out + NextAVTransportURI + + + NextURIMetaData + out + NextAVTransportURIMetaData + + + PlayMedium + out + PlaybackStorageMedium + + + RecordMedium + out + RecordStorageMedium + + + WriteStatus + out + RecordMediumWriteStatus + + + + + + + A_ARG_TYPE_InstanceID + ui4 + + + A_ARG_TYPE_SeekMode + string + + REL_TIME + TRACK_NR + + + + A_ARG_TYPE_SeekTarget + string + + + TransportState + string + + STOPPED + PLAYING + TRANSITIONING + PAUSED_PLAYBACK + NO_MEDIA_PRESENT + + + + TransportStatus + string + + OK + ERROR_OCCURRED + + + + TransportPlaySpeed + string + 1 + + + CurrentTrack + ui4 + 0 + + + CurrentTrackDuration + string + 00:00:00 + + + CurrentTrackMetaData + string + + + CurrentTrackURI + string + + + AVTransportURI + string + + + AVTransportURIMetaData + string + + + NextAVTransportURI + string + + + NextAVTransportURIMetaData + string + + + RelativeTimePosition + string + 00:00:00 + + + AbsoluteTimePosition + string + 00:00:00 + + + RelativeCounterPosition + i4 + 0 + + + AbsoluteCounterPosition + i4 + 0 + + + NumberOfTracks + ui4 + 0 + + + CurrentMediaDuration + string + 00:00:00 + + + PlaybackStorageMedium + string + NETWORK + + + RecordStorageMedium + string + NOT_IMPLEMENTED + + + RecordMediumWriteStatus + string + NOT_IMPLEMENTED + + + LastChange + string + + + diff --git a/music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml b/music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml new file mode 100644 index 0000000000..8f1fd80ca7 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/scpd/ConnectionManager.xml @@ -0,0 +1,133 @@ + + + + 1 + 0 + + + + GetProtocolInfo + + + Source + out + SourceProtocolInfo + + + Sink + out + SinkProtocolInfo + + + + + GetCurrentConnectionIDs + + + ConnectionIDs + out + CurrentConnectionIDs + + + + + GetCurrentConnectionInfo + + + ConnectionID + in + A_ARG_TYPE_ConnectionID + + + RcsID + out + A_ARG_TYPE_RcsID + + + AVTransportID + out + A_ARG_TYPE_AVTransportID + + + ProtocolInfo + out + A_ARG_TYPE_ProtocolInfo + + + PeerConnectionManager + out + A_ARG_TYPE_ConnectionManager + + + PeerConnectionID + out + A_ARG_TYPE_ConnectionID + + + Direction + out + A_ARG_TYPE_Direction + + + Status + out + A_ARG_TYPE_ConnectionStatus + + + + + + + SourceProtocolInfo + string + + + SinkProtocolInfo + string + + + CurrentConnectionIDs + string + 0 + + + A_ARG_TYPE_ConnectionID + i4 + + + A_ARG_TYPE_RcsID + i4 + + + A_ARG_TYPE_AVTransportID + i4 + + + A_ARG_TYPE_ProtocolInfo + string + + + A_ARG_TYPE_ConnectionManager + string + + + A_ARG_TYPE_Direction + string + + Input + Output + + + + A_ARG_TYPE_ConnectionStatus + string + + OK + ContentFormatMismatch + InsufficientBandwidth + UnreliableChannel + Unknown + + + + diff --git a/music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml b/music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml new file mode 100644 index 0000000000..7f470ba7d2 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/scpd/RenderingControl.xml @@ -0,0 +1,121 @@ + + + + 1 + 0 + + + + GetVolume + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + CurrentVolume + out + Volume + + + + + SetVolume + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + DesiredVolume + in + Volume + + + + + GetMute + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + CurrentMute + out + Mute + + + + + SetMute + + + InstanceID + in + A_ARG_TYPE_InstanceID + + + Channel + in + A_ARG_TYPE_Channel + + + DesiredMute + in + Mute + + + + + + + A_ARG_TYPE_InstanceID + ui4 + + + A_ARG_TYPE_Channel + string + + Master + + + + Volume + ui2 + + 0 + 100 + 1 + + 50 + + + Mute + boolean + 0 + + + LastChange + string + + + diff --git a/music_assistant/providers/dlna_receiver/ssdp.py b/music_assistant/providers/dlna_receiver/ssdp.py new file mode 100644 index 0000000000..a140621e55 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/ssdp.py @@ -0,0 +1,321 @@ +"""DLNA Receiver — SSDP advertisement for the virtual MediaRenderer.""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +import secrets +import socket + +from .constants import ( + SSDP_MAX_AGE, + SSDP_MULTICAST_ADDR, + SSDP_PORT, + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_CONNECTION_MANAGER, + UPNP_SERVICE_RENDERING_CONTROL, +) + +LOGGER = logging.getLogger(__name__) + +# UPnP spec allows MX up to 120, but honoring that would stall discovery; +# cap at 5 s so control points still see our response within their window. +_MX_MAX_SECONDS = 5 + + +class SSDPAdvertiser: + """Advertise a UPnP MediaRenderer via SSDP on the local network.""" + + def __init__( + self, + udn: str, + description_url: str, + bind_ip: str, + ) -> None: + """Store config; sockets and tasks are created in start().""" + self.udn = udn + self.description_url = description_url + self.bind_ip = bind_ip + self._transport: asyncio.DatagramTransport | None = None + self._recv_transport: asyncio.DatagramTransport | None = None + self._advertise_task: asyncio.Task[None] | None = None + self._pending_responses: set[asyncio.Task[None]] = set() + self._running = False + + async def start(self) -> None: + """Start SSDP advertisement.""" + self._running = True + loop = asyncio.get_running_loop() + + # Sending socket — for NOTIFY alive/byebye to multicast group + send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + send_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + send_sock.setsockopt( + socket.IPPROTO_IP, + socket.IP_MULTICAST_IF, + socket.inet_aton(self.bind_ip), + ) + send_sock.setblocking(False) + self._transport, _ = await loop.create_datagram_endpoint( + _SSDPSendProtocol, + sock=send_sock, + ) + + # Receiving socket — join multicast group on port 1900 for M-SEARCH. + # In multi-player mode each instance binds its own ("", 1900) socket, + # which only works if SO_REUSEPORT is available: track whether we + # managed to enable it so we can turn the subsequent bind() error + # into a clear, actionable message instead of a raw EADDRINUSE. + recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + reuse_port_enabled = False + try: + recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + reuse_port_enabled = True + except (AttributeError, OSError): + reuse_port_enabled = False + try: + recv_sock.bind(("", SSDP_PORT)) + except OSError as err: + recv_sock.close() + # EADDRINUSE: 48 on macOS/BSD, 98 on Linux, 10048 on Windows. + if not reuse_port_enabled and err.errno in (48, 98, 10048): + msg = ( + f"Unable to bind SSDP receive socket to port {SSDP_PORT}: " + "this platform does not support sharing the SSDP port " + "across multiple DLNA receiver instances because " + "SO_REUSEPORT could not be enabled. Run with a single " + "target player, or use a platform that supports " + "SO_REUSEPORT." + ) + raise RuntimeError(msg) from err + raise + mreq = socket.inet_aton(SSDP_MULTICAST_ADDR) + socket.inet_aton(self.bind_ip) + recv_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + recv_sock.setblocking(False) + self._recv_transport, _ = await loop.create_datagram_endpoint( + lambda: _SSDPRecvProtocol(self), + sock=recv_sock, + ) + + # Send initial alive notifications + await self._send_alive() + + # Periodic re-advertisement + self._advertise_task = asyncio.create_task(self._periodic_alive()) + LOGGER.info("SSDP advertiser started for %s", self.udn) + + async def stop(self) -> None: + """Stop SSDP advertisement and send byebye.""" + self._running = False + if self._advertise_task: + advertise_task = self._advertise_task + self._advertise_task = None + advertise_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await advertise_task + if self._pending_responses: + # Snapshot: done-callbacks mutate self._pending_responses. + pending = list(self._pending_responses) + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + self._pending_responses.clear() + await self._send_byebye() + if self._recv_transport: + self._recv_transport.close() + self._recv_transport = None + if self._transport: + self._transport.close() + self._transport = None + LOGGER.info("SSDP advertiser stopped") + + async def _periodic_alive(self) -> None: + """Re-send alive notifications periodically.""" + while self._running: + await asyncio.sleep(SSDP_MAX_AGE // 2) + if self._running: + await self._send_alive() + + async def _send_alive(self) -> None: + """Send ssdp:alive NOTIFY messages for all service types.""" + notification_types = [ + "upnp:rootdevice", + self.udn, + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_RENDERING_CONTROL, + UPNP_SERVICE_CONNECTION_MANAGER, + ] + for nt in notification_types: + usn = f"{self.udn}::{nt}" if nt != self.udn else self.udn + message = ( + "NOTIFY * HTTP/1.1\r\n" + f"HOST: {SSDP_MULTICAST_ADDR}:{SSDP_PORT}\r\n" + f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}\r\n" + f"LOCATION: {self.description_url}\r\n" + f"NT: {nt}\r\n" + "NTS: ssdp:alive\r\n" + f"SERVER: Music Assistant DLNA Receiver/1.0 UPnP/1.0\r\n" + f"USN: {usn}\r\n" + "\r\n" + ) + self._send_datagram(message) + LOGGER.debug("Sent SSDP alive notifications") + + async def _send_byebye(self) -> None: + """Send ssdp:byebye NOTIFY messages.""" + notification_types = [ + "upnp:rootdevice", + self.udn, + UPNP_DEVICE_TYPE, + ] + for nt in notification_types: + usn = f"{self.udn}::{nt}" if nt != self.udn else self.udn + message = ( + "NOTIFY * HTTP/1.1\r\n" + f"HOST: {SSDP_MULTICAST_ADDR}:{SSDP_PORT}\r\n" + f"NT: {nt}\r\n" + "NTS: ssdp:byebye\r\n" + f"USN: {usn}\r\n" + "\r\n" + ) + self._send_datagram(message) + LOGGER.debug("Sent SSDP byebye notifications") + + def _send_datagram(self, message: str) -> None: + """Send a UDP datagram to the SSDP multicast address.""" + if self._transport and not self._transport.is_closing(): + self._transport.sendto( + message.encode("utf-8"), + (SSDP_MULTICAST_ADDR, SSDP_PORT), + ) + + def handle_search(self, data: bytes, addr: tuple[str, int]) -> None: + """Respond to M-SEARCH requests matching our device/service types. + + Per UPnP Device Architecture 1.1 §1.3.3, the responder must wait a + random interval in [0, MX] seconds before replying so a roomful of + devices doesn't flood the requester simultaneously. We cap MX at + ``_MX_MAX_SECONDS`` to keep discovery snappy. + """ + text = data.decode("utf-8", errors="ignore") + if "M-SEARCH" not in text: + return + + st = "" + mx_raw = "" + for line in text.splitlines(): + upper = line.upper() + if upper.startswith("ST:"): + st = line.split(":", 1)[1].strip() + elif upper.startswith("MX:"): + mx_raw = line.split(":", 1)[1].strip() + + # Check if the search target matches any of our types + match_targets = { + "ssdp:all", + "upnp:rootdevice", + UPNP_DEVICE_TYPE, + UPNP_SERVICE_AV_TRANSPORT, + UPNP_SERVICE_RENDERING_CONTROL, + UPNP_SERVICE_CONNECTION_MANAGER, + self.udn, + } + if st not in match_targets: + return + + usn = f"{self.udn}::{st}" if st != self.udn else self.udn + response = ( + "HTTP/1.1 200 OK\r\n" + f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}\r\n" + f"LOCATION: {self.description_url}\r\n" + f"ST: {st}\r\n" + f"USN: {usn}\r\n" + f"SERVER: Music Assistant DLNA Receiver/1.0 UPnP/1.0\r\n" + "\r\n" + ).encode() + + delay = _parse_mx_delay(mx_raw) + if delay <= 0: + self._send_response(response, addr, st) + return + try: + task: asyncio.Task[None] = asyncio.create_task( + self._delayed_response(response, addr, st, delay), + ) + except RuntimeError: + # No running loop (e.g. invoked from a test) — send immediately. + self._send_response(response, addr, st) + return + self._pending_responses.add(task) + task.add_done_callback(self._pending_responses.discard) + + async def _delayed_response( + self, + response: bytes, + addr: tuple[str, int], + st: str, + delay: float, + ) -> None: + """Sleep for the MX-derived jitter then send the prepared response.""" + try: + await asyncio.sleep(delay) + except asyncio.CancelledError: + return + self._send_response(response, addr, st) + + def _send_response(self, response: bytes, addr: tuple[str, int], st: str) -> None: + """Send a prebuilt M-SEARCH response datagram to the requester.""" + if self._recv_transport and not self._recv_transport.is_closing(): + self._recv_transport.sendto(response, addr) + LOGGER.debug("Responded to M-SEARCH from %s for %s", addr, st) + + +def _parse_mx_delay(header: str) -> float: + """Parse an SSDP ``MX:`` header into a jitter delay in seconds. + + Returns 0.0 if the header is missing, malformed, or non-positive — in + which case we respond immediately. A valid MX is clamped to + ``_MX_MAX_SECONDS`` and then multiplied by a uniform random in [0, 1) + using ``secrets`` (we don't need cryptographic strength, but ``secrets`` + avoids pulling the ``random`` module just for this one spot and the + quality is more than adequate for jitter). + """ + if not header: + return 0.0 + try: + mx = int(header) + except ValueError: + return 0.0 + if mx <= 0: + return 0.0 + capped = min(mx, _MX_MAX_SECONDS) + # randbelow(1000)/1000 gives a float in [0, 1) without importing `random`. + return capped * (secrets.randbelow(1000) / 1000) + + +class _SSDPSendProtocol(asyncio.DatagramProtocol): + """Asyncio UDP protocol for the SSDP sending socket (NOTIFY).""" + + def error_received(self, exc: Exception) -> None: + """Handle transport errors.""" + LOGGER.warning("SSDP send transport error: %s", exc) + + +class _SSDPRecvProtocol(asyncio.DatagramProtocol): + """Asyncio UDP protocol for receiving SSDP M-SEARCH requests.""" + + def __init__(self, advertiser: SSDPAdvertiser) -> None: + """Hold a reference to the advertiser that receives M-SEARCH datagrams.""" + self._advertiser = advertiser + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + """Handle incoming UDP datagrams.""" + self._advertiser.handle_search(data, addr) + + def error_received(self, exc: Exception) -> None: + """Handle transport errors.""" + LOGGER.warning("SSDP recv transport error: %s", exc) diff --git a/music_assistant/providers/dlna_receiver/urls.py b/music_assistant/providers/dlna_receiver/urls.py new file mode 100644 index 0000000000..0fd3662be1 --- /dev/null +++ b/music_assistant/providers/dlna_receiver/urls.py @@ -0,0 +1,57 @@ +"""URL helpers shared by provider and renderer. + +Lives in its own module (no Music Assistant dependency) so its pure, +security-sensitive helpers can be exercised by unit tests without +requiring the full MA runtime to be importable. +""" + +from __future__ import annotations + +from urllib.parse import urlsplit, urlunsplit + +ALLOWED_STREAM_SCHEMES = frozenset({"http", "https"}) + + +def validate_stream_url(uri: str) -> str | None: + """Return the URI if it is a safe http(s) stream URL, else None.""" + if not uri: + return None + try: + parts = urlsplit(uri) + except ValueError: + return None + if parts.scheme.lower() not in ALLOWED_STREAM_SCHEMES: + return None + if not parts.hostname: + return None + return uri + + +def redact_url(uri: str) -> str: + """Return a log-safe copy of a URL: strip userinfo and query/fragment. + + The query string on DLNA/streaming URLs commonly carries bearer tokens + (``?token=...``), pre-signed GET parameters (``?sig=...&expires=...``), + session keys, etc.; logging them would defeat the point of redacting + userinfo. Drop both query and fragment unconditionally and replace any + ``user[:pass]@`` with ``***@`` — callers only need something the + operator can correlate, not a replayable URL. + """ + try: + parts = urlsplit(uri) + except ValueError: + return "" + host = parts.hostname or "" + # urlsplit strips the enclosing brackets from IPv6 hostnames; restore them + # so the reconstructed URL is syntactically valid. + if ":" in host: + host = f"[{host}]" + netloc = host + if parts.port: + netloc = f"{netloc}:{parts.port}" + if parts.username or parts.password: + netloc = f"***@{netloc}" + # Drop query + fragment (positions 3 and 4) regardless of whether + # userinfo was present — query params are the common source of + # accidental secret leakage into logs. + return urlunsplit((parts.scheme, netloc, parts.path, "", "")) diff --git a/tests/providers/dlna_receiver/__init__.py b/tests/providers/dlna_receiver/__init__.py new file mode 100644 index 0000000000..944595c8b9 --- /dev/null +++ b/tests/providers/dlna_receiver/__init__.py @@ -0,0 +1 @@ +"""Tests for the DLNA Receiver provider.""" diff --git a/tests/providers/dlna_receiver/test_eventing.py b/tests/providers/dlna_receiver/test_eventing.py new file mode 100644 index 0000000000..22d472ab56 --- /dev/null +++ b/tests/providers/dlna_receiver/test_eventing.py @@ -0,0 +1,179 @@ +"""Tests for the GENA eventing module.""" + +from __future__ import annotations + +import pytest + +from music_assistant.providers.dlna_receiver.eventing import EventingManager + + +@pytest.fixture +def manager() -> EventingManager: + """Create a fresh eventing manager.""" + return EventingManager() + + +def test_subscribe_returns_sid_and_timeout(manager: EventingManager) -> None: + """subscribe() returns a uuid SID and the default timeout.""" + sid, timeout = manager.subscribe("") + assert sid.startswith("uuid:") + assert timeout == 1800 + + +def test_subscribe_custom_timeout(manager: EventingManager) -> None: + """subscribe() honors a custom Second-N timeout header.""" + _sid, timeout = manager.subscribe( + "", + "Second-300", + ) + assert timeout == 300 + + +def test_subscribe_multiple_callbacks(manager: EventingManager) -> None: + """subscribe() stores all callback URLs from a multi-URL CALLBACK header.""" + sid, _ = manager.subscribe( + "", + ) + sub = manager._subscriptions[sid] + assert len(sub.callback_urls) == 2 + + +def test_subscribe_no_callback_raises(manager: EventingManager) -> None: + """subscribe() rejects an empty CALLBACK header.""" + with pytest.raises(ValueError, match="No valid callback URLs"): + manager.subscribe("") + + +def test_unsubscribe(manager: EventingManager) -> None: + """unsubscribe() removes the subscription by SID.""" + sid, _ = manager.subscribe("") + assert sid in manager._subscriptions + manager.unsubscribe(sid) + assert sid not in manager._subscriptions + + +def test_unsubscribe_unknown_is_noop(manager: EventingManager) -> None: + """unsubscribe() on an unknown SID is a no-op rather than an error.""" + manager.unsubscribe("uuid:nonexistent") # should not raise + + +def test_renew(manager: EventingManager) -> None: + """renew() updates the timeout for an active subscription.""" + sid, _ = manager.subscribe("", "Second-100") + new_timeout = manager.renew(sid, "Second-600") + assert new_timeout == 600 + + +def test_renew_unknown_raises(manager: EventingManager) -> None: + """renew() on an unknown SID raises KeyError (412 Precondition Failed).""" + with pytest.raises(KeyError): + manager.renew("uuid:nonexistent") + + +def test_renew_expired_raises_and_removes(manager: EventingManager) -> None: + """renew() on an expired SID raises KeyError AND evicts the stale entry. + + Per UPnP spec, renewing an expired subscription must fail with 412 + Precondition Failed — the renderer surfaces the KeyError as 412, and + the manager must not keep the dead subscription around. + """ + sid, _ = manager.subscribe("", "Second-100") + # Force expiry by backdating the subscription's creation timestamp. + manager._subscriptions[sid].created_at -= 1000 + assert manager._subscriptions[sid].is_expired + + with pytest.raises(KeyError): + manager.renew(sid, "Second-1800") + + assert sid not in manager._subscriptions + + +def test_parse_callback_header() -> None: + """_parse_callback_header splits angle-bracketed URLs into a list.""" + urls = EventingManager._parse_callback_header( + "", + ) + assert urls == ["http://192.168.1.5:8080/event", "http://10.0.0.1:9000/ev"] + + +def test_parse_callback_header_single() -> None: + """_parse_callback_header handles a single URL.""" + urls = EventingManager._parse_callback_header("") + assert urls == ["http://host:1234/cb"] + + +def test_parse_callback_header_rejects_non_http_schemes() -> None: + """Only http:// and https:// schemes are valid GENA CALLBACK URLs.""" + urls = EventingManager._parse_callback_header( + "" + "", + ) + assert urls == ["http://ok/cb", "https://secure/cb"] + + +def test_parse_timeout_default() -> None: + """_parse_timeout falls back to the default when header is missing or empty.""" + assert EventingManager._parse_timeout(None) == 1800 + assert EventingManager._parse_timeout("") == 1800 + + +def test_parse_timeout_infinite() -> None: + """_parse_timeout maps 'infinite' to the default timeout.""" + assert EventingManager._parse_timeout("infinite") == 1800 + + +def test_parse_timeout_seconds() -> None: + """_parse_timeout extracts the integer from a 'Second-N' header.""" + assert EventingManager._parse_timeout("Second-300") == 300 + assert EventingManager._parse_timeout("Second-7200") == 7200 + + +def test_build_propertyset() -> None: + """_build_propertyset wraps variables in GENA XML structure.""" + xml = EventingManager._build_propertyset({"Volume": "75", "Mute": "0"}) + assert "e:propertyset" in xml + assert "75" in xml + assert "0" in xml + + +def test_build_propertyset_escapes_values() -> None: + """_build_propertyset escapes XML-special characters in values.""" + xml = EventingManager._build_propertyset({"Title": "Tom & Jerry"}) + assert "Tom & Jerry" in xml + + +async def test_notify_no_subscribers(manager: EventingManager) -> None: + """Notify with no subscribers should be a no-op.""" + await manager.notify({"TransportState": "PLAYING"}) + + +async def test_injected_session_is_not_closed_on_stop() -> None: + """A caller-owned session must survive ``stop()`` unchanged. + + Provider-level wiring injects ``mass.http_session`` and closing it + would break the rest of MA. Only managers that created their own + session are allowed to close it. + """ + import aiohttp # noqa: PLC0415 + + shared = aiohttp.ClientSession() + try: + mgr = EventingManager(session=shared) + await mgr.start() + # start() must reuse the injected session, not overwrite it. + assert mgr._session is shared + await mgr.stop() + # stop() must not close a session it does not own. + assert not shared.closed + finally: + await shared.close() + + +async def test_owned_session_is_closed_on_stop() -> None: + """When no session is injected, the manager creates + closes its own.""" + mgr = EventingManager() + await mgr.start() + owned = mgr._session + assert owned is not None + await mgr.stop() + assert owned.closed diff --git a/tests/providers/dlna_receiver/test_provider.py b/tests/providers/dlna_receiver/test_provider.py new file mode 100644 index 0000000000..4e015a54ba --- /dev/null +++ b/tests/providers/dlna_receiver/test_provider.py @@ -0,0 +1,183 @@ +"""Tests for the multi-player provider logic. + +Provider module imports music_assistant.models which is only available +when running inside MA. We test the pure utility functions directly +and guard the full-provider import with pytest.importorskip. +""" + +from __future__ import annotations + +import uuid + +import pytest + +from music_assistant.providers.dlna_receiver.constants import ( + CONF_TARGET_PLAYER, + CONF_TARGET_PLAYERS, + UDN_NAMESPACE, +) +from music_assistant.providers.dlna_receiver.renderer import UPnPRenderer + + +def _deterministic_udn(player_id: str) -> str: + """Replicate the UDN generation logic for standalone testing.""" + namespace = uuid.uuid5(uuid.NAMESPACE_URL, UDN_NAMESPACE) + return f"uuid:{uuid.uuid5(namespace, player_id or '__default__')}" + + +def test_deterministic_udn_same_input() -> None: + """Same player_id always produces the same UDN.""" + udn1 = _deterministic_udn("player_kitchen") + udn2 = _deterministic_udn("player_kitchen") + assert udn1 == udn2 + assert udn1.startswith("uuid:") + + +def test_deterministic_udn_different_inputs() -> None: + """Different player_ids produce different UDNs.""" + udn1 = _deterministic_udn("player_kitchen") + udn2 = _deterministic_udn("player_bedroom") + assert udn1 != udn2 + + +def test_deterministic_udn_default() -> None: + """Empty player_id produces a stable UDN for the default instance.""" + udn1 = _deterministic_udn("") + udn2 = _deterministic_udn("") + assert udn1 == udn2 + assert udn1 != _deterministic_udn("some_player") + + +def test_deterministic_udn_is_valid_uuid() -> None: + """Generated UDN contains a valid UUID5.""" + udn = _deterministic_udn("test_player") + uuid_str = udn.replace("uuid:", "") + parsed = uuid.UUID(uuid_str) + assert parsed.version == 5 + + +def test_multiple_renderers_different_ports() -> None: + """Verify multiple renderers can bind to different ports.""" + r1 = UPnPRenderer("Player 1", "127.0.0.1", 9001) + r2 = UPnPRenderer("Player 2", "127.0.0.1", 9002) + assert r1.http_port != r2.http_port + assert r1.udn != r2.udn + + +def test_renderer_with_explicit_udn() -> None: + """Renderer uses provided UDN instead of generating one.""" + udn = _deterministic_udn("test_player") + r = UPnPRenderer("Test", "127.0.0.1", 9999, udn=udn) + assert r.udn == udn + + +# --------------------------------------------------------------------- +# Provider-level tests (require full MA import; skip otherwise) +# --------------------------------------------------------------------- + + +@pytest.fixture +def provider_cls(): # type: ignore[no-untyped-def] + """Return DLNAReceiverProvider class, skipping if MA isn't installed.""" + provider_mod = pytest.importorskip("music_assistant.providers.dlna_receiver.provider") + return provider_mod.DLNAReceiverProvider + + +class _StubConfig: + """Minimal ProviderConfig stand-in for testing config lookups.""" + + def __init__( + self, + values: dict[str, str], + instance_id: str = "dlna_receiver_test", + name: str = "DLNA Receiver", + ) -> None: + self._values = values + self.instance_id = instance_id + self.name = name + + def get_value(self, key: str) -> str | None: + return self._values.get(key) + + +def _make_provider(cls, values: dict[str, str]): # type: ignore[no-untyped-def] + inst = cls.__new__(cls) + inst.config = _StubConfig(values) + return inst + + +def test_raw_target_prefers_new_key(provider_cls) -> None: # type: ignore[no-untyped-def] + """_raw_target uses CONF_TARGET_PLAYERS when set.""" + inst = _make_provider(provider_cls, {CONF_TARGET_PLAYERS: "p1,p2"}) + assert inst._raw_target() == "p1,p2" + + +def test_raw_target_falls_back_to_legacy_key(provider_cls) -> None: # type: ignore[no-untyped-def] + """Legacy CONF_TARGET_PLAYER with '*' must surface via _raw_target.""" + inst = _make_provider( + provider_cls, + {CONF_TARGET_PLAYERS: "", CONF_TARGET_PLAYER: "*"}, + ) + assert inst._raw_target() == "*" + + +def test_raw_target_empty_when_neither_set(provider_cls) -> None: # type: ignore[no-untyped-def] + """No configured targets → empty string (single unbound renderer).""" + inst = _make_provider(provider_cls, {}) + assert inst._raw_target() == "" + + +def test_get_source_uses_unknown_content_type(provider_cls) -> None: # type: ignore[no-untyped-def] + """PluginSource.audio_format must let ffmpeg probe incoming codec. + + Upstream DLNA senders push FLAC/MP3/AAC/PCM etc.; declaring a concrete + PCM format would cause ffmpeg to misread compressed bytes as raw PCM. + """ + from music_assistant_models.enums import ContentType # noqa: PLC0415 + + inst = _make_provider(provider_cls, {}) + inst._plugin_source = None + + source = provider_cls.get_source(inst) + + assert source.audio_format is not None + assert source.audio_format.content_type == ContentType.UNKNOWN + assert source.audio_format.codec_type == ContentType.UNKNOWN + + +# --------------------------------------------------------------------- +# _is_concrete_ipv4 helper +# --------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "value", + ["192.168.1.5", "10.0.0.1", "8.8.8.8", "172.16.0.1"], +) +def test_is_concrete_ipv4_accepts_routable_addresses(value: str) -> None: + """Concrete non-wildcard, non-loopback IPv4 addresses are accepted.""" + from music_assistant.providers.dlna_receiver.provider import _is_concrete_ipv4 # noqa: PLC0415 + + assert _is_concrete_ipv4(value) is True + + +@pytest.mark.parametrize( + "value", + [ + "", + "0.0.0.0", # wildcard — SSDP would join multicast on wrong interface + "127.0.0.1", # IPv4 loopback — multicast on lo never reaches real CPs + "127.1.2.3", # entire 127.0.0.0/8 is loopback + "::1", # IPv6 loopback — inet_aton rejects + "fe80::1", # IPv6 link-local + "2001:db8::1", # IPv6 documentation + "localhost", # hostname, not an IP literal + "192.168.1", # malformed + "not-an-ip", + ], +) +def test_is_concrete_ipv4_rejects_non_routable(value: str) -> None: + """Empty / wildcard / loopback / IPv6 / hostname / garbage all rejected.""" + from music_assistant.providers.dlna_receiver.provider import _is_concrete_ipv4 # noqa: PLC0415 + + assert _is_concrete_ipv4(value) is False diff --git a/tests/providers/dlna_receiver/test_renderer.py b/tests/providers/dlna_receiver/test_renderer.py new file mode 100644 index 0000000000..f55a952959 --- /dev/null +++ b/tests/providers/dlna_receiver/test_renderer.py @@ -0,0 +1,329 @@ +"""Tests for the UPnP renderer SOAP handling.""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator +from typing import TYPE_CHECKING + +import pytest +from aiohttp.test_utils import TestClient, TestServer + +from music_assistant.providers.dlna_receiver.renderer import UPnPRenderer + +if TYPE_CHECKING: + from aiohttp.web import Application, Request + + +@pytest.fixture +def renderer() -> UPnPRenderer: + """Create a test renderer instance.""" + return UPnPRenderer( + friendly_name="Test Renderer", + bind_ip="127.0.0.1", + http_port=0, + ) + + +@pytest.fixture +async def client( + renderer: UPnPRenderer, +) -> AsyncGenerator[TestClient[Request, Application], None]: + """Create an aiohttp test client for the renderer.""" + server = TestServer(renderer._app) + _client = TestClient(server) + await _client.start_server() + yield _client + await _client.close() + + +async def test_device_description(client: TestClient[Request, Application]) -> None: + """GET /description.xml returns the MediaRenderer device XML.""" + resp = await client.get("/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "MediaRenderer" in text + assert "Test Renderer" in text + + +async def test_get_transport_info(client: TestClient[Request, Application]) -> None: + """GetTransportInfo returns NO_MEDIA_PRESENT before any URI is set.""" + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#GetTransportInfo"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "NO_MEDIA_PRESENT" in text + + +async def test_set_volume(client: TestClient[Request, Application], renderer: UPnPRenderer) -> None: + """SetVolume updates renderer state and invokes the on_set_volume callback.""" + volume_received: list[int] = [] + + async def _on_volume(v: int) -> None: + volume_received.append(v) + + renderer.on_set_volume = _on_volume + + resp = await client.post( + "/RenderingControl/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:RenderingControl:1#SetVolume"', + }, + data="75", + ) + assert resp.status == 200 + assert renderer.volume == 75 + assert volume_received == [75] + + +async def test_get_protocol_info(client: TestClient[Request, Application]) -> None: + """GetProtocolInfo advertises supported sink audio mime types.""" + resp = await client.post( + "/ConnectionManager/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:ConnectionManager:1#GetProtocolInfo"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "audio/flac" in text + + +# ------------------------------------------------------------------ +# SCPD tests — verify full service descriptions are served +# ------------------------------------------------------------------ + + +async def test_av_transport_scpd(client: TestClient[Request, Application]) -> None: + """AVTransport SCPD exposes the expected actions and state variables.""" + resp = await client.get("/AVTransport/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "SetAVTransportURI" in text + assert "Play" in text + assert "Seek" in text + # Verify state variables are present (not just action names) + assert "TransportState" in text + assert "serviceStateTable" in text + assert "argumentList" in text + + +async def test_rendering_control_scpd(client: TestClient[Request, Application]) -> None: + """RenderingControl SCPD exposes volume/mute actions and allowed ranges.""" + resp = await client.get("/RenderingControl/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "GetVolume" in text + assert "SetMute" in text + assert "Volume" in text + assert "allowedValueRange" in text + + +async def test_connection_manager_scpd(client: TestClient[Request, Application]) -> None: + """ConnectionManager SCPD exposes GetProtocolInfo and connection info.""" + resp = await client.get("/ConnectionManager/description.xml") + assert resp.status == 200 + text = await resp.text() + assert "GetProtocolInfo" in text + assert "GetCurrentConnectionInfo" in text + assert "SinkProtocolInfo" in text + + +# ------------------------------------------------------------------ +# SOAP action tests +# ------------------------------------------------------------------ + + +async def test_play_pause_stop( + client: TestClient[Request, Application], renderer: UPnPRenderer +) -> None: + """Test transport state transitions via SOAP actions.""" + # SetAVTransportURI + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI"', + }, + data="http://example.com/stream.flac", + ) + assert resp.status == 200 + assert renderer.current_uri == "http://example.com/stream.flac" + assert renderer.transport_state == "STOPPED" + + # Play + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Play"', + }, + data="", + ) + assert resp.status == 200 + assert renderer.transport_state == "PLAYING" + + # Pause + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Pause"', + }, + data="", + ) + assert resp.status == 200 + assert renderer.transport_state == "PAUSED_PLAYBACK" + + # Stop + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Stop"', + }, + data="", + ) + assert resp.status == 200 + assert renderer.transport_state == "STOPPED" + + +async def test_seek_action(client: TestClient[Request, Application]) -> None: + """Test that Seek action returns success (no-op).""" + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#Seek"', + }, + data="REL_TIME00:01:30", + ) + assert resp.status == 200 + + +async def test_get_position_info(client: TestClient[Request, Application]) -> None: + """GetPositionInfo returns a SOAP response containing RelTime.""" + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#GetPositionInfo"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "RelTime" in text + + +async def test_get_connection_info(client: TestClient[Request, Application]) -> None: + """Test GetCurrentConnectionInfo action.""" + resp = await client.post( + "/ConnectionManager/control", + headers={ + "SOAPACTION": ( + '"urn:schemas-upnp-org:service:ConnectionManager:1#GetCurrentConnectionInfo"' + ), + }, + data="0", + ) + assert resp.status == 200 + text = await resp.text() + assert "Direction" in text + assert "Input" in text + + +async def test_invalid_action(client: TestClient[Request, Application]) -> None: + """Test that unknown actions return SOAP error.""" + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#NonExistentAction"', + }, + data="", + ) + assert resp.status == 500 + text = await resp.text() + assert "Invalid Action" in text + + +async def test_set_av_transport_uri_rejected( + client: TestClient[Request, Application], + renderer: UPnPRenderer, +) -> None: + """A callback that raises ValueError causes a 716 SOAP fault and no state change. + + Previously the renderer eagerly wrote ``current_uri`` and returned 200 OK + before invoking the callback, so a silent SSRF-guard rejection in the + provider left control points thinking the URI was accepted. + """ + renderer.current_uri = "http://prior.example/stream.flac" + + async def _reject(_uri: str, _metadata: str | None) -> None: + raise ValueError("unsupported URI scheme or missing host") + + renderer.on_set_av_transport_uri = _reject + + resp = await client.post( + "/AVTransport/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI"', + }, + data="file:///etc/passwd", + ) + assert resp.status == 500 + text = await resp.text() + assert "716" in text + # State was NOT mutated by the rejected request. + assert renderer.current_uri == "http://prior.example/stream.flac" + + +def test_description_url_brackets_ipv6() -> None: + """IPv6 bind_ip must be wrapped in brackets per RFC 3986 §3.2.2.""" + r = UPnPRenderer("ipv6 renderer", bind_ip="::1", http_port=9999) + assert r.description_url == "http://[::1]:9999/description.xml" + + +def test_description_url_ipv4_no_brackets() -> None: + """Plain IPv4 addresses are not bracketed.""" + r = UPnPRenderer("ipv4 renderer", bind_ip="192.168.1.5", http_port=8080) + assert r.description_url == "http://192.168.1.5:8080/description.xml" + + +async def test_start_learns_ephemeral_port() -> None: + """Binding on http_port=0 must update self.http_port from the bound socket. + + Without this, description_url and the SSDP LOCATION header advertise + ``:0`` and nothing can reach the renderer. + """ + r = UPnPRenderer("ephemeral", bind_ip="127.0.0.1", http_port=0) + try: + await r.start() + assert r.http_port != 0 + assert 1 <= r.http_port <= 65535 + assert f":{r.http_port}" in r.description_url + finally: + await r.stop() + + +async def test_set_mute(client: TestClient[Request, Application], renderer: UPnPRenderer) -> None: + """SetMute updates renderer state and GetMute reflects the change.""" + resp = await client.post( + "/RenderingControl/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:RenderingControl:1#SetMute"', + }, + data="1", + ) + assert resp.status == 200 + assert renderer.mute is True + + resp = await client.post( + "/RenderingControl/control", + headers={ + "SOAPACTION": '"urn:schemas-upnp-org:service:RenderingControl:1#GetMute"', + }, + data="", + ) + assert resp.status == 200 + text = await resp.text() + assert "1" in text diff --git a/tests/providers/dlna_receiver/test_ssdp.py b/tests/providers/dlna_receiver/test_ssdp.py new file mode 100644 index 0000000000..c99c823942 --- /dev/null +++ b/tests/providers/dlna_receiver/test_ssdp.py @@ -0,0 +1,62 @@ +"""Tests for SSDP advertiser.""" + +from __future__ import annotations + +from music_assistant.providers.dlna_receiver.ssdp import ( + _MX_MAX_SECONDS, + SSDPAdvertiser, + _parse_mx_delay, +) + + +def test_ssdp_advertiser_init() -> None: + """SSDPAdvertiser stores its configured UDN, bind IP, and description URL.""" + adv = SSDPAdvertiser( + udn="uuid:test-1234", + description_url="http://192.168.1.100:8298/description.xml", + bind_ip="192.168.1.100", + ) + assert adv.udn == "uuid:test-1234" + assert adv.bind_ip == "192.168.1.100" + assert "8298" in adv.description_url + + +def test_handle_search_ignores_non_matching() -> None: + """Non-M-SEARCH datagrams are silently dropped without raising.""" + adv = SSDPAdvertiser( + udn="uuid:test-1234", + description_url="http://192.168.1.100:8298/description.xml", + bind_ip="192.168.1.100", + ) + adv.handle_search(b"NOTIFY * HTTP/1.1\r\n", ("192.168.1.1", 1900)) + + +def test_parse_mx_delay_missing_is_zero() -> None: + """A missing MX header means respond immediately (delay 0).""" + assert _parse_mx_delay("") == 0.0 + + +def test_parse_mx_delay_non_integer_is_zero() -> None: + """A malformed MX (non-integer) falls back to immediate response.""" + assert _parse_mx_delay("abc") == 0.0 + assert _parse_mx_delay("3.5") == 0.0 + + +def test_parse_mx_delay_non_positive_is_zero() -> None: + """Zero or negative MX values fall back to immediate response.""" + assert _parse_mx_delay("0") == 0.0 + assert _parse_mx_delay("-1") == 0.0 + + +def test_parse_mx_delay_within_cap() -> None: + """For MX ≤ cap, the returned delay is bounded by MX itself.""" + for _ in range(20): + delay = _parse_mx_delay("3") + assert 0.0 <= delay < 3.0 + + +def test_parse_mx_delay_caps_large_mx() -> None: + """Large MX values are clamped to _MX_MAX_SECONDS to keep discovery snappy.""" + for _ in range(20): + delay = _parse_mx_delay("120") + assert 0.0 <= delay < _MX_MAX_SECONDS diff --git a/tests/providers/dlna_receiver/test_urls.py b/tests/providers/dlna_receiver/test_urls.py new file mode 100644 index 0000000000..2f0b0bce10 --- /dev/null +++ b/tests/providers/dlna_receiver/test_urls.py @@ -0,0 +1,99 @@ +"""Tests for URL validation and redaction helpers.""" + +from __future__ import annotations + +import pytest + +from music_assistant.providers.dlna_receiver.urls import redact_url, validate_stream_url + + +@pytest.mark.parametrize( + "uri", + [ + "http://stream.example.com/audio.flac", + "https://stream.example.com:8443/audio.mp3", + "HTTP://Stream.Example.com/upper.flac", + "http://10.0.0.5:8080/qobuz-stream?t=abc", + ], +) +def test_validate_stream_url_accepts_http_and_https(uri: str) -> None: + """Stream URLs with http/https schemes and a host are accepted as-is.""" + assert validate_stream_url(uri) == uri + + +@pytest.mark.parametrize( + "uri", + [ + "", + "file:///etc/passwd", + "gopher://evil.example/1", + "ftp://files.example.com/song.flac", + "javascript:alert(1)", + "data:audio/mp3;base64,AAAA", + "ws://example.com/stream", + ], +) +def test_validate_stream_url_rejects_non_http_schemes(uri: str) -> None: + """Any non-http(s) scheme (or empty input) is rejected.""" + assert validate_stream_url(uri) is None + + +def test_validate_stream_url_rejects_missing_host() -> None: + """A URL without a hostname is rejected even if the scheme is http(s).""" + assert validate_stream_url("http:///no-host") is None + assert validate_stream_url("https://") is None + + +def test_redact_url_strips_query_without_userinfo() -> None: + """Query params are dropped even when there is no userinfo to mask. + + Signed URLs / bearer tokens commonly live in the query string; keeping + them in logs would defeat the purpose of redact_url. + """ + redacted = redact_url("http://example.com:8080/path?token=secret&sig=abc") + assert "secret" not in redacted + assert "sig" not in redacted + assert "token" not in redacted + assert redacted == "http://example.com:8080/path" + + +def test_redact_url_strips_fragment() -> None: + """Fragment is dropped (may also contain sensitive data in some flows).""" + redacted = redact_url("https://example.com/foo#access_token=secret") + assert "secret" not in redacted + assert redacted == "https://example.com/foo" + + +def test_redact_url_masks_user_and_password_and_drops_query() -> None: + """user:pass@host is replaced with ***@host; query is dropped entirely.""" + redacted = redact_url("http://alice:secret@example.com:8080/stream?token=xyz") + assert "alice" not in redacted + assert "secret" not in redacted + assert "xyz" not in redacted + assert redacted == "http://***@example.com:8080/stream" + + +def test_redact_url_masks_user_only() -> None: + """A bare user (no password) still triggers redaction.""" + redacted = redact_url("https://alice@example.com/foo") + assert "alice" not in redacted + assert redacted == "https://***@example.com/foo" + + +def test_redact_url_invalid_returns_placeholder() -> None: + """A completely unparsable URL yields the sentinel placeholder.""" + # urlsplit is quite permissive; use a string that provokes ValueError. + redacted = redact_url("http://[invalid-ipv6") + assert redacted == "" + + +def test_redact_url_preserves_ipv6_brackets() -> None: + """IPv6 hosts keep their brackets when userinfo is stripped.""" + redacted = redact_url("http://user:pass@[::1]:8080/x") + assert redacted == "http://***@[::1]:8080/x" + + +def test_redact_url_preserves_ipv6_brackets_no_port() -> None: + """IPv6 hosts without a port also keep brackets.""" + redacted = redact_url("https://alice@[2001:db8::1]/foo") + assert redacted == "https://***@[2001:db8::1]/foo"