Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,299 changes: 21 additions & 1,278 deletions music_assistant/providers/apple_music/__init__.py

Large diffs are not rendered by default.

180 changes: 180 additions & 0 deletions music_assistant/providers/apple_music/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Apple Music API client."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from music_assistant_models.enums import MediaType
from music_assistant_models.errors import (
MediaNotFoundError,
MusicAssistantError,
ResourceTemporarilyUnavailable,
)

from music_assistant.helpers.json import json_loads
from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries

from .helpers.utils import is_library_id, translate_media_type_to_apple_type

if TYPE_CHECKING:
from .provider import AppleMusicProvider

_APPLE_API_BASE = "https://api.music.apple.com/v1"


class AppleMusicAPIClient:
"""Handles all HTTP communication with the Apple Music API."""

throttler = ThrottlerManager(rate_limit=1, period=2, initial_backoff=15)

def __init__(self, provider: AppleMusicProvider) -> None:
"""Initialize the API client."""
self.provider = provider
self.logger = provider.logger

@property
def _headers(self) -> dict[str, str]:
"""Return standard auth headers."""
return {
"Authorization": f"Bearer {self.provider._music_app_token}",
"Music-User-Token": self.provider._music_user_token,
}

@throttle_with_retries
async def get_data(self, endpoint: str, **kwargs: Any) -> dict[str, Any]:
"""GET data from the Apple Music API."""
url = f"{_APPLE_API_BASE}/{endpoint}"
async with (
self.provider.mass.http_session.get(
url, headers=self._headers, params=kwargs, ssl=True, timeout=120
) as response,
):
if response.status == 404 and "limit" in kwargs and "offset" in kwargs:
return {}
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
if response.status == 504:
self.provider.logger.debug(
"Apple Music API Timeout: url=%s, params=%s, response_headers=%s",
url,
kwargs,
response.headers,
)
raise ResourceTemporarilyUnavailable("Apple Music API Timeout")
if response.status == 429:
self.provider.logger.debug(
"Apple Music Rate Limiter. Headers: %s", response.headers
)
raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
if response.status == 500:
raise MusicAssistantError("Unexpected server error when calling Apple Music")
response.raise_for_status()
return await response.json(loads=json_loads)

@throttle_with_retries
async def delete_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> None:
"""DELETE data from the Apple Music API."""
url = f"{_APPLE_API_BASE}/{endpoint}"
async with (
self.provider.mass.http_session.delete(
url, headers=self._headers, params=kwargs, json=data, ssl=True, timeout=120
) as response,
):
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
if response.status == 429:
self.provider.logger.debug(
"Apple Music Rate Limiter. Headers: %s", response.headers
)
raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
response.raise_for_status()

@throttle_with_retries
async def put_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> dict[str, Any]:
"""PUT data to the Apple Music API."""
url = f"{_APPLE_API_BASE}/{endpoint}"
async with (
self.provider.mass.http_session.put(
url, headers=self._headers, params=kwargs, json=data, ssl=True, timeout=120
) as response,
):
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
if response.status == 429:
self.provider.logger.debug(
"Apple Music Rate Limiter. Headers: %s", response.headers
)
raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
response.raise_for_status()
if response.content_length:
return await response.json(loads=json_loads)
return {}

@throttle_with_retries
async def post_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> dict[str, Any]:
"""POST data to the Apple Music API."""
url = f"{_APPLE_API_BASE}/{endpoint}"
async with (
self.provider.mass.http_session.post(
url, headers=self._headers, params=kwargs, json=data, ssl=True, timeout=120
) as response,
):
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
if response.status == 429:
self.provider.logger.debug(
"Apple Music Rate Limiter. Headers: %s", response.headers
)
raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
response.raise_for_status()
return await response.json(loads=json_loads)

async def get_all_items(self, endpoint: str, key: str = "data", **kwargs: Any) -> list[dict]:
"""Get all items from a paged list."""
limit = 50
offset = 0
all_items: list[dict] = []
while True:
kwargs["limit"] = limit
kwargs["offset"] = offset
result = await self.get_data(endpoint, **kwargs)
if key not in result:
break
all_items += result[key]
if not result.get("next"):
break
offset += limit
return all_items

async def get_user_storefront(self) -> str:
"""Return the user's storefront identifier."""
locale = self.provider.mass.metadata.locale.replace("_", "-")
language = locale.split("-")[0]
result = await self.get_data("me/storefront", l=language)
return result["data"][0]["id"]

async def get_ratings(self, item_ids: list[str], media_type: MediaType) -> dict[str, bool]:
"""Return a mapping of item_id → is_favourite for a list of IDs."""
if media_type == MediaType.ARTIST:
raise NotImplementedError(
"Ratings are not available for artist in the Apple Music API."
)
if not item_ids:
return {}
apple_type = translate_media_type_to_apple_type(media_type)
endpoint = apple_type if not is_library_id(item_ids[0]) else f"library-{apple_type}"
max_ids_per_request = 200
results: dict[str, bool] = {}
for i in range(0, len(item_ids), max_ids_per_request):
batch_ids = item_ids[i : i + max_ids_per_request]
response = await self.get_data(
f"me/ratings/{endpoint}",
ids=",".join(batch_ids),
)
results.update(
{
item["id"]: bool(item["attributes"].get("value", False) == 1)
for item in response.get("data", [])
}
)
return results
38 changes: 38 additions & 0 deletions music_assistant/providers/apple_music/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Constants for Apple Music provider."""

from music_assistant_models.enums import ProviderFeature

from music_assistant.helpers.app_vars import app_var

SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.LIBRARY_PLAYLISTS,
ProviderFeature.BROWSE,
ProviderFeature.SEARCH,
ProviderFeature.RECOMMENDATIONS,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
ProviderFeature.LIBRARY_ALBUMS_EDIT,
ProviderFeature.LIBRARY_ARTISTS_EDIT,
ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
ProviderFeature.LIBRARY_TRACKS_EDIT,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
ProviderFeature.FAVORITE_ALBUMS_EDIT,
ProviderFeature.FAVORITE_TRACKS_EDIT,
ProviderFeature.FAVORITE_PLAYLISTS_EDIT,
}

MUSIC_APP_TOKEN = app_var(8)
WIDEVINE_BASE_PATH = "/usr/local/bin/widevine_cdm"
DECRYPT_CLIENT_ID_FILENAME = "client_id.bin"
DECRYPT_PRIVATE_KEY_FILENAME = "private_key.pem"
UNKNOWN_PLAYLIST_NAME = "Unknown Apple Music Playlist"
CONF_MUSIC_APP_TOKEN = "music_app_token"
CONF_MUSIC_USER_TOKEN = "music_user_token"
CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
CACHE_CATEGORY_DECRYPT_KEY = 1
MAX_ARTWORK_DIMENSION = 1000
8 changes: 7 additions & 1 deletion music_assistant/providers/apple_music/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""Various Apple Music utils/helpers."""

from .browse import browse_playlists
from .utils import is_catalog_id, is_library_id, translate_media_type_to_apple_type

__all__ = ["browse_playlists"]
__all__ = [
"browse_playlists",
"is_catalog_id",
"is_library_id",
"translate_media_type_to_apple_type",
]
15 changes: 9 additions & 6 deletions music_assistant/providers/apple_music/helpers/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
from music_assistant_models.errors import MediaNotFoundError
from music_assistant_models.media_items import BrowseFolder, Playlist, ProviderMapping

from music_assistant.providers.apple_music.helpers.utils import is_library_id
from music_assistant.providers.apple_music.parsers import parse_playlist

if TYPE_CHECKING:
from music_assistant.providers.apple_music import AppleMusicProvider
from music_assistant.providers.apple_music.provider import AppleMusicProvider

ROOT_PLAYLIST_FOLDER_ID = "p.playlistsroot"
# Apple exposes the entire playlist hierarchy under this synthetic root. We walk the
Expand Down Expand Up @@ -72,7 +75,7 @@ async def _fetch_playlist_folder_children(
apple_folder_id = folder_id or ROOT_PLAYLIST_FOLDER_ID
endpoint = f"me/library/playlist-folders/{apple_folder_id}/children"
try:
children = await provider._get_all_items(endpoint)
children = await provider.api_client.get_all_items(endpoint)
except MediaNotFoundError:
children = []
folders: list[AppleMusicPlaylistFolder] = []
Expand All @@ -93,11 +96,11 @@ async def _fetch_playlist_folder_children(
)
elif child_type == "library-playlists":
playlist_entries.append(child)
if provider.is_library_id(child_id):
if is_library_id(child_id):
library_playlist_ids.append(child_id)
ratings: dict[str, Any] = {}
if library_playlist_ids:
ratings = await provider._get_ratings(library_playlist_ids, MediaType.PLAYLIST)
ratings = await provider.api_client.get_ratings(library_playlist_ids, MediaType.PLAYLIST)
playlists: list[Playlist] = []
for playlist_entry in playlist_entries:
playlist_id = playlist_entry.get("id")
Expand All @@ -109,7 +112,7 @@ async def _fetch_playlist_folder_children(
# Start with the original entry, potentially modify it below
playlist_obj = playlist_entry

if attributes.get("hasCatalog") and global_id and not provider.is_library_id(global_id):
if attributes.get("hasCatalog") and global_id and not is_library_id(global_id):
try:
playlist = await provider.get_playlist(global_id, is_favourite)
except MediaNotFoundError:
Expand All @@ -121,7 +124,7 @@ async def _fetch_playlist_folder_children(
else:
playlists.append(_apply_library_id(playlist, playlist_id, provider))
continue
playlists.append(provider._parse_playlist(playlist_obj, is_favourite))
playlists.append(parse_playlist(provider, playlist_obj, is_favourite))
playlists.sort(key=lambda item: (item.name or "").casefold())
folders.sort(key=lambda folder: folder.name.casefold())
return folders, playlists
Expand Down
35 changes: 35 additions & 0 deletions music_assistant/providers/apple_music/helpers/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Utility helpers for the Apple Music provider."""

from __future__ import annotations

import re
from typing import Any

from music_assistant_models.enums import MediaType
from music_assistant_models.errors import MusicAssistantError


def is_library_id(library_id: Any) -> bool:
"""Return True if the ID matches the Apple Music library ID format."""
if not isinstance(library_id, str):
return False
return bool(re.fullmatch(r"[ailp]\.[a-zA-Z0-9]+", library_id))


def is_catalog_id(catalog_id: str) -> bool:
"""Return True if the ID is a catalog ID (numeric or starts with 'pl.')."""
return catalog_id.isnumeric() or catalog_id.startswith("pl.")


def translate_media_type_to_apple_type(media_type: MediaType) -> str:
"""Translate a MediaType to the Apple Music API endpoint segment."""
match media_type:
case MediaType.ARTIST:
return "artists"
case MediaType.ALBUM:
return "albums"
case MediaType.TRACK:
return "songs"
case MediaType.PLAYLIST:
return "playlists"
raise MusicAssistantError(f"Unsupported media type: {media_type}")
Loading
Loading