From 1963c4c3d1d74c42423cea076282c9720a06c024 Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 15:17:17 +0200 Subject: [PATCH 1/8] Fix infinite loop in plex_connect when handling refreshPlayQueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handle_refresh_play_queue was missing the _updating_from_plex guard that all other command handlers use. Without it, any queue refresh from Plex would modify the MA queue, triggering _handle_queue_items_updated, which would recreate the Plex PlayQueue, causing another refreshPlayQueue — an infinite loop. The stale PlayQueue 404 errors were also a symptom of this. --- music_assistant/providers/plex_connect/player_remote.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/music_assistant/providers/plex_connect/player_remote.py b/music_assistant/providers/plex_connect/player_remote.py index a88a456372..0e32cd8ffe 100644 --- a/music_assistant/providers/plex_connect/player_remote.py +++ b/music_assistant/providers/plex_connect/player_remote.py @@ -857,6 +857,8 @@ async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: This is called when the play queue is modified (items added, removed, reordered). We need to sync the entire updated queue state to MA while preserving playback. """ + # Set flag to prevent circular updates + self._updating_from_plex = True try: play_queue_id = request.query.get("playQueueID") @@ -955,6 +957,8 @@ def fetch_queue() -> PlayQueue: except Exception as e: LOGGER.exception(f"Error handling refreshPlayQueue: {e}") return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False async def handle_create_play_queue(self, request: web.Request) -> web.Response: """ From 1ce49edf7861a86f3dfb9cf124bcc0cf53f9ff0b Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 16:24:54 +0200 Subject: [PATCH 2/8] Refactor plex_connect for maintainability and contributions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses style fixes and better segregation of functionalities to make the plugin easier to maintain and for others to contribute to. The monolithic player_remote.py is split into focused modules: - server.py — core HTTP server, routing, lifecycle - timeline.py — timeline XML building and broadcasting - playback.py — playback control command handlers - queue_commands.py — play queue HTTP command handlers - queue_sync.py — background queue loading and MA↔Plex sync logic Also promotes the plugin stage from alpha to beta, and fixes several bugs discovered during the refactor: a missing _updating_from_plex guard in handle_create_play_queue, a race condition in background queue loading, and shuffle changes not being synced back to the Plex play queue. --- .../providers/plex_connect/__init__.py | 2 +- .../providers/plex_connect/manifest.json | 2 +- .../providers/plex_connect/playback.py | 290 +++ .../providers/plex_connect/player_remote.py | 1885 ----------------- .../providers/plex_connect/queue_commands.py | 448 ++++ .../providers/plex_connect/queue_sync.py | 394 ++++ .../providers/plex_connect/server.py | 371 ++++ .../providers/plex_connect/timeline.py | 331 +++ 8 files changed, 1836 insertions(+), 1887 deletions(-) create mode 100644 music_assistant/providers/plex_connect/playback.py delete mode 100644 music_assistant/providers/plex_connect/player_remote.py create mode 100644 music_assistant/providers/plex_connect/queue_commands.py create mode 100644 music_assistant/providers/plex_connect/queue_sync.py create mode 100644 music_assistant/providers/plex_connect/server.py create mode 100644 music_assistant/providers/plex_connect/timeline.py diff --git a/music_assistant/providers/plex_connect/__init__.py b/music_assistant/providers/plex_connect/__init__.py index aa7af4d89d..acc091f7e9 100644 --- a/music_assistant/providers/plex_connect/__init__.py +++ b/music_assistant/providers/plex_connect/__init__.py @@ -20,7 +20,7 @@ from music_assistant.models.plugin import PluginProvider -from .player_remote import PlayerRemoteInstance +from .server import PlayerRemoteInstance if TYPE_CHECKING: from music_assistant_models.config_entries import ConfigValueType, ProviderConfig diff --git a/music_assistant/providers/plex_connect/manifest.json b/music_assistant/providers/plex_connect/manifest.json index 91e3e7da12..95d293101c 100644 --- a/music_assistant/providers/plex_connect/manifest.json +++ b/music_assistant/providers/plex_connect/manifest.json @@ -1,6 +1,6 @@ { "type": "plugin", - "stage": "alpha", + "stage": "beta", "domain": "plex_connect", "name": "Plex Connect Plugin", "description": "Makes a Music Assistant player appear as a device in the official Plex apps.", diff --git a/music_assistant/providers/plex_connect/playback.py b/music_assistant/providers/plex_connect/playback.py new file mode 100644 index 0000000000..e495cd2add --- /dev/null +++ b/music_assistant/providers/plex_connect/playback.py @@ -0,0 +1,290 @@ +"""Playback control command handlers for Plex remote control.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING + +from aiohttp import web +from music_assistant_models.enums import PlayerFeature, PlayerType, RepeatMode + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class PlaybackMixin: + """Mixin providing playback control command handlers.""" + + if TYPE_CHECKING: + provider: PlexProvider + _ma_player_id: str | None + _updating_from_plex: bool + play_queue_id: str | None + play_queue_item_ids: dict[int, int] + _last_synced_ma_queue_length: int + _last_synced_ma_queue_keys: list[str] + + async def _broadcast_timeline(self) -> None: ... + + async def _create_plex_playqueue_from_ma(self) -> None: ... + + def _collect_synced_keys(self, player_id: str) -> list[str]: ... + + async def _ungroup_player_if_needed(self, player_id: str) -> None: + """Ungroup player before playback if it's part of a group/sync. + + :param player_id: The player ID to potentially ungroup. + """ + player = self.provider.mass.players.get_player(player_id) + if not player or player.type == PlayerType.GROUP: + return + + if not (player.state.synced_to or player.state.group_members or player.state.active_group): + return + + LOGGER.debug("Ungrouping player %s before starting playback from Plex", player.display_name) + if ( + player.state.active_group + and (group := self.provider.mass.players.get_player(player.state.active_group)) + and group.supports_feature(PlayerFeature.SET_MEMBERS) + ): + await group.set_members(player_ids_to_remove=[player_id]) + elif ( + player.state.synced_to + and (sync_leader := self.provider.mass.players.get_player(player.state.synced_to)) + and sync_leader.supports_feature(PlayerFeature.SET_MEMBERS) + ): + await sync_leader.set_members(player_ids_to_remove=[player_id]) + elif player.state.group_members and player.supports_feature(PlayerFeature.SET_MEMBERS): + await player.set_members(player_ids_to_remove=player.group_members) + + async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None: + """Seek to the specified offset after playback starts. + + :param player_id: The player ID to seek on. + :param offset: The offset in milliseconds. + """ + for _ in range(10): # Try up to 10 times (5 seconds total) + await asyncio.sleep(0.5) + queue = self.provider.mass.player_queues.get(player_id) + if queue and queue.current_item: + try: + await self.provider.mass.players.cmd_seek(player_id, offset // 1000) + await asyncio.sleep(0.1) + break + except Exception as e: + LOGGER.debug(f"Could not seek to offset {offset}ms: {e}") + break + else: + LOGGER.warning("Queue not ready for seeking after timeout") + + async def handle_pause(self, request: web.Request) -> web.Response: + """Handle pause command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self.provider.mass.players.cmd_pause(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_play(self, request: web.Request) -> web.Response: + """Handle play/resume command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self._ungroup_player_if_needed(self._ma_player_id) + await self.provider.mass.players.cmd_play(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_stop(self, request: web.Request) -> web.Response: + """Handle stop command - stops playback and clears the queue.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + self.provider.mass.player_queues.clear(self._ma_player_id) + self.play_queue_id = None + self.play_queue_item_ids = {} + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_skip_next(self, request: web.Request) -> web.Response: + """Handle skip next command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self.provider.mass.player_queues.next(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_skip_previous(self, request: web.Request) -> web.Response: + """Handle skip previous command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self.provider.mass.player_queues.previous(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_step_forward(self, request: web.Request) -> web.Response: + """Handle step forward command (small skip forward, 30 seconds).""" + self._updating_from_plex = True + try: + if self._ma_player_id: + queue = self.provider.mass.player_queues.get(self._ma_player_id) + if queue: + new_position = queue.corrected_elapsed_time + 30 + if queue.current_item and queue.current_item.media_item: + max_duration = queue.current_item.media_item.duration or new_position + new_position = min(new_position, max_duration) + await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) + await asyncio.sleep(0.1) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_step_back(self, request: web.Request) -> web.Response: + """Handle step back command (small skip backward, 10 seconds).""" + self._updating_from_plex = True + try: + if self._ma_player_id: + queue = self.provider.mass.player_queues.get(self._ma_player_id) + if queue: + new_position = max(0, queue.corrected_elapsed_time - 10) + await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) + await asyncio.sleep(0.1) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_seek_to(self, request: web.Request) -> web.Response: + """Handle seek command.""" + self._updating_from_plex = True + try: + offset_ms = int(request.query.get("offset", 0)) + if self._ma_player_id: + await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000)) + await asyncio.sleep(0.1) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_skip_to(self, request: web.Request) -> web.Response: + """Handle skip to specific queue item.""" + key = request.query.get("key") + if not self._ma_player_id or not key: + return web.Response(status=400, text="Missing player ID or key") + + self._updating_from_plex = True + try: + ma_index = None + + if key.isdigit(): + # Key is a play queue item ID — look up MA queue index + play_queue_item_id = int(key) + for idx, pq_item_id in self.play_queue_item_ids.items(): + if pq_item_id == play_queue_item_id: + ma_index = idx + break + + if ma_index is None: + LOGGER.warning( + f"Could not find MA queue index for play queue item ID: " + f"{play_queue_item_id}" + ) + return web.Response(status=404, text="Queue item not found") + + LOGGER.info( + f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})" + ) + else: + # Key is a library path — find track in MA queue by Plex key + queue_items = self.provider.mass.player_queues.items(self._ma_player_id) + if not queue_items: + return web.Response(status=404, text="Queue is empty") + + for idx, item in enumerate(queue_items): + if not item.media_item: + continue + for mapping in item.media_item.provider_mappings: + if ( + mapping.provider_instance == self.provider.instance_id + and mapping.item_id == key + ): + ma_index = idx + break + if ma_index is not None: + break + + if ma_index is None: + LOGGER.warning(f"Could not find track with key {key} in MA queue") + return web.Response(status=404, text="Track not found in queue") + + LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})") + + await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index) + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error handling skipTo: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False + + async def handle_set_parameters(self, request: web.Request) -> web.Response: + """Handle parameter changes (volume, shuffle, repeat).""" + if not self._ma_player_id: + return web.Response(status=200) + + self._updating_from_plex = True + shuffle_changed = False + try: + if "volume" in request.query: + volume = int(request.query["volume"]) + await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume) + + if "shuffle" in request.query: + shuffle = request.query["shuffle"] == "1" + await self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle) + shuffle_changed = True + + if "repeat" in request.query: + repeat_value = int(request.query["repeat"]) + if repeat_value == 0: + self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF) + elif repeat_value == 1: + self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE) + elif repeat_value == 2: + self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL) + + await self._broadcast_timeline() + finally: + self._updating_from_plex = False + + if shuffle_changed: + # MA shuffles the queue asynchronously; give it a moment then sync the + # new order back to Plex so Plexamp sees the updated play queue. + await asyncio.sleep(0.2) + await self._create_plex_playqueue_from_ma() + synced_keys = self._collect_synced_keys(self._ma_player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + return web.Response(status=200) diff --git a/music_assistant/providers/plex_connect/player_remote.py b/music_assistant/providers/plex_connect/player_remote.py deleted file mode 100644 index 0e32cd8ffe..0000000000 --- a/music_assistant/providers/plex_connect/player_remote.py +++ /dev/null @@ -1,1885 +0,0 @@ -"""Per-player Plex remote control instances.""" - -from __future__ import annotations - -import asyncio -import logging -import platform -import re -import time -import uuid -from collections.abc import Callable -from typing import TYPE_CHECKING, Any -from urllib.parse import urlparse - -from aiohttp import ClientTimeout, web -from music_assistant_models.enums import ( - EventType, - PlayerFeature, - PlayerType, - QueueOption, - RepeatMode, -) -from plexapi.playqueue import PlayQueue - -from .gdm import PlexGDMAdvertiser - -if TYPE_CHECKING: - from music_assistant_models.event import MassEvent - - from music_assistant.providers.plex import PlexProvider - - -LOGGER = logging.getLogger(__name__) - - -class PlayerRemoteInstance: - """Single remote control instance for one MA player.""" - - def __init__( - self, - plex_provider: PlexProvider, - ma_player_id: str, - player_name: str, - port: int, - device_class: str = "speaker", - remote_control: bool = False, - ) -> None: - """Initialize player remote instance. - - :param plex_provider: Plex provider instance. - :param ma_player_id: Music Assistant player ID. - :param player_name: Display name for the player. - :param port: Port for the remote control server. - :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). - :param remote_control: Whether to enable remote control. - """ - self.plex_provider = plex_provider - self.plex_server = plex_provider._plex_server - self.ma_player_id = ma_player_id - self.player_name = player_name - self.port = port - self.device_class = device_class - self.remote_control = remote_control - - self.client_id = str( - uuid.uuid5( - uuid.NAMESPACE_DNS, - f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}", - ) - ) - - if self.remote_control: - # Remote control server - self.server: PlexRemoteControlServer | None = None - # GDM advertiser - self.gdm: PlexGDMAdvertiser | None = None - - async def start(self) -> None: - """Start this player's remote control.""" - if self.remote_control: - # Create player-specific PlexServer instance with unique client identification - LOGGER.info( - f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}" - ) - - self.server = PlexRemoteControlServer( - plex_provider=self.plex_provider, - port=self.port, - client_id=self.client_id, - ma_player_id=self.ma_player_id, - device_class=self.device_class, - ) - LOGGER.info( - f"Remote control server for '{self.player_name}' bound to MA player: " - f"{self.ma_player_id}" - ) - - await self.server.start() - - # Step 4: Start GDM broadcasting - self.gdm = PlexGDMAdvertiser( - instance_id=self.client_id, - port=self.port, - publish_ip=str(self.plex_provider.mass.streams.publish_ip), - name=self.player_name, - product="Music Assistant", - version=self.plex_provider.mass.version - if self.plex_provider.mass.version != "0.0.0" - else "1.0.0", - ) - self.gdm.start() - - LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}") - - async def stop(self) -> None: - """Stop this player's remote control.""" - if self.remote_control: - if self.gdm: - await self.gdm.stop() - - if self.server: - await self.server.stop() - - LOGGER.info(f"Stopped remote control for player '{self.player_name}'") - - -class PlexRemoteControlServer: - """HTTP server to receive Plex remote control commands.""" - - def __init__( - self, - plex_provider: PlexProvider, - port: int = 32500, - client_id: str | None = None, - ma_player_id: str | None = None, - device_class: str = "speaker", - ) -> None: - """Initialize remote control server. - - :param plex_provider: Plex provider instance. - :param port: Port for the HTTP server. - :param client_id: Unique client identifier. - :param ma_player_id: Music Assistant player ID. - :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). - """ - self.provider = plex_provider - self.plex_server = plex_provider._plex_server - self.port = port - self.client_id = client_id or plex_provider.instance_id - self.device_class = device_class - self.app = web.Application() - self.subscriptions: dict[str, dict[str, object]] = {} - self.runner: web.AppRunner | None = None - self.http_site: web.TCPSite | None = None - - # Play queue tracking (Plex-specific state that doesn't exist in MA) - self.play_queue_id: str | None = None - self.play_queue_version: int = 1 - # Map queue index to item ID - self.play_queue_item_ids: dict[int, int] = {} - - # Track MA queue state to detect when we need to sync to Plex - self._last_synced_ma_queue_length: int = 0 - self._last_synced_ma_queue_keys: list[str] = [] - - # Specific MA player this server controls (set by PlayerRemoteInstance) - self._ma_player_id = ma_player_id - - # Store unsubscribe callbacks - self._unsub_callbacks: list[Callable[..., None]] = [] - - # Flag to prevent circular updates when we modify the queue ourselves - self._updating_from_plex = False - - self.player = self.provider.mass.players.get_player(self._ma_player_id) # type: ignore[arg-type] - - self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant" - - self.headers = { - "X-Plex-Device-Name": self.device_name, - "X-Plex-Session-Identifier": self.client_id, - "X-Plex-Client-Identifier": self.client_id, - "X-Plex-Product": "Music Assistant", - "X-Plex-Platform": "Music Assistant", - "X-Plex-Platform-Version": platform.release(), - } - - self._setup_routes() - - def _setup_routes(self) -> None: - """Set up all required endpoints.""" - # Root endpoint - self.app.router.add_get("/", self.handle_root) - - # Subscription management - self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe) - self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe) - self.app.router.add_get("/player/timeline/poll", self.handle_poll) - - # Playback commands - self.app.router.add_get("/player/playback/playMedia", self.handle_play_media) - self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue) - self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue) - self.app.router.add_get("/player/playback/pause", self.handle_pause) - self.app.router.add_get("/player/playback/play", self.handle_play) - self.app.router.add_get("/player/playback/stop", self.handle_stop) - self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next) - self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous) - self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward) - self.app.router.add_get("/player/playback/stepBack", self.handle_step_back) - self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to) - self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters) - self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to) - - # Resources endpoint - self.app.router.add_get("/resources", self.handle_resources) - - # CORS OPTIONS handler (for all routes) - self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options) - - # --- Catch-all fallback for debugging purposes --- - # self.app.router.add_route("*", "/{path_info:.*}", self.handle_unknown) - - async def start(self) -> None: - """Start HTTP server and GDM advertising.""" - self.runner = web.AppRunner(self.app) - await self.runner.setup() - - # Start HTTP server - self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port) - await self.http_site.start() - LOGGER.info(f"Plex remote control server started on HTTP port {self.port}") - - # Note: GDM advertising is handled by PlexProvider in __init__.py - # to avoid duplicate broadcasts - - # Subscribe to player and queue events for state synchronization - if self._ma_player_id: - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_player_event, - EventType.PLAYER_UPDATED, - id_filter=self._ma_player_id, - ) - ) - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_queue_event, - EventType.QUEUE_UPDATED, - id_filter=self._ma_player_id, - ) - ) - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_queue_event, - EventType.QUEUE_TIME_UPDATED, - id_filter=self._ma_player_id, - ) - ) - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_queue_items_updated, - EventType.QUEUE_ITEMS_UPDATED, - id_filter=self._ma_player_id, - ) - ) - - async def stop(self) -> None: - """Stop the HTTP server.""" - # Unsubscribe from events - for unsub in self._unsub_callbacks: - unsub() - self._unsub_callbacks.clear() - - # Stop HTTP server - if self.http_site: - await self.http_site.stop() - if self.runner: - await self.runner.cleanup() - LOGGER.info("Plex remote control server stopped") - - async def handle_root(self, request: web.Request) -> web.Response: - """Handle root endpoint - return basic player info.""" - # Get player name - player_name = "Music Assistant" - if self._ma_player_id: - player = self.provider.mass.players.get_player(self._ma_player_id) - if player: - player_name = player.display_name - - xml = f""" - - -""" - return web.Response( - text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} - ) - - async def handle_subscribe(self, request: web.Request) -> web.Response: - """Handle timeline subscription from controller.""" - client_id = request.headers.get("X-Plex-Client-Identifier") - protocol = request.query.get("protocol", "http") - port = request.query.get("port") - command_id = int(request.query.get("commandID", 0)) - - if not client_id or not port: - return web.Response(status=400) - - self.subscriptions[client_id] = { - "url": f"{protocol}://{request.remote}:{port}", - "command_id": command_id, - "last_update": time.time(), - } - - LOGGER.info(f"Controller {client_id} subscribed for timeline updates") - await self._send_timeline(client_id) - return web.Response(status=200) - - async def handle_unsubscribe(self, request: web.Request) -> web.Response: - """Handle unsubscribe request.""" - client_id = request.headers.get("X-Plex-Client-Identifier") - if client_id in self.subscriptions: - del self.subscriptions[client_id] - LOGGER.info(f"Controller {client_id} unsubscribed") - return web.Response(status=200) - - async def handle_poll(self, request: web.Request) -> web.Response: - """Handle timeline poll request.""" - # Extract parameters - include_metadata = request.query.get("includeMetadata", "0") == "1" - command_id = request.query.get("commandID", "0") - - # Update subscription timestamp if this client is subscribed - client_id = request.headers.get("X-Plex-Client-Identifier") - if client_id and client_id in self.subscriptions: - self.subscriptions[client_id]["last_update"] = time.time() - - # Build timeline from current MA player state - timeline_xml = await self._build_timeline_xml( - include_metadata=include_metadata, command_id=command_id - ) - return web.Response( - text=timeline_xml, - content_type="text/xml", - headers={ - "X-Plex-Client-Identifier": self.client_id, - "Access-Control-Expose-Headers": "X-Plex-Client-Identifier", - "Access-Control-Allow-Origin": "*", - }, - ) - - async def _ungroup_player_if_needed(self, player_id: str) -> None: - """Ungroup player before playback if it's part of a group/sync.""" - player = self.provider.mass.players.get_player(player_id) - if not player or player.type == PlayerType.GROUP: - return - - if not (player.state.synced_to or player.state.group_members or player.state.active_group): - return - - LOGGER.debug("Ungrouping player %s before starting playback from Plex", player.display_name) - # Use set_members directly on the group to bypass static member check - if ( - player.state.active_group - and (group := self.provider.mass.players.get_player(player.state.active_group)) - and group.supports_feature(PlayerFeature.SET_MEMBERS) - ): - await group.set_members(player_ids_to_remove=[player_id]) - elif ( - player.state.synced_to - and (sync_leader := self.provider.mass.players.get_player(player.state.synced_to)) - and sync_leader.supports_feature(PlayerFeature.SET_MEMBERS) - ): - await sync_leader.set_members(player_ids_to_remove=[player_id]) - elif player.state.group_members and player.supports_feature(PlayerFeature.SET_MEMBERS): - await player.set_members(player_ids_to_remove=player.group_members) - - async def handle_play_media(self, request: web.Request) -> web.Response: - """ - Handle playMedia command from Plex controller. - - Plexamp sends various parameters: - - key: The item to play (track, album, playlist, etc.) - - containerKey: The container context (play queue) - - offset: Starting position in milliseconds - - shuffle: Whether to shuffle - - repeat: Repeat mode - """ - # Set flag to prevent circular updates - self._updating_from_plex = True - try: - key = request.query.get("key") - container_key = request.query.get("containerKey") - offset = int(request.query.get("offset", 0)) - shuffle = request.query.get("shuffle", "0") == "1" - - if not key: - return web.Response( - status=400, text="Missing required 'key' parameter for playMedia command" - ) - - LOGGER.info( - f"Received playMedia command - key: {key}, " - f"containerKey: {container_key}, offset: {offset}ms" - ) - - # Use the assigned player for this server instance - player_id = self._ma_player_id - if not player_id: - return web.Response(status=500, text="No player assigned to this server") - - # Ungroup player if it's part of a group/sync - # User selected this specific player, so remove from any groups - await self._ungroup_player_if_needed(player_id) - - if container_key and "/playQueues/" in container_key: - # Extract play queue ID from container key - queue_id_match = re.search(r"/playQueues/(\d+)", container_key) - if queue_id_match: - self.play_queue_id = queue_id_match.group(1) - self.play_queue_version = 1 - LOGGER.info(f"Playing from queue: {container_key} starting at {key}") - - await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset) - else: - # Reset queue tracking if no valid queue ID found - self.play_queue_id = None - self.play_queue_item_ids = {} - # Fall back to single track - media = await self._resolve_plex_item(key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=media, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - elif container_key: - # Playing from a regular container (album, playlist, artist) not a play queue - # Reset queue tracking - self.play_queue_id = None - self.play_queue_item_ids = {} - - # The key is the specific track, containerKey is the collection - media_to_play = await self._resolve_plex_item(container_key) - - # Queue the entire container - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=media_to_play, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - - else: - # Playing a single item, reset queue tracking - self.play_queue_id = None - self.play_queue_item_ids = {} - - media = await self._resolve_plex_item(key) - - # Replace the queue with this media - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=media, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - - # Set shuffle if requested - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) - - # Seek to offset if specified - if offset > 0: - await self._seek_to_offset_after_playback(player_id, offset) - - await self._broadcast_timeline() - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error handling playMedia: {e}") - return web.Response(status=500, text=str(e)) - finally: - # Clear flag after processing - self._updating_from_plex = False - - def _reorder_tracks_for_playback( - self, tracks: list[Any], start_index: int - ) -> tuple[list[Any], dict[int, int]]: - """Reorder tracks to start from a specific index and update item ID mappings. - - :param tracks: List of tracks to reorder. - :param start_index: Index of the track to start from. - :return: Tuple of (reordered tracks, updated item ID mappings). - """ - if start_index <= 0 or start_index >= len(tracks): - # No reordering needed - return tracks, self.play_queue_item_ids - - # Reorder: [selected track, tracks after it, tracks before it] - reordered_tracks = ( - tracks[start_index:] # From selected to end - + tracks[:start_index] # From start to selected - ) - - # Update play queue item ID mappings to reflect new order - new_item_ids = {} - for new_idx, old_idx in enumerate( - list(range(start_index, len(tracks))) + list(range(start_index)) - ): - if old_idx in self.play_queue_item_ids: - new_item_ids[new_idx] = self.play_queue_item_ids[old_idx] - - LOGGER.info(f"Started playback from offset {start_index} (reordered queue)") - return reordered_tracks, new_item_ids - - async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None: - """Seek to the specified offset after playback starts. - - :param player_id: The player ID to seek on. - :param offset: The offset in milliseconds. - """ - # Wait for the queue to have items loaded before seeking - for _ in range(10): # Try up to 10 times (5 seconds total) - await asyncio.sleep(0.5) - queue = self.provider.mass.player_queues.get(player_id) - if queue and queue.current_item: - try: - await self.provider.mass.players.cmd_seek(player_id, offset // 1000) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - break - except Exception as e: - LOGGER.debug(f"Could not seek to offset {offset}ms: {e}") - break - else: - LOGGER.warning("Queue not ready for seeking after timeout") - - async def _play_from_plex_queue( - self, - player_id: str, - container_key: str, - starting_key: str | None, - shuffle: bool, - offset: int, - ) -> None: - """Fetch play queue from Plex and load tracks. - - Starts playback immediately with the first track, - then loads remaining tracks in the background. - """ - try: - LOGGER.info(f"Fetching play queue: {container_key}") - - # Extract queue ID from container_key (e.g., "/playQueues/123" -> "123") - queue_id_match = re.search(r"/playQueues/(\d+)", container_key) - if not queue_id_match: - raise ValueError(f"Invalid container_key format: {container_key}") - - queue_id = queue_id_match.group(1) - - # Use plexapi to fetch the play queue - def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id) - - playqueue = await asyncio.to_thread(fetch_queue) - - if playqueue and playqueue.items: - # Get selected item offset from PlayQueue - this tells us which track to start from - selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0) - LOGGER.info(f"PlayQueue selected item offset: {selected_offset}") - - # Track play queue item IDs - self.play_queue_item_ids = {} - - # Fetch the first track to start playback immediately - first_item = ( - playqueue.items[selected_offset] - if selected_offset < len(playqueue.items) - else playqueue.items[0] - ) - first_track_key = first_item.key if hasattr(first_item, "key") else None - first_play_queue_item_id = ( - first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None - ) - - if not first_track_key: - LOGGER.error("No valid first track in play queue") - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - return - - # Fetch and start playing the first track immediately - try: - first_track = await self.provider.get_track(first_track_key) - LOGGER.info(f"Starting playback with first track: {first_track.name}") - - # Store first track's play queue item ID mapping - if first_play_queue_item_id: - self.play_queue_item_ids[0] = first_play_queue_item_id - - # Start playback immediately with just the first track - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=first_track, - option=QueueOption.REPLACE, - ) - - # Seek to offset if specified (do this before loading remaining tracks) - if offset > 0: - await self._seek_to_offset_after_playback(player_id, offset) - - # Broadcast timeline update immediately - await self._broadcast_timeline() - - # Now load the remaining tracks in the background - self.provider.mass.create_task( - self._load_remaining_queue_tracks( - player_id, playqueue, selected_offset, shuffle - ) - ) - - except Exception as e: - LOGGER.exception(f"Error starting playback with first track: {e}") - # Fall back to single track - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - else: - LOGGER.error("Play queue is empty or could not be fetched") - # Fall back to single track - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - - except Exception as e: - LOGGER.exception(f"Error playing from queue: {e}") - # Fall back to single track - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - - async def _load_remaining_queue_tracks( - self, - player_id: str, - playqueue: PlayQueue, - selected_offset: int, - shuffle: bool, - ) -> None: - """Load remaining tracks from play queue in the background. - - :param player_id: The Music Assistant player ID. - :param playqueue: The Plex play queue. - :param selected_offset: The offset of the track that's already playing. - :param shuffle: Whether shuffle is enabled. - """ - try: - # Prepare to fetch all tracks except the first one - remaining_items = [] - - # Get items after selected track - for i in range(selected_offset + 1, len(playqueue.items)): - remaining_items.append((i, playqueue.items[i])) - - # Get items before selected track (these will be added at the end) - for i in range(selected_offset): - remaining_items.append((i, playqueue.items[i])) - - if not remaining_items: - LOGGER.debug("No remaining tracks to load") - return - - # Fetch all remaining tracks concurrently - async def fetch_track( - plex_idx: int, item: Any - ) -> tuple[int, object | None, int | None]: - """Fetch a single track from Plex.""" - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = ( - item.playQueueItemID if hasattr(item, "playQueueItemID") else None - ) - - if track_key: - try: - track = await self.provider.get_track(track_key) - return plex_idx, track, play_queue_item_id - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - - return plex_idx, None, None - - # Fetch all tracks in parallel - fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items] - results = await asyncio.gather(*fetch_tasks, return_exceptions=True) - - # Process results and build track list - tracks_to_add: list[object] = [] - for result in results: - if isinstance(result, Exception): - LOGGER.debug(f"Error fetching track: {result}") - continue - - # result is guaranteed to be a tuple here after the Exception check - _plex_idx, track, play_queue_item_id = result # type: ignore[misc] - if track: - ma_idx = len(tracks_to_add) + 1 # +1 because first track is already queued - tracks_to_add.append(track) - - # Store play queue item ID mapping - if play_queue_item_id: - self.play_queue_item_ids[ma_idx] = play_queue_item_id - - if tracks_to_add: - LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue") - - # Add remaining tracks to the queue - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=tracks_to_add, # type: ignore[arg-type] - option=QueueOption.ADD, - ) - - # Update tracked state to prevent sync loop - queue_items = self.provider.mass.player_queues.items(player_id) - synced_keys = [] - for item in queue_items: - if item.media_item: - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - synced_keys.append(mapping.item_id) - break - self._last_synced_ma_queue_length = len(synced_keys) - self._last_synced_ma_queue_keys = synced_keys - - # Apply shuffle if requested (after all tracks are loaded) - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) - - LOGGER.info( - f"Successfully loaded {len(tracks_to_add)} remaining tracks " - f"(total queue: {len(synced_keys)} tracks)" - ) - else: - LOGGER.warning("No valid remaining tracks found in play queue") - - except Exception as e: - LOGGER.exception(f"Error loading remaining queue tracks: {e}") - - async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: - """Replace the entire queue when nothing is currently playing. - - :param player_id: The Music Assistant player ID. - :param playqueue: The Plex play queue to load. - """ - all_tracks = [] - self.play_queue_item_ids = {} - - for i, item in enumerate(playqueue.items): - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None - - if track_key: - try: - track = await self.provider.get_track(track_key) - all_tracks.append(track) - - if play_queue_item_id: - self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - continue - - if all_tracks: - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=all_tracks, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks") - - async def _replace_remaining_queue( - self, player_id: str, playqueue: PlayQueue, current_index: int - ) -> None: - """Replace only items after the current track. - - :param player_id: The Music Assistant player ID. - :param playqueue: The Plex play queue to load. - :param current_index: The current track index in the MA queue. - """ - # Fetch tracks that come AFTER the current track in the Plex queue - remaining_tracks = [] - new_item_mappings = {} - - # Start from the track after current_index - for i in range(current_index + 1, len(playqueue.items)): - item = playqueue.items[i] - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None - - if track_key: - try: - track = await self.provider.get_track(track_key) - remaining_tracks.append(track) - - # Map relative to the current position - if play_queue_item_id: - new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = ( - play_queue_item_id - ) - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - continue - - # Replace items after current track - if remaining_tracks: - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=remaining_tracks, # type: ignore[arg-type] - option=QueueOption.REPLACE_NEXT, # Replace everything after current - ) - # Update mappings for the new items - self.play_queue_item_ids.update(new_item_mappings) - - LOGGER.info( - f"Replaced {len(remaining_tracks)} tracks after current track " - f"(index {current_index})" - ) - else: - # No tracks after current - clear remaining queue - LOGGER.debug("No tracks after current track in Plex queue") - - # Rebuild complete item ID mappings from Plex queue - # Keep mappings for tracks from index 0 to current_index unchanged - for i, item in enumerate(playqueue.items): - play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None - if play_queue_item_id: - self.play_queue_item_ids[i] = play_queue_item_id - - async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: - """ - Handle refreshPlayQueue command from Plex controller. - - This is called when the play queue is modified (items added, removed, reordered). - We need to sync the entire updated queue state to MA while preserving playback. - """ - # Set flag to prevent circular updates - self._updating_from_plex = True - try: - play_queue_id = request.query.get("playQueueID") - - if not play_queue_id: - return web.Response(status=400, text="Missing 'playQueueID' parameter") - - # Log all query parameters to understand what Plex sends - LOGGER.info( - f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, " - f"params: {dict(request.query)}" - ) - - # Verify this is our active play queue - if self.play_queue_id != play_queue_id: - LOGGER.warning( - f"Refresh requested for queue {play_queue_id} but active queue is " - f"{self.play_queue_id}" - ) - return web.Response( - status=409, - text=( - f"Requested playQueueID {play_queue_id} does not match " - f"active queue {self.play_queue_id}" - ), - ) - - # Update the play queue version (increments on each refresh) - self.play_queue_version += 1 - - # Use plexapi to fetch the updated play queue - def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id) - - playqueue = await asyncio.to_thread(fetch_queue) - - if not playqueue or not playqueue.items: - LOGGER.error("Failed to refresh play queue - queue is empty or not found") - return web.Response(status=404, text="Play queue not found") - - # Get current MA queue state - player_id = self._ma_player_id - if not player_id: - LOGGER.error("No player assigned to this server") - return web.Response(status=500, text="No player assigned") - - # disable shuffle to avoid infinite loop - await self.provider.mass.player_queues.set_shuffle(player_id, False) - ma_queue = self.provider.mass.player_queues.get(player_id) - if not ma_queue: - LOGGER.error(f"MA queue not found for player {player_id}") - return web.Response(status=500, text="MA queue not found") - - # Get current playback state - current_index = ma_queue.current_index - - # Get MA queue item count - ma_queue_items = self.provider.mass.player_queues.items(player_id) - ma_queue_count = len(ma_queue_items) if ma_queue_items else 0 - - LOGGER.debug( - f"Queue refresh: Current index={current_index}, " - f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items" - ) - - # If nothing is playing, replace the entire queue - if current_index is None: - LOGGER.debug("No track currently playing, replacing entire queue") - await self._replace_entire_queue(player_id, playqueue) - else: - # Something is playing - update only the remaining queue items - LOGGER.debug( - f"Track at index {current_index} is playing, " - f"replacing only items after current track" - ) - await self._replace_remaining_queue(player_id, playqueue, current_index) - - LOGGER.info( - f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items" - ) - - # Update tracked state to prevent sync loop - # Get what's actually in MA queue after the refresh - queue_items_after = self.provider.mass.player_queues.items(player_id) - synced_keys = [] - for item in queue_items_after: - if item.media_item: - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - synced_keys.append(mapping.item_id) - break - self._last_synced_ma_queue_length = len(synced_keys) - self._last_synced_ma_queue_keys = synced_keys - - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error handling refreshPlayQueue: {e}") - return web.Response(status=500, text=str(e)) - finally: - self._updating_from_plex = False - - async def handle_create_play_queue(self, request: web.Request) -> web.Response: - """ - Handle createPlayQueue command from Plex controller. - - Creates a new play queue from a URI (album, playlist, artist tracks, etc.) - and optionally applies shuffle. - """ - try: - uri = request.query.get("uri") - shuffle = request.query.get("shuffle", "0") == "1" - continuous = request.query.get("continuous", "0") == "1" - - if not uri: - return web.Response(status=400, text="Missing 'uri' parameter") - - LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}") - - # Use the assigned player for this server instance - player_id = self._ma_player_id - if not player_id: - return web.Response(status=500, text="No player assigned to this server") - - # Use plexapi to create play queue - def create_queue() -> PlayQueue: - # Fetch the item from URI first - item = self.provider._plex_server.fetchItem(uri) - # Create play queue from the item - return PlayQueue.create( - self.provider._plex_server, - item, - shuffle=1 if shuffle else 0, - continuous=1 if continuous else 0, - ) - - playqueue = await asyncio.to_thread(create_queue) - - if playqueue and playqueue.items: - # Extract play queue ID from response - self.play_queue_id = str(playqueue.playQueueID) - self.play_queue_version = 1 - - LOGGER.info( - f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items" - ) - - # Fetch the first track to start playback immediately - self.play_queue_item_ids = {} - first_item = playqueue.items[0] - first_track_key = first_item.key if hasattr(first_item, "key") else None - first_play_queue_item_id = ( - first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None - ) - - if not first_track_key: - LOGGER.error("No valid first track in created play queue") - return web.Response(status=500, text="Failed to load tracks from play queue") - - try: - # Fetch and start playing the first track immediately - first_track = await self.provider.get_track(first_track_key) - LOGGER.info(f"Starting playback with first track: {first_track.name}") - - # Store first track's play queue item ID mapping - if first_play_queue_item_id: - self.play_queue_item_ids[0] = first_play_queue_item_id - - # Start playback immediately with just the first track - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=first_track, - option=QueueOption.REPLACE, - ) - - # Now load the remaining tracks in the background - if len(playqueue.items) > 1: - self.provider.mass.create_task( - self._load_remaining_queue_tracks( - player_id, - playqueue, - 0, # Selected offset is 0 since we started from the first track - shuffle, - ) - ) - - # Broadcast timeline update - await self._broadcast_timeline() - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error starting playback with first track: {e}") - return web.Response(status=500, text=f"Failed to start playback: {e}") - else: - LOGGER.error("Failed to create play queue or queue is empty") - return web.Response(status=500, text="Failed to create play queue") - - except Exception as e: - LOGGER.exception(f"Error handling createPlayQueue: {e}") - return web.Response(status=500, text=str(e)) - - async def _resolve_plex_item(self, key: str) -> object: - """Resolve a Plex key to a Music Assistant media item.""" - # Determine item type from the key format - if "/library/metadata/" in key: - # Could be track, album, or artist - # Try to fetch as track first - try: - return await self.provider.get_track(key) - except Exception as exc: - LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}") - - # Try as album - try: - return await self.provider.get_album(key) - except Exception as exc: - LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}") - - # Try as artist - try: - return await self.provider.get_artist(key) - except Exception: - raise ValueError(f"Could not resolve Plex item: {key}") from None - - elif "/playlists/" in key: - return await self.provider.get_playlist(key) - else: - raise ValueError(f"Unknown Plex key format: {key}") - - async def handle_pause(self, request: web.Request) -> web.Response: - """Handle pause command (test-client.py line 98-101).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - await self.provider.mass.players.cmd_pause(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_play(self, request: web.Request) -> web.Response: - """Handle play/resume command (test-client.py line 103-106).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - # Ungroup player before resuming playback - await self._ungroup_player_if_needed(self._ma_player_id) - await self.provider.mass.players.cmd_play(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_stop(self, request: web.Request) -> web.Response: - """Handle stop command - stops playback and clears the queue.""" - self._updating_from_plex = True - try: - if self._ma_player_id: - # Clear the queue (which also stops playback) - self.provider.mass.player_queues.clear(self._ma_player_id) - - # Reset play queue tracking since the queue is now cleared - self.play_queue_id = None - self.play_queue_item_ids = {} - - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_skip_next(self, request: web.Request) -> web.Response: - """Handle skip next command.""" - self._updating_from_plex = True - try: - if self._ma_player_id: - await self.provider.mass.player_queues.next(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_skip_previous(self, request: web.Request) -> web.Response: - """Handle skip previous command.""" - self._updating_from_plex = True - try: - if self._ma_player_id: - await self.provider.mass.player_queues.previous(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_step_forward(self, request: web.Request) -> web.Response: - """Handle step forward command (small skip forward).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - queue = self.provider.mass.player_queues.get(self._ma_player_id) - if queue: - # Step forward 30 seconds - new_position = queue.corrected_elapsed_time + 30 - if queue.current_item and queue.current_item.media_item: - # Don't seek past the track duration - max_duration = queue.current_item.media_item.duration or new_position - new_position = min(new_position, max_duration) - await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_step_back(self, request: web.Request) -> web.Response: - """Handle step back command (small skip backward).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - queue = self.provider.mass.player_queues.get(self._ma_player_id) - if queue: - # Step back 10 seconds - new_position = max(0, queue.corrected_elapsed_time - 10) - await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_skip_to(self, request: web.Request) -> web.Response: - """Handle skip to specific queue item.""" - key = request.query.get("key") - if not self._ma_player_id or not key: - return web.Response(status=400, text="Missing player ID or key") - - self._updating_from_plex = True - try: - ma_index = None - - # Check if key is a play queue item ID (numeric) or a library path - if key.isdigit(): - # Key is a play queue item ID - play_queue_item_id = int(key) - - # Find the MA queue index for this play queue item ID - for idx, pq_item_id in self.play_queue_item_ids.items(): - if pq_item_id == play_queue_item_id: - ma_index = idx - break - - if ma_index is None: - LOGGER.warning( - f"Could not find MA queue index for play queue item ID: " - f"{play_queue_item_id}" - ) - return web.Response(status=404, text="Queue item not found") - - LOGGER.info( - f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})" - ) - else: - # Key is a library path (e.g., "/library/metadata/856761") - # Find the track in the MA queue by matching the Plex key - queue_items = self.provider.mass.player_queues.items(self._ma_player_id) - if not queue_items: - return web.Response(status=404, text="Queue is empty") - - for idx, item in enumerate(queue_items): - if not item.media_item: - continue - - # Find Plex mapping for this track - for mapping in item.media_item.provider_mappings: - if ( - mapping.provider_instance == self.provider.instance_id - and mapping.item_id == key - ): - ma_index = idx - break - - if ma_index is not None: - break - - if ma_index is None: - LOGGER.warning(f"Could not find track with key {key} in MA queue") - return web.Response(status=404, text="Track not found in queue") - - LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})") - - # Skip to this index in the MA queue - await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index) - - await self._broadcast_timeline() - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error handling skipTo: {e}") - return web.Response(status=500, text=str(e)) - finally: - self._updating_from_plex = False - - async def handle_seek_to(self, request: web.Request) -> web.Response: - """Handle seek command.""" - self._updating_from_plex = True - try: - offset_ms = int(request.query.get("offset", 0)) - if self._ma_player_id: - await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000)) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_set_parameters(self, request: web.Request) -> web.Response: - """Handle parameter changes (volume, shuffle, repeat).""" - if not self._ma_player_id: - return web.Response(status=200) - - self._updating_from_plex = True - try: - if "volume" in request.query: - volume = int(request.query["volume"]) - await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume) - - if "shuffle" in request.query: - # Plex sends shuffle as "0" or "1" - shuffle = request.query["shuffle"] == "1" - await self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle) - - if "repeat" in request.query: - # Plex repeat: 0=off, 1=repeat one, 2=repeat all - repeat_value = int(request.query["repeat"]) - - # Map Plex repeat to MA repeat mode - if repeat_value == 0: - # Repeat off - self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF) - elif repeat_value == 1: - # Repeat one track - self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE) - elif repeat_value == 2: - # Repeat all - self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL) - - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_options(self, request: web.Request) -> web.Response: - """Handle OPTIONS requests for CORS (like test-client.py).""" - return web.Response( - status=200, - headers={ - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "GET, POST, OPTIONS", - "Access-Control-Allow-Headers": "*", - }, - ) - - async def handle_resources(self, request: web.Request) -> web.Response: - """Return player information (matching test-client.py format exactly).""" - # Get player name - player_name = "Music Assistant" - if self._ma_player_id: - player = self.provider.mass.players.get_player(self._ma_player_id) - if player: - player_name = player.display_name - - # Get player state - state = "stopped" - if self._ma_player_id: - player = self.provider.mass.players.get_player(self._ma_player_id) - if player and player.state: - state_value = ( - player.state.value if hasattr(player.state, "value") else str(player.state) - ) - if state_value in ["playing", "paused"]: - state = state_value - - local_ip = self.provider.mass.streams.publish_ip - version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0" - - # Match test-client.py format exactly - xml = f""" - - - - -""" - return web.Response( - text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} - ) - - def _build_timeline_attributes( - self, - track: Any, - state: str, - duration: int, - time: int, - volume: int, - shuffle: int, - repeat: int, - controllable: str, - queue: Any | None, - ) -> list[str]: - """Build timeline attributes for a playing track. - - :param track: The current track media item. - :param state: Playback state (playing, paused, etc.). - :param duration: Track duration in milliseconds. - :param time: Current playback time in milliseconds. - :param volume: Volume level (0-100). - :param shuffle: Shuffle state (0 or 1). - :param repeat: Repeat mode (0=off, 1=one, 2=all). - :param controllable: Controllable features string. - :param queue: The MA queue object. - :return: List of timeline attribute strings. - """ - # Get Plex key and ratingKey - key = None - rating_key = None - for mapping in track.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - key = mapping.item_id - rating_key = key.split("/")[-1] - break - - if not key: - return [] - - # Server identification - plex_url = urlparse(self.provider._baseurl) - machine_identifier = self.provider._plex_server.machineIdentifier - address = plex_url.hostname - port = plex_url.port or (443 if plex_url.scheme == "https" else 32400) - protocol = plex_url.scheme - - # Build timeline attributes - attrs = [ - f'state="{state}"', - f'duration="{duration}"', - f'time="{time}"', - f'ratingKey="{rating_key}"', - f'key="{key}"', - ] - - # Add play queue info if available - if self.play_queue_id and queue: - if queue.current_index is not None: - play_queue_item_id = self.play_queue_item_ids.get( - queue.current_index, queue.current_index + 1 - ) - attrs.append(f'playQueueItemID="{play_queue_item_id}"') - attrs.append(f'playQueueID="{self.play_queue_id}"') - attrs.append(f'playQueueVersion="{self.play_queue_version}"') - attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"') - - # Add standard attributes - attrs.extend( - [ - 'type="music"', - f'volume="{volume}"', - f'shuffle="{shuffle}"', - f'repeat="{repeat}"', - f'controllable="{controllable}"', - f'machineIdentifier="{machine_identifier}"', - f'address="{address}"', - f'port="{port}"', - f'protocol="{protocol}"', - ] - ) - - return attrs - - async def _build_timeline_xml( - self, include_metadata: bool = False, command_id: str = "0" - ) -> str: - """Build timeline XML from current Music Assistant player state.""" - player_id = self._ma_player_id - - # Get MA player and queue - player = self.provider.mass.players.get_player(player_id) if player_id else None - queue = self.provider.mass.player_queues.get(player_id) if player_id else None - - # Controllable features for music - controllable = ( - "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext" - ) - - # Map MA playback state to Plex state (stopped, paused, playing, buffering, error) - state = "stopped" - if player and player.playback_state: - state_value = ( - player.playback_state.value - if hasattr(player.playback_state, "value") - else str(player.playback_state) - ) - - # Map MA states to Plex states - if state_value == "playing": - state = "playing" - elif state_value == "paused": - state = "paused" - elif state_value == "buffering": - state = "buffering" - elif state_value == "idle": - # Idle with a current track = paused, idle without track = stopped - state = ( - "paused" - if queue and queue.current_item and queue.current_item.media_item - else "stopped" - ) - else: - state = "stopped" - - # Get volume (0-100) - use group_volume for groups, volume_level for others - volume = 0 - if player: - volume = ( - int(player.group_volume or 0) - if (player.type == PlayerType.GROUP or player.group_members) - else (int(player.volume_level or 0)) - ) - - # Get shuffle (0/1) and repeat (0=off, 1=one, 2=all) - shuffle = 0 - repeat = 0 - if queue: - shuffle = 1 if queue.shuffle_enabled else 0 - if hasattr(queue, "repeat_mode"): - repeat_mode = queue.repeat_mode - if hasattr(repeat_mode, "value"): - repeat_value = repeat_mode.value - if repeat_value == "one": - repeat = 1 - elif repeat_value == "all": - repeat = 2 - - # Build music timeline - if ( - state in ["playing", "paused"] - and queue - and queue.current_item - and queue.current_item.media_item - ): - track = queue.current_item.media_item - - # Duration in milliseconds - duration = round(track.duration * 1000) if track.duration else 0 - - # Current playback time in milliseconds - time = round(queue.corrected_elapsed_time * 1000) - - # Build timeline attributes - attrs = self._build_timeline_attributes( - track, state, duration, time, volume, shuffle, repeat, controllable, queue - ) - - if attrs: - music_timeline = f"" - else: - # No Plex mapping, send basic timeline with actual state - music_timeline = ( - f'' - ) - else: - # No current track - send stopped state with time=0 - time = 0 - music_timeline = ( - f'' - ) - - # Video and photo timelines (always stopped for music player) - video_timeline = '' - photo_timeline = '' - - # Combine all timelines - return ( - f'' - f"{music_timeline}{video_timeline}{photo_timeline}" - f"" - ) - - async def _handle_player_event(self, event: MassEvent) -> None: - """Handle player state change events.""" - if not self._ma_player_id or event.object_id != self._ma_player_id: - return - - # Skip if we're the ones making the changes - if self._updating_from_plex: - return - - try: - # Send timeline to Plex server (for activity tracking) - await self._send_timeline_to_server() - - # Broadcast timeline to subscribed controllers - # Timeline will be built from current MA player state - await self._broadcast_timeline() - except Exception as e: - LOGGER.debug(f"Error handling player event: {e}") - - async def _handle_queue_event(self, event: MassEvent) -> None: - """Handle queue change events.""" - if not self._ma_player_id or event.object_id != self._ma_player_id: - return - - # Skip if we're the ones making the changes - if self._updating_from_plex: - return - - try: - # Send timeline to Plex server (for activity tracking) - await self._send_timeline_to_server() - - # Broadcast timeline to subscribed controllers - # Timeline will be built from current MA player state - await self._broadcast_timeline() - except Exception as e: - LOGGER.debug(f"Error handling queue event: {e}") - - async def _handle_queue_items_updated(self, event: MassEvent) -> None: - """Handle queue items being added/removed/reordered.""" - if not self._ma_player_id or event.object_id != self._ma_player_id: - return - - # Skip if we're the ones making the changes - if self._updating_from_plex: - return - - # Get current MA queue state - queue_items = self.provider.mass.player_queues.items(self._ma_player_id) - if not queue_items: - return - - current_keys = [] - for item in queue_items: - if not item.media_item: - continue - # Find Plex mapping - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - current_keys.append(mapping.item_id) - break - - # Check if queue actually changed from what we last synced FROM Plex - if ( - len(current_keys) == self._last_synced_ma_queue_length - and current_keys == self._last_synced_ma_queue_keys - ): - # Queue hasn't changed from last sync, skip - LOGGER.debug("MA queue matches last synced state, skipping Plex sync") - return - - LOGGER.info( - f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items" - ) - - # (Re)create Plex PlayQueue from MA queue - try: - await self._create_plex_playqueue_from_ma() - # Update tracked state - self._last_synced_ma_queue_length = len(current_keys) - self._last_synced_ma_queue_keys = current_keys - except Exception as e: - LOGGER.debug(f"Error creating Plex PlayQueue: {e}") - - # Broadcast timeline update - try: - await self._broadcast_timeline() - except Exception as e: - LOGGER.debug(f"Error broadcasting timeline: {e}") - - async def _create_plex_playqueue_from_ma(self) -> None: - """Create a new Plex PlayQueue from current MA queue.""" - ma_queue = self.provider.mass.player_queues.get(self._ma_player_id) # type: ignore[arg-type] - queue_items = self.provider.mass.player_queues.items(self._ma_player_id) # type: ignore[arg-type] - - if not ma_queue or not queue_items: - return - - # Fetch Plex items for all tracks in MA queue - async def fetch_plex_item(plex_key: str) -> object | None: - """Fetch a single Plex item.""" - try: - - def fetch_item() -> object: - return self.plex_server.fetchItem(plex_key) - - return await asyncio.to_thread(fetch_item) - except Exception as e: - LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}") - return None - - # Collect all fetch tasks - fetch_tasks = [] - for item in queue_items: - if not item.media_item: - continue - - # Find Plex mapping - plex_key = None - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - plex_key = mapping.item_id - break - - if plex_key: - fetch_tasks.append(fetch_plex_item(plex_key)) - - # Fetch all items concurrently - plex_items = [] - if fetch_tasks: - fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True) - plex_items = [item for item in fetched_items if item is not None] - - if not plex_items: - LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation") - return - - # Determine which track should be selected (currently playing) - start_item = None - if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items): - start_item = plex_items[ma_queue.current_index] - - # Create Plex PlayQueue - don't pass shuffle since MA queue is already in desired order - def create_queue() -> PlayQueue: - return PlayQueue.create( - self.plex_server, - items=plex_items, - startItem=start_item, - shuffle=0, # Don't shuffle, plex_items is already in MA queue order - continuous=1, - ) - - try: - playqueue = await asyncio.to_thread(create_queue) - - if playqueue: - self.play_queue_id = str(playqueue.playQueueID) - self.play_queue_version = playqueue.playQueueVersion - - # Build item ID mappings - self.play_queue_item_ids = {} - for i, item in enumerate(playqueue.items): - if hasattr(item, "playQueueItemID"): - self.play_queue_item_ids[i] = item.playQueueItemID - - LOGGER.info( - f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks" - ) - except Exception as e: - LOGGER.exception(f"Error creating Plex PlayQueue: {e}") - - async def _send_timeline(self, client_id: str) -> None: - """Send timeline update to specific controller.""" - subscription = self.subscriptions.get(client_id) - if not subscription: - return - - timeline_xml = await self._build_timeline_xml() - - try: - await self.provider.mass.http_session.post( - f"{subscription['url']}/:/timeline", - data=timeline_xml, - headers={ - "X-Plex-Client-Identifier": self.client_id, - "Content-Type": "text/xml", - }, - timeout=ClientTimeout(total=5), - ) - # Update last_update timestamp on successful send - subscription["last_update"] = time.time() - except Exception as e: - LOGGER.debug(f"Failed to send timeline to {client_id}: {e}") - - async def _send_timeline_to_server(self) -> None: - """Send timeline update to Plex server for activity tracking.""" - if not self._ma_player_id: - return - - try: - player = self.provider.mass.players.get_player(self._ma_player_id) - queue = self.provider.mass.player_queues.get(self._ma_player_id) - - if ( - not player - or not queue - or not queue.current_item - or not queue.current_item.media_item - ): - return - - track = queue.current_item.media_item - - # Find Plex mapping - plex_key = None - for mapping in track.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - plex_key = mapping.item_id - break - - if not plex_key: - return - - # Extract rating key from plex_key (e.g., "/library/metadata/12345" -> "12345") - rating_key = plex_key.split("/")[-1] - - # Get playback state - state_value = ( - player.playback_state.value - if hasattr(player.playback_state, "value") - else str(player.playback_state) - ) - - # Map to Plex state - if state_value == "playing": - plex_state = "playing" - elif state_value == "paused": - plex_state = "paused" - else: - plex_state = "stopped" - - # Get position and duration in milliseconds - position_ms = round(queue.corrected_elapsed_time * 1000) - duration_ms = round(track.duration * 1000) if track.duration else 0 - - # Get play queue info if available - container_key = "" - play_queue_item_id = "" - if self.play_queue_id: - container_key = f"/playQueues/{self.play_queue_id}" - if queue.current_index is not None: - play_queue_item_id = str( - self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1) - ) - - # Build timeline params (only Plex timeline data) - params = { - "ratingKey": rating_key, - "key": plex_key, - "state": plex_state, - "time": str(position_ms), - "duration": str(duration_ms), - } - - # Add play queue info if available - if container_key: - params["containerKey"] = container_key - if play_queue_item_id: - params["playQueueItemID"] = play_queue_item_id - - def send_timeline() -> None: - # Pass session headers to identify this specific player instance - self.plex_server.query("/:/timeline", params=params, headers=self.headers) - - await asyncio.to_thread(send_timeline) - - except Exception as e: - LOGGER.debug(f"Failed to send timeline to Plex server: {e}") - - async def _broadcast_timeline(self) -> None: - """Send timeline to all subscribed controllers.""" - current_time = time.time() - stale_clients = [] - for client_id, sub in self.subscriptions.items(): - try: - last_update = float(sub["last_update"]) # type: ignore[arg-type] - if current_time - last_update > 90: - stale_clients.append(client_id) - except (ValueError, TypeError): - # If conversion fails, treat client as stale - LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale") - stale_clients.append(client_id) - - for client_id in stale_clients: - del self.subscriptions[client_id] - - await asyncio.gather( - *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())), - return_exceptions=True, # Don't fail all if one fails - ) - - # for debugging purposes only - # async def handle_unknown(self, request: web.Request) -> web.Response: - # """Catch-all handler for unexpected or unsupported paths.""" - # LOGGER.debug( - # "Unhandled request: %s %s from %s", - # request.method, - # request.path, - # request.remote, - # ) - # - # # You can log query/body if needed (be careful not to leak tokens) - # if request.query: - # LOGGER.debug("Query params for %s: %s", request.path, dict(request.query)) - # try: - # data = await request.text() - # if data: - # LOGGER.debug("Body for %s: %s", request.path, data) - # except Exception as e: - # LOGGER.debug("Could not read request body: %s", e) - # - # return web.Response(status=404, text=f"Unhandled path: {request.path}") diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py new file mode 100644 index 0000000000..a1abfdac3c --- /dev/null +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -0,0 +1,448 @@ +"""Play queue command handlers for Plex remote control. + +Handles playMedia, createPlayQueue and refreshPlayQueue HTTP commands. +""" + +from __future__ import annotations + +import asyncio +import logging +import re +from typing import TYPE_CHECKING, Any + +from aiohttp import web +from music_assistant_models.enums import QueueOption +from plexapi.playqueue import PlayQueue + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class QueueCommandsMixin: + """Mixin providing HTTP handlers for Plex play queue commands.""" + + if TYPE_CHECKING: + provider: PlexProvider + _ma_player_id: str | None + _updating_from_plex: bool + play_queue_id: str | None + play_queue_version: int + play_queue_item_ids: dict[int, int] + _last_synced_ma_queue_length: int + _last_synced_ma_queue_keys: list[str] + + async def _broadcast_timeline(self) -> None: ... + + async def _ungroup_player_if_needed(self, player_id: str) -> None: ... + + async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None: ... + + async def _load_remaining_queue_tracks( + self, + player_id: str, + playqueue: PlayQueue, + selected_offset: int, + shuffle: bool, + ) -> None: ... + + async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: ... + + async def _replace_remaining_queue( + self, player_id: str, playqueue: PlayQueue, current_index: int + ) -> None: ... + + def _collect_synced_keys(self, player_id: str) -> list[str]: ... + + async def _resolve_plex_item(self, key: str) -> Any: + """Resolve a Plex key to a Music Assistant media item. + + :param key: The Plex key to resolve. + """ + if "/library/metadata/" in key: + try: + return await self.provider.get_track(key) + except Exception as exc: + LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}") + + try: + return await self.provider.get_album(key) + except Exception as exc: + LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}") + + try: + return await self.provider.get_artist(key) + except Exception: + raise ValueError(f"Could not resolve Plex item: {key}") from None + + elif "/playlists/" in key: + return await self.provider.get_playlist(key) + else: + raise ValueError(f"Unknown Plex key format: {key}") + + async def _play_from_plex_queue( + self, + player_id: str, + container_key: str, + starting_key: str | None, + shuffle: bool, + offset: int, + ) -> None: + """Fetch a Plex PlayQueue and start playback, loading remaining tracks in the background. + + :param player_id: The Music Assistant player ID. + :param container_key: The Plex container key (e.g. /playQueues/123). + :param starting_key: Fallback track key if queue fetch fails. + :param shuffle: Whether shuffle is enabled. + :param offset: Starting position in milliseconds. + """ + try: + LOGGER.info(f"Fetching play queue: {container_key}") + + queue_id_match = re.search(r"/playQueues/(\d+)", container_key) + if not queue_id_match: + raise ValueError(f"Invalid container_key format: {container_key}") + + queue_id = queue_id_match.group(1) + + def fetch_queue() -> PlayQueue: + return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id) + + playqueue = await asyncio.to_thread(fetch_queue) + + if playqueue and playqueue.items: + selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0) + LOGGER.info(f"PlayQueue selected item offset: {selected_offset}") + + self.play_queue_item_ids = {} + + first_item = ( + playqueue.items[selected_offset] + if selected_offset < len(playqueue.items) + else playqueue.items[0] + ) + first_track_key = first_item.key if hasattr(first_item, "key") else None + first_play_queue_item_id = ( + first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None + ) + + if not first_track_key: + LOGGER.error("No valid first track in play queue") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + return + + try: + first_track = await self.provider.get_track(first_track_key) + LOGGER.info(f"Starting playback with first track: {first_track.name}") + + if first_play_queue_item_id: + self.play_queue_item_ids[0] = first_play_queue_item_id + + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=first_track, + option=QueueOption.REPLACE, + ) + + if offset > 0: + await self._seek_to_offset_after_playback(player_id, offset) + + await self._broadcast_timeline() + + self.provider.mass.create_task( + self._load_remaining_queue_tracks( + player_id, playqueue, selected_offset, shuffle + ) + ) + + except Exception as e: + LOGGER.exception(f"Error starting playback with first track: {e}") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + else: + LOGGER.error("Play queue is empty or could not be fetched") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + + except Exception as e: + LOGGER.exception(f"Error playing from queue: {e}") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + + async def handle_play_media(self, request: web.Request) -> web.Response: + """Handle playMedia command from Plex controller. + + Plexamp sends various parameters: + - key: The item to play (track, album, playlist, etc.) + - containerKey: The container context (play queue) + - offset: Starting position in milliseconds + - shuffle: Whether to shuffle + - repeat: Repeat mode + """ + self._updating_from_plex = True + try: + key = request.query.get("key") + container_key = request.query.get("containerKey") + offset = int(request.query.get("offset", 0)) + shuffle = request.query.get("shuffle", "0") == "1" + + if not key: + return web.Response( + status=400, text="Missing required 'key' parameter for playMedia command" + ) + + LOGGER.info( + f"Received playMedia command - key: {key}, " + f"containerKey: {container_key}, offset: {offset}ms" + ) + + player_id = self._ma_player_id + if not player_id: + return web.Response(status=500, text="No player assigned to this server") + + await self._ungroup_player_if_needed(player_id) + + if container_key and "/playQueues/" in container_key: + queue_id_match = re.search(r"/playQueues/(\d+)", container_key) + if queue_id_match: + self.play_queue_id = queue_id_match.group(1) + self.play_queue_version = 1 + LOGGER.info(f"Playing from queue: {container_key} starting at {key}") + await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset) + else: + self.play_queue_id = None + self.play_queue_item_ids = {} + media = await self._resolve_plex_item(key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=media, + option=QueueOption.REPLACE, + ) + elif container_key: + self.play_queue_id = None + self.play_queue_item_ids = {} + media_to_play = await self._resolve_plex_item(container_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=media_to_play, + option=QueueOption.REPLACE, + ) + else: + self.play_queue_id = None + self.play_queue_item_ids = {} + media = await self._resolve_plex_item(key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=media, + option=QueueOption.REPLACE, + ) + + if shuffle: + await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) + + if offset > 0: + await self._seek_to_offset_after_playback(player_id, offset) + + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error handling playMedia: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False + + async def handle_create_play_queue(self, request: web.Request) -> web.Response: + """Handle createPlayQueue command from Plex controller. + + Creates a new play queue from a URI (album, playlist, artist tracks, etc.) + and optionally applies shuffle. + """ + self._updating_from_plex = True + try: + uri = request.query.get("uri") + shuffle = request.query.get("shuffle", "0") == "1" + continuous = request.query.get("continuous", "0") == "1" + + if not uri: + return web.Response(status=400, text="Missing 'uri' parameter") + + LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}") + + player_id = self._ma_player_id + if not player_id: + return web.Response(status=500, text="No player assigned to this server") + + def create_queue() -> PlayQueue: + item = self.provider._plex_server.fetchItem(uri) + return PlayQueue.create( + self.provider._plex_server, + item, + shuffle=1 if shuffle else 0, + continuous=1 if continuous else 0, + ) + + playqueue = await asyncio.to_thread(create_queue) + + if playqueue and playqueue.items: + self.play_queue_id = str(playqueue.playQueueID) + self.play_queue_version = 1 + + LOGGER.info( + f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items" + ) + + self.play_queue_item_ids = {} + first_item = playqueue.items[0] + first_track_key = first_item.key if hasattr(first_item, "key") else None + first_play_queue_item_id = ( + first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None + ) + + if not first_track_key: + LOGGER.error("No valid first track in created play queue") + return web.Response(status=500, text="Failed to load tracks from play queue") + + try: + first_track = await self.provider.get_track(first_track_key) + LOGGER.info(f"Starting playback with first track: {first_track.name}") + + if first_play_queue_item_id: + self.play_queue_item_ids[0] = first_play_queue_item_id + + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=first_track, + option=QueueOption.REPLACE, + ) + + if len(playqueue.items) > 1: + self.provider.mass.create_task( + self._load_remaining_queue_tracks(player_id, playqueue, 0, shuffle) + ) + + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error starting playback with first track: {e}") + return web.Response(status=500, text=f"Failed to start playback: {e}") + else: + LOGGER.error("Failed to create play queue or queue is empty") + return web.Response(status=500, text="Failed to create play queue") + + except Exception as e: + LOGGER.exception(f"Error handling createPlayQueue: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False + + async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: + """Handle refreshPlayQueue command from Plex controller. + + Called when the play queue is modified (items added, removed, reordered). + Syncs the updated queue state to MA while preserving current playback. + """ + self._updating_from_plex = True + try: + play_queue_id = request.query.get("playQueueID") + + if not play_queue_id: + return web.Response(status=400, text="Missing 'playQueueID' parameter") + + LOGGER.info( + f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, " + f"params: {dict(request.query)}" + ) + + if self.play_queue_id != play_queue_id: + LOGGER.warning( + f"Refresh requested for queue {play_queue_id} but active queue is " + f"{self.play_queue_id}" + ) + return web.Response( + status=409, + text=( + f"Requested playQueueID {play_queue_id} does not match " + f"active queue {self.play_queue_id}" + ), + ) + + self.play_queue_version += 1 + + def fetch_queue() -> PlayQueue: + return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id) + + playqueue = await asyncio.to_thread(fetch_queue) + + if not playqueue or not playqueue.items: + LOGGER.error("Failed to refresh play queue - queue is empty or not found") + return web.Response(status=404, text="Play queue not found") + + player_id = self._ma_player_id + if not player_id: + LOGGER.error("No player assigned to this server") + return web.Response(status=500, text="No player assigned") + + await self.provider.mass.player_queues.set_shuffle(player_id, False) + ma_queue = self.provider.mass.player_queues.get(player_id) + if not ma_queue: + LOGGER.error(f"MA queue not found for player {player_id}") + return web.Response(status=500, text="MA queue not found") + + current_index = ma_queue.current_index + ma_queue_items = self.provider.mass.player_queues.items(player_id) + ma_queue_count = len(ma_queue_items) if ma_queue_items else 0 + + LOGGER.debug( + f"Queue refresh: Current index={current_index}, " + f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items" + ) + + if current_index is None: + LOGGER.debug("No track currently playing, replacing entire queue") + await self._replace_entire_queue(player_id, playqueue) + else: + LOGGER.debug( + f"Track at index {current_index} is playing, " + f"replacing only items after current track" + ) + await self._replace_remaining_queue(player_id, playqueue, current_index) + + LOGGER.info( + f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items" + ) + + synced_keys = self._collect_synced_keys(player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error handling refreshPlayQueue: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False diff --git a/music_assistant/providers/plex_connect/queue_sync.py b/music_assistant/providers/plex_connect/queue_sync.py new file mode 100644 index 0000000000..265bf00323 --- /dev/null +++ b/music_assistant/providers/plex_connect/queue_sync.py @@ -0,0 +1,394 @@ +"""Queue synchronisation logic for Plex remote control. + +Handles background queue loading, MA→Plex sync, and MA event handlers. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import QueueOption +from plexapi.playqueue import PlayQueue + +if TYPE_CHECKING: + from music_assistant_models.event import MassEvent + + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class QueueSyncMixin: + """Mixin providing queue synchronisation between MA and Plex.""" + + if TYPE_CHECKING: + provider: PlexProvider + plex_server: Any + _ma_player_id: str | None + _updating_from_plex: bool + play_queue_id: str | None + play_queue_version: int + play_queue_item_ids: dict[int, int] + _last_synced_ma_queue_length: int + _last_synced_ma_queue_keys: list[str] + + async def _broadcast_timeline(self) -> None: ... + + async def _send_timeline_to_server(self) -> None: ... + + def _collect_synced_keys(self, player_id: str) -> list[str]: + """Return Plex item_id keys for every track currently in the MA queue. + + :param player_id: The Music Assistant player ID. + :return: Ordered list of Plex item IDs matching the current MA queue. + """ + synced_keys = [] + for item in self.provider.mass.player_queues.items(player_id): + if item.media_item: + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + synced_keys.append(mapping.item_id) + break + return synced_keys + + def _reorder_tracks_for_playback( + self, tracks: list[Any], start_index: int + ) -> tuple[list[Any], dict[int, int]]: + """Reorder tracks to start from a specific index and update item ID mappings. + + :param tracks: List of tracks to reorder. + :param start_index: Index of the track to start from. + :return: Tuple of (reordered tracks, updated item ID mappings). + """ + if start_index <= 0 or start_index >= len(tracks): + return tracks, self.play_queue_item_ids + + reordered_tracks = tracks[start_index:] + tracks[:start_index] + + new_item_ids = {} + for new_idx, old_idx in enumerate( + list(range(start_index, len(tracks))) + list(range(start_index)) + ): + if old_idx in self.play_queue_item_ids: + new_item_ids[new_idx] = self.play_queue_item_ids[old_idx] + + LOGGER.info(f"Started playback from offset {start_index} (reordered queue)") + return reordered_tracks, new_item_ids + + async def _load_remaining_queue_tracks( + self, + player_id: str, + playqueue: PlayQueue, + selected_offset: int, + shuffle: bool, + ) -> None: + """Load remaining tracks from play queue in the background. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue. + :param selected_offset: The offset of the track that's already playing. + :param shuffle: Whether shuffle is enabled. + """ + try: + remaining_items = [] + + for i in range(selected_offset + 1, len(playqueue.items)): + remaining_items.append((i, playqueue.items[i])) + + for i in range(selected_offset): + remaining_items.append((i, playqueue.items[i])) + + if not remaining_items: + LOGGER.debug("No remaining tracks to load") + return + + async def fetch_track( + plex_idx: int, item: Any + ) -> tuple[int, object | None, int | None]: + """Fetch a single track from Plex.""" + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = ( + item.playQueueItemID if hasattr(item, "playQueueItemID") else None + ) + if track_key: + try: + track = await self.provider.get_track(track_key) + return plex_idx, track, play_queue_item_id + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + return plex_idx, None, None + + fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items] + results = await asyncio.gather(*fetch_tasks, return_exceptions=True) + + tracks_to_add: list[object] = [] + for result in results: + if isinstance(result, Exception): + LOGGER.debug(f"Error fetching track: {result}") + continue + _plex_idx, track, play_queue_item_id = result # type: ignore[misc] + if track: + ma_idx = len(tracks_to_add) + 1 # +1 because first track is already queued + tracks_to_add.append(track) + if play_queue_item_id: + self.play_queue_item_ids[ma_idx] = play_queue_item_id + + if tracks_to_add: + LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue") + + # Guard against the ADD firing QUEUE_ITEMS_UPDATED before we update + # _last_synced_ma_queue_keys, which would trigger a spurious MA→Plex sync. + self._updating_from_plex = True + try: + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=tracks_to_add, # type: ignore[arg-type] + option=QueueOption.ADD, + ) + + synced_keys = self._collect_synced_keys(player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + finally: + self._updating_from_plex = False + + if shuffle: + await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) + + LOGGER.info( + f"Successfully loaded {len(tracks_to_add)} remaining tracks " + f"(total queue: {self._last_synced_ma_queue_length} tracks)" + ) + else: + LOGGER.warning("No valid remaining tracks found in play queue") + + except Exception as e: + LOGGER.exception(f"Error loading remaining queue tracks: {e}") + + async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: + """Replace the entire MA queue from a Plex play queue. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue to load. + """ + all_tracks = [] + self.play_queue_item_ids = {} + + for i, item in enumerate(playqueue.items): + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None + + if track_key: + try: + track = await self.provider.get_track(track_key) + all_tracks.append(track) + if play_queue_item_id: + self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + continue + + if all_tracks: + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=all_tracks, # type: ignore[arg-type] + option=QueueOption.REPLACE, + ) + LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks") + + async def _replace_remaining_queue( + self, player_id: str, playqueue: PlayQueue, current_index: int + ) -> None: + """Replace only items after the current track. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue to load. + :param current_index: The current track index in the MA queue. + """ + remaining_tracks = [] + new_item_mappings = {} + + for i in range(current_index + 1, len(playqueue.items)): + item = playqueue.items[i] + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None + + if track_key: + try: + track = await self.provider.get_track(track_key) + remaining_tracks.append(track) + if play_queue_item_id: + new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = ( + play_queue_item_id + ) + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + continue + + if remaining_tracks: + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=remaining_tracks, # type: ignore[arg-type] + option=QueueOption.REPLACE_NEXT, + ) + self.play_queue_item_ids.update(new_item_mappings) + LOGGER.info( + f"Replaced {len(remaining_tracks)} tracks after current track " + f"(index {current_index})" + ) + else: + LOGGER.debug("No tracks after current track in Plex queue") + + for i, item in enumerate(playqueue.items): + play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None + if play_queue_item_id: + self.play_queue_item_ids[i] = play_queue_item_id + + async def _create_plex_playqueue_from_ma(self) -> None: + """Create a new Plex PlayQueue mirroring the current MA queue.""" + ma_queue = self.provider.mass.player_queues.get(self._ma_player_id) # type: ignore[arg-type] + queue_items = self.provider.mass.player_queues.items(self._ma_player_id) # type: ignore[arg-type] + + if not ma_queue or not queue_items: + return + + async def fetch_plex_item(plex_key: str) -> object | None: + """Fetch a single Plex item.""" + try: + plex_server = self.plex_server + + def fetch_item() -> object: + return plex_server.fetchItem(plex_key) + + return await asyncio.to_thread(fetch_item) + except Exception as e: + LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}") + return None + + fetch_tasks = [] + for item in queue_items: + if not item.media_item: + continue + plex_key = None + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + plex_key = mapping.item_id + break + if plex_key: + fetch_tasks.append(fetch_plex_item(plex_key)) + + plex_items = [] + if fetch_tasks: + fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True) + plex_items = [item for item in fetched_items if item is not None] + + if not plex_items: + LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation") + return + + start_item = None + if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items): + start_item = plex_items[ma_queue.current_index] + + plex_server = self.plex_server + + def create_queue() -> PlayQueue: + return PlayQueue.create( + plex_server, + items=plex_items, + startItem=start_item, + shuffle=0, + continuous=1, + ) + + try: + playqueue = await asyncio.to_thread(create_queue) + + if playqueue: + self.play_queue_id = str(playqueue.playQueueID) + self.play_queue_version = playqueue.playQueueVersion + + self.play_queue_item_ids = {} + for i, item in enumerate(playqueue.items): + if hasattr(item, "playQueueItemID"): + self.play_queue_item_ids[i] = item.playQueueItemID + + LOGGER.info( + f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks" + ) + except Exception as e: + LOGGER.exception(f"Error creating Plex PlayQueue: {e}") + + async def _handle_player_event(self, event: MassEvent) -> None: + """Handle player state change events.""" + if not self._ma_player_id or event.object_id != self._ma_player_id: + return + + if self._updating_from_plex: + return + + try: + await self._send_timeline_to_server() + await self._broadcast_timeline() + except Exception as e: + LOGGER.debug(f"Error handling player event: {e}") + + async def _handle_queue_event(self, event: MassEvent) -> None: + """Handle queue change events.""" + if not self._ma_player_id or event.object_id != self._ma_player_id: + return + + if self._updating_from_plex: + return + + try: + await self._send_timeline_to_server() + await self._broadcast_timeline() + except Exception as e: + LOGGER.debug(f"Error handling queue event: {e}") + + async def _handle_queue_items_updated(self, event: MassEvent) -> None: + """Handle queue items being added/removed/reordered.""" + if not self._ma_player_id or event.object_id != self._ma_player_id: + return + + if self._updating_from_plex: + return + + queue_items = self.provider.mass.player_queues.items(self._ma_player_id) + if not queue_items: + return + + current_keys = [] + for item in queue_items: + if not item.media_item: + continue + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + current_keys.append(mapping.item_id) + break + + if ( + len(current_keys) == self._last_synced_ma_queue_length + and current_keys == self._last_synced_ma_queue_keys + ): + LOGGER.debug("MA queue matches last synced state, skipping Plex sync") + return + + LOGGER.info( + f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items" + ) + + try: + await self._create_plex_playqueue_from_ma() + self._last_synced_ma_queue_length = len(current_keys) + self._last_synced_ma_queue_keys = current_keys + except Exception as e: + LOGGER.debug(f"Error creating Plex PlayQueue: {e}") + + try: + await self._broadcast_timeline() + except Exception as e: + LOGGER.debug(f"Error broadcasting timeline: {e}") diff --git a/music_assistant/providers/plex_connect/server.py b/music_assistant/providers/plex_connect/server.py new file mode 100644 index 0000000000..ed927210fc --- /dev/null +++ b/music_assistant/providers/plex_connect/server.py @@ -0,0 +1,371 @@ +"""Per-player Plex remote control instances.""" + +from __future__ import annotations + +import logging +import platform +import time +import uuid +from collections.abc import Callable +from typing import TYPE_CHECKING + +from aiohttp import web +from music_assistant_models.enums import EventType + +from .gdm import PlexGDMAdvertiser +from .playback import PlaybackMixin +from .queue_commands import QueueCommandsMixin +from .queue_sync import QueueSyncMixin +from .timeline import TimelineMixin + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class PlayerRemoteInstance: + """Single remote control instance for one MA player.""" + + def __init__( + self, + plex_provider: PlexProvider, + ma_player_id: str, + player_name: str, + port: int, + device_class: str = "speaker", + remote_control: bool = False, + ) -> None: + """Initialize player remote instance. + + :param plex_provider: Plex provider instance. + :param ma_player_id: Music Assistant player ID. + :param player_name: Display name for the player. + :param port: Port for the remote control server. + :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). + :param remote_control: Whether to enable remote control. + """ + self.plex_provider = plex_provider + self.plex_server = plex_provider._plex_server + self.ma_player_id = ma_player_id + self.player_name = player_name + self.port = port + self.device_class = device_class + self.remote_control = remote_control + + self.client_id = str( + uuid.uuid5( + uuid.NAMESPACE_DNS, + f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}", + ) + ) + + if self.remote_control: + self.server: PlexRemoteControlServer | None = None + self.gdm: PlexGDMAdvertiser | None = None + + async def start(self) -> None: + """Start this player's remote control.""" + if self.remote_control: + LOGGER.info( + f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}" + ) + + self.server = PlexRemoteControlServer( + plex_provider=self.plex_provider, + port=self.port, + client_id=self.client_id, + ma_player_id=self.ma_player_id, + device_class=self.device_class, + ) + LOGGER.info( + f"Remote control server for '{self.player_name}' bound to MA player: " + f"{self.ma_player_id}" + ) + + await self.server.start() + + self.gdm = PlexGDMAdvertiser( + instance_id=self.client_id, + port=self.port, + publish_ip=str(self.plex_provider.mass.streams.publish_ip), + name=self.player_name, + product="Music Assistant", + version=self.plex_provider.mass.version + if self.plex_provider.mass.version != "0.0.0" + else "1.0.0", + ) + self.gdm.start() + + LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}") + + async def stop(self) -> None: + """Stop this player's remote control.""" + if self.remote_control: + if self.gdm: + await self.gdm.stop() + + if self.server: + await self.server.stop() + + LOGGER.info(f"Stopped remote control for player '{self.player_name}'") + + +class PlexRemoteControlServer(QueueCommandsMixin, PlaybackMixin, QueueSyncMixin, TimelineMixin): + """HTTP server implementing the Plex remote control protocol for one MA player.""" + + def __init__( + self, + plex_provider: PlexProvider, + port: int = 32500, + client_id: str | None = None, + ma_player_id: str | None = None, + device_class: str = "speaker", + ) -> None: + """Initialize remote control server. + + :param plex_provider: Plex provider instance. + :param port: Port for the HTTP server. + :param client_id: Unique client identifier. + :param ma_player_id: Music Assistant player ID. + :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). + """ + self.provider = plex_provider + self.plex_server = plex_provider._plex_server + self.port = port + self.client_id = client_id or plex_provider.instance_id + self.device_class = device_class + self.app = web.Application() + self.subscriptions: dict[str, dict[str, object]] = {} + self.runner: web.AppRunner | None = None + self.http_site: web.TCPSite | None = None + + # Play queue tracking (Plex-specific state that doesn't exist in MA) + self.play_queue_id: str | None = None + self.play_queue_version: int = 1 + self.play_queue_item_ids: dict[int, int] = {} + + # Track MA queue state to detect when we need to sync to Plex + self._last_synced_ma_queue_length: int = 0 + self._last_synced_ma_queue_keys: list[str] = [] + + self._ma_player_id = ma_player_id + + self._unsub_callbacks: list[Callable[..., None]] = [] + + # Flag to prevent circular updates when we modify the queue ourselves + self._updating_from_plex = False + + self.player = self.provider.mass.players.get_player(self._ma_player_id) # type: ignore[arg-type] + + self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant" + + self.headers = { + "X-Plex-Device-Name": self.device_name, + "X-Plex-Session-Identifier": self.client_id, + "X-Plex-Client-Identifier": self.client_id, + "X-Plex-Product": "Music Assistant", + "X-Plex-Platform": "Music Assistant", + "X-Plex-Platform-Version": platform.release(), + } + + self._setup_routes() + + def _setup_routes(self) -> None: + """Set up all HTTP endpoints.""" + self.app.router.add_get("/", self.handle_root) + + self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe) + self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe) + self.app.router.add_get("/player/timeline/poll", self.handle_poll) + + self.app.router.add_get("/player/playback/playMedia", self.handle_play_media) + self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue) + self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue) + self.app.router.add_get("/player/playback/pause", self.handle_pause) + self.app.router.add_get("/player/playback/play", self.handle_play) + self.app.router.add_get("/player/playback/stop", self.handle_stop) + self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next) + self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous) + self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward) + self.app.router.add_get("/player/playback/stepBack", self.handle_step_back) + self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to) + self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters) + self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to) + + self.app.router.add_get("/resources", self.handle_resources) + + self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options) + + async def start(self) -> None: + """Start HTTP server and subscribe to MA events.""" + self.runner = web.AppRunner(self.app) + await self.runner.setup() + + self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port) + await self.http_site.start() + LOGGER.info(f"Plex remote control server started on HTTP port {self.port}") + + if self._ma_player_id: + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_player_event, + EventType.PLAYER_UPDATED, + id_filter=self._ma_player_id, + ) + ) + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_queue_event, + EventType.QUEUE_UPDATED, + id_filter=self._ma_player_id, + ) + ) + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_queue_event, + EventType.QUEUE_TIME_UPDATED, + id_filter=self._ma_player_id, + ) + ) + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_queue_items_updated, + EventType.QUEUE_ITEMS_UPDATED, + id_filter=self._ma_player_id, + ) + ) + + async def stop(self) -> None: + """Stop the HTTP server and unsubscribe from events.""" + for unsub in self._unsub_callbacks: + unsub() + self._unsub_callbacks.clear() + + if self.http_site: + await self.http_site.stop() + if self.runner: + await self.runner.cleanup() + LOGGER.info("Plex remote control server stopped") + + async def handle_root(self, request: web.Request) -> web.Response: + """Handle root endpoint - return basic player info.""" + player_name = "Music Assistant" + if self._ma_player_id: + player = self.provider.mass.players.get_player(self._ma_player_id) + if player: + player_name = player.display_name + + xml = f""" + + +""" + return web.Response( + text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} + ) + + async def handle_subscribe(self, request: web.Request) -> web.Response: + """Handle timeline subscription from controller.""" + client_id = request.headers.get("X-Plex-Client-Identifier") + protocol = request.query.get("protocol", "http") + port = request.query.get("port") + command_id = int(request.query.get("commandID", 0)) + + if not client_id or not port: + return web.Response(status=400) + + self.subscriptions[client_id] = { + "url": f"{protocol}://{request.remote}:{port}", + "command_id": command_id, + "last_update": time.time(), + } + + LOGGER.info(f"Controller {client_id} subscribed for timeline updates") + await self._send_timeline(client_id) + return web.Response(status=200) + + async def handle_unsubscribe(self, request: web.Request) -> web.Response: + """Handle unsubscribe request.""" + client_id = request.headers.get("X-Plex-Client-Identifier") + if client_id in self.subscriptions: + del self.subscriptions[client_id] + LOGGER.info(f"Controller {client_id} unsubscribed") + return web.Response(status=200) + + async def handle_poll(self, request: web.Request) -> web.Response: + """Handle timeline poll request.""" + include_metadata = request.query.get("includeMetadata", "0") == "1" + command_id = request.query.get("commandID", "0") + + client_id = request.headers.get("X-Plex-Client-Identifier") + if client_id and client_id in self.subscriptions: + self.subscriptions[client_id]["last_update"] = time.time() + + timeline_xml = await self._build_timeline_xml( + include_metadata=include_metadata, command_id=command_id + ) + return web.Response( + text=timeline_xml, + content_type="text/xml", + headers={ + "X-Plex-Client-Identifier": self.client_id, + "Access-Control-Expose-Headers": "X-Plex-Client-Identifier", + "Access-Control-Allow-Origin": "*", + }, + ) + + async def handle_options(self, request: web.Request) -> web.Response: + """Handle OPTIONS requests for CORS.""" + return web.Response( + status=200, + headers={ + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "*", + }, + ) + + async def handle_resources(self, request: web.Request) -> web.Response: + """Return player capabilities and connection information.""" + player_name = "Music Assistant" + if self._ma_player_id: + player = self.provider.mass.players.get_player(self._ma_player_id) + if player: + player_name = player.display_name + + state = "stopped" + if self._ma_player_id: + player = self.provider.mass.players.get_player(self._ma_player_id) + if player and player.state: + state_value = ( + player.state.value if hasattr(player.state, "value") else str(player.state) + ) + if state_value in ["playing", "paused"]: + state = state_value + + local_ip = self.provider.mass.streams.publish_ip + version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0" + + xml = f""" + + + + +""" + return web.Response( + text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} + ) diff --git a/music_assistant/providers/plex_connect/timeline.py b/music_assistant/providers/plex_connect/timeline.py new file mode 100644 index 0000000000..6cbee09e2e --- /dev/null +++ b/music_assistant/providers/plex_connect/timeline.py @@ -0,0 +1,331 @@ +"""Timeline building and broadcasting for Plex remote control.""" + +from __future__ import annotations + +import asyncio +import logging +import time +from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse + +from aiohttp import ClientTimeout +from music_assistant_models.enums import PlayerType + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class TimelineMixin: + """Mixin providing timeline building and broadcasting.""" + + if TYPE_CHECKING: + provider: PlexProvider + client_id: str + subscriptions: dict[str, dict[str, object]] + play_queue_id: str | None + play_queue_version: int + play_queue_item_ids: dict[int, int] + _ma_player_id: str | None + headers: dict[str, str] + plex_server: Any + + def _build_timeline_attributes( + self, + track: Any, + state: str, + duration: int, + time_ms: int, + volume: int, + shuffle: int, + repeat: int, + controllable: str, + queue: Any | None, + ) -> list[str]: + """Build timeline attributes for a playing track. + + :param track: The current track media item. + :param state: Playback state (playing, paused, etc.). + :param duration: Track duration in milliseconds. + :param time_ms: Current playback time in milliseconds. + :param volume: Volume level (0-100). + :param shuffle: Shuffle state (0 or 1). + :param repeat: Repeat mode (0=off, 1=one, 2=all). + :param controllable: Controllable features string. + :param queue: The MA queue object. + :return: List of timeline attribute strings. + """ + key = None + rating_key = None + for mapping in track.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + key = mapping.item_id + rating_key = key.split("/")[-1] + break + + if not key: + return [] + + plex_url = urlparse(self.provider._baseurl) + machine_identifier = self.provider._plex_server.machineIdentifier + address = plex_url.hostname + port = plex_url.port or (443 if plex_url.scheme == "https" else 32400) + protocol = plex_url.scheme + + attrs = [ + f'state="{state}"', + f'duration="{duration}"', + f'time="{time_ms}"', + f'ratingKey="{rating_key}"', + f'key="{key}"', + ] + + if self.play_queue_id and queue: + if queue.current_index is not None: + play_queue_item_id = self.play_queue_item_ids.get( + queue.current_index, queue.current_index + 1 + ) + attrs.append(f'playQueueItemID="{play_queue_item_id}"') + attrs.append(f'playQueueID="{self.play_queue_id}"') + attrs.append(f'playQueueVersion="{self.play_queue_version}"') + attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"') + + attrs.extend( + [ + 'type="music"', + f'volume="{volume}"', + f'shuffle="{shuffle}"', + f'repeat="{repeat}"', + f'controllable="{controllable}"', + f'machineIdentifier="{machine_identifier}"', + f'address="{address}"', + f'port="{port}"', + f'protocol="{protocol}"', + ] + ) + + return attrs + + async def _build_timeline_xml( + self, include_metadata: bool = False, command_id: str = "0" + ) -> str: + """Build timeline XML from current Music Assistant player state. + + :param include_metadata: Whether to include metadata in the timeline. + :param command_id: The command ID for the timeline response. + """ + player_id = self._ma_player_id + + player = self.provider.mass.players.get_player(player_id) if player_id else None + queue = self.provider.mass.player_queues.get(player_id) if player_id else None + + controllable = ( + "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext" + ) + + state = "stopped" + if player and player.playback_state: + state_value = ( + player.playback_state.value + if hasattr(player.playback_state, "value") + else str(player.playback_state) + ) + + if state_value == "playing": + state = "playing" + elif state_value == "paused": + state = "paused" + elif state_value == "buffering": + state = "buffering" + elif state_value == "idle": + state = ( + "paused" + if queue and queue.current_item and queue.current_item.media_item + else "stopped" + ) + else: + state = "stopped" + + volume = 0 + if player: + volume = ( + int(player.group_volume or 0) + if (player.type == PlayerType.GROUP or player.group_members) + else (int(player.volume_level or 0)) + ) + + shuffle = 0 + repeat = 0 + if queue: + shuffle = 1 if queue.shuffle_enabled else 0 + if hasattr(queue, "repeat_mode"): + repeat_mode = queue.repeat_mode + if hasattr(repeat_mode, "value"): + repeat_value = repeat_mode.value + if repeat_value == "one": + repeat = 1 + elif repeat_value == "all": + repeat = 2 + + if ( + state in ["playing", "paused"] + and queue + and queue.current_item + and queue.current_item.media_item + ): + track = queue.current_item.media_item + duration = round(track.duration * 1000) if track.duration else 0 + time_ms = round(queue.corrected_elapsed_time * 1000) + + attrs = self._build_timeline_attributes( + track, state, duration, time_ms, volume, shuffle, repeat, controllable, queue + ) + + if attrs: + music_timeline = f"" + else: + music_timeline = ( + f'' + ) + else: + time_ms = 0 + music_timeline = ( + f'' + ) + + video_timeline = '' + photo_timeline = '' + + return ( + f'' + f"{music_timeline}{video_timeline}{photo_timeline}" + f"" + ) + + async def _send_timeline(self, client_id: str) -> None: + """Send timeline update to a specific subscribed controller. + + :param client_id: The client ID to send the timeline to. + """ + subscription = self.subscriptions.get(client_id) + if not subscription: + return + + timeline_xml = await self._build_timeline_xml() + + try: + await self.provider.mass.http_session.post( + f"{subscription['url']}/:/timeline", + data=timeline_xml, + headers={ + "X-Plex-Client-Identifier": self.client_id, + "Content-Type": "text/xml", + }, + timeout=ClientTimeout(total=5), + ) + subscription["last_update"] = time.time() + except Exception as e: + LOGGER.debug(f"Failed to send timeline to {client_id}: {e}") + + async def _send_timeline_to_server(self) -> None: + """Send timeline update to Plex server for activity tracking.""" + if not self._ma_player_id: + return + + try: + player = self.provider.mass.players.get_player(self._ma_player_id) + queue = self.provider.mass.player_queues.get(self._ma_player_id) + + if ( + not player + or not queue + or not queue.current_item + or not queue.current_item.media_item + ): + return + + track = queue.current_item.media_item + + plex_key = None + for mapping in track.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + plex_key = mapping.item_id + break + + if not plex_key: + return + + rating_key = plex_key.split("/")[-1] + + state_value = ( + player.playback_state.value + if hasattr(player.playback_state, "value") + else str(player.playback_state) + ) + + if state_value == "playing": + plex_state = "playing" + elif state_value == "paused": + plex_state = "paused" + else: + plex_state = "stopped" + + position_ms = round(queue.corrected_elapsed_time * 1000) + duration_ms = round(track.duration * 1000) if track.duration else 0 + + container_key = "" + play_queue_item_id = "" + if self.play_queue_id: + container_key = f"/playQueues/{self.play_queue_id}" + if queue.current_index is not None: + play_queue_item_id = str( + self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1) + ) + + params: dict[str, str] = { + "ratingKey": rating_key, + "key": plex_key, + "state": plex_state, + "time": str(position_ms), + "duration": str(duration_ms), + } + + if container_key: + params["containerKey"] = container_key + if play_queue_item_id: + params["playQueueItemID"] = play_queue_item_id + + plex_server = self.plex_server + headers = self.headers + + def send_timeline() -> None: + plex_server.query("/:/timeline", params=params, headers=headers) + + await asyncio.to_thread(send_timeline) + + except Exception as e: + LOGGER.debug(f"Failed to send timeline to Plex server: {e}") + + async def _broadcast_timeline(self) -> None: + """Send timeline to all subscribed controllers.""" + current_time = time.time() + stale_clients = [] + for client_id, sub in self.subscriptions.items(): + try: + last_update = float(sub["last_update"]) # type: ignore[arg-type] + if current_time - last_update > 90: + stale_clients.append(client_id) + except (ValueError, TypeError): + LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale") + stale_clients.append(client_id) + + for client_id in stale_clients: + del self.subscriptions[client_id] + + await asyncio.gather( + *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())), + return_exceptions=True, + ) From 1d21046a65b73ecf5b46b7cb14e4289ab893e8be Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 16:29:56 +0200 Subject: [PATCH 3/8] Fix play queue capped at 50 items by increasing fetch window --- music_assistant/providers/plex_connect/queue_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py index a1abfdac3c..4d7e13dead 100644 --- a/music_assistant/providers/plex_connect/queue_commands.py +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -107,7 +107,7 @@ async def _play_from_plex_queue( queue_id = queue_id_match.group(1) def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id) + return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id, window=10000) playqueue = await asyncio.to_thread(fetch_queue) From 5fcd5f084b0d8de54265d2d350d096455d645325 Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 16:31:28 +0200 Subject: [PATCH 4/8] Use window=500 for PlayQueue.get instead of 10000 --- music_assistant/providers/plex_connect/queue_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py index 4d7e13dead..d8a9539a65 100644 --- a/music_assistant/providers/plex_connect/queue_commands.py +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -107,7 +107,7 @@ async def _play_from_plex_queue( queue_id = queue_id_match.group(1) def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id, window=10000) + return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id, window=500) playqueue = await asyncio.to_thread(fetch_queue) From d22169151a851a7a656b541ba51533c374106179 Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 16:37:28 +0200 Subject: [PATCH 5/8] Fix play queue truncated at server window limit Replace direct PlayQueue.get() calls with a _fetch_full_play_queue helper that paginates past the Plex server's per-request cap (~200 items). The helper claims ownership of the queue (own=True) on the first fetch, then uses center + includeBefore=False to walk forward page by page until playQueueTotalCount items have been collected. --- .../providers/plex_connect/queue_commands.py | 65 ++++++++++++++++--- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py index d8a9539a65..723a08bc3e 100644 --- a/music_assistant/providers/plex_connect/queue_commands.py +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -55,6 +55,61 @@ async def _replace_remaining_queue( def _collect_synced_keys(self, player_id: str) -> list[str]: ... + async def _fetch_full_play_queue(self, queue_id: str) -> PlayQueue | None: + """Fetch a complete PlayQueue, paginating past the server's per-request window cap. + + The Plex server caps each response to ~200 items regardless of the requested + window size. We use playQueueTotalCount to detect truncation and keep fetching + forward pages until we have every item. + + :param queue_id: The Plex PlayQueue ID to fetch. + :return: A PlayQueue whose items list contains all tracks, or None on failure. + """ + page_size = 200 + plex_server = self.provider._plex_server + + def fetch_initial() -> PlayQueue: + # own=True transfers ownership of this queue to MA so we can control it fully. + return PlayQueue.get(plex_server, playQueueID=queue_id, own=True, window=page_size) + + playqueue = await asyncio.to_thread(fetch_initial) + + if not playqueue or not playqueue.items: + return playqueue + + all_items = list(playqueue.items) + seen_ids = {item.playQueueItemID for item in all_items} + + while len(all_items) < playqueue.playQueueTotalCount: + last_id = all_items[-1].playQueueItemID + + def fetch_next(_last_id: int = last_id) -> PlayQueue: + # own=False — we already claimed ownership on the first fetch. + # includeBefore=False — only items strictly after center are returned. + return PlayQueue.get( + plex_server, + playQueueID=queue_id, + own=False, + center=_last_id, + window=page_size, + includeBefore=False, + ) + + next_page = await asyncio.to_thread(fetch_next) + if not next_page or not next_page.items: + break + + new_items = [i for i in next_page.items if i.playQueueItemID not in seen_ids] + if not new_items: + break + + all_items.extend(new_items) + seen_ids.update(i.playQueueItemID for i in new_items) + + # Patch the cached items property on the PlayQueue object. + playqueue.__dict__["items"] = all_items + return playqueue + async def _resolve_plex_item(self, key: str) -> Any: """Resolve a Plex key to a Music Assistant media item. @@ -106,10 +161,7 @@ async def _play_from_plex_queue( queue_id = queue_id_match.group(1) - def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id, window=500) - - playqueue = await asyncio.to_thread(fetch_queue) + playqueue = await self._fetch_full_play_queue(queue_id) if playqueue and playqueue.items: selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0) @@ -392,10 +444,7 @@ async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: self.play_queue_version += 1 - def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id) - - playqueue = await asyncio.to_thread(fetch_queue) + playqueue = await self._fetch_full_play_queue(play_queue_id) if not playqueue or not playqueue.items: LOGGER.error("Failed to refresh play queue - queue is empty or not found") From cf9d0571eaae9ceea689e87df500f6f1d86f4564 Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 16:51:40 +0200 Subject: [PATCH 6/8] Sync shuffle state from Plex PlayQueue to MA Remove the unconditional set_shuffle(False) in handle_refresh_play_queue (it was a workaround for the old infinite-loop bug, now fixed) and replace it with a sync of playqueue.playQueueShuffled after the queue is loaded. _play_from_plex_queue also now uses playqueue.playQueueShuffled as the authoritative shuffle state, falling back to the request parameter. --- .../providers/plex_connect/queue_commands.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py index 723a08bc3e..692ece25f3 100644 --- a/music_assistant/providers/plex_connect/queue_commands.py +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -208,9 +208,12 @@ async def _play_from_plex_queue( await self._broadcast_timeline() + # Use the queue's own shuffle flag as the authoritative state, + # falling back to the request parameter if not set. + effective_shuffle = playqueue.playQueueShuffled or shuffle self.provider.mass.create_task( self._load_remaining_queue_tracks( - player_id, playqueue, selected_offset, shuffle + player_id, playqueue, selected_offset, effective_shuffle ) ) @@ -455,7 +458,6 @@ async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: LOGGER.error("No player assigned to this server") return web.Response(status=500, text="No player assigned") - await self.provider.mass.player_queues.set_shuffle(player_id, False) ma_queue = self.provider.mass.player_queues.get(player_id) if not ma_queue: LOGGER.error(f"MA queue not found for player {player_id}") @@ -480,6 +482,11 @@ async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: ) await self._replace_remaining_queue(player_id, playqueue, current_index) + # Sync shuffle state from Plex to MA. + await self.provider.mass.player_queues.set_shuffle( + player_id, playqueue.playQueueShuffled + ) + LOGGER.info( f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items" ) From 488530b3761d3af41eff95c5093d7683926f8e62 Mon Sep 17 00:00:00 2001 From: Anatosun Date: Mon, 30 Mar 2026 17:16:05 +0200 Subject: [PATCH 7/8] Improve shuffle handling when loading Plex play queues When a Plex play queue is flagged as shuffled, fetch the original source collection via playQueueSourceURI, load it into MA in its natural order, then apply MA's own shuffle and propagate the resulting order back to Plex via a new PlayQueue. This avoids double-shuffling an already-shuffled list. Also ensure MA shuffle is always explicitly synced to the Plex queue's shuffle state on playMedia, so a leftover enabled shuffle cannot silently reorder a non-shuffled queue. --- .../providers/plex_connect/queue_commands.py | 91 ++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py index 692ece25f3..0dca514a81 100644 --- a/music_assistant/providers/plex_connect/queue_commands.py +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -9,6 +9,7 @@ import logging import re from typing import TYPE_CHECKING, Any +from urllib.parse import unquote from aiohttp import web from music_assistant_models.enums import QueueOption @@ -55,6 +56,8 @@ async def _replace_remaining_queue( def _collect_synced_keys(self, player_id: str) -> list[str]: ... + async def _create_plex_playqueue_from_ma(self) -> None: ... + async def _fetch_full_play_queue(self, queue_id: str) -> PlayQueue | None: """Fetch a complete PlayQueue, paginating past the server's per-request window cap. @@ -110,6 +113,81 @@ def fetch_next(_last_id: int = last_id) -> PlayQueue: playqueue.__dict__["items"] = all_items return playqueue + def _source_key_from_play_queue_uri(self, source_uri: str) -> str | None: + """Extract a Plex library key from a playQueueSourceURI string. + + Plex encodes the source as ``library:///directory/ENCODED_PATH``. We decode it + to a plain library path that :meth:`_resolve_plex_item` can use. + + :param source_uri: The playQueueSourceURI from a Plex PlayQueue. + :return: A Plex library key (e.g. ``/library/playlists/5329``), or None if unparsable. + """ + prefix = "library:///directory/" + if not source_uri.startswith(prefix): + return None + + path = unquote(source_uri[len(prefix) :]).lstrip("/") + if not path: + return None + + path = "/" + path.split("?")[0] + + for suffix in ("/children", "/allLeaves", "/items"): + if path.endswith(suffix): + path = path[: -len(suffix)] + break + + return path if path and path != "/" else None + + async def _play_from_shuffled_source( + self, + player_id: str, + source_uri: str, + offset: int, + ) -> bool: + """Load a shuffled PlayQueue's source in original order, then defer shuffle to MA. + + When Plex reports a shuffled PlayQueue the items are already in Plex's shuffled + order. Loading them directly would cause MA to shuffle an already-shuffled list. + Instead we parse the source URI, load the source collection unshuffled into MA, + and schedule a deferred task that applies MA's own shuffle and then recreates the + Plex PlayQueue so Plexamp sees the new order. + + :param player_id: The Music Assistant player ID. + :param source_uri: The playQueueSourceURI from the Plex PlayQueue. + :param offset: Starting position in milliseconds. + :return: True if handled, False if the caller should fall back to regular loading. + """ + source_key = self._source_key_from_play_queue_uri(source_uri) + if not source_key: + return False + + try: + LOGGER.info(f"Shuffled queue detected — loading source in original order: {source_key}") + source_media = await self._resolve_plex_item(source_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=source_media, + option=QueueOption.REPLACE, + ) + if offset > 0: + await self._seek_to_offset_after_playback(player_id, offset) + await self._broadcast_timeline() + + async def _apply_shuffle_deferred() -> None: + await self.provider.mass.player_queues.set_shuffle(player_id, True) + await asyncio.sleep(0.2) + await self._create_plex_playqueue_from_ma() + synced_keys = self._collect_synced_keys(player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + self.provider.mass.create_task(_apply_shuffle_deferred()) + return True + except Exception as e: + LOGGER.debug(f"Could not resolve source for shuffled queue, falling back: {e}") + return False + async def _resolve_plex_item(self, key: str) -> Any: """Resolve a Plex key to a Music Assistant media item. @@ -167,6 +245,14 @@ async def _play_from_plex_queue( selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0) LOGGER.info(f"PlayQueue selected item offset: {selected_offset}") + # When Plex reports a shuffled queue, load the original source into MA + # unshuffled and let MA apply its own shuffle, then propagate back to Plex. + if playqueue.playQueueShuffled and getattr(playqueue, "playQueueSourceURI", None): + if await self._play_from_shuffled_source( + player_id, playqueue.playQueueSourceURI, offset + ): + return + self.play_queue_item_ids = {} first_item = ( @@ -314,8 +400,9 @@ async def handle_play_media(self, request: web.Request) -> web.Response: option=QueueOption.REPLACE, ) - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) + # Always sync shuffle state so that a previously enabled MA shuffle + # does not reorder an unshuffled Plex queue. + await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) if offset > 0: await self._seek_to_offset_after_playback(player_id, offset) From 4a62854b9999533fcf04559eb241d7aa74e82d31 Mon Sep 17 00:00:00 2001 From: Anatosun Date: Sun, 3 May 2026 11:03:49 +0200 Subject: [PATCH 8/8] removed unnecessary for loop --- .../providers/plex_connect/playback.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/music_assistant/providers/plex_connect/playback.py b/music_assistant/providers/plex_connect/playback.py index e495cd2add..90b5ffa80b 100644 --- a/music_assistant/providers/plex_connect/playback.py +++ b/music_assistant/providers/plex_connect/playback.py @@ -67,17 +67,12 @@ async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> N :param player_id: The player ID to seek on. :param offset: The offset in milliseconds. """ - for _ in range(10): # Try up to 10 times (5 seconds total) - await asyncio.sleep(0.5) - queue = self.provider.mass.player_queues.get(player_id) - if queue and queue.current_item: - try: - await self.provider.mass.players.cmd_seek(player_id, offset // 1000) - await asyncio.sleep(0.1) - break - except Exception as e: - LOGGER.debug(f"Could not seek to offset {offset}ms: {e}") - break + queue = self.provider.mass.player_queues.get(player_id) + if queue and queue.current_item: + try: + await self.provider.mass.players.cmd_seek(player_id, offset // 1000) + except Exception as e: + LOGGER.debug(f"Could not seek to offset {offset}ms: {e}") else: LOGGER.warning("Queue not ready for seeking after timeout")