diff --git a/music_assistant/providers/tidal/provider.py b/music_assistant/providers/tidal/provider.py index f19b0fc093..14e53e7e0e 100644 --- a/music_assistant/providers/tidal/provider.py +++ b/music_assistant/providers/tidal/provider.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +from collections.abc import AsyncGenerator from datetime import datetime from typing import TYPE_CHECKING, Any @@ -188,6 +189,13 @@ async def get_stream_details( """Return the content details for the given track when it will be streamed.""" return await self.streaming.get_stream_details(item_id) + async def get_audio_stream( + self, streamdetails: StreamDetails, seek_position: int = 0 + ) -> AsyncGenerator[bytes, None]: + """Return the audio stream for the provider item.""" + async for chunk in self.streaming.get_audio_stream(streamdetails, seek_position): + yield chunk + def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping: """Create a generic item mapping.""" return ItemMapping( diff --git a/music_assistant/providers/tidal/streaming.py b/music_assistant/providers/tidal/streaming.py index 44ce1813cd..92baa15190 100644 --- a/music_assistant/providers/tidal/streaming.py +++ b/music_assistant/providers/tidal/streaming.py @@ -2,15 +2,13 @@ from __future__ import annotations -import asyncio import base64 -import binascii -import uuid -from collections.abc import Callable, Coroutine +from collections.abc import AsyncGenerator from sqlite3 import OperationalError from typing import TYPE_CHECKING, Any -from aiohttp import web +import aiohttp +from defusedxml import ElementTree from music_assistant_models.enums import ContentType, ExternalID, StreamType from music_assistant_models.errors import MediaNotFoundError from music_assistant_models.media_items import AudioFormat @@ -28,6 +26,9 @@ from .provider import TidalProvider +# DASH MPD namespace +_MPD_NS = "urn:mpeg:dash:schema:mpd:2011" + class TidalStreamingManager: """Manages Tidal streaming operations.""" @@ -89,7 +90,18 @@ async def get_stream_details(self, item_id: str) -> StreamDetails: quality, ) - # 4. Parse stream URL + # 4. Parse stream URL and build StreamDetails + return await self._build_stream_details(stream_data, track, cache_key) + + async def _build_stream_details( + self, stream_data: dict[str, Any], track: Track, cache_key: str + ) -> StreamDetails: + """Build StreamDetails from Tidal playback info. + + :param stream_data: Parsed Tidal playback info response. + :param track: The track being streamed. + :param cache_key: Cache key for playback info (used to evict on decode error). + """ manifest_type = stream_data.get("manifestMimeType", "") self.provider.logger.debug( "Tidal playback info for track %s: manifestMimeType=%s audioQuality=%s codec=%s", @@ -98,17 +110,14 @@ async def get_stream_details(self, item_id: str) -> StreamDetails: stream_data.get("audioQuality"), stream_data.get("codec"), ) + dash_data: dict[str, Any] | None = None if "dash+xml" in manifest_type and "manifest" in stream_data: - # Tidal returns the DASH manifest as inline base64 content. Passing a data: URI - # directly to ffmpeg is unreliable — its DASH demuxer stops processing after - # buffering an initial batch of segments and never fetches the rest, resulting in - # only a fraction of the track being played. We therefore serve the decoded - # manifest XML from MA's in-memory stream server so that ffmpeg receives a proper - # HTTP URL. ffmpeg then connects directly to Tidal's CDN for all audio segments - # without MA acting as a proxy for the audio data. try: manifest_bytes = base64.b64decode(stream_data["manifest"]) - except (binascii.Error, TypeError, ValueError) as err: + dash_data = self._parse_dash_segments(manifest_bytes, track.item_id) + except MediaNotFoundError: + raise + except Exception as err: self.provider.logger.warning( "Invalid DASH manifest for track %s, evicting cache entry: %s", track.item_id, @@ -122,40 +131,26 @@ async def get_stream_details(self, item_id: str) -> StreamDetails: raise MediaNotFoundError( f"Invalid DASH manifest for track {track.item_id}" ) from err - manifest_id = uuid.uuid4().hex - route_path = f"/tidal-dash/{manifest_id}.mpd" - unregister = self.mass.streams.register_dynamic_route( - route_path, - self._make_manifest_handler(manifest_bytes, self.provider.logger, track.item_id), - method="GET", - ) - url = f"{self.mass.streams.base_url}{route_path}" - self.provider.logger.debug( - "Using DASH manifest (stream server route %s) for track %s", - route_path, - track.item_id, - ) - # Unregister the route once the track duration has elapsed. - self.mass.create_task( - self._async_unregister_manifest_route( - unregister, route_path, (track.duration or 600) + 60 - ) - ) + stream_type = StreamType.CUSTOM + content_type = ContentType.MP4 + url = None else: urls = stream_data.get("urls", []) if not urls: raise MediaNotFoundError("No stream URL found") url = urls[0] + stream_type = StreamType.HTTP + content_type = None self.provider.logger.debug("Using direct URL for track %s", track.item_id) - # 5. Determine format - audio_quality = stream_data.get("audioQuality") - if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"): - content_type = ContentType.FLAC - elif codec := stream_data.get("codec"): - content_type = ContentType.try_parse(codec) - else: - content_type = ContentType.MP4 + if content_type is None: + audio_quality = stream_data.get("audioQuality") + if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"): + content_type = ContentType.FLAC + elif codec := stream_data.get("codec"): + content_type = ContentType.try_parse(codec) + else: + content_type = ContentType.MP4 resolved_audio_format = AudioFormat( content_type=content_type, @@ -164,7 +159,6 @@ async def get_stream_details(self, item_id: str) -> StreamDetails: channels=2, ) - # Never block or fail playback on DB issues. self.mass.create_task( self._async_update_provider_mapping_audio_format( provider_track_id=track.item_id, @@ -172,41 +166,115 @@ async def get_stream_details(self, item_id: str) -> StreamDetails: ) ) - return StreamDetails( + details = StreamDetails( item_id=track.item_id, provider=self.provider.instance_id, audio_format=resolved_audio_format, - stream_type=StreamType.HTTP, + stream_type=stream_type, duration=track.duration, path=url, can_seek=True, allow_seek=True, ) + if stream_type == StreamType.CUSTOM: + details.data = dash_data + return details + + def _parse_dash_segments(self, manifest_bytes: bytes, track_id: str) -> dict[str, Any]: + """Parse a DASH manifest and return segment info with timing. + + :param manifest_bytes: Decoded DASH manifest XML bytes. + :param track_id: Track ID for logging. + """ + root = ElementTree.fromstring(manifest_bytes) + + rep = root.find(f".//{{{_MPD_NS}}}Representation") + if rep is None: + raise MediaNotFoundError(f"No Representation in DASH manifest for track {track_id}") + seg_tpl = rep.find(f"{{{_MPD_NS}}}SegmentTemplate") + if seg_tpl is None: + raise MediaNotFoundError(f"No SegmentTemplate in DASH manifest for track {track_id}") + + init_url = seg_tpl.get("initialization", "") + media_template = seg_tpl.get("media", "") + start_number = int(seg_tpl.get("startNumber", "1")) + timescale = int(seg_tpl.get("timescale", "1")) + + # Build segment list with start times from the SegmentTimeline. + segments: list[dict[str, Any]] = [] + timeline = seg_tpl.find(f"{{{_MPD_NS}}}SegmentTimeline") + time_pos = 0 # running position in timescale units + seg_idx = 0 + if timeline is not None: + for s_elem in timeline.findall(f"{{{_MPD_NS}}}S"): + duration = int(s_elem.get("d", "0")) + repeat = int(s_elem.get("r", "0")) + for _ in range(1 + repeat): + url = media_template.replace("$Number$", str(start_number + seg_idx)) + segments.append({"url": url, "start": time_pos / timescale}) + time_pos += duration + seg_idx += 1 + + self.provider.logger.debug( + "Parsed DASH manifest for track %s: %d segments", track_id, len(segments) + ) + return {"init_url": init_url, "segments": segments} - @staticmethod - def _make_manifest_handler( - manifest_bytes: bytes, - logger: Any, - track_id: str, - ) -> Callable[[web.Request], Coroutine[Any, Any, web.Response]]: - """Return an aiohttp request handler that serves the given manifest bytes.""" - - async def _handler(_request: web.Request) -> web.Response: - logger.debug("Serving DASH manifest to ffmpeg for track %s", track_id) - return web.Response( - body=manifest_bytes, - content_type="application/dash+xml", + async def get_audio_stream( + self, + streamdetails: StreamDetails, + seek_position: int = 0, + ) -> AsyncGenerator[bytes, None]: + """Stream DASH segments as a single contiguous fMP4 byte stream. + + Downloads the init segment (always needed for codec config) followed by + media segments starting from the one closest to ``seek_position``. + ffmpeg receives plain fMP4 on stdin, completely bypassing its DASH demuxer. + """ + dash_data: dict[str, Any] = streamdetails.data + init_url: str = dash_data["init_url"] + segments: list[dict[str, Any]] = dash_data["segments"] + + # Determine which segment to start from for seeking. + start_seg = 0 + if seek_position > 0 and segments: + for i, seg in enumerate(segments): + if seg["start"] > seek_position: + break + start_seg = i + + if start_seg > 0: + self.provider.logger.debug( + "Seeking to segment %d (%.1fs) for track %s (requested %ds)", + start_seg, + segments[start_seg]["start"], + streamdetails.item_id, + seek_position, ) - return _handler + session = self.mass.http_session + timeout = aiohttp.ClientTimeout(total=30, sock_read=15) - async def _async_unregister_manifest_route( - self, unregister: Callable[[], None], route_path: str, delay: float - ) -> None: - """Call unregister after delay seconds to clean up the temporary manifest route.""" - await asyncio.sleep(delay) - unregister() - self.provider.logger.debug("Unregistered DASH manifest route %s", route_path) + # Always send the init segment first (contains codec config). + async with session.get(init_url, timeout=timeout) as resp: + if resp.status != 200: + raise MediaNotFoundError( + f"DASH init segment returned HTTP {resp.status} for track " + f"{streamdetails.item_id}" + ) + async for chunk in resp.content.iter_any(): + yield chunk + + # Stream media segments from start_seg onwards. + for seg in segments[start_seg:]: + async with session.get(seg["url"], timeout=timeout) as resp: + if resp.status != 200: + raise MediaNotFoundError( + f"DASH segment at {seg['start']:.1f}s returned HTTP {resp.status} " + f"for track {streamdetails.item_id}" + ) + async for chunk in resp.content.iter_any(): + yield chunk async def _async_update_provider_mapping_audio_format( self, diff --git a/tests/providers/tidal/test_streaming.py b/tests/providers/tidal/test_streaming.py index 63cd953a56..b71de783ef 100644 --- a/tests/providers/tidal/test_streaming.py +++ b/tests/providers/tidal/test_streaming.py @@ -1,10 +1,10 @@ """Test Tidal Streaming Manager.""" import base64 -from collections.abc import Coroutine +from collections.abc import AsyncGenerator, Coroutine from sqlite3 import OperationalError from typing import Any -from unittest.mock import AsyncMock, MagicMock, Mock, patch +from unittest.mock import AsyncMock, MagicMock, Mock import pytest from music_assistant_models.enums import ContentType, ExternalID, StreamType @@ -13,6 +13,23 @@ from music_assistant.providers.tidal.streaming import TidalStreamingManager +# A minimal valid DASH manifest for testing. +_TEST_MANIFEST_XML = ( + '' + '' + "" + '' + '' + "" + '' + "" + "" + "" + "" +) +_TEST_MANIFEST_B64 = base64.b64encode(_TEST_MANIFEST_XML.encode()).decode() + @pytest.fixture def provider_mock() -> Mock: @@ -39,9 +56,6 @@ def provider_mock() -> Mock: provider.mass.cache.set = AsyncMock() provider.mass.cache.delete = AsyncMock() provider.mass.music.tracks.get_library_item_by_prov_id = AsyncMock(return_value=None) - provider.mass.streams.base_url = "http://localhost:8095" - provider.mass.streams.register_dynamic_route = Mock(return_value=Mock()) - return provider @@ -120,13 +134,11 @@ async def test_get_stream_details_hires( async def test_get_stream_details_with_dash_manifest( streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock ) -> None: - """Test get_stream_details with DASH manifest serves via HTTP route, not data: URI.""" + """Test get_stream_details with DASH manifest uses CUSTOM stream with parsed segments.""" provider_mock.get_track.return_value = mock_track - manifest_xml = b"dummy manifest" - manifest_b64 = base64.b64encode(manifest_xml).decode() provider_mock.api.get.return_value = { "manifestMimeType": "application/dash+xml", - "manifest": manifest_b64, + "manifest": _TEST_MANIFEST_B64, "audioQuality": "HIGH", "sampleRate": 44100, "bitDepth": 16, @@ -134,16 +146,18 @@ async def test_get_stream_details_with_dash_manifest( stream_details = await streaming_manager.get_stream_details("123") - assert isinstance(stream_details.path, str) - assert stream_details.path.startswith("http://localhost:8095/tidal-dash/") - assert stream_details.path.endswith(".mpd") - assert "data:" not in stream_details.path - - # Route must be registered with the streams server. - provider_mock.mass.streams.register_dynamic_route.assert_called_once() - call_args = provider_mock.mass.streams.register_dynamic_route.call_args - assert call_args[0][0].startswith("/tidal-dash/") - assert call_args[1]["method"] == "GET" + assert stream_details.stream_type == StreamType.CUSTOM + assert stream_details.audio_format.content_type == ContentType.MP4 + assert stream_details.can_seek is True + assert stream_details.allow_seek is True + # Segment info stored in data field + assert isinstance(stream_details.data, dict) + assert stream_details.data["init_url"] == "https://cdn.tidal.com/init.mp4" + segments = stream_details.data["segments"] + assert len(segments) == 3 + assert segments[0]["url"] == "https://cdn.tidal.com/1.mp4" + assert segments[0]["start"] == 0.0 + assert segments[2]["url"] == "https://cdn.tidal.com/3.mp4" async def test_get_stream_details_with_codec( @@ -568,15 +582,14 @@ async def test_async_update_provider_mapping_audio_format_unexpected_error_logs_ provider_mock.logger.exception.assert_called() -async def test_get_stream_details_dash_schedules_cleanup_task( +async def test_get_stream_details_dash_only_creates_mapping_task( streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock ) -> None: - """Verify a cleanup task is scheduled to unregister the DASH manifest route.""" + """Verify only the mapping update task is created (no route cleanup for CUSTOM streams).""" provider_mock.get_track.return_value = mock_track - manifest_b64 = base64.b64encode(b"").decode() provider_mock.api.get.return_value = { "manifestMimeType": "application/dash+xml", - "manifest": manifest_b64, + "manifest": _TEST_MANIFEST_B64, "audioQuality": "HIGH", "sampleRate": 44100, "bitDepth": 16, @@ -587,33 +600,179 @@ async def test_get_stream_details_dash_schedules_cleanup_task( await streaming_manager.get_stream_details("123") - # create_task is called twice: once for the mapping update, once for route cleanup. - assert provider_mock.mass.create_task.call_count == 2 + # Only the mapping update task should be created. + assert provider_mock.mass.create_task.call_count == 1 -async def test_make_manifest_handler_serves_manifest_bytes() -> None: - """Verify _make_manifest_handler returns an async handler that serves the given bytes.""" - manifest_bytes = b"test manifest" - logger = Mock() - handler = TidalStreamingManager._make_manifest_handler(manifest_bytes, logger, "track_42") +async def test_get_stream_details_dash_repeated_calls_independent( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Repeated calls for the same DASH track produce independent stream details.""" + provider_mock.get_track.return_value = mock_track + provider_mock.mass.cache.get.return_value = { + "manifestMimeType": "application/dash+xml", + "manifest": _TEST_MANIFEST_B64, + "audioQuality": "HIGH", + "sampleRate": 44100, + "bitDepth": 16, + } - response = await handler(MagicMock()) + sd1 = await streaming_manager.get_stream_details("123") + sd2 = await streaming_manager.get_stream_details("123") - assert response.body == manifest_bytes - assert "dash+xml" in response.content_type - logger.debug.assert_called() + # Both should be CUSTOM streams with the same segment data. + assert sd1.stream_type == StreamType.CUSTOM + assert sd2.stream_type == StreamType.CUSTOM + assert sd1.data["init_url"] == sd2.data["init_url"] + assert len(sd1.data["segments"]) == len(sd2.data["segments"]) -async def test_async_unregister_manifest_route_calls_unregister( +async def test_parse_dash_segments_extracts_urls_with_timing( streaming_manager: TidalStreamingManager, ) -> None: - """Verify the route unregister callable is called after the delay.""" - unregister = Mock() - with patch("music_assistant.providers.tidal.streaming.asyncio.sleep", new_callable=AsyncMock): - await streaming_manager._async_unregister_manifest_route( - unregister, "/tidal-dash/test.mpd", 300.0 - ) - unregister.assert_called_once() + """Verify _parse_dash_segments correctly extracts segment URLs and start times.""" + result = streaming_manager._parse_dash_segments(_TEST_MANIFEST_XML.encode(), "123") + + assert result["init_url"] == "https://cdn.tidal.com/init.mp4" + segments = result["segments"] + assert len(segments) == 3 # r="2" means 3 repetitions + assert segments[0]["url"] == "https://cdn.tidal.com/1.mp4" + assert segments[0]["start"] == 0.0 + assert segments[1]["url"] == "https://cdn.tidal.com/2.mp4" + # 176128 / 44100 ≈ 3.993 seconds + assert abs(segments[1]["start"] - 176128 / 44100) < 0.001 + assert segments[2]["url"] == "https://cdn.tidal.com/3.mp4" + + +async def test_get_audio_stream_yields_init_then_segments( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Verify get_audio_stream yields init segment followed by media segments.""" + mock_streamdetails = Mock() + mock_streamdetails.item_id = "123" + mock_streamdetails.data = { + "init_url": "https://cdn.tidal.com/init.mp4", + "segments": [ + {"url": "https://cdn.tidal.com/1.mp4", "start": 0.0}, + {"url": "https://cdn.tidal.com/2.mp4", "start": 4.0}, + ], + } + + # Mock the HTTP session to return predictable bytes per URL. + responses: dict[str, bytes] = { + "https://cdn.tidal.com/init.mp4": b"INIT", + "https://cdn.tidal.com/1.mp4": b"SEG1", + "https://cdn.tidal.com/2.mp4": b"SEG2", + } + + async def _mock_content(data: bytes) -> AsyncGenerator[bytes, None]: + yield data + + def _mock_get(url: str, **_kwargs: object) -> MagicMock: + ctx = MagicMock() + resp = MagicMock() + resp.status = 200 + resp.content.iter_any = lambda: _mock_content(responses[url]) + ctx.__aenter__ = AsyncMock(return_value=resp) + ctx.__aexit__ = AsyncMock(return_value=None) + return ctx + + mock_session = MagicMock() + mock_session.get = _mock_get + provider_mock.mass.http_session = mock_session + + chunks = [] + async for chunk in streaming_manager.get_audio_stream(mock_streamdetails): + chunks.append(chunk) + + assert chunks == [b"INIT", b"SEG1", b"SEG2"] + + +async def test_get_audio_stream_seeks_to_correct_segment( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Verify get_audio_stream skips segments before the seek position.""" + mock_streamdetails = Mock() + mock_streamdetails.item_id = "123" + mock_streamdetails.data = { + "init_url": "https://cdn.tidal.com/init.mp4", + "segments": [ + {"url": "https://cdn.tidal.com/1.mp4", "start": 0.0}, + {"url": "https://cdn.tidal.com/2.mp4", "start": 4.0}, + {"url": "https://cdn.tidal.com/3.mp4", "start": 8.0}, + ], + } + + fetched_urls: list[str] = [] + + async def _mock_content(data: bytes) -> AsyncGenerator[bytes, None]: + yield data + + def _mock_get(url: str, **_kwargs: object) -> MagicMock: + fetched_urls.append(url) + ctx = MagicMock() + resp = MagicMock() + resp.status = 200 + resp.content.iter_any = lambda: _mock_content(b"X") + ctx.__aenter__ = AsyncMock(return_value=resp) + ctx.__aexit__ = AsyncMock(return_value=None) + return ctx + + mock_session = MagicMock() + mock_session.get = _mock_get + provider_mock.mass.http_session = mock_session + + chunks = [] + async for chunk in streaming_manager.get_audio_stream(mock_streamdetails, seek_position=5): + chunks.append(chunk) + + # Should fetch init + segments starting from segment 2 (start=4.0, which is <= 5) + assert fetched_urls == [ + "https://cdn.tidal.com/init.mp4", + "https://cdn.tidal.com/2.mp4", + "https://cdn.tidal.com/3.mp4", + ] + + +async def test_get_audio_stream_raises_on_segment_error( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Verify get_audio_stream raises MediaNotFoundError on HTTP errors.""" + mock_streamdetails = Mock() + mock_streamdetails.item_id = "123" + mock_streamdetails.data = { + "init_url": "https://cdn.tidal.com/init.mp4", + "segments": [{"url": "https://cdn.tidal.com/1.mp4", "start": 0.0}], + } + + async def _mock_content(data: bytes) -> AsyncGenerator[bytes, None]: + yield data + + call_count = 0 + + def _mock_get(_url: str, **_kwargs: object) -> MagicMock: + nonlocal call_count + call_count += 1 + ctx = MagicMock() + resp = MagicMock() + if call_count == 1: + # Init segment succeeds + resp.status = 200 + resp.content.iter_any = lambda: _mock_content(b"INIT") + else: + # Media segment fails + resp.status = 403 + ctx.__aenter__ = AsyncMock(return_value=resp) + ctx.__aexit__ = AsyncMock(return_value=None) + return ctx + + mock_session = MagicMock() + mock_session.get = _mock_get + provider_mock.mass.http_session = mock_session + + with pytest.raises(MediaNotFoundError, match="HTTP 403"): + async for _ in streaming_manager.get_audio_stream(mock_streamdetails): + pass async def test_get_stream_details_playback_info_cache_hit(