From 89f5009c6ff0e04683cc4d37d68c904e3b8afb99 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sat, 11 Apr 2026 01:56:09 +0800
Subject: [PATCH 01/11] Add NetEase Cloud Music provider
---
.../providers/neteasecloudmusic/__init__.py | 1902 +++++++++++++++++
.../providers/neteasecloudmusic/constants.py | 26 +
.../providers/neteasecloudmusic/icon.svg | 1 +
.../neteasecloudmusic/icon_monochrome.svg | 1 +
.../providers/neteasecloudmusic/manifest.json | 10 +
5 files changed, 1940 insertions(+)
create mode 100644 music_assistant/providers/neteasecloudmusic/__init__.py
create mode 100644 music_assistant/providers/neteasecloudmusic/constants.py
create mode 100644 music_assistant/providers/neteasecloudmusic/icon.svg
create mode 100644 music_assistant/providers/neteasecloudmusic/icon_monochrome.svg
create mode 100644 music_assistant/providers/neteasecloudmusic/manifest.json
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
new file mode 100644
index 0000000000..a082261ec8
--- /dev/null
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -0,0 +1,1902 @@
+"""NetEase Cloud Music provider implementation (MVP)."""
+
+from __future__ import annotations
+
+import asyncio
+import base64
+import binascii
+import json
+import re
+import time
+from collections.abc import AsyncGenerator, Sequence
+from contextlib import suppress
+from datetime import UTC, datetime
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qs, urlparse
+
+from aiohttp import ClientError, ClientSession, ClientTimeout, web
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ EventType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import (
+ InvalidDataError,
+ LoginFailed,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+ UnplayableMediaError,
+)
+from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ AudioFormat,
+ ItemMapping,
+ MediaItemImage,
+ Playlist,
+ ProviderMapping,
+ RecommendationFolder,
+ SearchResults,
+ Track,
+ UniqueList,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.controllers.cache import use_cache
+from music_assistant.models.music_provider import MusicProvider
+
+from .constants import (
+ CONF_ACTION_CHECK_QR_AUTH,
+ CONF_ACTION_CLEAR_AUTH,
+ CONF_ACTION_START_QR_AUTH,
+ CONF_API_BASE_URL,
+ CONF_COOKIE,
+ CONF_QR_KEY,
+ CONF_QR_PAGE_URL,
+ CONF_QUALITY,
+ CONF_UID,
+ DEFAULT_API_BASE_URL,
+ QUALITY_EXHIGH,
+ QUALITY_HIGHER,
+ QUALITY_HIRES,
+ QUALITY_JYEFFECT,
+ QUALITY_JYMASTER,
+ QUALITY_LOSSLESS,
+ QUALITY_STANDARD,
+)
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+SUPPORTED_FEATURES = {
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_ALBUMS,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.LIBRARY_PLAYLISTS,
+ ProviderFeature.RECOMMENDATIONS,
+ ProviderFeature.SEARCH,
+ ProviderFeature.ARTIST_ALBUMS,
+ ProviderFeature.ARTIST_TOPTRACKS,
+ ProviderFeature.LYRICS,
+}
+
+_QR_ROUTE_UNREGISTER: dict[str, Any] = {}
+_HTTP_TIMEOUT = ClientTimeout(total=20)
+_LRC_TIMESTAMP_PATTERN = re.compile(r"\[\d{1,2}:\d{2}(?:\.\d{1,3})?\]")
+_LRC_META_TAG_PATTERN = re.compile(r"^\[[a-zA-Z]+:.*\]$")
+_RECOMMEND_NEWSONG_TTL = 60 * 30
+_RECOMMEND_PLAYLIST_TTL = 60 * 60
+_RECOMMEND_DAILY_TTL = 60 * 30
+_RECOMMEND_PERSONAL_FM_TTL = 60 * 5
+_RECOMMEND_HEART_MODE_TTL = 60 * 60
+_HIRES_MARK_FLAG = 17179869184
+_PLAYLIST_PERSONAL_FM_ID = "personal_fm_dynamic"
+_PLAYLIST_HEART_MODE_PREFIX = "heart_mode_dynamic"
+_NCM_PROVIDER_ICON_URL = (
+ "https://raw.githubusercontent.com/NeteaseCloudMusicApiEnhanced/"
+ "api-enhanced/main/public/docs/netease.png"
+)
+
+
+def _to_positive_int(value: Any) -> int:
+ """Convert unknown value to positive int, otherwise return 0."""
+ if isinstance(value, bool):
+ return 0
+ if isinstance(value, (int, float)):
+ parsed = int(value)
+ return max(0, parsed)
+ if isinstance(value, str):
+ stripped = value.strip()
+ if not stripped:
+ return 0
+ with suppress(ValueError):
+ parsed = int(float(stripped))
+ return max(0, parsed)
+ return 0
+
+
+def _lrc_to_plain_text(lrc_text: str) -> str:
+ """Convert timestamped lrc to plain multi-line lyric text."""
+ lines: list[str] = []
+ for raw_line in lrc_text.splitlines():
+ line = _LRC_TIMESTAMP_PATTERN.sub("", raw_line).strip()
+ if not line or _LRC_META_TAG_PATTERN.match(line):
+ continue
+ lines.append(line)
+ return "\n".join(lines).strip()
+
+
+class NcmApiClient:
+ """Small async client for NeteaseCloudMusicApi-compatible endpoints."""
+
+ def __init__(self, session: ClientSession, base_url: str) -> None:
+ """Initialize API client."""
+ self._session = session
+ self._base_url = base_url.rstrip("/")
+
+ async def get(
+ self,
+ path: str,
+ *,
+ params: dict[str, Any] | None = None,
+ cookie: str | None = None,
+ allow_codes: set[int] | None = None,
+ ) -> dict[str, Any]:
+ """Perform GET request and validate common NCM response format."""
+ req_params: dict[str, Any] = {}
+ if params:
+ req_params.update(params)
+ headers: dict[str, str] = {
+ "User-Agent": (
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
+ )
+ }
+ if cookie:
+ req_params["cookie"] = cookie
+ headers["Cookie"] = cookie
+ url = f"{self._base_url}/{path.lstrip('/')}"
+ try:
+ async with self._session.get(
+ url,
+ params=req_params,
+ headers=headers,
+ timeout=_HTTP_TIMEOUT,
+ ) as resp:
+ text = await resp.text()
+ if resp.status >= 400:
+ raise ResourceTemporarilyUnavailable(
+ f"Netease API HTTP {resp.status} for {path}",
+ backoff_time=20,
+ )
+ except TimeoutError as err:
+ raise ResourceTemporarilyUnavailable(
+ f"Netease API timeout for {url}. "
+ "Please verify API base URL and that the backend service is running.",
+ backoff_time=20,
+ ) from err
+ except ClientError as err:
+ raise ResourceTemporarilyUnavailable(
+ f"Netease API network error for {path}: {err}",
+ backoff_time=20,
+ ) from err
+ try:
+ payload = json.loads(text)
+ except json.JSONDecodeError as err:
+ raise InvalidDataError(f"Netease API returned invalid JSON for {path}") from err
+ if not isinstance(payload, dict):
+ raise InvalidDataError(f"Netease API payload is not an object for {path}")
+ code = _extract_code(payload)
+ if allow_codes and code in allow_codes:
+ return payload
+ if code not in (None, 200):
+ raise InvalidDataError(f"Netease API error code {code} for {path}")
+ return payload
+
+
+def _extract_code(payload: dict[str, Any]) -> int | None:
+ """Extract API code from payload."""
+ raw_code = payload.get("code")
+ if raw_code is None and isinstance(payload.get("data"), dict):
+ raw_code = payload["data"].get("code")
+ try:
+ return int(raw_code) if raw_code is not None else None
+ except (TypeError, ValueError):
+ return None
+
+
+def _extract_data(payload: dict[str, Any]) -> dict[str, Any]:
+ """Return payload.data when it is an object, otherwise payload itself."""
+ data = payload.get("data")
+ if isinstance(data, dict):
+ return data
+ return payload
+
+
+def _extract_cookie(payload: dict[str, Any]) -> str:
+ """Extract login cookie string from payload."""
+ data = _extract_data(payload)
+ for candidate in (data.get("cookie"), payload.get("cookie")):
+ if isinstance(candidate, str) and candidate.strip():
+ return candidate.strip()
+ return ""
+
+
+def _with_pc_os_cookie(cookie: str) -> str:
+ """Return cookie string with os=pc enforced (required for correct song/url quality)."""
+ if not cookie.strip():
+ return cookie
+ parts = [part.strip() for part in cookie.split(";") if part.strip()]
+ kept: list[str] = []
+ os_set = False
+ for part in parts:
+ if "=" not in part:
+ kept.append(part)
+ continue
+ key, value = part.split("=", 1)
+ if key.strip().lower() == "os":
+ kept.append("os=pc")
+ os_set = True
+ continue
+ kept.append(f"{key.strip()}={value.strip()}")
+ if not os_set:
+ kept.append("os=pc")
+ return "; ".join(kept)
+
+
+def _clear_qr_route(route_path: str | None) -> None:
+ """Unregister one temporary QR route if present."""
+ if not route_path:
+ return
+ if unregister := _QR_ROUTE_UNREGISTER.pop(route_path, None):
+ with suppress(Exception):
+ unregister()
+
+
+def _get_qr_route_path(values: dict[str, ConfigValueType]) -> str | None:
+ """Resolve current session QR route path from config values."""
+ qr_page_url = str(values.get(CONF_QR_PAGE_URL) or "")
+ if qr_page_url:
+ parsed = urlparse(qr_page_url)
+ path = parsed.path or qr_page_url
+ if path and not path.startswith("/"):
+ path = f"/{path}"
+ if path.startswith("/auth/neteasecloudmusic/qr/"):
+ return path
+ if session_id := values.get("session_id"):
+ return f"/auth/neteasecloudmusic/qr/{session_id}"
+ return None
+
+
+def _register_qr_auth_page(
+ mass: MusicAssistant,
+ session_id: str,
+ image_bytes: bytes,
+ mime_type: str,
+) -> str:
+ """Register a temporary web route for QR image and return client-safe URL."""
+ if not getattr(mass, "webserver", None):
+ b64 = base64.b64encode(image_bytes).decode("ascii")
+ return f"data:{mime_type};base64,{b64}"
+
+ route_path = f"/auth/neteasecloudmusic/qr/{session_id}"
+ _clear_qr_route(route_path)
+
+ async def _serve_qr(_: web.Request) -> web.Response:
+ return web.Response(
+ body=image_bytes,
+ content_type=mime_type,
+ headers={"Cache-Control": "no-store, max-age=0"},
+ )
+
+ unregister = mass.webserver.register_dynamic_route(route_path, _serve_qr, "GET")
+ _QR_ROUTE_UNREGISTER[route_path] = unregister
+ return f"{route_path.lstrip('/')}?ts={int(time.time())}"
+
+
+def _decode_qr_data_url(data_url: str) -> tuple[bytes, str] | None:
+ """Decode data:image/...;base64,... to bytes."""
+ if not data_url.startswith("data:image/"):
+ return None
+ if ";base64," not in data_url:
+ return None
+ meta, b64_data = data_url.split(";base64,", 1)
+ mime_type = meta.replace("data:", "", 1)
+ try:
+ return (base64.b64decode(b64_data), mime_type)
+ except (ValueError, binascii.Error):
+ return None
+
+
+def _clear_auth(values: dict[str, ConfigValueType]) -> None:
+ """Clear stored authentication fields."""
+ route_path = _get_qr_route_path(values)
+ values[CONF_COOKIE] = None
+ values[CONF_UID] = None
+ values[CONF_QR_KEY] = None
+ values[CONF_QR_PAGE_URL] = None
+ _clear_qr_route(route_path)
+
+
+def _has_qr_pending(values: dict[str, ConfigValueType]) -> bool:
+ """Return True if QR auth is currently pending."""
+ return bool(values.get(CONF_QR_KEY))
+
+
+def _is_verified(values: dict[str, ConfigValueType]) -> bool:
+ """Return True if auth fields are already complete."""
+ return bool(values.get(CONF_COOKIE) and values.get(CONF_UID))
+
+
+async def _resolve_uid(client: NcmApiClient, cookie: str) -> str:
+ """Resolve user id from login status endpoint."""
+ payload = await client.get("/login/status", cookie=cookie)
+ data = _extract_data(payload)
+ profile = data.get("profile")
+ if isinstance(profile, dict) and profile.get("userId") is not None:
+ return str(profile.get("userId"))
+ account = data.get("account")
+ if isinstance(account, dict) and account.get("id") is not None:
+ return str(account.get("id"))
+ raise LoginFailed("Login succeeded but user id is missing from login status")
+
+
+async def _start_qr_auth(mass: MusicAssistant, values: dict[str, ConfigValueType]) -> None:
+ """Start QR login flow and auto-poll once for quick setup."""
+ _clear_qr_route(_get_qr_route_path(values))
+ base_url = str(values.get(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL).strip()
+ client = NcmApiClient(mass.http_session, base_url)
+
+ key_payload = await client.get("/login/qr/key", params={"timestamp": int(time.time() * 1000)})
+ key_data = _extract_data(key_payload)
+ qr_key = str(key_data.get("unikey") or key_data.get("key") or "").strip()
+ if not qr_key:
+ raise LoginFailed("Failed to generate NetEase QR key")
+
+ qr_payload = await client.get(
+ "/login/qr/create",
+ params={
+ "key": qr_key,
+ "qrimg": "true",
+ "timestamp": int(time.time() * 1000),
+ },
+ )
+ qr_data = _extract_data(qr_payload)
+ qrimg = str(qr_data.get("qrimg") or "").strip()
+ qrurl = str(qr_data.get("qrurl") or "").strip()
+
+ values[CONF_QR_KEY] = qr_key
+ values[CONF_QR_PAGE_URL] = None
+
+ if session_id := values.get("session_id"):
+ decoded = _decode_qr_data_url(qrimg)
+ if decoded:
+ image_bytes, mime_type = decoded
+ qr_page_url = _register_qr_auth_page(mass, str(session_id), image_bytes, mime_type)
+ values[CONF_QR_PAGE_URL] = qr_page_url
+ mass.signal_event(EventType.AUTH_SESSION, str(session_id), qr_page_url)
+ elif qrurl:
+ values[CONF_QR_PAGE_URL] = qrurl
+ mass.signal_event(EventType.AUTH_SESSION, str(session_id), qrurl)
+ elif qrurl:
+ values[CONF_QR_PAGE_URL] = qrurl
+
+ deadline = time.monotonic() + 120
+ while time.monotonic() < deadline:
+ try:
+ await _check_qr_auth(mass, values, raise_for_pending=False)
+ if _is_verified(values):
+ return
+ except InvalidDataError as err:
+ if "expired" in str(err).lower():
+ raise
+ await asyncio.sleep(1.0)
+ if values.get(CONF_QR_KEY):
+ raise InvalidDataError(
+ "Waiting for scan confirmation timed out. "
+ "You can click QR Login again or use Check QR status."
+ )
+
+
+async def _check_qr_auth(
+ mass: MusicAssistant,
+ values: dict[str, ConfigValueType],
+ *,
+ raise_for_pending: bool = True,
+) -> None:
+ """Check QR status and store cookie/uid when login completes."""
+ qr_key = str(values.get(CONF_QR_KEY) or "").strip()
+ if not qr_key:
+ raise InvalidDataError("Please generate a QR code first")
+ base_url = str(values.get(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL).strip()
+ client = NcmApiClient(mass.http_session, base_url)
+
+ payload = await client.get(
+ "/login/qr/check",
+ params={"key": qr_key, "timestamp": int(time.time() * 1000)},
+ allow_codes={800, 801, 802, 803},
+ )
+ code = _extract_code(payload)
+ if code == 803:
+ cookie = _extract_cookie(payload)
+ if not cookie:
+ raise LoginFailed("QR login succeeded but API response did not include cookie")
+ uid = await _resolve_uid(client, cookie)
+ values[CONF_COOKIE] = cookie
+ values[CONF_UID] = uid
+ values[CONF_QR_KEY] = None
+ values[CONF_QR_PAGE_URL] = None
+ return
+ if code == 800:
+ values[CONF_QR_KEY] = None
+ values[CONF_QR_PAGE_URL] = None
+ raise InvalidDataError("QR code expired, please generate a new one")
+ if raise_for_pending:
+ if code == 801:
+ raise InvalidDataError("QR code not scanned yet")
+ if code == 802:
+ raise InvalidDataError("QR scanned, please confirm login in NetEase app")
+ raise LoginFailed("Unable to determine QR login status")
+
+
+def _build_config_entries(values: dict[str, ConfigValueType]) -> tuple[ConfigEntry, ...]:
+ """Build setup flow config entries."""
+ has_qr_pending = _has_qr_pending(values)
+ is_verified = _is_verified(values)
+ qr_page_url = str(values.get(CONF_QR_PAGE_URL) or "")
+ status_label = (
+ "NetEase Cloud Music login confirmed. Close the QR page and click Save."
+ if is_verified
+ else "QR generated. Open the popup page, scan in NetEase Cloud Music app, then confirm."
+ if has_qr_pending
+ else "Click QR Login to start authentication."
+ )
+ help_text = (
+ "Login flow: 1) Click QR Login. 2) In the newly opened page, scan with NetEase app and "
+ "confirm login. 3) Close QR page. 4) Click Save."
+ )
+ return (
+ ConfigEntry(key="auth_help", type=ConfigEntryType.LABEL, label=help_text),
+ ConfigEntry(key="auth_status", type=ConfigEntryType.LABEL, label=status_label),
+ ConfigEntry(
+ key=CONF_API_BASE_URL,
+ type=ConfigEntryType.STRING,
+ label="API base URL",
+ required=True,
+ default_value=DEFAULT_API_BASE_URL,
+ value=str(values.get(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL),
+ description="Base URL of your NeteaseCloudMusicApi-compatible service.",
+ ),
+ ConfigEntry(
+ key=CONF_QR_PAGE_URL,
+ type=ConfigEntryType.STRING,
+ label="QR login page URL",
+ required=False,
+ hidden=not has_qr_pending or not qr_page_url,
+ value=qr_page_url,
+ ),
+ ConfigEntry(
+ key=CONF_QUALITY,
+ type=ConfigEntryType.STRING,
+ label="Preferred quality",
+ default_value=QUALITY_EXHIGH,
+ hidden=not is_verified,
+ options=[
+ ConfigValueOption("Standard", QUALITY_STANDARD),
+ ConfigValueOption("Higher", QUALITY_HIGHER),
+ ConfigValueOption("Exhigh", QUALITY_EXHIGH),
+ ConfigValueOption("Lossless", QUALITY_LOSSLESS),
+ ConfigValueOption("Hi-Res", QUALITY_HIRES),
+ ConfigValueOption("JY Effect", QUALITY_JYEFFECT),
+ ConfigValueOption("JY Master", QUALITY_JYMASTER),
+ ],
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_START_QR_AUTH,
+ type=ConfigEntryType.ACTION,
+ label="QR Login",
+ action=CONF_ACTION_START_QR_AUTH,
+ description="Generate QR code and open login popup page.",
+ hidden=is_verified,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_CHECK_QR_AUTH,
+ type=ConfigEntryType.ACTION,
+ label="Check QR status",
+ action=CONF_ACTION_CHECK_QR_AUTH,
+ description="Manually check whether scan confirmation completed.",
+ hidden=not has_qr_pending or is_verified,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_CLEAR_AUTH,
+ type=ConfigEntryType.ACTION,
+ label="Reset authentication",
+ action=CONF_ACTION_CLEAR_AUTH,
+ hidden=not (has_qr_pending or is_verified),
+ ),
+ ConfigEntry(
+ key=CONF_COOKIE,
+ type=ConfigEntryType.SECURE_STRING,
+ label=CONF_COOKIE,
+ hidden=True,
+ required=False,
+ value=str(values.get(CONF_COOKIE) or ""),
+ ),
+ ConfigEntry(
+ key=CONF_UID,
+ type=ConfigEntryType.STRING,
+ label=CONF_UID,
+ hidden=True,
+ required=False,
+ value=str(values.get(CONF_UID) or ""),
+ ),
+ ConfigEntry(
+ key=CONF_QR_KEY,
+ type=ConfigEntryType.STRING,
+ label=CONF_QR_KEY,
+ hidden=True,
+ required=False,
+ value=str(values.get(CONF_QR_KEY) or ""),
+ ),
+ )
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider instance with given configuration."""
+ return NeteaseCloudMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """Return Config entries to setup this provider."""
+ if values is None:
+ values = {}
+ try:
+ if action == CONF_ACTION_CLEAR_AUTH:
+ _clear_auth(values)
+ elif action == CONF_ACTION_START_QR_AUTH:
+ await _start_qr_auth(mass, values)
+ elif action == CONF_ACTION_CHECK_QR_AUTH:
+ await _check_qr_auth(mass, values)
+ except ResourceTemporarilyUnavailable as err:
+ raise InvalidDataError(str(err)) from err
+ return _build_config_entries(values)
+
+
+class NeteaseCloudMusicProvider(MusicProvider):
+ """NetEase Cloud Music provider (MVP)."""
+
+ _client: NcmApiClient
+ _cookie: str
+ _uid: str
+ _recommend_payload_cache: dict[str, tuple[float, dict[str, Any]]]
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of provider."""
+ self._cookie = str(self.config.get_value(CONF_COOKIE) or "").strip()
+ self._uid = str(self.config.get_value(CONF_UID) or "").strip()
+ if not self._cookie:
+ raise LoginFailed("No NetEase authentication configured, please login by QR code")
+
+ api_base_url = str(self.config.get_value(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL).strip()
+ self._client = NcmApiClient(self.mass.http_session, api_base_url)
+ resolved_uid = await _resolve_uid(self._client, self._cookie)
+ if not self._uid:
+ self._uid = resolved_uid
+ self._recommend_payload_cache = {}
+ self.logger.info("NetEase Cloud Music authenticated for uid %s", self._uid)
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """Handle unload/close of provider."""
+ if is_removed:
+ route_path = _get_qr_route_path(
+ {CONF_QR_PAGE_URL: str(self.config.get_value(CONF_QR_PAGE_URL) or "")}
+ )
+ _clear_qr_route(route_path)
+ self._recommend_payload_cache = {}
+ await super().unload(is_removed)
+
+ def _get_item_mapping(self, media_type: MediaType, item_id: str, name: str) -> ItemMapping:
+ """Create generic item mapping."""
+ return ItemMapping(
+ media_type=media_type, item_id=item_id, provider=self.instance_id, name=name
+ )
+
+ def _ensure_square_image_url(self, url: str, size: int = 500) -> str:
+ """Return image URL with square-size hint when supported by source."""
+ if not url or "param=" in url:
+ return url
+ separator = "&" if "?" in url else "?"
+ return f"{url}{separator}param={size}y{size}"
+
+ def _make_image_list(
+ self, url: str | None, *, force_square: bool = False
+ ) -> UniqueList[MediaItemImage]:
+ """Create image list for media item."""
+ if not url:
+ return UniqueList()
+ image_url = self._ensure_square_image_url(url) if force_square else url
+ return UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
+ def _get_quality_obj(self, song_obj: dict[str, Any], level: str) -> dict[str, Any] | None:
+ """Map level to corresponding quality object in song/detail payload."""
+ quality_key_map = {
+ QUALITY_STANDARD: "l",
+ QUALITY_HIGHER: "m",
+ QUALITY_EXHIGH: "h",
+ QUALITY_LOSSLESS: "sq",
+ QUALITY_HIRES: "hr",
+ QUALITY_JYEFFECT: "je",
+ QUALITY_JYMASTER: "jm",
+ }
+ quality_key = quality_key_map.get(level.lower())
+ if not quality_key:
+ return None
+ quality_obj = song_obj.get(quality_key)
+ return quality_obj if isinstance(quality_obj, dict) else None
+
+ def _infer_audio_format_from_level(
+ self, level: str, quality_obj: dict[str, Any] | None = None
+ ) -> tuple[AudioFormat, str | None]:
+ """Infer best-effort AudioFormat and optional quality label from level info."""
+ level_norm = level.lower()
+ if level_norm in (QUALITY_HIRES, QUALITY_JYEFFECT, QUALITY_JYMASTER):
+ content_type = ContentType.FLAC
+ bit_depth = 24
+ details = "Hi-Res"
+ elif level_norm == QUALITY_LOSSLESS:
+ content_type = ContentType.FLAC
+ bit_depth = 16
+ details = None
+ else:
+ content_type = ContentType.MP3
+ bit_depth = 16
+ details = None
+
+ sample_rate = _to_positive_int(quality_obj.get("sr")) if quality_obj else 0
+ bit_rate = _to_positive_int(quality_obj.get("br")) if quality_obj else 0
+ return (
+ AudioFormat(
+ content_type=content_type,
+ sample_rate=sample_rate or 44100,
+ bit_depth=bit_depth,
+ bit_rate=bit_rate or None,
+ ),
+ details,
+ )
+
+ def _normalize_level_name(self, value: Any) -> str | None:
+ """Normalize any level-like value to known quality levels."""
+ if not isinstance(value, str):
+ return None
+ level = value.strip().lower()
+ aliases = {
+ "hires": QUALITY_HIRES,
+ "hi_res": QUALITY_HIRES,
+ "hi-res": QUALITY_HIRES,
+ "dolby": QUALITY_HIRES,
+ "sky": QUALITY_HIRES,
+ "jyeffect": QUALITY_JYEFFECT,
+ "jymaster": QUALITY_JYMASTER,
+ "lossless": QUALITY_LOSSLESS,
+ "exhigh": QUALITY_EXHIGH,
+ "higher": QUALITY_HIGHER,
+ "standard": QUALITY_STANDARD,
+ }
+ return aliases.get(level)
+
+ def _detect_max_quality_level(self, song_obj: dict[str, Any]) -> str:
+ """Detect highest available quality for a track from song/detail fields."""
+ level_priority = [
+ QUALITY_JYMASTER,
+ QUALITY_JYEFFECT,
+ QUALITY_HIRES,
+ QUALITY_LOSSLESS,
+ QUALITY_EXHIGH,
+ QUALITY_HIGHER,
+ QUALITY_STANDARD,
+ ]
+
+ # 1) Prefer explicit quality objects from song/detail.
+ for level in level_priority:
+ quality_obj = self._get_quality_obj(song_obj, level)
+ if isinstance(quality_obj, dict):
+ if _to_positive_int(quality_obj.get("br")) or _to_positive_int(
+ quality_obj.get("sr")
+ ):
+ return level
+ if quality_obj:
+ return level
+
+ # 2) Fallback to privilege-reported max/play/download levels.
+ privilege = song_obj.get("privilege")
+ if isinstance(privilege, dict):
+ best_idx = len(level_priority)
+ best_level: str | None = None
+ for key in ("maxBrLevel", "dlLevel", "plLevel", "flLevel"):
+ normalized = self._normalize_level_name(privilege.get(key))
+ if not normalized:
+ continue
+ idx = level_priority.index(normalized)
+ if idx < best_idx:
+ best_idx = idx
+ best_level = normalized
+ if best_level:
+ return best_level
+
+ # 3) Fallback to mark bit flag (Hi-Res support).
+ mark = _to_positive_int(song_obj.get("mark"))
+ if mark and (mark & _HIRES_MARK_FLAG):
+ return QUALITY_HIRES
+
+ return QUALITY_STANDARD
+
+ def _apply_track_quality_from_song_detail(self, track: Track, song_obj: dict[str, Any]) -> None:
+ """Populate mapping quality/details from detailed song object."""
+ max_level = self._detect_max_quality_level(song_obj)
+ quality_obj = self._get_quality_obj(song_obj, max_level)
+ audio_format, quality_label = self._infer_audio_format_from_level(max_level, quality_obj)
+ for mapping in track.provider_mappings:
+ if mapping.provider_instance != self.instance_id:
+ continue
+ mapping.audio_format = audio_format
+ mapping.details = quality_label
+ break
+
+ def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
+ """Parse artist object."""
+ artist_id = str(artist_obj.get("id") or artist_obj.get("artistId") or "").strip()
+ if not artist_id:
+ raise InvalidDataError("Artist object missing id")
+ name = str(artist_obj.get("name") or "Unknown Artist").strip()
+ artist = Artist(
+ item_id=artist_id,
+ provider=self.instance_id,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=artist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"https://music.163.com/#/artist?id={artist_id}",
+ )
+ },
+ )
+ image_url = (
+ artist_obj.get("picUrl")
+ or artist_obj.get("img1v1Url")
+ or artist_obj.get("cover")
+ or artist_obj.get("avatar")
+ )
+ if isinstance(image_url, str):
+ artist.metadata.images = self._make_image_list(image_url, force_square=True)
+ return artist
+
+ def _parse_album(self, album_obj: dict[str, Any]) -> Album:
+ """Parse album object."""
+ album_id = str(album_obj.get("id") or album_obj.get("albumId") or "").strip()
+ if not album_id:
+ raise InvalidDataError("Album object missing id")
+ name = str(album_obj.get("name") or "Unknown Album").strip()
+ album = Album(
+ item_id=album_id,
+ provider=self.instance_id,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"https://music.163.com/#/album?id={album_id}",
+ )
+ },
+ )
+ if artists := album_obj.get("artists") or album_obj.get("ar"):
+ if isinstance(artists, list):
+ album.artists = UniqueList()
+ for artist_obj in artists:
+ if not isinstance(artist_obj, dict):
+ continue
+ artist_id = str(artist_obj.get("id") or "").strip()
+ artist_name = str(artist_obj.get("name") or "Unknown Artist").strip()
+ if artist_id:
+ album.artists.append(
+ self._get_item_mapping(MediaType.ARTIST, artist_id, artist_name)
+ )
+ image_url = (
+ album_obj.get("picUrl")
+ or album_obj.get("coverUrl")
+ or album_obj.get("blurPicUrl")
+ or album_obj.get("albumPic")
+ )
+ if isinstance(image_url, str):
+ album.metadata.images = self._make_image_list(image_url)
+ publish_time = album_obj.get("publishTime")
+ if isinstance(publish_time, int) and publish_time > 0:
+ with suppress(Exception):
+ album.year = datetime.fromtimestamp(publish_time / 1000, tz=UTC).year
+ return album
+
+ def _parse_track(self, song_obj: dict[str, Any]) -> Track:
+ """Parse song object."""
+ track_id = str(song_obj.get("id") or song_obj.get("songId") or "").strip()
+ if not track_id:
+ raise InvalidDataError("Track object missing id")
+ name = str(song_obj.get("name") or "Unknown Track").strip()
+ duration_ms = song_obj.get("dt") or song_obj.get("duration") or 0
+ duration = (
+ int(duration_ms / 1000) if isinstance(duration_ms, int) and duration_ms > 0 else 0
+ )
+ max_level = self._detect_max_quality_level(song_obj)
+ max_quality_obj = self._get_quality_obj(song_obj, max_level)
+ max_audio_format, max_quality_label = self._infer_audio_format_from_level(
+ max_level, max_quality_obj
+ )
+ track = Track(
+ item_id=track_id,
+ provider=self.instance_id,
+ name=name,
+ duration=duration,
+ provider_mappings={
+ ProviderMapping(
+ item_id=track_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=max_audio_format,
+ url=f"https://music.163.com/#/song?id={track_id}",
+ details=max_quality_label,
+ )
+ },
+ )
+
+ artists_raw = song_obj.get("ar") or song_obj.get("artists")
+ if isinstance(artists_raw, list):
+ track.artists = UniqueList()
+ for artist_obj in artists_raw:
+ if not isinstance(artist_obj, dict):
+ continue
+ artist_id = str(artist_obj.get("id") or "").strip()
+ artist_name = str(artist_obj.get("name") or "Unknown Artist").strip()
+ if artist_id:
+ track.artists.append(
+ self._get_item_mapping(MediaType.ARTIST, artist_id, artist_name)
+ )
+
+ album_raw = song_obj.get("al") or song_obj.get("album")
+ if isinstance(album_raw, dict):
+ album_id = str(album_raw.get("id") or "").strip()
+ album_name = str(album_raw.get("name") or "Unknown Album").strip()
+ if album_id:
+ track.album = self._get_item_mapping(MediaType.ALBUM, album_id, album_name)
+ image_url = (
+ album_raw.get("picUrl")
+ or album_raw.get("coverUrl")
+ or album_raw.get("blurPicUrl")
+ or song_obj.get("picUrl")
+ or song_obj.get("albumPic")
+ )
+ if isinstance(image_url, str):
+ track.metadata.images = self._make_image_list(image_url)
+ return track
+
+ def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
+ """Parse playlist object."""
+ playlist_id = str(playlist_obj.get("id") or playlist_obj.get("playlistId") or "").strip()
+ if not playlist_id:
+ raise InvalidDataError("Playlist object missing id")
+ name = str(playlist_obj.get("name") or "Unknown Playlist").strip()
+ playlist = Playlist(
+ item_id=playlist_id,
+ provider=self.instance_id,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=playlist_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=f"https://music.163.com/#/playlist?id={playlist_id}",
+ )
+ },
+ )
+ if isinstance(playlist_obj.get("description"), str):
+ playlist.metadata.description = str(playlist_obj["description"]).strip()
+ image_url = playlist_obj.get("coverImgUrl") or playlist_obj.get("picUrl")
+ if isinstance(image_url, str):
+ playlist.metadata.images = self._make_image_list(image_url)
+ return playlist
+
+ def _build_dynamic_playlist(
+ self, item_id: str, name: str, image_url: str | None = None
+ ) -> Playlist:
+ """Create a dynamic playlist entry for radio-like flows."""
+ playlist = Playlist(
+ item_id=item_id,
+ provider=self.instance_id,
+ name=name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ is_dynamic=True,
+ )
+ # Keep a fixed provider icon for FM/heart-mode entries for consistent UI.
+ playlist.metadata.images = self._make_image_list(_NCM_PROVIDER_ICON_URL)
+ return playlist
+
+ def _parse_heart_mode_playlist_id(self, playlist_id: str) -> tuple[str, str] | None:
+ """Parse heart mode dynamic playlist id into (seed_song_id, playlist_id)."""
+ if not playlist_id.startswith(f"{_PLAYLIST_HEART_MODE_PREFIX}:"):
+ return None
+ parts = playlist_id.split(":")
+ if len(parts) != 3:
+ return None
+ seed_song_id, source_playlist_id = parts[1], parts[2]
+ if not seed_song_id.isdigit() or not source_playlist_id.isdigit():
+ return None
+ return seed_song_id, source_playlist_id
+
+ def _mark_track_source(self, track: Track, source_label: str) -> Track:
+ """Mark track title so users can identify current dynamic radio source."""
+ prefix = f"[{source_label}] "
+ if not track.name.startswith(prefix):
+ track.name = f"{prefix}{track.name}"
+ return track
+
+ async def _get_song_detail(self, ids: str) -> list[dict[str, Any]]:
+ """Fetch song details for one or many ids."""
+ payload = await self._client.get("/song/detail", params={"ids": ids}, cookie=self._cookie)
+ data = _extract_data(payload)
+ songs = data.get("songs")
+ if isinstance(songs, list):
+ return [item for item in songs if isinstance(item, dict)]
+ return []
+
+ async def _get_song_music_detail(self, song_id: str) -> dict[str, Any] | None:
+ """Fetch extended quality info (jm/je/hr...) for a single song."""
+ payload = await self._client.get(
+ "/song/music/detail",
+ params={"id": song_id},
+ cookie=self._cookie,
+ )
+ return _extract_data(payload)
+
+ def _merge_quality_objects(
+ self, base_song_obj: dict[str, Any], quality_song_obj: dict[str, Any]
+ ) -> dict[str, Any]:
+ """Merge quality objects from song/music/detail into song/detail object."""
+ merged = dict(base_song_obj)
+ for key in ("jm", "je", "hr", "sq", "h", "m", "l"):
+ value = quality_song_obj.get(key)
+ if isinstance(value, dict):
+ merged[key] = value
+ return merged
+
+ async def _enrich_tracks_with_cover(self, tracks: list[Track]) -> None:
+ """Enrich track cover/quality by querying song/detail in chunks."""
+ track_ids = [track.item_id for track in tracks if track.item_id]
+ if not track_ids:
+ return
+
+ details_by_id: dict[str, dict[str, Any]] = {}
+ chunk_size = 200
+ for idx in range(0, len(track_ids), chunk_size):
+ chunk = track_ids[idx : idx + chunk_size]
+ with suppress(Exception):
+ detail_rows = await self._get_song_detail(",".join(chunk))
+ for row in detail_rows:
+ row_id = str(row.get("id") or "").strip()
+ if row_id:
+ details_by_id[row_id] = row
+
+ for track in tracks:
+ detail_obj = details_by_id.get(track.item_id)
+ if not isinstance(detail_obj, dict):
+ continue
+ with suppress(Exception):
+ quality_obj = await self._get_song_music_detail(track.item_id)
+ if isinstance(quality_obj, dict):
+ detail_obj = self._merge_quality_objects(detail_obj, quality_obj)
+ if not track.metadata.images:
+ album_raw = detail_obj.get("al") or detail_obj.get("album")
+ if isinstance(album_raw, dict):
+ image_url = (
+ album_raw.get("picUrl")
+ or album_raw.get("coverUrl")
+ or album_raw.get("blurPicUrl")
+ or detail_obj.get("picUrl")
+ or detail_obj.get("albumPic")
+ )
+ if isinstance(image_url, str) and image_url:
+ track.metadata.images = self._make_image_list(image_url)
+ self._apply_track_quality_from_song_detail(track, detail_obj)
+
+ async def search( # noqa: PLR0915
+ self,
+ search_query: str,
+ media_types: list[MediaType],
+ limit: int = 5,
+ ) -> SearchResults:
+ """Perform search on NetEase Cloud Music."""
+ result = SearchResults()
+ track_results: list[Track] = []
+ artist_results: list[Artist] = []
+ album_results: list[Album] = []
+ playlist_results: list[Playlist] = []
+ search_plan: list[tuple[MediaType, int]] = []
+ if MediaType.TRACK in media_types:
+ search_plan.append((MediaType.TRACK, 1))
+ if MediaType.ARTIST in media_types:
+ search_plan.append((MediaType.ARTIST, 100))
+ if MediaType.ALBUM in media_types:
+ search_plan.append((MediaType.ALBUM, 10))
+ if MediaType.PLAYLIST in media_types:
+ search_plan.append((MediaType.PLAYLIST, 1000))
+
+ async def _search_single(type_code: int) -> dict[str, Any]:
+ return await self._client.get(
+ "/search",
+ params={"keywords": search_query, "type": type_code, "limit": limit},
+ cookie=self._cookie,
+ )
+
+ responses = await asyncio.gather(
+ *[_search_single(type_code) for _, type_code in search_plan],
+ return_exceptions=True,
+ )
+ for idx, (media_type, _) in enumerate(search_plan):
+ response = responses[idx]
+ if isinstance(response, BaseException):
+ self.logger.debug("NCM search failed for media_type=%s: %s", media_type, response)
+ continue
+ data = _extract_data(response)
+ search_result = data.get("result")
+ if not isinstance(search_result, dict):
+ continue
+ if media_type == MediaType.TRACK:
+ songs = search_result.get("songs")
+ if isinstance(songs, list):
+ for song in songs[:limit]:
+ if not isinstance(song, dict):
+ continue
+ with suppress(InvalidDataError):
+ track_results.append(self._parse_track(song))
+ elif media_type == MediaType.ARTIST:
+ artists = search_result.get("artists")
+ if isinstance(artists, list):
+ for artist in artists[:limit]:
+ if not isinstance(artist, dict):
+ continue
+ with suppress(InvalidDataError):
+ artist_results.append(self._parse_artist(artist))
+ elif media_type == MediaType.ALBUM:
+ albums = search_result.get("albums")
+ if isinstance(albums, list):
+ for album in albums[:limit]:
+ if not isinstance(album, dict):
+ continue
+ with suppress(InvalidDataError):
+ album_results.append(self._parse_album(album))
+ elif media_type == MediaType.PLAYLIST:
+ playlists = search_result.get("playlists")
+ if isinstance(playlists, list):
+ for playlist in playlists[:limit]:
+ if not isinstance(playlist, dict):
+ continue
+ with suppress(InvalidDataError):
+ playlist_results.append(self._parse_playlist(playlist))
+ result.tracks = track_results
+ await self._enrich_tracks_with_cover(track_results)
+ result.artists = artist_results
+ result.albums = album_results
+ result.playlists = playlist_results
+ return result
+
+ @use_cache(3600 * 24)
+ async def get_artist(self, prov_artist_id: str) -> Artist:
+ """Get full artist details by id."""
+ payload = await self._client.get(
+ "/artist/detail",
+ params={"id": prov_artist_id},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ artist_obj = data.get("artist")
+ if not isinstance(artist_obj, dict):
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
+ return self._parse_artist(artist_obj)
+
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
+ """Get all albums for an artist."""
+ payload = await self._client.get(
+ "/artist/album",
+ params={"id": prov_artist_id, "limit": 100, "offset": 0},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ raw_albums = data.get("hotAlbums") or data.get("albums")
+ if not isinstance(raw_albums, list):
+ return []
+ albums: list[Album] = []
+ for album_obj in raw_albums:
+ if not isinstance(album_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ albums.append(self._parse_album(album_obj))
+ return albums
+
+ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
+ """Get top tracks for given artist."""
+ payload = await self._client.get(
+ "/artist/top/song",
+ params={"id": prov_artist_id},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ songs = data.get("songs")
+ if not isinstance(songs, list):
+ return []
+ tracks: list[Track] = []
+ for song_obj in songs:
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ tracks.append(self._parse_track(song_obj))
+ return tracks
+
+ @use_cache(3600 * 24)
+ async def get_album(self, prov_album_id: str) -> Album:
+ """Get full album details by id."""
+ payload = await self._client.get(
+ "/album",
+ params={"id": prov_album_id},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ album_obj = data.get("album")
+ if not isinstance(album_obj, dict):
+ raise MediaNotFoundError(f"Album {prov_album_id} not found")
+ return self._parse_album(album_obj)
+
+ @use_cache(3600 * 24)
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
+ """Get album tracks for album id."""
+ payload = await self._client.get(
+ "/album",
+ params={"id": prov_album_id},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ songs = data.get("songs")
+ if not isinstance(songs, list):
+ return []
+ tracks: list[Track] = []
+ for song_obj in songs:
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ tracks.append(self._parse_track(song_obj))
+ return tracks
+
+ @use_cache(3600 * 24)
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get full track details by id."""
+ songs = await self._get_song_detail(prov_track_id)
+ if not songs:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found")
+ song_obj = songs[0]
+ with suppress(Exception):
+ quality_obj = await self._get_song_music_detail(prov_track_id)
+ if isinstance(quality_obj, dict):
+ song_obj = self._merge_quality_objects(song_obj, quality_obj)
+ track = self._parse_track(song_obj)
+ with suppress(Exception):
+ lyric_payload = await self._client.get(
+ "/lyric/new",
+ params={"id": prov_track_id},
+ cookie=self._cookie,
+ )
+ lyric_data = _extract_data(lyric_payload)
+ lrc = (
+ lyric_data.get("lrc", {}).get("lyric")
+ if isinstance(lyric_data.get("lrc"), dict)
+ else ""
+ )
+ yrc = (
+ lyric_data.get("yrc", {}).get("lyric")
+ if isinstance(lyric_data.get("yrc"), dict)
+ else ""
+ )
+ tlyric = (
+ lyric_data.get("tlyric", {}).get("lyric")
+ if isinstance(lyric_data.get("tlyric"), dict)
+ else ""
+ )
+ lrc_text = str(lrc or "").strip()
+ yrc_text = str(yrc or "").strip()
+ tlyric_text = str(tlyric or "").strip()
+ if lrc_text and _LRC_TIMESTAMP_PATTERN.search(lrc_text):
+ track.metadata.lrc_lyrics = lrc_text
+ track.metadata.lyrics = _lrc_to_plain_text(lrc_text) or lrc_text
+ elif lrc_text:
+ track.metadata.lyrics = lrc_text
+ elif yrc_text:
+ track.metadata.lyrics = yrc_text
+ elif tlyric_text:
+ track.metadata.lyrics = tlyric_text
+ return track
+
+ @use_cache(3600 * 24)
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
+ """Get full playlist details by id."""
+ if prov_playlist_id == _PLAYLIST_PERSONAL_FM_ID:
+ return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人FM")
+ if heart_parts := self._parse_heart_mode_playlist_id(prov_playlist_id):
+ seed_song_id, source_playlist_id = heart_parts
+ return self._build_dynamic_playlist(
+ f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{source_playlist_id}",
+ "心动模式",
+ )
+ if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
+ if playlist := await self._build_heart_mode_dynamic_playlist():
+ return playlist
+ raise MediaNotFoundError("心动模式不可用, 请稍后重试")
+
+ payload = await self._client.get(
+ "/playlist/detail",
+ params={"id": prov_playlist_id},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ playlist_obj = data.get("playlist")
+ if not isinstance(playlist_obj, dict):
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
+ return self._parse_playlist(playlist_obj)
+
+ async def get_playlist_tracks(
+ self,
+ prov_playlist_id: str,
+ page: int = 0,
+ ) -> Sequence[Track]:
+ """Get all playlist tracks for given playlist id."""
+ if prov_playlist_id == _PLAYLIST_PERSONAL_FM_ID:
+ if page > 0:
+ return []
+ tracks = await self._pick_personal_fm_tracks(fresh=True, target_count=12)
+ for idx, track in enumerate(tracks, start=1):
+ self._mark_track_source(track, "私人FM")
+ track.position = idx
+ return tracks
+ if heart_parts := self._parse_heart_mode_playlist_id(prov_playlist_id):
+ if page > 0:
+ return []
+ seed_song_id, source_playlist_id = heart_parts
+ tracks = await self._pick_heart_mode_tracks(
+ seed_song_id,
+ source_playlist_id,
+ count=20,
+ )
+ for idx, track in enumerate(tracks, start=1):
+ self._mark_track_source(track, "心动模式")
+ track.position = idx
+ return tracks
+ if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
+ if page > 0:
+ return []
+ if playlist := await self._build_heart_mode_dynamic_playlist():
+ heart_parts = self._parse_heart_mode_playlist_id(playlist.item_id)
+ if heart_parts:
+ seed_song_id, source_playlist_id = heart_parts
+ tracks = await self._pick_heart_mode_tracks(
+ seed_song_id,
+ source_playlist_id,
+ count=20,
+ )
+ for idx, track in enumerate(tracks, start=1):
+ self._mark_track_source(track, "心动模式")
+ track.position = idx
+ return tracks
+ return []
+
+ limit = 500
+ offset = page * limit
+ payload = await self._client.get(
+ "/playlist/track/all",
+ params={"id": prov_playlist_id, "limit": limit, "offset": offset},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ songs = data.get("songs")
+ if not isinstance(songs, list):
+ return []
+ result: list[Track] = []
+ for idx, song_obj in enumerate(songs, start=1):
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ track = self._parse_track(song_obj)
+ track.position = offset + idx
+ result.append(track)
+ return result
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve favorite artists from NCM."""
+ payload = await self._client.get(
+ "/artist/sublist",
+ params={"limit": 2000, "offset": 0},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ artists = data.get("data") or data.get("artists")
+ if not isinstance(artists, list):
+ artists = []
+ for artist_obj in artists:
+ if not isinstance(artist_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ yield self._parse_artist(artist_obj)
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """Retrieve favorite albums from NCM."""
+ payload = await self._client.get(
+ "/album/sublist",
+ params={"limit": 2000, "offset": 0},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ albums = data.get("data") or data.get("albums")
+ if not isinstance(albums, list):
+ albums = []
+ for album_obj in albums:
+ if not isinstance(album_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ yield self._parse_album(album_obj)
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """Retrieve liked tracks from NCM."""
+ payload = await self._client.get(
+ "/likelist",
+ params={"uid": self._uid},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ ids = data.get("ids") or payload.get("ids")
+ if not isinstance(ids, list):
+ return
+ track_ids = [str(item) for item in ids if str(item).isdigit()]
+ chunk_size = 200
+ for idx in range(0, len(track_ids), chunk_size):
+ chunk_ids = track_ids[idx : idx + chunk_size]
+ songs = await self._get_song_detail(",".join(chunk_ids))
+ for song_obj in songs:
+ with suppress(InvalidDataError):
+ yield self._parse_track(song_obj)
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Retrieve user playlists from NCM."""
+ payload = await self._client.get(
+ "/user/playlist",
+ params={"uid": self._uid, "limit": 1000, "offset": 0},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ playlists = data.get("playlist")
+ if not isinstance(playlists, list):
+ return
+ for playlist_obj in playlists:
+ if not isinstance(playlist_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ yield self._parse_playlist(playlist_obj)
+
+ async def _get_recommend_payload_cached(
+ self,
+ key: str,
+ ttl: int,
+ path: str,
+ params: dict[str, Any] | None = None,
+ ) -> dict[str, Any]:
+ """Return recommendation payload from in-memory TTL cache or fetch fresh."""
+ if cached := self._recommend_payload_cache.get(key):
+ timestamp, payload = cached
+ if (time.time() - timestamp) < ttl:
+ self.logger.debug("NCM recommendations %s payload cache hit", key)
+ return payload
+ payload = await self._client.get(path, params=params, cookie=self._cookie)
+ self._recommend_payload_cache[key] = (time.time(), payload)
+ return payload
+
+ async def _pick_personal_fm_tracks(
+ self, *, fresh: bool = False, target_count: int = 1
+ ) -> list[Track]:
+ """Fetch personal FM tracks, optionally aggregate multiple fresh pulls."""
+ if not fresh:
+ fm_payload = await self._get_recommend_payload_cached(
+ "personal_fm",
+ _RECOMMEND_PERSONAL_FM_TTL,
+ "/personal_fm",
+ )
+ fm_data = _extract_data(fm_payload)
+ fm_songs = fm_data.get("data")
+ if not isinstance(fm_songs, list) or not fm_songs:
+ return []
+ cached_tracks: list[Track] = []
+ for fm_item in fm_songs:
+ if not isinstance(fm_item, dict):
+ continue
+ song_obj = fm_item.get("song") if isinstance(fm_item.get("song"), dict) else fm_item
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ cached_tracks.append(self._parse_track(song_obj))
+ return cached_tracks
+
+ # Fresh mode for dynamic playback: call endpoint in bounded batches and deduplicate tracks.
+ result: list[Track] = []
+ seen_ids: set[str] = set()
+ attempts = max(2, min(max(target_count, 4), 8))
+ no_new_rounds = 0
+ for _ in range(attempts):
+ fm_payload = await self._client.get(
+ "/personal_fm",
+ params={"timestamp": int(time.time() * 1000)},
+ cookie=self._cookie,
+ )
+ fm_data = _extract_data(fm_payload)
+ fm_songs = fm_data.get("data")
+ if not isinstance(fm_songs, list) or not fm_songs:
+ no_new_rounds += 1
+ if no_new_rounds >= 2:
+ break
+ continue
+ before_count = len(result)
+ for fm_item in fm_songs:
+ if not isinstance(fm_item, dict):
+ continue
+ song_obj = fm_item.get("song") if isinstance(fm_item.get("song"), dict) else fm_item
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ track = self._parse_track(song_obj)
+ if track.item_id in seen_ids:
+ continue
+ seen_ids.add(track.item_id)
+ result.append(track)
+ if len(result) == before_count:
+ no_new_rounds += 1
+ else:
+ no_new_rounds = 0
+ if len(result) >= target_count:
+ break
+ if no_new_rounds >= 2:
+ break
+ return result
+
+ async def _pick_personal_fm_track(self) -> Track | None:
+ """Pick one track from personal FM payload."""
+ tracks = await self._pick_personal_fm_tracks(target_count=1)
+ return tracks[0] if tracks else None
+
+ async def _get_heart_mode_seed(self) -> tuple[str, str] | None:
+ """Resolve heart mode seed ids as (seed_song_id, playlist_id)."""
+ daily_payload = await self._get_recommend_payload_cached(
+ "daily_songs",
+ _RECOMMEND_DAILY_TTL,
+ "/recommend/songs",
+ )
+ daily_data = _extract_data(daily_payload)
+ daily_songs = daily_data.get("dailySongs")
+ if not isinstance(daily_songs, list) or not daily_songs:
+ return None
+ seed_song = next(
+ (item for item in daily_songs if isinstance(item, dict) and item.get("id")),
+ None,
+ )
+ if not isinstance(seed_song, dict):
+ return None
+ seed_song_id = str(seed_song.get("id") or "").strip()
+ if not seed_song_id.isdigit():
+ return None
+
+ playlist_payload = await self._get_recommend_payload_cached(
+ "heart_mode_playlist",
+ _RECOMMEND_HEART_MODE_TTL,
+ "/user/playlist",
+ {"uid": self._uid, "limit": 1, "offset": 0},
+ )
+ playlist_data = _extract_data(playlist_payload)
+ playlist_rows = playlist_data.get("playlist")
+ if not isinstance(playlist_rows, list) or not playlist_rows:
+ return None
+ first_playlist = playlist_rows[0]
+ if not isinstance(first_playlist, dict):
+ return None
+ playlist_id = str(first_playlist.get("id") or "").strip()
+ if not playlist_id.isdigit():
+ return None
+
+ return seed_song_id, playlist_id
+
+ async def _build_heart_mode_dynamic_playlist(self) -> Playlist | None:
+ """Build heart mode dynamic playlist item."""
+ heart_parts = await self._get_heart_mode_seed()
+ if heart_parts is None:
+ return None
+ seed_song_id, playlist_id = heart_parts
+ return self._build_dynamic_playlist(
+ f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{playlist_id}",
+ "心动模式",
+ )
+
+ async def _pick_heart_mode_tracks(
+ self, seed_song_id: str, playlist_id: str, count: int = 20
+ ) -> list[Track]:
+ """Fetch heart mode recommendation tracks."""
+ payload = await self._client.get(
+ "/playmode/intelligence/list",
+ params={
+ "id": seed_song_id,
+ "pid": playlist_id,
+ "sid": seed_song_id,
+ "count": count,
+ },
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ rows = data.get("data")
+ if not isinstance(rows, list) or not rows:
+ return []
+ result: list[Track] = []
+ for row in rows:
+ if not isinstance(row, dict):
+ continue
+ song_obj = None
+ for key in ("songInfo", "song", "songData", "trackData"):
+ if isinstance(row.get(key), dict):
+ song_obj = row[key]
+ break
+ if song_obj is None and isinstance(row.get("id"), (int, str)):
+ song_obj = row
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ result.append(self._parse_track(song_obj))
+ return result
+
+ async def _build_dynamic_radio_folder(self) -> RecommendationFolder | None:
+ """Build recommendation folder with dynamic playlist items."""
+ folder = RecommendationFolder(
+ item_id="recommended_radios",
+ provider=self.instance_id,
+ name="个性电台",
+ icon="mdi:radio",
+ )
+ folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人FM"))
+ if heart_playlist := await self._build_heart_mode_dynamic_playlist():
+ folder.items.append(heart_playlist)
+ return folder if folder.items else None
+
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get recommendation folders."""
+ folders: list[RecommendationFolder] = []
+
+ if folder := await self._build_dynamic_radio_folder():
+ folders.append(folder)
+
+ daily_payload = await self._get_recommend_payload_cached(
+ "daily_songs", _RECOMMEND_DAILY_TTL, "/recommend/songs"
+ )
+ daily_data = _extract_data(daily_payload)
+ daily_songs = daily_data.get("dailySongs")
+ if isinstance(daily_songs, list) and daily_songs:
+ folder = RecommendationFolder(
+ item_id="daily_songs",
+ provider=self.instance_id,
+ name="每日推荐",
+ icon="mdi:star",
+ )
+ for song_obj in daily_songs:
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ folder.items.append(self._parse_track(song_obj))
+ if folder.items:
+ folders.append(folder)
+
+ new_song_payload = await self._get_recommend_payload_cached(
+ "recommended_newsong",
+ _RECOMMEND_NEWSONG_TTL,
+ "/personalized/newsong",
+ {"limit": 50},
+ )
+ new_song_data = _extract_data(new_song_payload)
+ raw_new_songs = new_song_data.get("result")
+ if isinstance(raw_new_songs, list) and raw_new_songs:
+ folder = RecommendationFolder(
+ item_id="recommended_new_songs",
+ provider=self.instance_id,
+ name="推荐新歌",
+ icon="mdi:music-note",
+ )
+ for item in raw_new_songs:
+ if not isinstance(item, dict):
+ continue
+ song_obj = item.get("song") if isinstance(item.get("song"), dict) else item
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ folder.items.append(self._parse_track(song_obj))
+ if folder.items:
+ folders.append(folder)
+
+ playlist_payload = await self._get_recommend_payload_cached(
+ "recommended_playlists",
+ _RECOMMEND_PLAYLIST_TTL,
+ "/personalized",
+ {"limit": 25},
+ )
+ playlist_data = _extract_data(playlist_payload)
+ raw_playlists = playlist_data.get("result")
+ if isinstance(raw_playlists, list) and raw_playlists:
+ folder = RecommendationFolder(
+ item_id="recommended_playlists",
+ provider=self.instance_id,
+ name="推荐歌单",
+ icon="mdi:playlist-music",
+ )
+ for playlist_obj in raw_playlists:
+ if not isinstance(playlist_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ folder.items.append(self._parse_playlist(playlist_obj))
+ if folder.items:
+ folders.append(folder)
+
+ return folders
+
+ def _quality_candidates(self) -> list[str]:
+ """Return ordered quality levels based on config."""
+ raw_quality = str(self.config.get_value(CONF_QUALITY) or QUALITY_EXHIGH).lower()
+ explicit_levels = {
+ QUALITY_STANDARD,
+ QUALITY_HIGHER,
+ QUALITY_EXHIGH,
+ QUALITY_LOSSLESS,
+ QUALITY_HIRES,
+ QUALITY_JYEFFECT,
+ QUALITY_JYMASTER,
+ }
+ quality = (
+ raw_quality
+ if raw_quality in explicit_levels
+ else (self._normalize_level_name(raw_quality) or raw_quality)
+ )
+ if quality == QUALITY_JYMASTER:
+ return [
+ QUALITY_JYMASTER,
+ QUALITY_JYEFFECT,
+ QUALITY_HIRES,
+ QUALITY_LOSSLESS,
+ QUALITY_EXHIGH,
+ QUALITY_HIGHER,
+ QUALITY_STANDARD,
+ ]
+ if quality == QUALITY_JYEFFECT:
+ return [
+ QUALITY_JYEFFECT,
+ QUALITY_HIRES,
+ QUALITY_LOSSLESS,
+ QUALITY_EXHIGH,
+ QUALITY_HIGHER,
+ QUALITY_STANDARD,
+ ]
+ if quality == QUALITY_HIRES:
+ return [
+ QUALITY_JYMASTER,
+ QUALITY_JYEFFECT,
+ QUALITY_HIRES,
+ QUALITY_LOSSLESS,
+ QUALITY_EXHIGH,
+ QUALITY_HIGHER,
+ QUALITY_STANDARD,
+ ]
+ if quality == QUALITY_LOSSLESS:
+ return [QUALITY_LOSSLESS, QUALITY_EXHIGH, QUALITY_HIGHER, QUALITY_STANDARD]
+ if quality == QUALITY_EXHIGH:
+ return [QUALITY_EXHIGH, QUALITY_HIGHER, QUALITY_STANDARD]
+ if quality == QUALITY_HIGHER:
+ return [QUALITY_HIGHER, QUALITY_STANDARD]
+ if quality == QUALITY_STANDARD:
+ return [QUALITY_STANDARD]
+ return [QUALITY_EXHIGH, QUALITY_HIGHER, QUALITY_STANDARD]
+
+ def _parse_content_type(self, stream_type: str | None, url: str) -> ContentType:
+ """Map stream type/ext to MA content type."""
+ if stream_type:
+ lowered = stream_type.lower()
+ if lowered == "flac":
+ return ContentType.FLAC
+ if lowered in ("mp3", "mpeg"):
+ return ContentType.MP3
+ if lowered in ("aac", "m4a"):
+ return ContentType.AAC
+ path = urlparse(url).path.lower()
+ if path.endswith(".flac"):
+ return ContentType.FLAC
+ if path.endswith(".mp3"):
+ return ContentType.MP3
+ if path.endswith((".m4a", ".aac")):
+ return ContentType.AAC
+ return ContentType.UNKNOWN
+
+ def _is_preview_stream(
+ self,
+ stream_info: dict[str, Any],
+ track_duration_ms: int | None = None,
+ ) -> bool:
+ """Return True when stream payload indicates a trial/preview clip."""
+ # 1) Strong signal: explicit free trial fragment window.
+ free_trial_info = stream_info.get("freeTrialInfo")
+ if isinstance(free_trial_info, dict):
+ trial_start = _to_positive_int(free_trial_info.get("start"))
+ trial_end = _to_positive_int(free_trial_info.get("end"))
+ if trial_end > trial_start:
+ return True
+
+ # 2) Fallback signal: returned stream duration is much shorter than track duration.
+ # Some responses include `freeTrialPrivilege` even for playable tracks, so we do not
+ # treat its mere presence as preview.
+ stream_time_ms = _to_positive_int(stream_info.get("time"))
+ if track_duration_ms and stream_time_ms:
+ # Keep a small tolerance to avoid false positives on rounding differences.
+ if (stream_time_ms + 5000) < track_duration_ms:
+ return True
+
+ return False
+
+ async def _get_track_stream_details(
+ self,
+ track_id: str,
+ *,
+ stream_item_id: str,
+ media_type: MediaType,
+ allow_seek: bool,
+ ) -> StreamDetails:
+ """Resolve stream details for a concrete track id."""
+ detail_obj: dict[str, Any] | None = None
+ track_duration_ms: int | None = None
+ with suppress(Exception):
+ detail_rows = await self._get_song_detail(track_id)
+ if detail_rows:
+ detail_obj = detail_rows[0]
+ if isinstance(detail_obj, dict):
+ track_duration_ms = _to_positive_int(detail_obj.get("dt")) or None
+ preview_fallback: StreamDetails | None = None
+ for requested_level in self._quality_candidates():
+ payload = await self._client.get(
+ "/song/url/v1",
+ params={"id": track_id, "level": requested_level},
+ cookie=_with_pc_os_cookie(self._cookie),
+ )
+ data = _extract_data(payload)
+ stream_rows = data.get("data")
+ if not isinstance(stream_rows, list) or not stream_rows:
+ continue
+ stream_info = stream_rows[0] if isinstance(stream_rows[0], dict) else {}
+ if not stream_info:
+ continue
+ stream_url = str(stream_info.get("url") or "").strip()
+ if not stream_url:
+ continue
+ resolved_level = str(stream_info.get("level") or requested_level).lower()
+ normalized_level = self._normalize_level_name(resolved_level) or resolved_level
+ preview = self._is_preview_stream(stream_info, track_duration_ms)
+ level_quality_obj = (
+ self._get_quality_obj(detail_obj, normalized_level)
+ if isinstance(detail_obj, dict)
+ else None
+ )
+ inferred_format, _ = self._infer_audio_format_from_level(
+ normalized_level, level_quality_obj
+ )
+ detected_content_type = self._parse_content_type(
+ str(stream_info.get("type") or stream_info.get("encodeType") or ""),
+ stream_url,
+ )
+ audio_format = AudioFormat(
+ content_type=(
+ inferred_format.content_type
+ if detected_content_type == ContentType.UNKNOWN
+ else detected_content_type
+ ),
+ sample_rate=inferred_format.sample_rate,
+ bit_depth=inferred_format.bit_depth,
+ bit_rate=inferred_format.bit_rate,
+ )
+ bitrate = stream_info.get("br")
+ if isinstance(bitrate, int) and bitrate > 0:
+ audio_format.bit_rate = bitrate
+ stream_sr = _to_positive_int(stream_info.get("sr"))
+ if stream_sr > 0 and (
+ level_quality_obj is None or audio_format.sample_rate in (0, 44100)
+ ):
+ audio_format.sample_rate = stream_sr
+ expiration = 3600
+ with suppress(Exception):
+ parsed = parse_qs(urlparse(stream_url).query)
+ if expire_raw := parsed.get("expire", [None])[0]:
+ expiration = max(60, int(expire_raw) - int(time.time()))
+ details = StreamDetails(
+ provider=self.instance_id,
+ item_id=stream_item_id,
+ media_type=media_type,
+ audio_format=audio_format,
+ stream_type=StreamType.HTTP,
+ path=stream_url,
+ can_seek=allow_seek,
+ allow_seek=allow_seek,
+ expiration=expiration,
+ data={
+ "preview": preview,
+ "requested_level": requested_level,
+ "resolved_level": resolved_level,
+ "track_id": track_id,
+ },
+ )
+ self.logger.debug(
+ "NCM stream selected item=%s track=%s requested=%s resolved=%s preview=%s format=%s/%s bitrate=%s",
+ stream_item_id,
+ track_id,
+ requested_level,
+ resolved_level,
+ preview,
+ audio_format.sample_rate,
+ audio_format.bit_depth,
+ audio_format.bit_rate,
+ )
+ if not preview:
+ return details
+ if preview_fallback is None:
+ preview_fallback = details
+ if preview_fallback is not None:
+ return preview_fallback
+ raise UnplayableMediaError(f"No playable stream URL returned for track {track_id}")
+
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ """Return streamdetails for track."""
+ if media_type == MediaType.TRACK:
+ return await self._get_track_stream_details(
+ item_id,
+ stream_item_id=item_id,
+ media_type=MediaType.TRACK,
+ allow_seek=True,
+ )
+ raise MediaNotFoundError(f"Unsupported media type {media_type}")
diff --git a/music_assistant/providers/neteasecloudmusic/constants.py b/music_assistant/providers/neteasecloudmusic/constants.py
new file mode 100644
index 0000000000..cc3030dd98
--- /dev/null
+++ b/music_assistant/providers/neteasecloudmusic/constants.py
@@ -0,0 +1,26 @@
+"""Constants for the NetEase Cloud Music provider."""
+
+from __future__ import annotations
+
+from typing import Final
+
+CONF_API_BASE_URL: Final[str] = "api_base_url"
+CONF_COOKIE: Final[str] = "cookie"
+CONF_UID: Final[str] = "uid"
+CONF_QR_KEY: Final[str] = "qr_key"
+CONF_QR_PAGE_URL: Final[str] = "qr_page_url"
+CONF_QUALITY: Final[str] = "quality"
+
+CONF_ACTION_START_QR_AUTH: Final[str] = "start_qr_auth"
+CONF_ACTION_CHECK_QR_AUTH: Final[str] = "check_qr_auth"
+CONF_ACTION_CLEAR_AUTH: Final[str] = "clear_auth"
+
+QUALITY_STANDARD: Final[str] = "standard"
+QUALITY_HIGHER: Final[str] = "higher"
+QUALITY_EXHIGH: Final[str] = "exhigh"
+QUALITY_LOSSLESS: Final[str] = "lossless"
+QUALITY_HIRES: Final[str] = "hires"
+QUALITY_JYEFFECT: Final[str] = "jyeffect"
+QUALITY_JYMASTER: Final[str] = "jymaster"
+
+DEFAULT_API_BASE_URL: Final[str] = "http://127.0.0.1:3000"
diff --git a/music_assistant/providers/neteasecloudmusic/icon.svg b/music_assistant/providers/neteasecloudmusic/icon.svg
new file mode 100644
index 0000000000..924fd521b1
--- /dev/null
+++ b/music_assistant/providers/neteasecloudmusic/icon.svg
@@ -0,0 +1 @@
+
diff --git a/music_assistant/providers/neteasecloudmusic/icon_monochrome.svg b/music_assistant/providers/neteasecloudmusic/icon_monochrome.svg
new file mode 100644
index 0000000000..6035999a5e
--- /dev/null
+++ b/music_assistant/providers/neteasecloudmusic/icon_monochrome.svg
@@ -0,0 +1 @@
+
diff --git a/music_assistant/providers/neteasecloudmusic/manifest.json b/music_assistant/providers/neteasecloudmusic/manifest.json
new file mode 100644
index 0000000000..17ef029511
--- /dev/null
+++ b/music_assistant/providers/neteasecloudmusic/manifest.json
@@ -0,0 +1,10 @@
+{
+ "type": "music",
+ "domain": "neteasecloudmusic",
+ "stage": "beta",
+ "name": "NetEase Cloud Music",
+ "description": "Stream NetEase Cloud Music catalog, playlists, lyrics, and recommendations.",
+ "codeowners": ["@xiasi0"],
+ "requirements": [],
+ "multi_instance": true
+}
From e0bc192a979ebc5ea3c9f859696d9d1b47524db1 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sat, 11 Apr 2026 02:11:31 +0800
Subject: [PATCH 02/11] fix(netease): paginate artist album retrieval
---
.../providers/neteasecloudmusic/__init__.py | 43 +++++++++++++------
1 file changed, 29 insertions(+), 14 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index a082261ec8..240fcf8b5c 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -1135,21 +1135,36 @@ async def get_artist(self, prov_artist_id: str) -> Artist:
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get all albums for an artist."""
- payload = await self._client.get(
- "/artist/album",
- params={"id": prov_artist_id, "limit": 100, "offset": 0},
- cookie=self._cookie,
- )
- data = _extract_data(payload)
- raw_albums = data.get("hotAlbums") or data.get("albums")
- if not isinstance(raw_albums, list):
- return []
+ limit = 100
+ offset = 0
+ seen_album_ids: set[str] = set()
albums: list[Album] = []
- for album_obj in raw_albums:
- if not isinstance(album_obj, dict):
- continue
- with suppress(InvalidDataError):
- albums.append(self._parse_album(album_obj))
+ for _ in range(50):
+ payload = await self._client.get(
+ "/artist/album",
+ params={"id": prov_artist_id, "limit": limit, "offset": offset},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ raw_albums = data.get("hotAlbums") or data.get("albums")
+ if not isinstance(raw_albums, list) or not raw_albums:
+ break
+
+ for album_obj in raw_albums:
+ if not isinstance(album_obj, dict):
+ continue
+ album_id = str(album_obj.get("id") or album_obj.get("albumId") or "").strip()
+ if album_id and album_id in seen_album_ids:
+ continue
+ with suppress(InvalidDataError):
+ album = self._parse_album(album_obj)
+ albums.append(album)
+ seen_album_ids.add(album.item_id)
+
+ has_more = bool(data.get("more") or data.get("hasMore"))
+ offset += limit
+ if not has_more and len(raw_albums) < limit:
+ break
return albums
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
From 8b5fcf2c605f5f3692107d917f9b891c68462407 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sat, 11 Apr 2026 12:37:42 +0800
Subject: [PATCH 03/11] docs(netease): clarify os cookie intent and API
dependency
---
.../providers/neteasecloudmusic/__init__.py | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 240fcf8b5c..4c6bf39747 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -99,6 +99,9 @@
_RECOMMEND_DAILY_TTL = 60 * 30
_RECOMMEND_PERSONAL_FM_TTL = 60 * 5
_RECOMMEND_HEART_MODE_TTL = 60 * 60
+# NetEase song-detail payload uses this bit in `hr`/`h` mark metadata to indicate
+# that the track has a Hi-Res tier in catalog metadata.
+# Value observed from NeteaseCloudMusicApi-compatible responses.
_HIRES_MARK_FLAG = 17179869184
_PLAYLIST_PERSONAL_FM_ID = "personal_fm_dynamic"
_PLAYLIST_HEART_MODE_PREFIX = "heart_mode_dynamic"
@@ -233,7 +236,13 @@ def _extract_cookie(payload: dict[str, Any]) -> str:
def _with_pc_os_cookie(cookie: str) -> str:
- """Return cookie string with os=pc enforced (required for correct song/url quality)."""
+ """Return cookie string with os=pc for quality URL consistency.
+
+ Netease API may return lower-tier URLs for non-pc `os` cookies even for
+ entitled accounts. This hint only stabilizes server-side format selection;
+ entitlement still comes from upstream account/song permission checks and we
+ do not bypass locked content.
+ """
if not cookie.strip():
return cookie
parts = [part.strip() for part in cookie.split(";") if part.strip()]
@@ -475,7 +484,10 @@ def _build_config_entries(values: dict[str, ConfigValueType]) -> tuple[ConfigEnt
required=True,
default_value=DEFAULT_API_BASE_URL,
value=str(values.get(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL),
- description="Base URL of your NeteaseCloudMusicApi-compatible service.",
+ description=(
+ "Base URL of your local NeteaseCloudMusicApi-compatible service "
+ "(for Home Assistant users, see companion add-on PR #16)."
+ ),
),
ConfigEntry(
key=CONF_QR_PAGE_URL,
From 02c3519fb83544b3cc717b80eab85b98a64c0e26 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sat, 11 Apr 2026 22:07:09 +0800
Subject: [PATCH 04/11] fix(netease): align review feedback on
cache/search/error handling
---
.../providers/neteasecloudmusic/__init__.py | 201 ++++++++++--------
.../providers/neteasecloudmusic/manifest.json | 1 +
2 files changed, 119 insertions(+), 83 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 4c6bf39747..77fcfa48f9 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -99,6 +99,7 @@
_RECOMMEND_DAILY_TTL = 60 * 30
_RECOMMEND_PERSONAL_FM_TTL = 60 * 5
_RECOMMEND_HEART_MODE_TTL = 60 * 60
+CACHE_CATEGORY_RECOMMENDATIONS = 1
# NetEase song-detail payload uses this bit in `hr`/`h` mark metadata to indicate
# that the track has a Hi-Res tier in catalog metadata.
# Value observed from NeteaseCloudMusicApi-compatible responses.
@@ -268,8 +269,9 @@ def _clear_qr_route(route_path: str | None) -> None:
if not route_path:
return
if unregister := _QR_ROUTE_UNREGISTER.pop(route_path, None):
- with suppress(Exception):
- unregister()
+ if callable(unregister):
+ with suppress(RuntimeError):
+ unregister()
def _get_qr_route_path(values: dict[str, ConfigValueType]) -> str | None:
@@ -597,7 +599,6 @@ class NeteaseCloudMusicProvider(MusicProvider):
_client: NcmApiClient
_cookie: str
_uid: str
- _recommend_payload_cache: dict[str, tuple[float, dict[str, Any]]]
async def handle_async_init(self) -> None:
"""Handle async initialization of provider."""
@@ -608,10 +609,8 @@ async def handle_async_init(self) -> None:
api_base_url = str(self.config.get_value(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL).strip()
self._client = NcmApiClient(self.mass.http_session, api_base_url)
- resolved_uid = await _resolve_uid(self._client, self._cookie)
if not self._uid:
- self._uid = resolved_uid
- self._recommend_payload_cache = {}
+ self._uid = await _resolve_uid(self._client, self._cookie)
self.logger.info("NetEase Cloud Music authenticated for uid %s", self._uid)
async def unload(self, is_removed: bool = False) -> None:
@@ -621,7 +620,6 @@ async def unload(self, is_removed: bool = False) -> None:
{CONF_QR_PAGE_URL: str(self.config.get_value(CONF_QR_PAGE_URL) or "")}
)
_clear_qr_route(route_path)
- self._recommend_payload_cache = {}
await super().unload(is_removed)
def _get_item_mapping(self, media_type: MediaType, item_id: str, name: str) -> ItemMapping:
@@ -850,7 +848,7 @@ def _parse_album(self, album_obj: dict[str, Any]) -> Album:
album.metadata.images = self._make_image_list(image_url)
publish_time = album_obj.get("publishTime")
if isinstance(publish_time, int) and publish_time > 0:
- with suppress(Exception):
+ with suppress(OSError, OverflowError, ValueError):
album.year = datetime.fromtimestamp(publish_time / 1000, tz=UTC).year
return album
@@ -975,13 +973,6 @@ def _parse_heart_mode_playlist_id(self, playlist_id: str) -> tuple[str, str] | N
return None
return seed_song_id, source_playlist_id
- def _mark_track_source(self, track: Track, source_label: str) -> Track:
- """Mark track title so users can identify current dynamic radio source."""
- prefix = f"[{source_label}] "
- if not track.name.startswith(prefix):
- track.name = f"{prefix}{track.name}"
- return track
-
async def _get_song_detail(self, ids: str) -> list[dict[str, Any]]:
"""Fetch song details for one or many ids."""
payload = await self._client.get("/song/detail", params={"ids": ids}, cookie=self._cookie)
@@ -1021,7 +1012,7 @@ async def _enrich_tracks_with_cover(self, tracks: list[Track]) -> None:
chunk_size = 200
for idx in range(0, len(track_ids), chunk_size):
chunk = track_ids[idx : idx + chunk_size]
- with suppress(Exception):
+ with suppress(InvalidDataError, ResourceTemporarilyUnavailable):
detail_rows = await self._get_song_detail(",".join(chunk))
for row in detail_rows:
row_id = str(row.get("id") or "").strip()
@@ -1032,7 +1023,7 @@ async def _enrich_tracks_with_cover(self, tracks: list[Track]) -> None:
detail_obj = details_by_id.get(track.item_id)
if not isinstance(detail_obj, dict):
continue
- with suppress(Exception):
+ with suppress(InvalidDataError, ResourceTemporarilyUnavailable):
quality_obj = await self._get_song_music_detail(track.item_id)
if isinstance(quality_obj, dict):
detail_obj = self._merge_quality_objects(detail_obj, quality_obj)
@@ -1050,7 +1041,82 @@ async def _enrich_tracks_with_cover(self, tracks: list[Track]) -> None:
track.metadata.images = self._make_image_list(image_url)
self._apply_track_quality_from_song_detail(track, detail_obj)
- async def search( # noqa: PLR0915
+ def _search_plan(self, media_types: list[MediaType]) -> list[tuple[MediaType, int]]:
+ """Build NCM search type plan from requested media types."""
+ plan: list[tuple[MediaType, int]] = []
+ if MediaType.TRACK in media_types:
+ plan.append((MediaType.TRACK, 1))
+ if MediaType.ARTIST in media_types:
+ plan.append((MediaType.ARTIST, 100))
+ if MediaType.ALBUM in media_types:
+ plan.append((MediaType.ALBUM, 10))
+ if MediaType.PLAYLIST in media_types:
+ plan.append((MediaType.PLAYLIST, 1000))
+ return plan
+
+ async def _search_single(
+ self, search_query: str, *, type_code: int, limit: int
+ ) -> dict[str, Any]:
+ """Run one NCM search request by type code."""
+ return await self._client.get(
+ "/search",
+ params={"keywords": search_query, "type": type_code, "limit": limit},
+ cookie=self._cookie,
+ )
+
+ def _parse_search_tracks(self, search_result: dict[str, Any], limit: int) -> list[Track]:
+ """Parse track search result items."""
+ tracks: list[Track] = []
+ songs = search_result.get("songs")
+ if not isinstance(songs, list):
+ return tracks
+ for song in songs[:limit]:
+ if not isinstance(song, dict):
+ continue
+ with suppress(InvalidDataError):
+ tracks.append(self._parse_track(song))
+ return tracks
+
+ def _parse_search_artists(self, search_result: dict[str, Any], limit: int) -> list[Artist]:
+ """Parse artist search result items."""
+ artists_result: list[Artist] = []
+ artists = search_result.get("artists")
+ if not isinstance(artists, list):
+ return artists_result
+ for artist in artists[:limit]:
+ if not isinstance(artist, dict):
+ continue
+ with suppress(InvalidDataError):
+ artists_result.append(self._parse_artist(artist))
+ return artists_result
+
+ def _parse_search_albums(self, search_result: dict[str, Any], limit: int) -> list[Album]:
+ """Parse album search result items."""
+ albums_result: list[Album] = []
+ albums = search_result.get("albums")
+ if not isinstance(albums, list):
+ return albums_result
+ for album in albums[:limit]:
+ if not isinstance(album, dict):
+ continue
+ with suppress(InvalidDataError):
+ albums_result.append(self._parse_album(album))
+ return albums_result
+
+ def _parse_search_playlists(self, search_result: dict[str, Any], limit: int) -> list[Playlist]:
+ """Parse playlist search result items."""
+ playlists_result: list[Playlist] = []
+ playlists = search_result.get("playlists")
+ if not isinstance(playlists, list):
+ return playlists_result
+ for playlist in playlists[:limit]:
+ if not isinstance(playlist, dict):
+ continue
+ with suppress(InvalidDataError):
+ playlists_result.append(self._parse_playlist(playlist))
+ return playlists_result
+
+ async def search(
self,
search_query: str,
media_types: list[MediaType],
@@ -1062,25 +1128,12 @@ async def search( # noqa: PLR0915
artist_results: list[Artist] = []
album_results: list[Album] = []
playlist_results: list[Playlist] = []
- search_plan: list[tuple[MediaType, int]] = []
- if MediaType.TRACK in media_types:
- search_plan.append((MediaType.TRACK, 1))
- if MediaType.ARTIST in media_types:
- search_plan.append((MediaType.ARTIST, 100))
- if MediaType.ALBUM in media_types:
- search_plan.append((MediaType.ALBUM, 10))
- if MediaType.PLAYLIST in media_types:
- search_plan.append((MediaType.PLAYLIST, 1000))
-
- async def _search_single(type_code: int) -> dict[str, Any]:
- return await self._client.get(
- "/search",
- params={"keywords": search_query, "type": type_code, "limit": limit},
- cookie=self._cookie,
- )
-
+ search_plan = self._search_plan(media_types)
responses = await asyncio.gather(
- *[_search_single(type_code) for _, type_code in search_plan],
+ *[
+ self._search_single(search_query, type_code=type_code, limit=limit)
+ for _, type_code in search_plan
+ ],
return_exceptions=True,
)
for idx, (media_type, _) in enumerate(search_plan):
@@ -1093,37 +1146,13 @@ async def _search_single(type_code: int) -> dict[str, Any]:
if not isinstance(search_result, dict):
continue
if media_type == MediaType.TRACK:
- songs = search_result.get("songs")
- if isinstance(songs, list):
- for song in songs[:limit]:
- if not isinstance(song, dict):
- continue
- with suppress(InvalidDataError):
- track_results.append(self._parse_track(song))
+ track_results.extend(self._parse_search_tracks(search_result, limit))
elif media_type == MediaType.ARTIST:
- artists = search_result.get("artists")
- if isinstance(artists, list):
- for artist in artists[:limit]:
- if not isinstance(artist, dict):
- continue
- with suppress(InvalidDataError):
- artist_results.append(self._parse_artist(artist))
+ artist_results.extend(self._parse_search_artists(search_result, limit))
elif media_type == MediaType.ALBUM:
- albums = search_result.get("albums")
- if isinstance(albums, list):
- for album in albums[:limit]:
- if not isinstance(album, dict):
- continue
- with suppress(InvalidDataError):
- album_results.append(self._parse_album(album))
+ album_results.extend(self._parse_search_albums(search_result, limit))
elif media_type == MediaType.PLAYLIST:
- playlists = search_result.get("playlists")
- if isinstance(playlists, list):
- for playlist in playlists[:limit]:
- if not isinstance(playlist, dict):
- continue
- with suppress(InvalidDataError):
- playlist_results.append(self._parse_playlist(playlist))
+ playlist_results.extend(self._parse_search_playlists(search_result, limit))
result.tracks = track_results
await self._enrich_tracks_with_cover(track_results)
result.artists = artist_results
@@ -1239,12 +1268,12 @@ async def get_track(self, prov_track_id: str) -> Track:
if not songs:
raise MediaNotFoundError(f"Track {prov_track_id} not found")
song_obj = songs[0]
- with suppress(Exception):
+ with suppress(InvalidDataError, ResourceTemporarilyUnavailable):
quality_obj = await self._get_song_music_detail(prov_track_id)
if isinstance(quality_obj, dict):
song_obj = self._merge_quality_objects(song_obj, quality_obj)
track = self._parse_track(song_obj)
- with suppress(Exception):
+ with suppress(InvalidDataError, ResourceTemporarilyUnavailable):
lyric_payload = await self._client.get(
"/lyric/new",
params={"id": prov_track_id},
@@ -1294,7 +1323,7 @@ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
if playlist := await self._build_heart_mode_dynamic_playlist():
return playlist
- raise MediaNotFoundError("心动模式不可用, 请稍后重试")
+ raise MediaNotFoundError("Heart mode is currently unavailable, please try again later")
payload = await self._client.get(
"/playlist/detail",
@@ -1318,7 +1347,6 @@ async def get_playlist_tracks(
return []
tracks = await self._pick_personal_fm_tracks(fresh=True, target_count=12)
for idx, track in enumerate(tracks, start=1):
- self._mark_track_source(track, "私人FM")
track.position = idx
return tracks
if heart_parts := self._parse_heart_mode_playlist_id(prov_playlist_id):
@@ -1331,7 +1359,6 @@ async def get_playlist_tracks(
count=20,
)
for idx, track in enumerate(tracks, start=1):
- self._mark_track_source(track, "心动模式")
track.position = idx
return tracks
if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
@@ -1347,7 +1374,6 @@ async def get_playlist_tracks(
count=20,
)
for idx, track in enumerate(tracks, start=1):
- self._mark_track_source(track, "心动模式")
track.position = idx
return tracks
return []
@@ -1451,14 +1477,27 @@ async def _get_recommend_payload_cached(
path: str,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
- """Return recommendation payload from in-memory TTL cache or fetch fresh."""
- if cached := self._recommend_payload_cache.get(key):
- timestamp, payload = cached
- if (time.time() - timestamp) < ttl:
+ """Return recommendation payload from MA cache or fetch fresh."""
+ params_key = json.dumps(params or {}, sort_keys=True, separators=(",", ":"))
+ cache_key = f"{key}:{params_key}"
+ cached = await self.mass.cache.get(
+ key=cache_key,
+ provider=self.instance_id,
+ category=CACHE_CATEGORY_RECOMMENDATIONS,
+ default=None,
+ )
+ if cached is not None:
+ if isinstance(cached, dict):
self.logger.debug("NCM recommendations %s payload cache hit", key)
- return payload
+ return cached
payload = await self._client.get(path, params=params, cookie=self._cookie)
- self._recommend_payload_cache[key] = (time.time(), payload)
+ await self.mass.cache.set(
+ key=cache_key,
+ provider=self.instance_id,
+ category=CACHE_CATEGORY_RECOMMENDATIONS,
+ data=payload,
+ expiration=ttl,
+ )
return payload
async def _pick_personal_fm_tracks(
@@ -1527,11 +1566,6 @@ async def _pick_personal_fm_tracks(
break
return result
- async def _pick_personal_fm_track(self) -> Track | None:
- """Pick one track from personal FM payload."""
- tracks = await self._pick_personal_fm_tracks(target_count=1)
- return tracks[0] if tracks else None
-
async def _get_heart_mode_seed(self) -> tuple[str, str] | None:
"""Resolve heart mode seed ids as (seed_song_id, playlist_id)."""
daily_payload = await self._get_recommend_payload_cached(
@@ -1820,7 +1854,7 @@ async def _get_track_stream_details(
"""Resolve stream details for a concrete track id."""
detail_obj: dict[str, Any] | None = None
track_duration_ms: int | None = None
- with suppress(Exception):
+ with suppress(InvalidDataError, ResourceTemporarilyUnavailable):
detail_rows = await self._get_song_detail(track_id)
if detail_rows:
detail_obj = detail_rows[0]
@@ -1877,7 +1911,7 @@ async def _get_track_stream_details(
):
audio_format.sample_rate = stream_sr
expiration = 3600
- with suppress(Exception):
+ with suppress(TypeError, ValueError):
parsed = parse_qs(urlparse(stream_url).query)
if expire_raw := parsed.get("expire", [None])[0]:
expiration = max(60, int(expire_raw) - int(time.time()))
@@ -1913,6 +1947,7 @@ async def _get_track_stream_details(
return details
if preview_fallback is None:
preview_fallback = details
+ # If account/song entitlement only allows trial playback, return preview stream.
if preview_fallback is not None:
return preview_fallback
raise UnplayableMediaError(f"No playable stream URL returned for track {track_id}")
diff --git a/music_assistant/providers/neteasecloudmusic/manifest.json b/music_assistant/providers/neteasecloudmusic/manifest.json
index 17ef029511..8b72e677fc 100644
--- a/music_assistant/providers/neteasecloudmusic/manifest.json
+++ b/music_assistant/providers/neteasecloudmusic/manifest.json
@@ -6,5 +6,6 @@
"description": "Stream NetEase Cloud Music catalog, playlists, lyrics, and recommendations.",
"codeowners": ["@xiasi0"],
"requirements": [],
+ "documentation": "https://music-assistant.io/music-providers/netease-cloud-music",
"multi_instance": true
}
From 12a6463c3a71407f6ac77d9378d5cfea808b7408 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Mon, 13 Apr 2026 20:13:56 +0800
Subject: [PATCH 05/11] fix(neteasecloudmusic): address review feedback
---
.../providers/neteasecloudmusic/__init__.py | 173 +++++++++++-------
1 file changed, 111 insertions(+), 62 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 77fcfa48f9..309a9881fb 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -1019,14 +1019,37 @@ async def _enrich_tracks_with_cover(self, tracks: list[Track]) -> None:
if row_id:
details_by_id[row_id] = row
+ async def _fetch_quality(track_id: str) -> tuple[str, dict[str, Any] | None]:
+ try:
+ quality_obj = await self._get_song_music_detail(track_id)
+ except (InvalidDataError, ResourceTemporarilyUnavailable):
+ return track_id, None
+ return track_id, quality_obj if isinstance(quality_obj, dict) else None
+
+ semaphore = asyncio.Semaphore(8)
+
+ async def _bounded_fetch(track_id: str) -> tuple[str, dict[str, Any] | None]:
+ async with semaphore:
+ return await _fetch_quality(track_id)
+
+ quality_tasks = [
+ _bounded_fetch(track.item_id)
+ for track in tracks
+ if track.item_id and track.item_id in details_by_id
+ ]
+ quality_by_id = {
+ track_id: quality_obj
+ for track_id, quality_obj in (await asyncio.gather(*quality_tasks))
+ if isinstance(quality_obj, dict)
+ }
+
for track in tracks:
detail_obj = details_by_id.get(track.item_id)
if not isinstance(detail_obj, dict):
continue
- with suppress(InvalidDataError, ResourceTemporarilyUnavailable):
- quality_obj = await self._get_song_music_detail(track.item_id)
- if isinstance(quality_obj, dict):
- detail_obj = self._merge_quality_objects(detail_obj, quality_obj)
+ quality_obj = quality_by_id.get(track.item_id)
+ if isinstance(quality_obj, dict):
+ detail_obj = self._merge_quality_objects(detail_obj, quality_obj)
if not track.metadata.images:
album_raw = detail_obj.get("al") or detail_obj.get("album")
if isinstance(album_raw, dict):
@@ -1174,6 +1197,7 @@ async def get_artist(self, prov_artist_id: str) -> Artist:
raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
return self._parse_artist(artist_obj)
+ @use_cache(3600 * 24)
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get all albums for an artist."""
limit = 100
@@ -1208,6 +1232,7 @@ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
break
return albums
+ @use_cache(3600 * 24)
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get top tracks for given artist."""
payload = await self._client.get(
@@ -1241,7 +1266,7 @@ async def get_album(self, prov_album_id: str) -> Album:
raise MediaNotFoundError(f"Album {prov_album_id} not found")
return self._parse_album(album_obj)
- @use_cache(3600 * 24)
+ @use_cache()
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for album id."""
payload = await self._client.get(
@@ -1261,7 +1286,7 @@ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
tracks.append(self._parse_track(song_obj))
return tracks
- @use_cache(3600 * 24)
+ @use_cache()
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
songs = await self._get_song_detail(prov_track_id)
@@ -1313,12 +1338,12 @@ async def get_track(self, prov_track_id: str) -> Track:
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
if prov_playlist_id == _PLAYLIST_PERSONAL_FM_ID:
- return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人FM")
+ return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "Personal FM")
if heart_parts := self._parse_heart_mode_playlist_id(prov_playlist_id):
seed_song_id, source_playlist_id = heart_parts
return self._build_dynamic_playlist(
f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{source_playlist_id}",
- "心动模式",
+ "Heart Mode",
)
if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
if playlist := await self._build_heart_mode_dynamic_playlist():
@@ -1336,6 +1361,34 @@ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
return self._parse_playlist(playlist_obj)
+ @use_cache(3600 * 3)
+ async def _get_playlist_tracks_cached(
+ self,
+ prov_playlist_id: str,
+ page: int = 0,
+ ) -> Sequence[Track]:
+ """Get playlist tracks for static playlists (cached)."""
+ limit = 500
+ offset = page * limit
+ payload = await self._client.get(
+ "/playlist/track/all",
+ params={"id": prov_playlist_id, "limit": limit, "offset": offset},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ songs = data.get("songs")
+ if not isinstance(songs, list):
+ return []
+ result: list[Track] = []
+ for idx, song_obj in enumerate(songs, start=1):
+ if not isinstance(song_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ track = self._parse_track(song_obj)
+ track.position = offset + idx
+ result.append(track)
+ return result
+
async def get_playlist_tracks(
self,
prov_playlist_id: str,
@@ -1378,60 +1431,55 @@ async def get_playlist_tracks(
return tracks
return []
- limit = 500
- offset = page * limit
- payload = await self._client.get(
- "/playlist/track/all",
- params={"id": prov_playlist_id, "limit": limit, "offset": offset},
- cookie=self._cookie,
- )
- data = _extract_data(payload)
- songs = data.get("songs")
- if not isinstance(songs, list):
- return []
- result: list[Track] = []
- for idx, song_obj in enumerate(songs, start=1):
- if not isinstance(song_obj, dict):
- continue
- with suppress(InvalidDataError):
- track = self._parse_track(song_obj)
- track.position = offset + idx
- result.append(track)
- return result
+ return await self._get_playlist_tracks_cached(prov_playlist_id, page)
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve favorite artists from NCM."""
- payload = await self._client.get(
- "/artist/sublist",
- params={"limit": 2000, "offset": 0},
- cookie=self._cookie,
- )
- data = _extract_data(payload)
- artists = data.get("data") or data.get("artists")
- if not isinstance(artists, list):
- artists = []
- for artist_obj in artists:
- if not isinstance(artist_obj, dict):
- continue
- with suppress(InvalidDataError):
- yield self._parse_artist(artist_obj)
+ limit = 200
+ offset = 0
+ for _ in range(100):
+ payload = await self._client.get(
+ "/artist/sublist",
+ params={"limit": limit, "offset": offset},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ artists = data.get("data") or data.get("artists")
+ if not isinstance(artists, list) or not artists:
+ break
+ for artist_obj in artists:
+ if not isinstance(artist_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ yield self._parse_artist(artist_obj)
+ has_more = bool(data.get("more") or data.get("hasMore"))
+ offset += limit
+ if not has_more and len(artists) < limit:
+ break
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve favorite albums from NCM."""
- payload = await self._client.get(
- "/album/sublist",
- params={"limit": 2000, "offset": 0},
- cookie=self._cookie,
- )
- data = _extract_data(payload)
- albums = data.get("data") or data.get("albums")
- if not isinstance(albums, list):
- albums = []
- for album_obj in albums:
- if not isinstance(album_obj, dict):
- continue
- with suppress(InvalidDataError):
- yield self._parse_album(album_obj)
+ limit = 200
+ offset = 0
+ for _ in range(100):
+ payload = await self._client.get(
+ "/album/sublist",
+ params={"limit": limit, "offset": offset},
+ cookie=self._cookie,
+ )
+ data = _extract_data(payload)
+ albums = data.get("data") or data.get("albums")
+ if not isinstance(albums, list) or not albums:
+ break
+ for album_obj in albums:
+ if not isinstance(album_obj, dict):
+ continue
+ with suppress(InvalidDataError):
+ yield self._parse_album(album_obj)
+ has_more = bool(data.get("more") or data.get("hasMore"))
+ offset += limit
+ if not has_more and len(albums) < limit:
+ break
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve liked tracks from NCM."""
@@ -1614,7 +1662,7 @@ async def _build_heart_mode_dynamic_playlist(self) -> Playlist | None:
seed_song_id, playlist_id = heart_parts
return self._build_dynamic_playlist(
f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{playlist_id}",
- "心动模式",
+ "Heart Mode",
)
async def _pick_heart_mode_tracks(
@@ -1657,14 +1705,15 @@ async def _build_dynamic_radio_folder(self) -> RecommendationFolder | None:
folder = RecommendationFolder(
item_id="recommended_radios",
provider=self.instance_id,
- name="个性电台",
+ name="Personal Radio",
icon="mdi:radio",
)
- folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人FM"))
+ folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "Personal FM"))
if heart_playlist := await self._build_heart_mode_dynamic_playlist():
folder.items.append(heart_playlist)
return folder if folder.items else None
+ @use_cache(3600)
async def recommendations(self) -> list[RecommendationFolder]:
"""Get recommendation folders."""
folders: list[RecommendationFolder] = []
@@ -1681,7 +1730,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="daily_songs",
provider=self.instance_id,
- name="每日推荐",
+ name="Daily Picks",
icon="mdi:star",
)
for song_obj in daily_songs:
@@ -1704,7 +1753,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="recommended_new_songs",
provider=self.instance_id,
- name="推荐新歌",
+ name="New Songs",
icon="mdi:music-note",
)
for item in raw_new_songs:
@@ -1730,7 +1779,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="recommended_playlists",
provider=self.instance_id,
- name="推荐歌单",
+ name="Recommended Playlists",
icon="mdi:playlist-music",
)
for playlist_obj in raw_playlists:
From 08f533e94b8367073cbb267e1f2c446adbd29341 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Wed, 15 Apr 2026 23:34:45 +0800
Subject: [PATCH 06/11] fix(neteasecloudmusic): revert recommendation labels to
Chinese
---
.../providers/neteasecloudmusic/__init__.py | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 309a9881fb..1b2f529553 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -1338,12 +1338,12 @@ async def get_track(self, prov_track_id: str) -> Track:
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
if prov_playlist_id == _PLAYLIST_PERSONAL_FM_ID:
- return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "Personal FM")
+ return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人 FM")
if heart_parts := self._parse_heart_mode_playlist_id(prov_playlist_id):
seed_song_id, source_playlist_id = heart_parts
return self._build_dynamic_playlist(
f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{source_playlist_id}",
- "Heart Mode",
+ "心动模式",
)
if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
if playlist := await self._build_heart_mode_dynamic_playlist():
@@ -1662,7 +1662,7 @@ async def _build_heart_mode_dynamic_playlist(self) -> Playlist | None:
seed_song_id, playlist_id = heart_parts
return self._build_dynamic_playlist(
f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{playlist_id}",
- "Heart Mode",
+ "心动模式",
)
async def _pick_heart_mode_tracks(
@@ -1705,10 +1705,10 @@ async def _build_dynamic_radio_folder(self) -> RecommendationFolder | None:
folder = RecommendationFolder(
item_id="recommended_radios",
provider=self.instance_id,
- name="Personal Radio",
+ name="私人电台",
icon="mdi:radio",
)
- folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "Personal FM"))
+ folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人 FM"))
if heart_playlist := await self._build_heart_mode_dynamic_playlist():
folder.items.append(heart_playlist)
return folder if folder.items else None
@@ -1730,7 +1730,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="daily_songs",
provider=self.instance_id,
- name="Daily Picks",
+ name="每日推荐",
icon="mdi:star",
)
for song_obj in daily_songs:
@@ -1753,7 +1753,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="recommended_new_songs",
provider=self.instance_id,
- name="New Songs",
+ name="推荐新歌",
icon="mdi:music-note",
)
for item in raw_new_songs:
@@ -1779,7 +1779,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="recommended_playlists",
provider=self.instance_id,
- name="Recommended Playlists",
+ name="推荐歌单",
icon="mdi:playlist-music",
)
for playlist_obj in raw_playlists:
From 33a989d71e16816da4419b42699c9c291468cb06 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sun, 19 Apr 2026 21:51:49 +0800
Subject: [PATCH 07/11] fix(neteasecloudmusic): use
UnsupportedFeaturedException for unsupported media type
---
music_assistant/providers/neteasecloudmusic/__init__.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 1b2f529553..748b6cf7c3 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -30,6 +30,7 @@
LoginFailed,
MediaNotFoundError,
ResourceTemporarilyUnavailable,
+ UnsupportedFeaturedException,
UnplayableMediaError,
)
from music_assistant_models.media_items import (
@@ -2010,4 +2011,4 @@ async def get_stream_details(self, item_id: str, media_type: MediaType) -> Strea
media_type=MediaType.TRACK,
allow_seek=True,
)
- raise MediaNotFoundError(f"Unsupported media type {media_type}")
+ raise UnsupportedFeaturedException(f"Unsupported media type {media_type}")
From 134192c803852ce8127c73a5b302ba87c3c647e4 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sun, 19 Apr 2026 21:59:45 +0800
Subject: [PATCH 08/11] fix(neteasecloudmusic): address duration fallback and
multi-instance QR route isolation
---
.../providers/neteasecloudmusic/__init__.py | 44 ++++++++++++++-----
1 file changed, 33 insertions(+), 11 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 748b6cf7c3..bfd4c16571 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -91,7 +91,7 @@
ProviderFeature.LYRICS,
}
-_QR_ROUTE_UNREGISTER: dict[str, Any] = {}
+_QR_ROUTE_UNREGISTER_ATTR = "_ncm_qr_route_unregister"
_HTTP_TIMEOUT = ClientTimeout(total=20)
_LRC_TIMESTAMP_PATTERN = re.compile(r"\[\d{1,2}:\d{2}(?:\.\d{1,3})?\]")
_LRC_META_TAG_PATTERN = re.compile(r"^\[[a-zA-Z]+:.*\]$")
@@ -265,11 +265,22 @@ def _with_pc_os_cookie(cookie: str) -> str:
return "; ".join(kept)
-def _clear_qr_route(route_path: str | None) -> None:
+def _get_qr_unregister_map(mass: MusicAssistant) -> dict[str, Any]:
+ """Return per-MA-process QR unregister map for this provider."""
+ route_map = getattr(mass, _QR_ROUTE_UNREGISTER_ATTR, None)
+ if isinstance(route_map, dict):
+ return route_map
+ route_map = {}
+ setattr(mass, _QR_ROUTE_UNREGISTER_ATTR, route_map)
+ return route_map
+
+
+def _clear_qr_route(mass: MusicAssistant, route_path: str | None) -> None:
"""Unregister one temporary QR route if present."""
if not route_path:
return
- if unregister := _QR_ROUTE_UNREGISTER.pop(route_path, None):
+ route_map = _get_qr_unregister_map(mass)
+ if unregister := route_map.pop(route_path, None):
if callable(unregister):
with suppress(RuntimeError):
unregister()
@@ -302,7 +313,7 @@ def _register_qr_auth_page(
return f"data:{mime_type};base64,{b64}"
route_path = f"/auth/neteasecloudmusic/qr/{session_id}"
- _clear_qr_route(route_path)
+ _clear_qr_route(mass, route_path)
async def _serve_qr(_: web.Request) -> web.Response:
return web.Response(
@@ -312,7 +323,7 @@ async def _serve_qr(_: web.Request) -> web.Response:
)
unregister = mass.webserver.register_dynamic_route(route_path, _serve_qr, "GET")
- _QR_ROUTE_UNREGISTER[route_path] = unregister
+ _get_qr_unregister_map(mass)[route_path] = unregister
return f"{route_path.lstrip('/')}?ts={int(time.time())}"
@@ -330,14 +341,14 @@ def _decode_qr_data_url(data_url: str) -> tuple[bytes, str] | None:
return None
-def _clear_auth(values: dict[str, ConfigValueType]) -> None:
+def _clear_auth(mass: MusicAssistant, values: dict[str, ConfigValueType]) -> None:
"""Clear stored authentication fields."""
route_path = _get_qr_route_path(values)
values[CONF_COOKIE] = None
values[CONF_UID] = None
values[CONF_QR_KEY] = None
values[CONF_QR_PAGE_URL] = None
- _clear_qr_route(route_path)
+ _clear_qr_route(mass, route_path)
def _has_qr_pending(values: dict[str, ConfigValueType]) -> bool:
@@ -365,7 +376,7 @@ async def _resolve_uid(client: NcmApiClient, cookie: str) -> str:
async def _start_qr_auth(mass: MusicAssistant, values: dict[str, ConfigValueType]) -> None:
"""Start QR login flow and auto-poll once for quick setup."""
- _clear_qr_route(_get_qr_route_path(values))
+ _clear_qr_route(mass, _get_qr_route_path(values))
base_url = str(values.get(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL).strip()
client = NcmApiClient(mass.http_session, base_url)
@@ -440,16 +451,20 @@ async def _check_qr_auth(
)
code = _extract_code(payload)
if code == 803:
+ route_path = _get_qr_route_path(values)
cookie = _extract_cookie(payload)
if not cookie:
raise LoginFailed("QR login succeeded but API response did not include cookie")
uid = await _resolve_uid(client, cookie)
+ _clear_qr_route(mass, route_path)
values[CONF_COOKIE] = cookie
values[CONF_UID] = uid
values[CONF_QR_KEY] = None
values[CONF_QR_PAGE_URL] = None
return
if code == 800:
+ route_path = _get_qr_route_path(values)
+ _clear_qr_route(mass, route_path)
values[CONF_QR_KEY] = None
values[CONF_QR_PAGE_URL] = None
raise InvalidDataError("QR code expired, please generate a new one")
@@ -584,7 +599,7 @@ async def get_config_entries(
values = {}
try:
if action == CONF_ACTION_CLEAR_AUTH:
- _clear_auth(values)
+ _clear_auth(mass, values)
elif action == CONF_ACTION_START_QR_AUTH:
await _start_qr_auth(mass, values)
elif action == CONF_ACTION_CHECK_QR_AUTH:
@@ -620,7 +635,7 @@ async def unload(self, is_removed: bool = False) -> None:
route_path = _get_qr_route_path(
{CONF_QR_PAGE_URL: str(self.config.get_value(CONF_QR_PAGE_URL) or "")}
)
- _clear_qr_route(route_path)
+ _clear_qr_route(self.mass, route_path)
await super().unload(is_removed)
def _get_item_mapping(self, media_type: MediaType, item_id: str, name: str) -> ItemMapping:
@@ -1250,7 +1265,14 @@ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
if not isinstance(song_obj, dict):
continue
with suppress(InvalidDataError):
- tracks.append(self._parse_track(song_obj))
+ track = self._parse_track(song_obj)
+ if track.duration <= 0:
+ # Album payload duration fields are authoritative for this endpoint;
+ # keep an explicit fallback here to avoid zero-length tracks.
+ duration_ms = _to_positive_int(song_obj.get("dt") or song_obj.get("duration"))
+ if duration_ms > 0:
+ track.duration = int(duration_ms / 1000)
+ tracks.append(track)
return tracks
@use_cache(3600 * 24)
From 6490f89e22ed59056a2ccc9e1c3344d8da68268a Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Sun, 19 Apr 2026 22:38:10 +0800
Subject: [PATCH 09/11] fix(neteasecloudmusic): sort imports for lint
---
music_assistant/providers/neteasecloudmusic/__init__.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index bfd4c16571..648b80672e 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -30,8 +30,8 @@
LoginFailed,
MediaNotFoundError,
ResourceTemporarilyUnavailable,
- UnsupportedFeaturedException,
UnplayableMediaError,
+ UnsupportedFeaturedException,
)
from music_assistant_models.media_items import (
Album,
From d50ef92bf578f290454110978109a6a53dd4c712 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Mon, 20 Apr 2026 20:38:21 +0800
Subject: [PATCH 10/11] fix(neteasecloudmusic): address review on qr route
storage and cookie handling
---
.../providers/neteasecloudmusic/__init__.py | 36 +++++++------------
1 file changed, 12 insertions(+), 24 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index 648b80672e..ee4d752944 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -91,7 +91,7 @@
ProviderFeature.LYRICS,
}
-_QR_ROUTE_UNREGISTER_ATTR = "_ncm_qr_route_unregister"
+_QR_ROUTE_UNREGISTER: dict[str, Any] = {}
_HTTP_TIMEOUT = ClientTimeout(total=20)
_LRC_TIMESTAMP_PATTERN = re.compile(r"\[\d{1,2}:\d{2}(?:\.\d{1,3})?\]")
_LRC_META_TAG_PATTERN = re.compile(r"^\[[a-zA-Z]+:.*\]$")
@@ -168,7 +168,6 @@ async def get(
)
}
if cookie:
- req_params["cookie"] = cookie
headers["Cookie"] = cookie
url = f"{self._base_url}/{path.lstrip('/')}"
try:
@@ -265,22 +264,11 @@ def _with_pc_os_cookie(cookie: str) -> str:
return "; ".join(kept)
-def _get_qr_unregister_map(mass: MusicAssistant) -> dict[str, Any]:
- """Return per-MA-process QR unregister map for this provider."""
- route_map = getattr(mass, _QR_ROUTE_UNREGISTER_ATTR, None)
- if isinstance(route_map, dict):
- return route_map
- route_map = {}
- setattr(mass, _QR_ROUTE_UNREGISTER_ATTR, route_map)
- return route_map
-
-
-def _clear_qr_route(mass: MusicAssistant, route_path: str | None) -> None:
+def _clear_qr_route(route_path: str | None) -> None:
"""Unregister one temporary QR route if present."""
if not route_path:
return
- route_map = _get_qr_unregister_map(mass)
- if unregister := route_map.pop(route_path, None):
+ if unregister := _QR_ROUTE_UNREGISTER.pop(route_path, None):
if callable(unregister):
with suppress(RuntimeError):
unregister()
@@ -313,7 +301,7 @@ def _register_qr_auth_page(
return f"data:{mime_type};base64,{b64}"
route_path = f"/auth/neteasecloudmusic/qr/{session_id}"
- _clear_qr_route(mass, route_path)
+ _clear_qr_route(route_path)
async def _serve_qr(_: web.Request) -> web.Response:
return web.Response(
@@ -323,7 +311,7 @@ async def _serve_qr(_: web.Request) -> web.Response:
)
unregister = mass.webserver.register_dynamic_route(route_path, _serve_qr, "GET")
- _get_qr_unregister_map(mass)[route_path] = unregister
+ _QR_ROUTE_UNREGISTER[route_path] = unregister
return f"{route_path.lstrip('/')}?ts={int(time.time())}"
@@ -341,14 +329,14 @@ def _decode_qr_data_url(data_url: str) -> tuple[bytes, str] | None:
return None
-def _clear_auth(mass: MusicAssistant, values: dict[str, ConfigValueType]) -> None:
+def _clear_auth(values: dict[str, ConfigValueType]) -> None:
"""Clear stored authentication fields."""
route_path = _get_qr_route_path(values)
values[CONF_COOKIE] = None
values[CONF_UID] = None
values[CONF_QR_KEY] = None
values[CONF_QR_PAGE_URL] = None
- _clear_qr_route(mass, route_path)
+ _clear_qr_route(route_path)
def _has_qr_pending(values: dict[str, ConfigValueType]) -> bool:
@@ -376,7 +364,7 @@ async def _resolve_uid(client: NcmApiClient, cookie: str) -> str:
async def _start_qr_auth(mass: MusicAssistant, values: dict[str, ConfigValueType]) -> None:
"""Start QR login flow and auto-poll once for quick setup."""
- _clear_qr_route(mass, _get_qr_route_path(values))
+ _clear_qr_route(_get_qr_route_path(values))
base_url = str(values.get(CONF_API_BASE_URL) or DEFAULT_API_BASE_URL).strip()
client = NcmApiClient(mass.http_session, base_url)
@@ -456,7 +444,7 @@ async def _check_qr_auth(
if not cookie:
raise LoginFailed("QR login succeeded but API response did not include cookie")
uid = await _resolve_uid(client, cookie)
- _clear_qr_route(mass, route_path)
+ _clear_qr_route(route_path)
values[CONF_COOKIE] = cookie
values[CONF_UID] = uid
values[CONF_QR_KEY] = None
@@ -464,7 +452,7 @@ async def _check_qr_auth(
return
if code == 800:
route_path = _get_qr_route_path(values)
- _clear_qr_route(mass, route_path)
+ _clear_qr_route(route_path)
values[CONF_QR_KEY] = None
values[CONF_QR_PAGE_URL] = None
raise InvalidDataError("QR code expired, please generate a new one")
@@ -599,7 +587,7 @@ async def get_config_entries(
values = {}
try:
if action == CONF_ACTION_CLEAR_AUTH:
- _clear_auth(mass, values)
+ _clear_auth(values)
elif action == CONF_ACTION_START_QR_AUTH:
await _start_qr_auth(mass, values)
elif action == CONF_ACTION_CHECK_QR_AUTH:
@@ -635,7 +623,7 @@ async def unload(self, is_removed: bool = False) -> None:
route_path = _get_qr_route_path(
{CONF_QR_PAGE_URL: str(self.config.get_value(CONF_QR_PAGE_URL) or "")}
)
- _clear_qr_route(self.mass, route_path)
+ _clear_qr_route(route_path)
await super().unload(is_removed)
def _get_item_mapping(self, media_type: MediaType, item_id: str, name: str) -> ItemMapping:
From c59824039c2839ea8941073ebbb1a177bc3c6243 Mon Sep 17 00:00:00 2001
From: xiasi0 <493355621@qq.com>
Date: Mon, 20 Apr 2026 21:28:51 +0800
Subject: [PATCH 11/11] chore(neteasecloudmusic): switch recommendation labels
to english
---
.../providers/neteasecloudmusic/__init__.py | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/music_assistant/providers/neteasecloudmusic/__init__.py b/music_assistant/providers/neteasecloudmusic/__init__.py
index ee4d752944..80587a27b2 100644
--- a/music_assistant/providers/neteasecloudmusic/__init__.py
+++ b/music_assistant/providers/neteasecloudmusic/__init__.py
@@ -1349,12 +1349,12 @@ async def get_track(self, prov_track_id: str) -> Track:
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
if prov_playlist_id == _PLAYLIST_PERSONAL_FM_ID:
- return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人 FM")
+ return self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "Personal FM")
if heart_parts := self._parse_heart_mode_playlist_id(prov_playlist_id):
seed_song_id, source_playlist_id = heart_parts
return self._build_dynamic_playlist(
f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{source_playlist_id}",
- "心动模式",
+ "Heart Mode",
)
if prov_playlist_id == _PLAYLIST_HEART_MODE_PREFIX:
if playlist := await self._build_heart_mode_dynamic_playlist():
@@ -1673,7 +1673,7 @@ async def _build_heart_mode_dynamic_playlist(self) -> Playlist | None:
seed_song_id, playlist_id = heart_parts
return self._build_dynamic_playlist(
f"{_PLAYLIST_HEART_MODE_PREFIX}:{seed_song_id}:{playlist_id}",
- "心动模式",
+ "Heart Mode",
)
async def _pick_heart_mode_tracks(
@@ -1716,10 +1716,10 @@ async def _build_dynamic_radio_folder(self) -> RecommendationFolder | None:
folder = RecommendationFolder(
item_id="recommended_radios",
provider=self.instance_id,
- name="私人电台",
+ name="Personal Radio",
icon="mdi:radio",
)
- folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "私人 FM"))
+ folder.items.append(self._build_dynamic_playlist(_PLAYLIST_PERSONAL_FM_ID, "Personal FM"))
if heart_playlist := await self._build_heart_mode_dynamic_playlist():
folder.items.append(heart_playlist)
return folder if folder.items else None
@@ -1741,7 +1741,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="daily_songs",
provider=self.instance_id,
- name="每日推荐",
+ name="Daily Songs",
icon="mdi:star",
)
for song_obj in daily_songs:
@@ -1764,7 +1764,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="recommended_new_songs",
provider=self.instance_id,
- name="推荐新歌",
+ name="New Songs",
icon="mdi:music-note",
)
for item in raw_new_songs:
@@ -1790,7 +1790,7 @@ async def recommendations(self) -> list[RecommendationFolder]:
folder = RecommendationFolder(
item_id="recommended_playlists",
provider=self.instance_id,
- name="推荐歌单",
+ name="Recommended Playlists",
icon="mdi:playlist-music",
)
for playlist_obj in raw_playlists: