Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions music_assistant/providers/tidal/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import json
from collections.abc import AsyncGenerator
from datetime import datetime
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -188,6 +189,13 @@ async def get_stream_details(
"""Return the content details for the given track when it will be streamed."""
return await self.streaming.get_stream_details(item_id)

async def get_audio_stream(
self, streamdetails: StreamDetails, seek_position: int = 0
) -> AsyncGenerator[bytes, None]:
"""Return the audio stream for the provider item."""
async for chunk in self.streaming.get_audio_stream(streamdetails, seek_position):
yield chunk

def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
"""Create a generic item mapping."""
return ItemMapping(
Expand Down
196 changes: 131 additions & 65 deletions music_assistant/providers/tidal/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

from __future__ import annotations

import asyncio
import base64
import binascii
import uuid
from collections.abc import Callable, Coroutine
from collections.abc import AsyncGenerator
from sqlite3 import OperationalError
from typing import TYPE_CHECKING, Any
from xml.etree import ElementTree as ET
Comment thread
jozefKruszynski marked this conversation as resolved.
Outdated

from aiohttp import web
import aiohttp
from music_assistant_models.enums import ContentType, ExternalID, StreamType
from music_assistant_models.errors import MediaNotFoundError
from music_assistant_models.media_items import AudioFormat
Expand All @@ -28,6 +26,9 @@

from .provider import TidalProvider

# DASH MPD namespace
_MPD_NS = "urn:mpeg:dash:schema:mpd:2011"


class TidalStreamingManager:
"""Manages Tidal streaming operations."""
Expand Down Expand Up @@ -89,7 +90,18 @@ async def get_stream_details(self, item_id: str) -> StreamDetails:
quality,
)

# 4. Parse stream URL
# 4. Parse stream URL and build StreamDetails
return await self._build_stream_details(stream_data, track, cache_key)

async def _build_stream_details(
self, stream_data: dict[str, Any], track: Track, cache_key: str
) -> StreamDetails:
"""Build StreamDetails from Tidal playback info.

:param stream_data: Parsed Tidal playback info response.
:param track: The track being streamed.
:param cache_key: Cache key for playback info (used to evict on decode error).
"""
manifest_type = stream_data.get("manifestMimeType", "")
self.provider.logger.debug(
"Tidal playback info for track %s: manifestMimeType=%s audioQuality=%s codec=%s",
Expand All @@ -98,17 +110,11 @@ async def get_stream_details(self, item_id: str) -> StreamDetails:
stream_data.get("audioQuality"),
stream_data.get("codec"),
)
dash_data: dict[str, Any] | None = None
if "dash+xml" in manifest_type and "manifest" in stream_data:
# Tidal returns the DASH manifest as inline base64 content. Passing a data: URI
# directly to ffmpeg is unreliable — its DASH demuxer stops processing after
# buffering an initial batch of segments and never fetches the rest, resulting in
# only a fraction of the track being played. We therefore serve the decoded
# manifest XML from MA's in-memory stream server so that ffmpeg receives a proper
# HTTP URL. ffmpeg then connects directly to Tidal's CDN for all audio segments
# without MA acting as a proxy for the audio data.
try:
manifest_bytes = base64.b64decode(stream_data["manifest"])
except (binascii.Error, TypeError, ValueError) as err:
except Exception as err:
self.provider.logger.warning(
"Invalid DASH manifest for track %s, evicting cache entry: %s",
track.item_id,
Expand All @@ -122,40 +128,27 @@ async def get_stream_details(self, item_id: str) -> StreamDetails:
raise MediaNotFoundError(
f"Invalid DASH manifest for track {track.item_id}"
) from err
manifest_id = uuid.uuid4().hex
route_path = f"/tidal-dash/{manifest_id}.mpd"
unregister = self.mass.streams.register_dynamic_route(
route_path,
self._make_manifest_handler(manifest_bytes, self.provider.logger, track.item_id),
method="GET",
)
url = f"{self.mass.streams.base_url}{route_path}"
self.provider.logger.debug(
"Using DASH manifest (stream server route %s) for track %s",
route_path,
track.item_id,
)
# Unregister the route once the track duration has elapsed.
self.mass.create_task(
self._async_unregister_manifest_route(
unregister, route_path, (track.duration or 600) + 60
)
)
dash_data = self._parse_dash_segments(manifest_bytes, track.item_id)
stream_type = StreamType.CUSTOM
content_type = ContentType.MP4
url = None
else:
urls = stream_data.get("urls", [])
if not urls:
raise MediaNotFoundError("No stream URL found")
url = urls[0]
stream_type = StreamType.HTTP
content_type = None
self.provider.logger.debug("Using direct URL for track %s", track.item_id)

# 5. Determine format
audio_quality = stream_data.get("audioQuality")
if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"):
content_type = ContentType.FLAC
elif codec := stream_data.get("codec"):
content_type = ContentType.try_parse(codec)
else:
content_type = ContentType.MP4
if content_type is None:
audio_quality = stream_data.get("audioQuality")
if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"):
content_type = ContentType.FLAC
elif codec := stream_data.get("codec"):
content_type = ContentType.try_parse(codec)
else:
content_type = ContentType.MP4

resolved_audio_format = AudioFormat(
content_type=content_type,
Expand All @@ -164,49 +157,122 @@ async def get_stream_details(self, item_id: str) -> StreamDetails:
channels=2,
)

# Never block or fail playback on DB issues.
self.mass.create_task(
self._async_update_provider_mapping_audio_format(
provider_track_id=track.item_id,
resolved_audio_format=resolved_audio_format,
)
)

return StreamDetails(
details = StreamDetails(
item_id=track.item_id,
provider=self.provider.instance_id,
audio_format=resolved_audio_format,
stream_type=StreamType.HTTP,
stream_type=stream_type,
duration=track.duration,
path=url,
can_seek=True,
allow_seek=True,
)
if stream_type == StreamType.CUSTOM:
details.data = dash_data
return details

def _parse_dash_segments(self, manifest_bytes: bytes, track_id: str) -> dict[str, Any]:
"""Parse a DASH manifest and return segment info with timing.

:param manifest_bytes: Decoded DASH manifest XML bytes.
:param track_id: Track ID for logging.
"""
root = ET.fromstring(manifest_bytes) # noqa: S314

rep = root.find(f".//{{{_MPD_NS}}}Representation")
if rep is None:
raise MediaNotFoundError(f"No Representation in DASH manifest for track {track_id}")
Comment thread
jozefKruszynski marked this conversation as resolved.
Outdated
seg_tpl = rep.find(f"{{{_MPD_NS}}}SegmentTemplate")
if seg_tpl is None:
raise MediaNotFoundError(f"No SegmentTemplate in DASH manifest for track {track_id}")

init_url = seg_tpl.get("initialization", "")
media_template = seg_tpl.get("media", "")
start_number = int(seg_tpl.get("startNumber", "1"))
timescale = int(seg_tpl.get("timescale", "1"))

# Build segment list with start times from the SegmentTimeline.
segments: list[dict[str, Any]] = []
timeline = seg_tpl.find(f"{{{_MPD_NS}}}SegmentTimeline")
time_pos = 0 # running position in timescale units
seg_idx = 0
if timeline is not None:
for s_elem in timeline.findall(f"{{{_MPD_NS}}}S"):
duration = int(s_elem.get("d", "0"))
repeat = int(s_elem.get("r", "0"))
for _ in range(1 + repeat):
url = media_template.replace("$Number$", str(start_number + seg_idx))
segments.append({"url": url, "start": time_pos / timescale})
time_pos += duration
seg_idx += 1

self.provider.logger.debug(
"Parsed DASH manifest for track %s: %d segments", track_id, len(segments)
)
return {"init_url": init_url, "segments": segments}

@staticmethod
def _make_manifest_handler(
manifest_bytes: bytes,
logger: Any,
track_id: str,
) -> Callable[[web.Request], Coroutine[Any, Any, web.Response]]:
"""Return an aiohttp request handler that serves the given manifest bytes."""

async def _handler(_request: web.Request) -> web.Response:
logger.debug("Serving DASH manifest to ffmpeg for track %s", track_id)
return web.Response(
body=manifest_bytes,
content_type="application/dash+xml",
async def get_audio_stream(
self,
streamdetails: StreamDetails,
seek_position: int = 0,
) -> AsyncGenerator[bytes, None]:
"""Stream DASH segments as a single contiguous fMP4 byte stream.

Downloads the init segment (always needed for codec config) followed by
media segments starting from the one closest to ``seek_position``.
ffmpeg receives plain fMP4 on stdin, completely bypassing its DASH demuxer.
"""
dash_data: dict[str, Any] = streamdetails.data
init_url: str = dash_data["init_url"]
segments: list[dict[str, Any]] = dash_data["segments"]

# Determine which segment to start from for seeking.
start_seg = 0
if seek_position > 0 and segments:
for i, seg in enumerate(segments):
if seg["start"] > seek_position:
break
start_seg = i

if start_seg > 0:
self.provider.logger.debug(
"Seeking to segment %d (%.1fs) for track %s (requested %ds)",
start_seg,
segments[start_seg]["start"],
streamdetails.item_id,
seek_position,
)

return _handler
session = self.mass.http_session
timeout = aiohttp.ClientTimeout(total=30, sock_read=15)

async def _async_unregister_manifest_route(
self, unregister: Callable[[], None], route_path: str, delay: float
) -> None:
"""Call unregister after delay seconds to clean up the temporary manifest route."""
await asyncio.sleep(delay)
unregister()
self.provider.logger.debug("Unregistered DASH manifest route %s", route_path)
# Always send the init segment first (contains codec config).
async with session.get(init_url, timeout=timeout) as resp:
if resp.status != 200:
raise MediaNotFoundError(
f"DASH init segment returned HTTP {resp.status} for track "
f"{streamdetails.item_id}"
)
async for chunk in resp.content.iter_any():
yield chunk

# Stream media segments from start_seg onwards.
for seg in segments[start_seg:]:
async with session.get(seg["url"], timeout=timeout) as resp:
if resp.status != 200:
raise MediaNotFoundError(
f"DASH segment at {seg['start']:.1f}s returned HTTP {resp.status} "
f"for track {streamdetails.item_id}"
)
async for chunk in resp.content.iter_any():
yield chunk

async def _async_update_provider_mapping_audio_format(
self,
Expand Down
Loading
Loading