diff --git a/Dockerfile b/Dockerfile index be23905490..45de6204fc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,8 +62,6 @@ ENV PATH="$VIRTUAL_ENV/bin:$PATH" # copy the already built /app dir COPY --from=builder /app /app -# the /app contents have correct permissions but for some reason /app itself does not. -# so apply again, but ONLY to the dir (otherwise we increase the size) RUN chmod 777 /app # Set some labels diff --git a/Dockerfile.base b/Dockerfile.base index b9adeac86f..5223bf8f2d 100644 --- a/Dockerfile.base +++ b/Dockerfile.base @@ -126,6 +126,16 @@ RUN PYAV_VERSION=$(python -c "import json; reqs=json.load(open('/tmp/sendspin_ma ################################################################################################## +# Small builder to install pulseaudio-utils and extract binaries/libs +FROM debian:bookworm AS pa-builder + +RUN apt-get update && apt-get install -y --no-install-recommends \ + pulseaudio-utils \ + && rm -rf /var/lib/apt/lists/* + +# Ensure the files we need exist (pactl, pacmd and pulse libs) +# They will be copied into the final base image. + FROM python:3.13-slim-bookworm # Enable non-free and contrib repositories for codec libraries @@ -197,6 +207,7 @@ RUN set -x \ # AirPlay receiver dependencies (shairport-sync) libconfig9 \ libpopt0 \ + pulseaudio-utils \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* @@ -224,6 +235,14 @@ COPY --from=ffmpeg-builder /usr/local/lib/libpostproc.so* /usr/local/lib/ # Copy pre-built PyAV wheel for use by downstream images COPY --from=pyav-builder /wheels/ /usr/local/share/pyav-wheels/ +# Copy pulseaudio binaries and libs from pa-builder +COPY --from=pa-builder /usr/bin/pactl /usr/bin/pactl +COPY --from=pa-builder /usr/bin/pacmd /usr/bin/pacmd +# PulseAudio libraries location on Debian x86_64; copy the whole pulse dir to be safe +COPY --from=pa-builder /usr/lib/x86_64-linux-gnu/pulse /usr/lib/x86_64-linux-gnu/pulse +# Some systems place libs directly under /usr/lib; include these patterns if needed: +COPY --from=pa-builder /usr/lib/x86_64-linux-gnu/libpulse* /usr/lib/x86_64-linux-gnu/ || true + # Update shared library cache and verify FFmpeg RUN ldconfig && ffmpeg -version && ffprobe -version diff --git a/music_assistant/providers/pulse_audio/README.md b/music_assistant/providers/pulse_audio/README.md new file mode 100644 index 0000000000..fb42de9d3d --- /dev/null +++ b/music_assistant/providers/pulse_audio/README.md @@ -0,0 +1,101 @@ +# Pulse Audio Out Provider + +## Overview + +The Pulse Audio Out provider exposes PulseAudio sinks (USB DACs, built-in audio, HDMI, remap sinks, etc.) as players in Music Assistant. It leverages the Sendspin provider for synchronization and timing, registering each sink as an external Sendspin bridge client. + +### Key Features + +- **Automatic Sink Discovery**: Enumerates all PulseAudio output sinks via `pactl`, including virtual sinks such as remap and combined sinks +- **Native Format Negotiation**: Each sink advertises its native sample rate and bit depth (16, 24, or 32-bit) so Music Assistant transcodes to the correct format per sink — no unnecessary resampling +- **Sendspin Integration**: Each sink is registered as a Sendspin bridge client, enabling synchronized multi-room playback +- **Software Volume Control**: Per-sink volume and mute via PCM sample scaling +- **Stable Player IDs**: Uses UUIDv5 derived from the PulseAudio sink name so players persist across restarts +- **Hardware Volume Ceiling**: Configurable per-provider PA sink volume ceiling applied at startup + +## Architecture + +### Component Overview + +``` +┌──────────────────────────────────────────────────────────────┐ +│ LocalPulseAudioProvider │ +│ - Thin provider shell, delegates to bridge manager │ +└──────────────────────────────────────────────────────────────┘ + │ + ┌────────────────▼────────────────┐ + │ LocalPulseAudioBridgeManager │ + │ - Enumerates PA sinks via pactl│ + │ - Creates/stops bridges │ + └────────────────┬────────────────┘ + │ + ┌───────────────────┼───────────────────┐ + │ │ +┌─────────▼──────────┐ ┌─────────────▼──────────┐ +│ SendspinPulseAudio │ │ SendspinPulseAudio │ +│ Bridge (Sink A) │ │ Bridge (Sink B) │ +│ │ │ │ +│ Sendspin Client ──► │ │ Sendspin Client ──► │ +│ BridgePlayerRole │ │ BridgePlayerRole │ +│ pa_simple output │ │ pa_simple output │ +└─────────────────────┘ └────────────────────────┘ +``` + +### Audio Flow + +``` +Sendspin PushStream + │ + ▼ +BridgePlayerRole.on_audio_chunk + │ + ▼ (software volume/mute applied, format conversion for 24-bit) +asyncio.Queue + │ + ▼ +PASimpleStream (libpulse-simple via ctypes) + │ + ▼ +PulseAudio Sink + │ + ▼ +Physical Audio Device +``` + +### Bit Depth Handling + +| Sink Format | MA Delivery | PA Stream Format | Conversion | +|-------------|-----------------|---------------------|-------------------------------------| +| `s16le` | 16-bit PCM | `PA_SAMPLE_S16LE` | None | +| `s24le` | 32-bit container (left-justified) | `PA_SAMPLE_S24LE` | Unpack int32, repack to 3-byte LE | +| `s32le` | 32-bit PCM | `PA_SAMPLE_S32LE` | None | + +### File Structure + +| File | Description | +|------|-------------| +| `__init__.py` | Provider entry point, setup, and config | +| `provider.py` | `LocalPulseAudioProvider` class | +| `sendspin_bridge.py` | Bridge manager and per-sink bridge implementation | +| `player.py` | `LocalPulseAudioPlayer` — MA player model for each sink | +| `pa_simple.py` | Minimal ctypes wrapper around `libpulse-simple` for direct PCM output | +| `helpers.py` | `find_pactl()` and `pactl_env()` utilities | +| `constants.py` | Shared constants (UUID namespace, config keys) | +| `manifest.json` | Provider metadata and dependencies | + +## Dependencies + +- **Sendspin provider** (`depends_on: sendspin`): Required for audio synchronization and player management +- **libpulse / libpulse-simple**: PulseAudio client libraries (must be present on the host); accessed via ctypes — no Python PulseAudio bindings required +- **pactl**: Used at startup for sink enumeration (`pulseaudio-utils` package on Debian/Ubuntu, `pulseaudio` on Alpine) +- **numpy**: Used for PCM volume scaling + +## Notes + +- The bundled `pactl` binary (if present) is `amd64` only. On other architectures the system `pactl` must be available in `PATH` or `PULSE_SERVER` must be set. +- Multi-channel sinks (5.1, 7.1) are supported — the bridge opens a stereo stream and PulseAudio handles channel remapping automatically. +- Virtual sinks created by `module-remap-sink` (stereo pairs split from multi-channel cards) are fully supported and are the recommended way to expose individual speaker pairs as independent MA players. + +## Related Documentation + +- [Sendspin Provider](../sendspin/README.md) diff --git a/music_assistant/providers/pulse_audio/__init__.py b/music_assistant/providers/pulse_audio/__init__.py new file mode 100644 index 0000000000..2305359691 --- /dev/null +++ b/music_assistant/providers/pulse_audio/__init__.py @@ -0,0 +1,60 @@ +"""Local PulseAudio Out player provider for Music Assistant.""" +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption +from music_assistant_models.enums import ConfigEntryType, ProviderFeature + +from music_assistant.mass import MusicAssistant + +from .constants import ( + CONF_HARDWARE_VOLUME_CEILING, + CONF_VOLUME_CONTROL, + DEFAULT_HARDWARE_VOLUME_CEILING, + VOLUME_CONTROL_DISABLED, + VOLUME_CONTROL_HARDWARE, + VOLUME_CONTROL_SOFTWARE, +) +from .provider import LocalPulseAudioProvider + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigValueType, ProviderConfig + from music_assistant_models.provider import ProviderManifest + from music_assistant.models import ProviderInstanceType + +SUPPORTED_FEATURES = { + ProviderFeature.SYNC_PLAYERS, +} + + +async def get_config_entries( + mass: MusicAssistant, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + # ruff: noqa: ARG001 + return ( + ConfigEntry( + key=CONF_HARDWARE_VOLUME_CEILING, + type=ConfigEntryType.INTEGER, + label="Hardware volume ceiling", + description=( + "Sets the PulseAudio sink volume to this level on every startup. " + "This attenuates the maximum output level of the hardware. " + "Day-to-day volume control uses software scaling within this ceiling. " + "Range: 0-100. Default: 50." + ), + default_value=DEFAULT_HARDWARE_VOLUME_CEILING, + required=False, + ), + ) + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return LocalPulseAudioProvider(mass, manifest, config, SUPPORTED_FEATURES) diff --git a/music_assistant/providers/pulse_audio/bin/lib/libpulsecommon-16.1.so b/music_assistant/providers/pulse_audio/bin/lib/libpulsecommon-16.1.so new file mode 100644 index 0000000000..585f0acd3e Binary files /dev/null and b/music_assistant/providers/pulse_audio/bin/lib/libpulsecommon-16.1.so differ diff --git a/music_assistant/providers/pulse_audio/bin/pactl b/music_assistant/providers/pulse_audio/bin/pactl new file mode 100644 index 0000000000..43476b7ebe Binary files /dev/null and b/music_assistant/providers/pulse_audio/bin/pactl differ diff --git a/music_assistant/providers/pulse_audio/constants.py b/music_assistant/providers/pulse_audio/constants.py new file mode 100644 index 0000000000..02c36ee69f --- /dev/null +++ b/music_assistant/providers/pulse_audio/constants.py @@ -0,0 +1,14 @@ +"""Constants for Local PulseAudio Out provider.""" +from __future__ import annotations + +import uuid + +DEVICE_UUID_NAMESPACE = uuid.UUID("b2c3d4e5-f6a7-8901-bcde-f12345678901") + +CONF_VOLUME_CONTROL = "volume_control" +VOLUME_CONTROL_SOFTWARE = "software" +VOLUME_CONTROL_HARDWARE = "hardware" +VOLUME_CONTROL_DISABLED = "disabled" + +CONF_HARDWARE_VOLUME_CEILING = "hardware_volume_ceiling" +DEFAULT_HARDWARE_VOLUME_CEILING = 50 diff --git a/music_assistant/providers/pulse_audio/helpers.py b/music_assistant/providers/pulse_audio/helpers.py new file mode 100644 index 0000000000..40bda9d249 --- /dev/null +++ b/music_assistant/providers/pulse_audio/helpers.py @@ -0,0 +1,43 @@ +"""Shared helpers for Local PulseAudio Out provider.""" +from __future__ import annotations + +import os +import shutil + +from .pa_simple import PULSE_SERVER + + +def find_pactl() -> str: + """Find the pactl binary, preferring the bundled version.""" + bundled = os.path.join(os.path.dirname(__file__), "bin", "pactl") + if os.path.isfile(bundled): + if not os.access(bundled, os.X_OK): + # Fix permissions if the wheel lost the execute bit + try: + os.chmod(bundled, 0o777) + except OSError: + pass + if os.access(bundled, os.X_OK): + return bundled + if path := shutil.which("pactl"): + return path + for candidate in ("/usr/bin/pactl", "/usr/local/bin/pactl", "/bin/pactl"): + if os.path.isfile(candidate): + return candidate + raise FileNotFoundError( + "pactl not found — bundled binary missing and pulseaudio-utils not installed" + ) + +def pactl_env() -> dict[str, str]: + """Build environment dict for pactl subprocess calls. + + Sets LD_LIBRARY_PATH to include the bundled lib directory so that + libpulsecommon is found, and sets PULSE_SERVER to the detected socket. + """ + lib_dir = os.path.join(os.path.dirname(__file__), "lib") + existing_ld = os.environ.get("LD_LIBRARY_PATH", "") + ld_path = f"{lib_dir}:{existing_ld}" if existing_ld else lib_dir + env = {**os.environ, "LD_LIBRARY_PATH": ld_path} + if PULSE_SERVER: + env["PULSE_SERVER"] = PULSE_SERVER + return env diff --git a/music_assistant/providers/pulse_audio/manifest.json b/music_assistant/providers/pulse_audio/manifest.json new file mode 100644 index 0000000000..9e7506905b --- /dev/null +++ b/music_assistant/providers/pulse_audio/manifest.json @@ -0,0 +1,12 @@ +{ + "type": "player", + "domain": "pulse_audio", + "name": "Pulse Audio Out", + "description": "Play audio through locally attached pulse audio outputs.", + "codeowners": ["@music-assistant"], + "depends_on": "sendspin", + "documentation": "https://music-assistant.io/player-support/pulse-audio/", + "builtin": true, + "allow_disable": true, + "stage": "alpha" +} diff --git a/music_assistant/providers/pulse_audio/pa_simple.py b/music_assistant/providers/pulse_audio/pa_simple.py new file mode 100644 index 0000000000..9d056cb788 --- /dev/null +++ b/music_assistant/providers/pulse_audio/pa_simple.py @@ -0,0 +1,300 @@ +"""Minimal ctypes wrapper around libpulse-simple for direct PA sink PCM streaming.""" +from __future__ import annotations + +import ctypes +import os +import threading +from typing import Any, ClassVar, Final + +PA_STREAM_PLAYBACK: Final = 1 + +PA_SAMPLE_S16LE: Final = 3 +PA_SAMPLE_S32LE: Final = 7 # verified via pa_sample_format_to_string +PA_SAMPLE_S24LE: Final = 9 # packed 3-byte LE — native format of s24le PA sinks + + +def _pa_sample_format(bit_depth: int) -> int: + """Return PA sample format constant for given bit depth.""" + if bit_depth == 32: + return PA_SAMPLE_S32LE + if bit_depth == 24: + # MA delivers in 32-bit containers; _apply_software_volume repacks to + # packed 3-byte before writing, so PA sees s24le here. + return PA_SAMPLE_S24LE + return PA_SAMPLE_S16LE + +class _PASampleSpec(ctypes.Structure): + _fields_: ClassVar = [ + ("format", ctypes.c_int), + ("rate", ctypes.c_uint32), + ("channels", ctypes.c_uint8), + ] + + +def _find_pulse_server() -> str: + """Detect the PulseAudio server socket path.""" + if server := os.environ.get("PULSE_SERVER"): + return server + for path in ( + "/run/audio/pulse.sock", + "/run/pulse/native", + "/var/run/pulse/native", + ): + if os.path.exists(path): + return f"unix:{path}" + return "" + + +PULSE_SERVER: Final = _find_pulse_server() + + +def _load_lib() -> ctypes.CDLL: + lib = ctypes.CDLL("libpulse-simple.so.0") + lib.pa_simple_new.restype = ctypes.c_void_p + lib.pa_simple_new.argtypes = [ + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_int, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ] + lib.pa_simple_write.restype = ctypes.c_int + lib.pa_simple_write.argtypes = [ + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_size_t, + ctypes.c_void_p, + ] + lib.pa_simple_drain.restype = ctypes.c_int + lib.pa_simple_drain.argtypes = [ctypes.c_void_p, ctypes.c_void_p] + lib.pa_simple_free.restype = None + lib.pa_simple_free.argtypes = [ctypes.c_void_p] + return lib + + +_lib: ctypes.CDLL | None = None + + +def _get_lib() -> ctypes.CDLL: + global _lib # noqa: PLW0603 + if _lib is None: + _lib = _load_lib() + return _lib + + +class PASimpleStream: + """Synchronous PCM playback stream to a named PulseAudio sink. + + All libpulse calls are serialized behind a threading.Lock so that + concurrent executor threads cannot simultaneously write/free the + same pa_simple connection, which causes assertion failures in libpulse. + """ + + def __init__(self, sink_name: str, app_name: str, rate: int, channels: int, bit_depth: int = 16) -> None: + lib = _get_lib() + spec = _PASampleSpec( + format=_pa_sample_format(bit_depth), + rate=rate, + channels=channels, + ) + error = ctypes.c_int(0) + self._lib = lib + self._lock = threading.Lock() + self._conn: int | None = lib.pa_simple_new( + PULSE_SERVER.encode() if PULSE_SERVER else None, + app_name.encode(), + PA_STREAM_PLAYBACK, + sink_name.encode(), + b"playback", + ctypes.byref(spec), + None, + None, + ctypes.byref(error), + ) + if not self._conn: + raise OSError( + f"pa_simple_new failed for sink '{sink_name}' " + f"(pa_error={error.value}, server={PULSE_SERVER!r})" + ) + + def write(self, data: bytes) -> None: + """Write a PCM chunk. Blocks until PA has buffered it.""" + with self._lock: + if not self._conn: + return + error = ctypes.c_int(0) + ret = self._lib.pa_simple_write(self._conn, data, len(data), ctypes.byref(error)) + if ret < 0: + raise OSError(f"pa_simple_write failed (pa_error={error.value})") + + def drain(self) -> None: + """Block until all buffered audio has played out.""" + with self._lock: + if not self._conn: + return + error = ctypes.c_int(0) + self._lib.pa_simple_drain(self._conn, ctypes.byref(error)) + + def close(self) -> None: + """Free the PA stream. + + Acquires the lock before zeroing _conn and calling pa_simple_free, + ensuring no concurrent write() or drain() can touch the pointer + between the None assignment and the free call. + """ + with self._lock: + conn, self._conn = self._conn, None + if conn: + self._lib.pa_simple_free(conn) + + def __enter__(self) -> PASimpleStream: + return self + + def __exit__(self, *_: object) -> None: + self.close() + + +def enumerate_pa_sinks() -> list[dict[str, Any]]: + """Enumerate PulseAudio sinks via libpulse introspection API. + + Uses pa_mainloop + pa_context synchronously — no pactl binary needed. + Returns list of dicts with 'name', 'pa_sink_name', 'max_output_channels'. + """ + lib = ctypes.CDLL("libpulse.so.0") + + # --- function signatures --- + lib.pa_mainloop_new.restype = ctypes.c_void_p + lib.pa_mainloop_new.argtypes = [] + lib.pa_mainloop_get_api.restype = ctypes.c_void_p + lib.pa_mainloop_get_api.argtypes = [ctypes.c_void_p] + lib.pa_mainloop_iterate.restype = ctypes.c_int + lib.pa_mainloop_iterate.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p] + lib.pa_mainloop_free.restype = None + lib.pa_mainloop_free.argtypes = [ctypes.c_void_p] + lib.pa_context_new.restype = ctypes.c_void_p + lib.pa_context_new.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + lib.pa_context_connect.restype = ctypes.c_int + lib.pa_context_connect.argtypes = [ + ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p, + ] + lib.pa_context_get_state.restype = ctypes.c_int + lib.pa_context_get_state.argtypes = [ctypes.c_void_p] + lib.pa_context_get_sink_info_list.restype = ctypes.c_void_p + lib.pa_context_get_sink_info_list.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, + ] + lib.pa_operation_get_state.restype = ctypes.c_int + lib.pa_operation_get_state.argtypes = [ctypes.c_void_p] + lib.pa_operation_unref.restype = None + lib.pa_operation_unref.argtypes = [ctypes.c_void_p] + lib.pa_context_disconnect.restype = None + lib.pa_context_disconnect.argtypes = [ctypes.c_void_p] + lib.pa_context_unref.restype = None + lib.pa_context_unref.argtypes = [ctypes.c_void_p] + + # PA context states + PA_CONTEXT_READY = 4 + PA_CONTEXT_FAILED = 5 + PA_CONTEXT_TERMINATED = 6 + # PA operation states + PA_OPERATION_DONE = 0 + + class _PASampleSpecFull(ctypes.Structure): + _fields_ = [ + ("format", ctypes.c_int), + ("rate", ctypes.c_uint32), + ("channels", ctypes.c_uint8), + ] + + class _PASinkInfo(ctypes.Structure): + _fields_ = [ + ("name", ctypes.c_char_p), + ("index", ctypes.c_uint32), + ("description", ctypes.c_char_p), + ("sample_spec", _PASampleSpecFull), + ] + + sinks: list[dict[str, Any]] = [] + + SINK_CB = ctypes.CFUNCTYPE( + None, + ctypes.c_void_p, + ctypes.POINTER(_PASinkInfo), + ctypes.c_int, + ctypes.c_void_p, + ) + + def _sink_cb( + context: ctypes.c_void_p, + info_ptr: ctypes.POINTER(_PASinkInfo), + eol: int, + userdata: ctypes.c_void_p, + ) -> None: + if eol or not info_ptr: + return + info = info_ptr.contents + name = info.name.decode() if info.name else "" + desc = info.description.decode() if info.description else name + channels = info.sample_spec.channels + if channels >= 2: + sinks.append({ + "name": desc, + "pa_sink_name": name, + "max_output_channels": channels, + }) + + sink_cb = SINK_CB(_sink_cb) + + mainloop = lib.pa_mainloop_new() + if not mainloop: + raise OSError("pa_mainloop_new failed") + + try: + api = lib.pa_mainloop_get_api(mainloop) + ctx = lib.pa_context_new(api, b"music-assistant-enum") + if not ctx: + raise OSError("pa_context_new failed") + + server = PULSE_SERVER.encode() if PULSE_SERVER else None + ret = lib.pa_context_connect(ctx, server, 0, None) + if ret < 0: + lib.pa_context_unref(ctx) + raise OSError(f"pa_context_connect failed (ret={ret})") + + # Wait for context to become ready (max ~2s) + for _ in range(2000): + lib.pa_mainloop_iterate(mainloop, 0, None) + state = lib.pa_context_get_state(ctx) + if state == PA_CONTEXT_READY: + break + if state in (PA_CONTEXT_FAILED, PA_CONTEXT_TERMINATED): + lib.pa_context_unref(ctx) + raise OSError(f"PA context failed to connect (state={state})") + else: + lib.pa_context_unref(ctx) + raise OSError("Timed out waiting for PA context to become ready") + + # Issue get_sink_info_list and pump mainloop until operation completes + op = lib.pa_context_get_sink_info_list(ctx, sink_cb, None) + if not op: + lib.pa_context_disconnect(ctx) + lib.pa_context_unref(ctx) + raise OSError("pa_context_get_sink_info_list failed") + + for _ in range(2000): + lib.pa_mainloop_iterate(mainloop, 0, None) + if lib.pa_operation_get_state(op) == PA_OPERATION_DONE: + break + + lib.pa_operation_unref(op) + lib.pa_context_disconnect(ctx) + lib.pa_context_unref(ctx) + + finally: + lib.pa_mainloop_free(mainloop) + + return sinks diff --git a/music_assistant/providers/pulse_audio/player.py b/music_assistant/providers/pulse_audio/player.py new file mode 100644 index 0000000000..0fb39733ce --- /dev/null +++ b/music_assistant/providers/pulse_audio/player.py @@ -0,0 +1,104 @@ +"""Local PulseAudio Player implementation.""" +from __future__ import annotations + +import uuid +from typing import TYPE_CHECKING + +from music_assistant_models.enums import IdentifierType, PlayerFeature, PlayerType +from music_assistant_models.player import DeviceInfo + +from music_assistant.helpers.process import check_output +from music_assistant.models.player import Player + +from .constants import ( + CONF_HARDWARE_VOLUME_CEILING, + DEFAULT_HARDWARE_VOLUME_CEILING, + DEVICE_UUID_NAMESPACE, + VOLUME_CONTROL_SOFTWARE, +) +from .helpers import find_pactl, pactl_env + +if TYPE_CHECKING: + from .provider import LocalPulseAudioProvider + + +def get_sink_uuid(sink_name: str) -> str: + """Generate a stable UUID for a PulseAudio sink from its internal name.""" + return str(uuid.uuid5(DEVICE_UUID_NAMESPACE, sink_name)) + + +class LocalPulseAudioPlayer(Player): + """Player for a PulseAudio sink (remap sink, combined sink, etc.).""" + + def __init__( + self, + provider: LocalPulseAudioProvider, + player_id: str, + display_name: str, + pa_sink_name: str, + ) -> None: + super().__init__(provider, player_id) + self._attr_type = PlayerType.PLAYER + self._attr_name = display_name + self._attr_available = True + self._attr_supported_features = { + PlayerFeature.VOLUME_SET, + PlayerFeature.VOLUME_MUTE, + } + self._attr_device_info = DeviceInfo( + model=display_name, + manufacturer="PulseAudio", + ) + self._attr_device_info.add_identifier(IdentifierType.UUID, player_id) + self._attr_can_group_with = set() + self._attr_volume_level = 25 + self._attr_volume_muted = False + self._pa_sink_name = pa_sink_name + + @property + def pa_sink_name(self) -> str: + """Return the internal PulseAudio sink name.""" + return self._pa_sink_name + + @property + def volume_control_mode(self) -> str: + """Always use software volume — hardware ceiling is set once on startup.""" + return VOLUME_CONTROL_SOFTWARE + + async def volume_set(self, volume_level: int) -> None: + """Handle VOLUME_SET command.""" + self.logger.debug("volume_set called: %s", volume_level) + self._attr_volume_level = volume_level + self.update_state() + + async def volume_mute(self, muted: bool) -> None: + """Handle VOLUME_MUTE command.""" + self._attr_volume_muted = muted + self.update_state() + + async def apply_hardware_ceiling(self) -> None: + """Set PA sink hardware volume ceiling via pactl. + + Called on every startup to ensure the hardware output level is + capped at the configured ceiling percentage. + """ + provider: LocalPulseAudioProvider = self.provider + ceiling = provider.config.get_value( + CONF_HARDWARE_VOLUME_CEILING, DEFAULT_HARDWARE_VOLUME_CEILING + ) + sink_name = self._pa_sink_name + try: + await check_output( + find_pactl(), + "set-sink-volume", + sink_name, + f"{ceiling}%", + env=pactl_env(), + ) + provider.logger.debug( + "Hardware ceiling set to %d%% for sink %s", ceiling, sink_name + ) + except Exception as err: + provider.logger.warning( + "Failed to set hardware ceiling for sink %s: %s", sink_name, err + ) diff --git a/music_assistant/providers/pulse_audio/provider.py b/music_assistant/providers/pulse_audio/provider.py new file mode 100644 index 0000000000..c3e762a968 --- /dev/null +++ b/music_assistant/providers/pulse_audio/provider.py @@ -0,0 +1,44 @@ +"""Local PulseAudio Out player provider for Music Assistant.""" +from __future__ import annotations + +import sys + +from music_assistant.models.player_provider import PlayerProvider + +from .sendspin_bridge import LocalPulseAudioBridgeManager + + +class LocalPulseAudioProvider(PlayerProvider): + """Player provider that exposes PulseAudio sinks as Sendspin players.""" + + _bridge_manager: LocalPulseAudioBridgeManager + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + if sys.platform != "linux": + raise RuntimeError( + "Local PulseAudio Out provider is only supported on Linux." + ) + # Verify libpulse-simple is present before we try to do anything + try: + import ctypes + ctypes.CDLL("libpulse-simple.so.0") + except OSError as err: + raise RuntimeError( + "libpulse-simple.so.0 not found — is PulseAudio installed?" + ) from err + + self._bridge_manager = LocalPulseAudioBridgeManager(self) + + async def loaded_in_mass(self) -> None: + """Handle provider fully loaded in Music Assistant.""" + await self._bridge_manager.discover_and_register() + + async def unload(self, is_removed: bool = False) -> None: + """Handle unload/removal of the provider.""" + if bridge_manager := getattr(self, "_bridge_manager", None): + await bridge_manager.stop_all() + + async def discover_players(self) -> None: + """Re-enumerate PulseAudio sinks.""" + await self._bridge_manager.discover_and_register() diff --git a/music_assistant/providers/pulse_audio/sendspin_bridge.py b/music_assistant/providers/pulse_audio/sendspin_bridge.py new file mode 100644 index 0000000000..c6a53ec1f3 --- /dev/null +++ b/music_assistant/providers/pulse_audio/sendspin_bridge.py @@ -0,0 +1,442 @@ +"""Sendspin Bridge for Local PulseAudio Out - streams audio to PA sinks.""" +from __future__ import annotations + +import asyncio +import json +import subprocess +from contextlib import suppress +from typing import TYPE_CHECKING, Any, cast + +import numpy as np +from aiosendspin.models.core import ClientHelloPayload +from aiosendspin.models.core import DeviceInfo as SendspinDeviceInfo +from aiosendspin.models.player import ClientHelloPlayerSupport, SupportedAudioFormat +from aiosendspin.models.types import AudioCodec, PlayerCommand +from music_assistant_models.enums import IdentifierType + +from music_assistant.providers.sendspin.bridge_role import ( + BRIDGE_BIT_DEPTH, + BRIDGE_CHANNELS, + BRIDGE_ROLE_ID, + BRIDGE_SAMPLE_RATE, + BridgePlayerRole, +) +from music_assistant.providers.sendspin.helpers import bridge_client_id_from_uuid + +from .constants import VOLUME_CONTROL_SOFTWARE +from .helpers import find_pactl, pactl_env +from .pa_simple import PASimpleStream +from .player import LocalPulseAudioPlayer, get_sink_uuid + +if TYPE_CHECKING: + from aiosendspin.server import ExternalStreamStartRequest, SendspinClient, SendspinServer + from aiosendspin.server.roles import AudioChunk + + from music_assistant.providers.sendspin.provider import SendspinProvider + + from .provider import LocalPulseAudioProvider + + +class SendspinPulseAudioBridge: + """Manages the Sendspin to PulseAudio sink bridge for a single sink.""" + + def __init__( + self, + provider: LocalPulseAudioProvider, + player: LocalPulseAudioPlayer, + sink_info: dict[str, Any], + sendspin_server: SendspinServer, + ) -> None: + self.provider = provider + self.mass = provider.mass + self.player = player + self.sendspin_server = sendspin_server + self.sink_name: str = sink_info["pa_sink_name"] + self.display_name: str = sink_info["name"] + self.sample_rate: int = sink_info.get("sample_rate", BRIDGE_SAMPLE_RATE) + self.bit_depth: int = sink_info.get("bit_depth", BRIDGE_BIT_DEPTH) + self.sink_info = sink_info + self.logger = provider.logger.getChild(f"bridge.{self.sink_name}") + + self._sendspin_client: SendspinClient | None = None + self._bridge_client_id: str | None = None + self._bridge_role: BridgePlayerRole | None = None + self._is_streaming = False + self._write_queue: asyncio.Queue[bytes | None] = asyncio.Queue() + self._writer_task: asyncio.Task[None] | None = None + self._lock = asyncio.Lock() + + @property + def is_registered(self) -> bool: + """Return whether the bridge is registered with Sendspin.""" + return self._sendspin_client is not None + + async def start(self) -> None: + """Register the PA sink as an external Sendspin client.""" + device_uuid = get_sink_uuid(self.sink_name) + self._bridge_client_id = bridge_client_id_from_uuid(device_uuid) + + if sendspin_prov := self._get_sendspin_provider(): + sendspin_prov.register_bridge_identifiers( + self._bridge_client_id, + {IdentifierType.UUID: device_uuid}, + ) + + # Advertise the sink's actual native format so MA transcodes to the + # correct rate/depth before sending chunks. We list PCM at every + # standard bit-depth the sink's rate supports so MA can pick the + # best match for each track (bit-perfect where possible). + sink_rate = self.sample_rate + sink_depth = self.bit_depth + # Build format list: prefer sink-native depth first, then fall back + # to the bridge default so at least something is always supported. + _depths = sorted({sink_depth, BRIDGE_BIT_DEPTH}, reverse=True) + supported_formats = [ + SupportedAudioFormat( + codec=AudioCodec.PCM, + channels=BRIDGE_CHANNELS, + sample_rate=sink_rate, + bit_depth=d, + ) + for d in _depths + ] + + hello = ClientHelloPayload( + client_id=self._bridge_client_id, + name=self.display_name, + version=1, + supported_roles=[BRIDGE_ROLE_ID], + device_info=SendspinDeviceInfo( + product_name=self.display_name, + manufacturer="PulseAudio", + ), + player_support=ClientHelloPlayerSupport( + supported_formats=supported_formats, + buffer_capacity=1_000, + supported_commands=[PlayerCommand.VOLUME, PlayerCommand.MUTE], + ), + ) + + self.logger.debug( + "Registering Sendspin bridge for sink %s (client_id=%s)", + self.sink_name, + self._bridge_client_id, + ) + + self._sendspin_client = self.sendspin_server.register_external_player( + hello, on_stream_start=self._on_stream_start + ) + + for role in self._sendspin_client.roles_by_family("player"): + self.logger.debug( + "Found player role: %s type=%s", role.role_id, type(role).__name__ + ) + if isinstance(role, BridgePlayerRole): + self._bridge_role = role + break + + if self._bridge_role is None: + self.logger.error("No BridgePlayerRole found for sink %s", self.sink_name) + return + + self._bridge_role.set_callbacks( + on_audio_chunk=self._on_audio_chunk, + on_volume_change=self._on_volume_change, + on_mute_change=self._on_mute_change, + on_stream_start=self._on_bridge_stream_start, + on_stream_end=self._on_bridge_stream_end, + initial_volume=25, + ) + self._bridge_role.setup_audio_requirements( + sample_rate=self.sample_rate, + bit_depth=self.bit_depth, + channels=BRIDGE_CHANNELS, + ) + + self.logger.info( + "Sendspin bridge registered for sink %s (client_id=%s)", + self.sink_name, + self._bridge_client_id, + ) + + def _get_sendspin_provider(self) -> SendspinProvider | None: + return cast("SendspinProvider | None", self.mass.get_provider("sendspin")) + + async def stop(self) -> None: + """Stop and unregister the bridge.""" + async with self._lock: + await self._stop_streaming() + if self._sendspin_client and self._bridge_client_id: + await self.sendspin_server.remove_client(self._bridge_client_id) + self._sendspin_client = None + self._bridge_role = None + self.logger.debug("Sendspin bridge stopped for sink %s", self.sink_name) + + def _on_stream_start(self, request: ExternalStreamStartRequest) -> None: + self.logger.debug( + "Stream start request for sink %s (reason=%s)", + self.sink_name, + request.connection_reason, + ) + self._is_streaming = True + + def _on_bridge_stream_start(self) -> None: + """Start the audio writer task.""" + if self._writer_task is not None and not self._writer_task.done(): + self._writer_task.cancel() + self._is_streaming = True + while not self._write_queue.empty(): + self._write_queue.get_nowait() + self._writer_task = self.mass.create_task(self._audio_writer()) + self.logger.info("Bridge writer started for sink %s", self.sink_name) + + def _on_bridge_stream_end(self) -> None: + self._is_streaming = False + self.mass.create_task(self._stop_streaming_locked()) + + def _on_volume_change(self, volume: int) -> None: + self.logger.debug("Volume change received: %d", volume) + self.mass.create_task(self.player.volume_set(volume)) + + def _on_mute_change(self, muted: bool) -> None: + self.logger.debug("Mute change received: %s", muted) + self.mass.create_task(self.player.volume_mute(muted)) + + def _on_audio_chunk(self, chunk: AudioChunk) -> None: + if not self._is_streaming: + return + if not hasattr(self, '_logged_chunk_fmt'): + self.logger.debug( + "First chunk: len=%d bridge sample_rate=%d bit_depth=%d", + len(chunk.data), self.sample_rate, self.bit_depth, + ) + self._logged_chunk_fmt = True + self._write_queue.put_nowait(chunk.data) + + def _apply_software_volume(self, pcm_data: bytes) -> bytes: + """Apply software volume scaling and format conversion.""" + if self.player.volume_muted: + if self.bit_depth == 24: + # PA expects packed s24le: 3 bytes/sample, not 4 + return b"\x00" * (len(pcm_data) * 3 // 4) + return b"\x00" * len(pcm_data) + volume = self.player.volume_level + self.logger.debug("Applying software volume: level=%s", volume) + scale = volume / 100.0 if (volume is not None and volume < 100) else None + + if self.bit_depth == 32: + if scale is None: + return pcm_data + samples = np.frombuffer(pcm_data, dtype=np.int32).copy() + scaled = np.clip(samples.astype(np.float64) * scale, -2147483648, 2147483647) + return scaled.astype(np.int32).tobytes() + + if self.bit_depth == 24: + # MA delivers 24-bit audio left-justified in 32-bit containers + # (significant bits in the upper 24, low 8 bits zero). + # Always repack to packed s24le (bytes 1-3 of each int32) since + # the PA sink expects packed 3-byte LE regardless of volume. + samples = np.frombuffer(pcm_data, dtype=np.int32).copy() + if scale is not None: + samples = np.clip( + samples.astype(np.float64) * scale, -2147483648, 2147483647 + ).astype(np.int32) + return samples.view(np.uint8).reshape(-1, 4)[:, 1:].tobytes() + + # 16-bit + if scale is None: + return pcm_data + samples = np.frombuffer(pcm_data, dtype=np.int16).copy() + scaled = np.clip(samples.astype(np.float64) * scale, -32768, 32767) + return scaled.astype(np.int16).tobytes() + + async def _audio_writer(self) -> None: + """Write queued audio to the PA sink via pa_simple.""" + loop = asyncio.get_running_loop() + stream: PASimpleStream | None = None + write_future: asyncio.Future | None = None + try: + self.logger.debug( + "Opening PA stream: sink=%s rate=%d channels=%d bit_depth=%d", + self.sink_name, self.sample_rate, BRIDGE_CHANNELS, self.bit_depth + ) + stream = await loop.run_in_executor( + None, + lambda: PASimpleStream( + sink_name=self.sink_name, + app_name="music-assistant", + rate=self.sample_rate, + channels=BRIDGE_CHANNELS, + bit_depth=self.bit_depth, + ), + ) + self.logger.debug("pa_simple stream opened for sink %s", self.sink_name) + + while True: + data = await self._write_queue.get() + if data is None or not self._is_streaming: + break + self.logger.debug("Audio chunk received: len=%d", len(data)) + data = self._apply_software_volume(data) + write_future = loop.run_in_executor(None, stream.write, data) + await write_future + write_future = None + + except asyncio.CancelledError: + pass + except OSError as err: + self.logger.error("pa_simple error for sink %s: %s", self.sink_name, err) + finally: + self._is_streaming = False + if write_future is not None: + with suppress(Exception): + await asyncio.shield(write_future) + if stream is not None: + with suppress(Exception): + await loop.run_in_executor(None, stream.close) + if self._writer_task is asyncio.current_task(): + self._writer_task = None + + async def _stop_streaming_locked(self) -> None: + async with self._lock: + await self._stop_streaming() + + async def _stop_streaming(self) -> None: + """Stop streaming (called with lock held).""" + self._is_streaming = False + if self._writer_task: + self._writer_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await self._writer_task + self._writer_task = None + while not self._write_queue.empty(): + self._write_queue.get_nowait() + + +class LocalPulseAudioBridgeManager: + """Manages Sendspin bridges for all PulseAudio output sinks.""" + + def __init__(self, provider: LocalPulseAudioProvider) -> None: + self.provider = provider + self.mass = provider.mass + self.logger = provider.logger.getChild("bridge_manager") + self._bridges: dict[str, SendspinPulseAudioBridge] = {} + self._lock = asyncio.Lock() + + @property + def sendspin_server(self) -> SendspinServer | None: + if provider := cast("SendspinProvider | None", self.mass.get_provider("sendspin")): + return provider.server_api + return None + + async def discover_and_register(self) -> None: + """Enumerate PA sinks and register players and Sendspin bridges.""" + sendspin_server = self.sendspin_server + if not sendspin_server: + self.logger.debug("Sendspin provider not available, skipping sink enumeration") + return + + loop = asyncio.get_running_loop() + try: + sinks: list[dict[str, Any]] = await loop.run_in_executor( + None, self._enumerate_pa_sinks + ) + except Exception as err: + self.logger.warning("Failed to enumerate PA sinks: %s", err, exc_info=True) + return + + if not sinks: + self.logger.info("No PulseAudio output sinks found") + return + + self.logger.info("Found %d PulseAudio sink(s)", len(sinks)) + + async with self._lock: + for sink in sinks: + pa_sink_name: str = sink["pa_sink_name"] + display_name: str = sink["name"] + device_uuid = get_sink_uuid(pa_sink_name) + client_id = bridge_client_id_from_uuid(device_uuid) + + if client_id in self._bridges: + self.logger.debug("Bridge already exists for sink %s", pa_sink_name) + continue + + player = LocalPulseAudioPlayer( + self.provider, + player_id=device_uuid, + display_name=display_name, + pa_sink_name=pa_sink_name, + ) + await self.mass.players.register_or_update(player) + await player.apply_hardware_ceiling() + + bridge = SendspinPulseAudioBridge( + self.provider, player, sink, sendspin_server + ) + try: + await bridge.start() + except Exception: + self.logger.warning("Failed to start bridge for sink %s", pa_sink_name) + with suppress(Exception): + await bridge.stop() + player._attr_available = False + player.update_state() + continue + + if not bridge.is_registered: + player._attr_available = False + player.update_state() + continue + + self._bridges[client_id] = bridge + self.logger.info( + "Bridge created for sink %s (%s)", pa_sink_name, display_name + ) + + @staticmethod + def _enumerate_pa_sinks() -> list[dict[str, Any]]: + """Enumerate stereo-capable PulseAudio sinks via pactl.""" + sinks: list[dict[str, Any]] = [] + result = subprocess.run( + [find_pactl(), "--format=json", "list", "sinks"], + capture_output=True, + text=True, + timeout=5, + env=pactl_env(), + ) + if result.returncode != 0: + raise RuntimeError( + f"pactl exited {result.returncode}: {result.stderr.strip()}" + ) + for sink in json.loads(result.stdout): + name: str = sink.get("name", "") + desc: str = sink.get("description", name) + spec_str: str = sink.get("sample_specification", "") + try: + parts = spec_str.split() + fmt = parts[0] + channels = int(parts[1].replace("ch", "")) + sample_rate = int(parts[2].replace("Hz", "")) + bit_depth = int("".join(filter(str.isdigit, fmt.split("le")[0].split("be")[0]))) + except (IndexError, ValueError): + continue + if channels < 2: + continue + sinks.append({ + "name": desc, + "pa_sink_name": name, + "max_output_channels": channels, + "sample_rate": sample_rate, + "bit_depth": bit_depth, + }) + return sinks + + async def stop_all(self) -> None: + """Stop all bridges.""" + async with self._lock: + for bridge in list(self._bridges.values()): + with suppress(Exception): + await bridge.stop() + self._bridges.clear() + self.logger.debug("All PulseAudio bridges stopped") diff --git a/music_assistant/providers/sendspin/bridge_role.py b/music_assistant/providers/sendspin/bridge_role.py index 89722c9825..47ab3faa86 100644 --- a/music_assistant/providers/sendspin/bridge_role.py +++ b/music_assistant/providers/sendspin/bridge_role.py @@ -92,15 +92,30 @@ def role_family(self) -> str: """Return role family name.""" return "player" - def setup_audio_requirements(self) -> None: - """Set up audio requirements for bridge PCM format.""" + def setup_audio_requirements( + self, + sample_rate: int = BRIDGE_SAMPLE_RATE, + bit_depth: int = BRIDGE_BIT_DEPTH, + channels: int = BRIDGE_CHANNELS, + ) -> None: + """Set up audio requirements for bridge PCM format. + + Call with the sink's native rate/depth so MA transcodes to the + correct format before delivering chunks. Defaults to the bridge + constants for callers that don't need format negotiation. + """ self._audio_requirements = AudioRequirements( - sample_rate=BRIDGE_SAMPLE_RATE, - bit_depth=BRIDGE_BIT_DEPTH, - channels=BRIDGE_CHANNELS, - transformer=None, # Raw PCM, no encoding + sample_rate=sample_rate, + bit_depth=bit_depth, + channels=channels, + transformer=None, ) + @property + def preferred_format(self) -> AudioRequirements | None: + """Return the audio format declared via setup_audio_requirements.""" + return self._audio_requirements + def get_audio_requirements(self) -> AudioRequirements | None: """Return audio requirements for PushStream.""" return self._audio_requirements diff --git a/music_assistant/providers/sendspin/playback.py b/music_assistant/providers/sendspin/playback.py index 44be905606..ca7f8d1ab2 100644 --- a/music_assistant/providers/sendspin/playback.py +++ b/music_assistant/providers/sendspin/playback.py @@ -1162,6 +1162,14 @@ def _get_member_output_format(self, player_id: str) -> AudioFormat: channels=preferred_fmt.channels, ) elif isinstance(role, BridgePlayerRole): + fmt = role.preferred_format + if fmt is not None: + return AudioFormat( + content_type=ContentType.from_bit_depth(fmt.bit_depth), + sample_rate=fmt.sample_rate, + bit_depth=fmt.bit_depth, + channels=fmt.channels, + ) return AudioFormat( content_type=ContentType.from_bit_depth(BRIDGE_BIT_DEPTH), sample_rate=BRIDGE_SAMPLE_RATE, diff --git a/music_assistant/providers/spotify_connect_go/__init__.py b/music_assistant/providers/spotify_connect_go/__init__.py new file mode 100644 index 0000000000..53f88b5907 --- /dev/null +++ b/music_assistant/providers/spotify_connect_go/__init__.py @@ -0,0 +1,908 @@ +""" +Spotify Connect Go plugin for Music Assistant. + +This plugin uses go-librespot with its web interface for better control capabilities. +We tie a single player to a single Spotify Connect daemon. +The provider has multi instance support, +so multiple players can be linked to multiple Spotify Connect daemons. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import time +from collections.abc import Callable +from contextlib import suppress +from typing import TYPE_CHECKING, cast + +import aiohttp +import yaml +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption +from music_assistant_models.enums import ( + ConfigEntryType, + ContentType, + EventType, + MediaType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import UnsupportedFeaturedException +from music_assistant_models.media_items import AudioFormat +from music_assistant_models.player import PlayerMedia + +from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW +from music_assistant.helpers.process import AsyncProcess, check_output +from music_assistant.models.plugin import PluginProvider, PluginSource + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigValueType, ProviderConfig + from music_assistant_models.event import MassEvent + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +CONF_MASS_PLAYER_ID = "mass_player_id" +CONF_SERVER_PORT = "server_port" +CONNECT_ITEM_ID = "spotify_connect_go" + +# Default server port for go-librespot web interface +DEFAULT_SERVER_PORT = 3678 + +SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE} + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return SpotifyConnectGoProvider(mass, manifest, config) + + +async def get_config_entries( + mass: MusicAssistant, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + return ( + CONF_ENTRY_WARN_PREVIEW, + ConfigEntry( + key=CONF_MASS_PLAYER_ID, + type=ConfigEntryType.STRING, + label="Connected Music Assistant Player", + description="Select the player for which you want to enable Spotify Connect Go.", + multi_value=False, + options=[ConfigValueOption(x.display_name, x.player_id) for x in mass.players], + required=True, + ), + ConfigEntry( + key=CONF_SERVER_PORT, + type=ConfigEntryType.INTEGER, + label="Web Interface Port", + description="Port for the go-librespot web interface (default: 3678)", + default_value=DEFAULT_SERVER_PORT, + required=False, + ), + ConfigEntry( + key="metadata_delay", + type=ConfigEntryType.FLOAT, + label="Metadata Delay (seconds)", + description="Delay metadata updates to sync with audio playback (0-10 seconds). Adjust based on your buffer chain latency.", + default_value=3.5, + required=False, + range=(0, 10), + ), + ) + + +class SpotifyConnectGoProvider(PluginProvider): + """Implementation of a Spotify Connect Go Plugin.""" + + def __init__( + self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig + ) -> None: + """Initialize MusicProvider.""" + super().__init__(mass, manifest, config) + # super().__init__(mass, manifest, SUPPORTED_FEATURES) + self.mass_player_id = cast("str", self.config.get_value(CONF_MASS_PLAYER_ID)) + self.server_port = cast( + "int", self.config.get_value(CONF_SERVER_PORT) or DEFAULT_SERVER_PORT + ) + self.cache_dir = os.path.join(self.mass.cache_path, self.instance_id) + self.config_dir = os.path.join(self.cache_dir, "config") + # self._go_librespot_bin = "/usr/local/bin/go-librespot" + self._go_librespot_bin = "/media/bin/go-librespot" + self._stop_called: bool = False + self._runner_task: asyncio.Task | None = None + self._websocket_task: asyncio.Task | None = None + self._go_librespot_proc: AsyncProcess | None = None + self._go_librespot_started = asyncio.Event() + self.named_pipe = f"/tmp/{self.instance_id}" # noqa: S108 + self._api_base_url = f"http://localhost:{self.server_port}" + self._ws_url = f"ws://localhost:{self.server_port}/events" + self._ws_session: aiohttp.ClientSession | None = None + self._ws_connection = None + + # Create the source details + self._source_details = PluginSource( + id=self.instance_id, + name=self.manifest.name, + passive=False, + can_play_pause=True, + can_seek=True, + can_next_previous=True, + audio_format=AudioFormat( + content_type=ContentType.PCM_S16LE, + codec_type=ContentType.PCM_S16LE, + sample_rate=44100, + bit_depth=16, + channels=2, + ), + stream_type=StreamType.NAMED_PIPE, + path=self.named_pipe, + on_play=self._on_play_callback, + on_pause=self._on_pause_callback, + on_next=self._on_next_callback, + on_previous=self._on_previous_callback, + on_seek=self._on_seek_callback, + on_volume=self._on_volume_callback, + on_select=self._on_source_selected, + ) + self._on_unload_callbacks: list[Callable[..., None]] = [ + self.mass.subscribe( + self._on_mass_player_event, + (EventType.PLAYER_ADDED, EventType.PLAYER_REMOVED), + id_filter=self.mass_player_id, + ), + ] + self._active_player_id: str | None = None + self._current_track_uri: str | None = None + self._metadata_update_task: asyncio.Task | None = ( + None # ADD THIS for cancelling delayed updates + ) + + @property + def supported_features(self) -> set[ProviderFeature]: + """Return the features supported by this Provider.""" + return {ProviderFeature.AUDIO_SOURCE} + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + # Check if go-librespot binary exists + if not os.path.exists(self._go_librespot_bin): + raise FileNotFoundError(f"go-librespot binary not found at {self._go_librespot_bin}") + + # Create config directory if it doesn't exist + os.makedirs(self.config_dir, exist_ok=True) + + self.player = self.mass.players.get_player(self.mass_player_id) + if self.player: + self._setup_player_daemon() + + async def unload(self, is_removed: bool = False) -> None: + """Handle close/cleanup of the provider.""" + self._stop_called = True + + # Close WebSocket connection + if self._ws_connection: + await self._ws_connection.close() + if self._ws_session: + await self._ws_session.close() + + # Stop the go-librespot process + if self._go_librespot_proc: + await self._go_librespot_proc.close() + + # Cancel tasks + if self._runner_task and not self._runner_task.done(): + self._runner_task.cancel() + with suppress(asyncio.CancelledError): + await self._runner_task + + if self._websocket_task and not self._websocket_task.done(): + self._websocket_task.cancel() + with suppress(asyncio.CancelledError): + await self._websocket_task + + for callback in self._on_unload_callbacks: + callback() + + def get_source(self) -> PluginSource: + """Get (audio)source details for this plugin.""" + return self._source_details + + async def _on_play_callback(self) -> None: + """Called by MA when play is requested.""" + await self._send_api_command("player/resume", method="POST") + + async def _on_pause_callback(self) -> None: + """Called by MA when pause is requested.""" + await self._send_api_command("player/pause", method="POST") + + async def _on_next_callback(self) -> None: + """Called by MA when next track is requested.""" + await self._send_api_command("player/next", method="POST") + + async def _on_previous_callback(self) -> None: + """Called by MA when previous track is requested.""" + await self._send_api_command("player/prev", method="POST") + + async def _on_seek_callback(self, position: int) -> None: + """Called by MA when seek is requested (position in seconds).""" + position_ms = position * 1000 + await self._send_api_command(f"player/seek?position={position_ms}", method="PUT") + + async def _on_volume_callback(self, volume: int) -> None: + """Volume is handled by MA at the player level, not go-librespot.""" + + async def _on_source_selected(self) -> None: + """Handle callback when this source is selected on a player.""" + new_player_id = self._source_details.in_use_by + if not new_player_id: + return + # If there's already an active player and it's different, stop it + if self._active_player_id and self._active_player_id != new_player_id: + self.logger.info( + "Source selected on player %s, stopping playback on %s", + new_player_id, + self._active_player_id, + ) + try: + await self.mass.players.cmd_stop(self._active_player_id) + except Exception as err: + self.logger.debug( + "Failed to stop previous player %s: %s", self._active_player_id, err + ) + self._active_player_id = new_player_id + self.logger.info("Active player set to: %s", self._active_player_id) + + def _clear_active_player(self) -> None: + """Clear the active player when playback ends.""" + prev_player_id = self._active_player_id + self._active_player_id = None + self._source_details.in_use_by = None + self._current_track_uri = None + if prev_player_id: + self.logger.debug("Playback ended on player %s, clearing active player", prev_player_id) + self.mass.players.trigger_player_update(prev_player_id) + + async def _send_api_command(self, endpoint: str, method: str = "POST") -> None: + """Send a command to the go-librespot API.""" + url = f"{self._api_base_url}/{endpoint}" + self.logger.debug("Sending %s request to %s", method, url) + try: + async with aiohttp.ClientSession() as session: + if method == "POST": + async with session.post(url) as response: + response_text = await response.text() + self.logger.debug( + "API response (%s): %s - %s", response.status, endpoint, response_text + ) + if response.status != 200: + self.logger.error( + "API command failed: %s - Status: %s - Response: %s", + endpoint, + response.status, + response_text, + ) + elif method == "PUT": + async with session.put(url) as response: + response_text = await response.text() + self.logger.debug( + "API response (%s): %s - %s", response.status, endpoint, response_text + ) + if response.status != 200: + self.logger.error( + "API command failed: %s - Status: %s - Response: %s", + endpoint, + response.status, + response_text, + ) + except Exception as e: + self.logger.error("Failed to send API command %s: %s", endpoint, e) + + def _create_config_file(self) -> str: + """Create go-librespot config file and return its path.""" + config_path = os.path.join(self.config_dir, "config.yml") + + config = { + "zeroconf_enabled": True, + "zeroconf_port": 0, + "credentials": {"type": "zeroconf", "zeroconf": {"persist_credentials": True}}, + "server": { + "enabled": True, + "address": "0.0.0.0", + "port": self.server_port, + "allow_origin": "*", + "cert_file": "", + "key_file": "", + }, + "log_level": "info", + "device_id": "", + "device_name": self.name, + "device_type": "computer", + "audio_backend": "pipe", + "audio_device": "", + "audio_output_pipe": self.named_pipe, + "audio_output_pipe_format": "s16le", + "audio_buffer_time": 50000, # 500ms in microseconds + "audio_period_count": 4, + "bitrate": 320, + "volume_steps": 100, + "initial_volume": 100, + "external_volume": True, + "disable_autoplay": False, + } + + with open(config_path, "w") as f: + yaml.dump(config, f, default_flow_style=False) + + return config_path + + async def _go_librespot_runner(self) -> None: + """Run the spotify connect daemon in a background task.""" + self.logger.info("Starting Spotify Connect Go background daemon") + + # Create named pipe for audio + await check_output("rm", "-f", self.named_pipe) + await asyncio.sleep(0.1) + await check_output("mkfifo", self.named_pipe) + await check_output("chmod", "666", self.named_pipe) + await asyncio.sleep(0.1) + + # Verify pipe was created + if os.path.exists(self.named_pipe): + self.logger.info("Named pipe created successfully at %s", self.named_pipe) + else: + self.logger.error("Failed to create named pipe at %s", self.named_pipe) + + # Create config file + config_file = self._create_config_file() + self.logger.debug("Created config file at: %s", config_file) + + # Open pipe for reading permanently so go-librespot can always open + # its write end. MA will take over reading when select_source is called. + self._pipe_fd: int | None = None + try: + self._pipe_fd = os.open(self.named_pipe, os.O_RDONLY | os.O_NONBLOCK) + self.logger.debug("Pipe held open for reading at fd %s", self._pipe_fd) + except OSError as e: + self.logger.error("Failed to open pipe for reading: %s", e) + + try: + args: list[str] = [ + self._go_librespot_bin, + "--config_dir", + self.config_dir, + ] + + self.logger.debug("Starting go-librespot with args: %s", " ".join(args)) + + self._go_librespot_proc = go_librespot = AsyncProcess( + args, stdout=False, stderr=True, name=f"go-librespot[{self.name}]" + ) + await go_librespot.start() + + # Give the server time to start + await asyncio.sleep(3) + + # Check if server is responding + max_retries = 5 + for i in range(max_retries): + try: + async with aiohttp.ClientSession() as session: + async with session.get(f"{self._api_base_url}/status") as response: + if response.status == 200: + self._go_librespot_started.set() + self.logger.info("go-librespot web interface is ready") + break + except Exception as e: + if i < max_retries - 1: + self.logger.debug( + "Waiting for go-librespot to start (attempt %d/%d)", i + 1, max_retries + ) + await asyncio.sleep(1) + else: + self.logger.error("Failed to connect to go-librespot web interface: %s", e) + + # Only start WebSocket listener if the server started successfully + if self._go_librespot_started.is_set(): + self._websocket_task = self.mass.create_task(self._websocket_listener()) + + # Create a task to read stderr + stderr_task = self.mass.create_task(self._read_stderr_output(go_librespot)) + + # Wait for the process to complete + return_code = await go_librespot.wait() + self.logger.info("go-librespot process exited with return code: %s", return_code) + + # Cancel stderr task + stderr_task.cancel() + with suppress(asyncio.CancelledError): + await stderr_task + + except asyncio.CancelledError: + self.logger.info("go-librespot runner cancelled") + except Exception as e: + self.logger.error("Error running go-librespot: %s", e) + finally: + # Close the persistent pipe fd if still open + if self._pipe_fd is not None: + with suppress(OSError): + os.close(self._pipe_fd) + self._pipe_fd = None + if self._go_librespot_proc: + await self._go_librespot_proc.close() + self.logger.info("Spotify Connect Go background daemon stopped for %s", self.name) + await check_output("rm", "-f", self.named_pipe) + + if not self._go_librespot_started.is_set(): + self.unload_with_error("Unable to initialize go-librespot daemon.") + return + + # Auto restart if not stopped manually + if not self._stop_called and self._go_librespot_started.is_set(): + self.logger.warning("go-librespot exited unexpectedly, restarting in 5 seconds...") + await asyncio.sleep(5) + self._setup_player_daemon() + + async def _read_stderr_output(self, process: AsyncProcess) -> None: + """Read stderr output from go-librespot process.""" + try: + async for line in process.iter_stderr(): + if "error" in line.lower(): + self.logger.error("[go-librespot] %s", line) + else: + self.logger.debug("[go-librespot] %s", line) + except asyncio.CancelledError: + pass + + async def _websocket_listener(self) -> None: + """Listen to WebSocket events from go-librespot.""" + retry_count = 0 + max_retries = 10 + + while not self._stop_called and retry_count < max_retries: + try: + self._ws_session = aiohttp.ClientSession() + async with self._ws_session.ws_connect(self._ws_url) as ws: + self._ws_connection = ws + self.logger.info("Connected to go-librespot WebSocket") + retry_count = 0 + + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + await self._handle_websocket_event(json.loads(msg.data)) + self.logger.info("WebSocket message: %s", json.loads(msg.data)) + elif msg.type == aiohttp.WSMsgType.ERROR: + self.logger.error("WebSocket error: %s", ws.exception()) + break + except aiohttp.ClientError as e: + retry_count += 1 + self.logger.warning( + "WebSocket connection failed (attempt %d/%d): %s", retry_count, max_retries, e + ) + if retry_count < max_retries: + await asyncio.sleep(2) + except Exception as e: + self.logger.error("Unexpected WebSocket error: %s", e) + break + finally: + if self._ws_session and not self._ws_session.closed: + await self._ws_session.close() + self._ws_session = None + + async def _handle_websocket_event(self, event_data: dict) -> None: + """Handle WebSocket event from go-librespot.""" + event_type = event_data.get("type") + + # Log all events for debugging + self.logger.info( + "WebSocket event: %s - Data keys: %s", + event_type, + list(event_data.get("data", {}).keys()) if event_data.get("data") else "None", + ) + self.logger.info(event_data) + position = 0 + + # Handle metadata updates - these can come in various event types + if event_type in ("metadata", "track_changed", "new_track"): + self.logger.info("Metadata event received, updating...") + await self._update_metadata(event_data.get("data", {})) + player = self.mass.players.get_player(self.mass_player_id) + await self._on_resume(player) + + elif event_type in ("will_play", "playback_started", "playing", "active"): + self.logger.info("Playback starting event: %s", event_type) + + is_resume = False + if data := event_data.get("data", {}): + is_resume = data.get("resume", False) + + # Check if metadata is in the event + if not is_resume and ( + "track" in data or "metadata" in data or "name" in data or "title" in data + ): + await self._update_metadata(data) + # Check if this is a restart of the same track (no metadata, resume=False) + elif not is_resume and "uri" in data: + track_uri = data.get("uri", "") + if track_uri == self._current_track_uri: + self.logger.info("Track restart detected - resetting position to 0") + # Reset position to 0 for track restart + if self._source_details.metadata: + self._source_details.metadata.elapsed_time = 0 + if self._source_details.in_use_by: + player = self.mass.players.get_player(self._source_details.in_use_by) + if player: + import time + + if hasattr(player, "_attr_elapsed_time"): + player._attr_elapsed_time = 0 + player._attr_elapsed_time_last_updated = time.time() + if hasattr(player, "update_state"): + player.update_state() + + if not self._source_details.in_use_by: + self.logger.info("Selecting source on player %s", self.mass_player_id) + await self.mass.players.select_source(self.mass_player_id, self.instance_id) + + player = self.mass.players.get_player(self.mass_player_id) + await self._on_play(player) + if player: + if is_resume: + self.logger.debug("Resume detected - MA will restart progress tracking") + + player_attrs = [ + attr + for attr in dir(player) + if "next" in attr.lower() or "prev" in attr.lower() or "skip" in attr.lower() + ] + self.logger.info("Player attributes related to skip: %s", player_attrs) + + self.logger.info( + "Player active_source: %s, our instance_id: %s", + player.active_source, + self.instance_id, + ) + self.logger.info("Source in_use_by: %s", self._source_details.in_use_by) + self.logger.info("Source metadata: %s", self._source_details.metadata) + + if hasattr(player, "can_next"): + player.can_next = True + self.logger.debug("Set player.can_next = True") + if hasattr(player, "can_previous"): + player.can_previous = True + self.logger.debug("Set player.can_previous = True") + if hasattr(player, "can_next_previous"): + player.can_next_previous = True + self.logger.debug("Set player.can_next_previous = True") + + self.logger.debug( + "Player %s updated with source %s", self.mass_player_id, self.instance_id + ) + + elif event_type in ("playback_paused", "paused", "inactive"): + self.logger.debug("Playback paused") + + if self._source_details.in_use_by: + player = self.mass.players.get_player(self._source_details.in_use_by) + if player: + import time + + # Update position if provided, or keep current + if data := event_data.get("data", {}): + if "position" in data: + position_sec = data.get("position") / 1000 + if self._source_details.metadata: + self._source_details.metadata.elapsed_time = position_sec + if hasattr(player, "_attr_elapsed_time"): + player._attr_elapsed_time = position_sec + + # Freeze the timestamp to stop progress calculation + if hasattr(player, "_attr_elapsed_time_last_updated"): + player._attr_elapsed_time_last_updated = time.time() + self.logger.debug( + "Froze elapsed_time at %s seconds", player._attr_elapsed_time + ) + + if hasattr(player, "update_state"): + player.update_state() + + elif event_type in ("stopped", "session_disconnected"): + self.logger.info("Playback stopped/disconnected event: %s", event_type) + if event_type == "session_disconnected": + self.logger.info("Session disconnected - clearing everything") + self._source_details.metadata = None + self._clear_active_player() + + elif event_type == "volume_changed": + volume = event_data.get("data", {}).get("value", 0) + self.logger.debug("go-librespot volume event ignored (MA handles volume): %d", volume) + + elif event_type in ("seek", "seeked", "position_correction"): + if data := event_data.get("data", {}): + if "position" in data: + import time + + position_ms = data.get("position") + position_sec = position_ms / 1000 + self.logger.debug("Position update: %s ms", position_ms) + + # Check for track URI in position updates + current_uri = data.get("uri", "") + if current_uri and current_uri != self._current_track_uri: + self.logger.warning( + "Position update has different URI - ignoring stale data" + ) + return + + # Detect if position jumped backwards (track restart) + position_jumped_back = False + if self._source_details.metadata: + old_position = self._source_details.metadata.elapsed_time + # If position jumps back by more than 3 seconds, it's a restart + if old_position > position_sec + 3: + self.logger.info( + "Position jumped backwards from %s to %s - track restarted", + old_position, + position_sec, + ) + position_jumped_back = True + self._source_details.metadata.elapsed_time = position_sec + + if self._source_details.in_use_by: + player = self.mass.players.get_player(self._source_details.in_use_by) + if player: + # Cap position to duration + if ( + self._source_details.metadata + and self._source_details.metadata.duration + ): + position_sec = min( + position_sec, self._source_details.metadata.duration + ) + + if hasattr(player, "_attr_elapsed_time"): + player._attr_elapsed_time = position_sec + player._attr_elapsed_time_last_updated = time.time() + self.logger.debug( + "Set player position to %s at timestamp %s", + position_sec, + time.time(), + ) + + if hasattr(player, "update_state"): + player.update_state() + + self.logger.debug("Updated position to %s seconds", position_sec) + + # ... rest of event handlers remain the same ... + elif event_type == "preload_next": + # go-librespot is preloading the next track + self.logger.debug("Preloading next track") + + elif event_type == "end_of_track": + # Track finished playing + self.logger.debug("Track ended") + # Metadata for next track should arrive soon + + elif event_type in ("session_connected", "device_became_active"): + # Device is now active + self.logger.info("Device became active") + if data := event_data.get("data", {}): + if user_name := data.get("user_name"): + self.logger.info("User connected: %s", user_name) + + elif event_type == "session_client_changed": + # Client controlling the session changed + if data := event_data.get("data", {}): + client_name = data.get("client_name", "Unknown") + self.logger.info("Control client changed to: %s", client_name) + + elif event_type == "loading": + # Track is loading + self.logger.debug("Loading track...") + if data := event_data.get("data", {}): + track_id = data.get("track_id", "") + self.logger.debug("Loading track ID: %s", track_id) + + else: + # Log unknown event types for debugging + self.logger.debug( + "Unhandled WebSocket event type: %s with data: %s", + event_type, + event_data.get("data"), + ) + + async def get_source_metadata(self, source_id: str) -> PlayerMedia | None: + """Get current metadata for the given source.""" + if source_id == self.instance_id: + return self._source_details.metadata + return None + + async def _update_metadata(self, metadata: dict) -> None: + """Update metadata from go-librespot events.""" + self.logger.debug( + "_update_metadata called with metadata keys: %s", + list(metadata.keys()) if metadata else "None", + ) + + if not metadata: + self.logger.warning("_update_metadata: No metadata provided") + return + + track_info = metadata.get("track", metadata) + self.logger.debug("track_info keys: %s", list(track_info.keys())) + + from music_assistant_models.player import PlayerMedia + + # Extract URI first to detect track changes + track_uri = track_info.get("uri", "") + is_new_track = track_uri != self._current_track_uri + + if is_new_track: + self.logger.info("New track detected: %s", track_uri) + self._current_track_uri = track_uri + + # Extract fields using go-librespot's field names + title = track_info.get("name", "Unknown") + + # Handle artist_names + artist = "Unknown" + if artist_names := track_info.get("artist_names"): + if isinstance(artist_names, list) and artist_names: + artist = ( + artist_names[0] if isinstance(artist_names[0], str) else str(artist_names[0]) + ) + elif isinstance(artist_names, str): + artist = artist_names + + album_name = track_info.get("album_name", "Unknown") + image_url = track_info.get("album_cover_url") + duration = track_info.get("duration") + + self.logger.info( + "Creating PlayerMedia: title=%s, artist=%s, album=%s", title, artist, album_name + ) + + media = PlayerMedia( + uri=track_uri.replace("spotify:", "spotifyconnect:"), + title=title, + artist=artist, + album=album_name, + media_type=MediaType.TRACK, + duration=duration, + ) + + if image_url: + media.image_url = image_url + + # Handle duration - go-librespot sends milliseconds + if duration := track_info.get("duration"): + media.duration = duration / 1000 + self.logger.debug("Track duration: %s seconds", media.duration) + + # Force position to 0 for new tracks, but honor position when reconnecting + if is_new_track: + # Check if this looks like a reconnection (position > 5 seconds) or truly new track + reported_position = ( + track_info.get("position", 0) / 1000 if "position" in track_info else 0 + ) + + if reported_position > 5: + # Likely reconnecting to in-progress track, honor the position + media.elapsed_time = reported_position + self.logger.info("Reconnecting to track at position: %s seconds", reported_position) + else: + # New track starting from beginning + media.elapsed_time = 0 + self.logger.info("New track - forcing elapsed_time to 0") + elif "position" in track_info: + position = track_info.get("position") + media.elapsed_time = position / 1000 + self.logger.debug("Same track - position: %s seconds", media.elapsed_time) + else: + media.elapsed_time = 0 + self.logger.debug("No position in metadata, setting elapsed_time to 0") + + # Add extra metadata + if track_number := track_info.get("track_number"): + media.track_number = track_number + if disc_number := track_info.get("disc_number"): + media.disc_number = disc_number + + # Update source metadata + self._source_details.metadata = media + self.logger.info( + "Updated source metadata: %s - %s (uri: %s)", media.title, media.artist, media.uri + ) + + # Update player elapsed time when metadata changes + if self._source_details.in_use_by: + player = self.mass.players.get_player(self._source_details.in_use_by) + if player: + import time + + if hasattr(player, "_attr_elapsed_time"): + # **ADD: Cap elapsed_time to duration to prevent overflow** + safe_elapsed_time = media.elapsed_time + if media.duration and safe_elapsed_time > media.duration: + safe_elapsed_time = media.duration + self.logger.warning( + "Capping elapsed_time from %s to duration %s", + media.elapsed_time, + media.duration, + ) + + player._attr_elapsed_time = safe_elapsed_time + player._attr_elapsed_time_last_updated = time.time() + self.logger.debug( + "Set player elapsed_time to %s at %s", + safe_elapsed_time, + player._attr_elapsed_time_last_updated, + ) + + if hasattr(player, "_attr_current_media"): + player._attr_current_media = media + self.logger.debug("Set player _attr_current_media") + + if hasattr(player, "update_state"): + player.update_state() + + if hasattr(player, "update_state"): + player.update_state() + + async def handle_player_command(self, player_id: str, command: str, **kwargs) -> None: + """Handle player commands.""" + self.logger.info("Received command %s for player %s", command, player_id) + + if player_id != self.mass_player_id: + return + + if command == "next": + await self._send_api_command("player/next", method="POST") + elif command == "previous": + await self._send_api_command("player/prev", method="POST") + elif command == "play": + await self._send_api_command("player/resume", method="POST") + elif command == "pause": + await self._send_api_command("player/pause", method="POST") + + def _setup_player_daemon(self) -> None: + """Handle setup of the spotify connect daemon for a player.""" + self._go_librespot_started.clear() + self._runner_task = self.mass.create_task(self._go_librespot_runner()) + + def _on_mass_player_event(self, event: MassEvent) -> None: + """Handle incoming event from linked player.""" + if event.object_id != self.mass_player_id: + return + if event.event == EventType.PLAYER_REMOVED: + self._stop_called = True + self.mass.create_task(self.unload()) + return + if event.event == EventType.PLAYER_ADDED: + self._setup_player_daemon() + return + + async def _on_play(self, player): + """Starting playback from beginning.""" + + async def _on_resume(self, player): + """Resuming from paused state.""" + # MA handles this automatically + + async def _on_pause(self, player): + """Freeze position.""" + # MA handles this automatically + + async def _on_seek(self, player, new_position_seconds: float): + """Jump to a new position.""" + + async def _on_stop(self, player): + """Reset everything.""" diff --git a/music_assistant/providers/spotify_connect_go/manifest.json b/music_assistant/providers/spotify_connect_go/manifest.json new file mode 100644 index 0000000000..80c6d73625 --- /dev/null +++ b/music_assistant/providers/spotify_connect_go/manifest.json @@ -0,0 +1,11 @@ +{ + "type": "plugin", + "domain": "spotify_connect_go", + "stage": "alpha", + "name": "Spotify Connect Go", + "description": "Add Spotify Connect Go support to ANY Music Assistant player.", + "codeowners": ["@music-assistant"], + "documentation": "https://music-assistant.io/plugins/spotify-connect/", + "multi_instance": true +} + diff --git a/scripts/create_stereo_pairs.sh b/scripts/create_stereo_pairs.sh new file mode 100644 index 0000000000..2e2fd97964 --- /dev/null +++ b/scripts/create_stereo_pairs.sh @@ -0,0 +1,112 @@ +#!/bin/sh +# create_stereo_pairs.sh +# +# Creates PulseAudio stereo pair remap sinks from multi-channel sound cards. +# Run inside the hassio_audio container to enable the Music Assistant +# "Pulse Audio Out" provider to see individual stereo outputs. +# +# Usage (from the HA host): +# docker exec hassio_audio sh -c "$(cat create_stereo_pairs.sh)" +# +# Or copy and run directly: +# docker cp create_stereo_pairs.sh hassio_audio:/tmp/ +# docker exec hassio_audio sh /tmp/create_stereo_pairs.sh +# +# Note: Remap sinks do not survive a PulseAudio restart. Re-run this script +# after hassio_audio restarts, or add a HA automation to run it on startup. + +echo "create_stereo_pairs.sh started" + +# Remove existing remap sinks to start clean +for id in $(pactl list short modules | awk '/module-remap-sink/ {print $1}'); do + echo "Unloading existing remap module $id" + pactl unload-module "$id" +done + +# Disable suspend-on-idle to prevent audio dropouts when MA starts playing +pactl unload-module module-suspend-on-idle 2>/dev/null || true + +sleep 1 + +# Stereo pair definitions: suffix:left_channel,right_channel +PAIRS=" +front_stereo:front-left,front-right +rear_stereo:rear-left,rear-right +side_stereo:side-left,side-right +center_sub:front-center,lfe +" + +# Get full sink list once +sink_list=$(pactl list sinks) + +# Parse sink blocks: extract name, channel map, and alsa.card_name +# Output format: sink_name|channel_map|card_name +echo "$sink_list" | awk ' + /^Sink #/ { + if (sink && chmap) print sink "|" chmap "|" card + sink=""; chmap=""; card="" + } + /^\tName:/ { sink=$2 } + /^\tChannel Map:/ { chmap=$3 } + /alsa\.card_name/ { + match($0, /"[^"]+"/) + card=substr($0, RSTART+1, RLENGTH-2) + gsub(/[ \t-]/, "_", card) + gsub(/[^[:alnum:]_]/, "", card) + } + END { if (sink && chmap) print sink "|" chmap "|" card } +' | while IFS='|' read -r sink chmap card; do + + # Skip sendspin virtual sinks + case "$sink" in + sendspin_*) continue ;; + esac + + # Skip sinks with fewer than 4 channels — already stereo, no pairs needed + chan_count=$(echo "$chmap" | tr ',' '\n' | wc -l) + if [ "$chan_count" -lt 4 ]; then + echo "Skipping $sink ($chan_count channels, already stereo)" + continue + fi + + # Fall back to sanitized sink name if no alsa.card_name found + if [ -z "$card" ]; then + card=$(echo "$sink" \ + | sed 's/^alsa_output\.//' \ + | sed 's/\.[^.]*$//' \ + | tr '.-' '__' \ + | tr -cd '[:alnum:]_') + fi + + echo "Processing $sink (card=$card, channels=$chmap)" + + # Try each stereo pair definition + echo "$PAIRS" | grep -v '^$' | while IFS=: read -r suffix channels; do + left=$(echo "$channels" | cut -d',' -f1) + right=$(echo "$channels" | cut -d',' -f2) + + # Only create the pair if both channels exist on this sink + if echo "$chmap" | grep -q "$left" && echo "$chmap" | grep -q "$right"; then + remap_name="${card}_${suffix}" + echo " Creating $remap_name ($left, $right)" + module_id=$(pactl load-module module-remap-sink \ + sink_name="$remap_name" \ + master="$sink" \ + sink_properties="device.description=$remap_name" \ + channels=2 \ + master_channel_map="${left},${right}" \ + channel_map="${left},${right}" \ + remix=no 2>&1) + if echo "$module_id" | grep -qE '^[0-9]+$'; then + echo " Loaded module $module_id for $remap_name" + else + echo " Failed to create $remap_name: $module_id" + fi + fi + done + +done + +echo "" +echo "Done. Current sinks:" +pactl list sinks short