diff --git a/music_assistant/providers/plex_connect/__init__.py b/music_assistant/providers/plex_connect/__init__.py index aa7af4d89d..acc091f7e9 100644 --- a/music_assistant/providers/plex_connect/__init__.py +++ b/music_assistant/providers/plex_connect/__init__.py @@ -20,7 +20,7 @@ from music_assistant.models.plugin import PluginProvider -from .player_remote import PlayerRemoteInstance +from .server import PlayerRemoteInstance if TYPE_CHECKING: from music_assistant_models.config_entries import ConfigValueType, ProviderConfig diff --git a/music_assistant/providers/plex_connect/manifest.json b/music_assistant/providers/plex_connect/manifest.json index 91e3e7da12..95d293101c 100644 --- a/music_assistant/providers/plex_connect/manifest.json +++ b/music_assistant/providers/plex_connect/manifest.json @@ -1,6 +1,6 @@ { "type": "plugin", - "stage": "alpha", + "stage": "beta", "domain": "plex_connect", "name": "Plex Connect Plugin", "description": "Makes a Music Assistant player appear as a device in the official Plex apps.", diff --git a/music_assistant/providers/plex_connect/playback.py b/music_assistant/providers/plex_connect/playback.py new file mode 100644 index 0000000000..90b5ffa80b --- /dev/null +++ b/music_assistant/providers/plex_connect/playback.py @@ -0,0 +1,285 @@ +"""Playback control command handlers for Plex remote control.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING + +from aiohttp import web +from music_assistant_models.enums import PlayerFeature, PlayerType, RepeatMode + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class PlaybackMixin: + """Mixin providing playback control command handlers.""" + + if TYPE_CHECKING: + provider: PlexProvider + _ma_player_id: str | None + _updating_from_plex: bool + play_queue_id: str | None + play_queue_item_ids: dict[int, int] + _last_synced_ma_queue_length: int + _last_synced_ma_queue_keys: list[str] + + async def _broadcast_timeline(self) -> None: ... + + async def _create_plex_playqueue_from_ma(self) -> None: ... + + def _collect_synced_keys(self, player_id: str) -> list[str]: ... + + async def _ungroup_player_if_needed(self, player_id: str) -> None: + """Ungroup player before playback if it's part of a group/sync. + + :param player_id: The player ID to potentially ungroup. + """ + player = self.provider.mass.players.get_player(player_id) + if not player or player.type == PlayerType.GROUP: + return + + if not (player.state.synced_to or player.state.group_members or player.state.active_group): + return + + LOGGER.debug("Ungrouping player %s before starting playback from Plex", player.display_name) + if ( + player.state.active_group + and (group := self.provider.mass.players.get_player(player.state.active_group)) + and group.supports_feature(PlayerFeature.SET_MEMBERS) + ): + await group.set_members(player_ids_to_remove=[player_id]) + elif ( + player.state.synced_to + and (sync_leader := self.provider.mass.players.get_player(player.state.synced_to)) + and sync_leader.supports_feature(PlayerFeature.SET_MEMBERS) + ): + await sync_leader.set_members(player_ids_to_remove=[player_id]) + elif player.state.group_members and player.supports_feature(PlayerFeature.SET_MEMBERS): + await player.set_members(player_ids_to_remove=player.group_members) + + async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None: + """Seek to the specified offset after playback starts. + + :param player_id: The player ID to seek on. + :param offset: The offset in milliseconds. + """ + queue = self.provider.mass.player_queues.get(player_id) + if queue and queue.current_item: + try: + await self.provider.mass.players.cmd_seek(player_id, offset // 1000) + except Exception as e: + LOGGER.debug(f"Could not seek to offset {offset}ms: {e}") + else: + LOGGER.warning("Queue not ready for seeking after timeout") + + async def handle_pause(self, request: web.Request) -> web.Response: + """Handle pause command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self.provider.mass.players.cmd_pause(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_play(self, request: web.Request) -> web.Response: + """Handle play/resume command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self._ungroup_player_if_needed(self._ma_player_id) + await self.provider.mass.players.cmd_play(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_stop(self, request: web.Request) -> web.Response: + """Handle stop command - stops playback and clears the queue.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + self.provider.mass.player_queues.clear(self._ma_player_id) + self.play_queue_id = None + self.play_queue_item_ids = {} + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_skip_next(self, request: web.Request) -> web.Response: + """Handle skip next command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self.provider.mass.player_queues.next(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_skip_previous(self, request: web.Request) -> web.Response: + """Handle skip previous command.""" + self._updating_from_plex = True + try: + if self._ma_player_id: + await self.provider.mass.player_queues.previous(self._ma_player_id) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_step_forward(self, request: web.Request) -> web.Response: + """Handle step forward command (small skip forward, 30 seconds).""" + self._updating_from_plex = True + try: + if self._ma_player_id: + queue = self.provider.mass.player_queues.get(self._ma_player_id) + if queue: + new_position = queue.corrected_elapsed_time + 30 + if queue.current_item and queue.current_item.media_item: + max_duration = queue.current_item.media_item.duration or new_position + new_position = min(new_position, max_duration) + await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) + await asyncio.sleep(0.1) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_step_back(self, request: web.Request) -> web.Response: + """Handle step back command (small skip backward, 10 seconds).""" + self._updating_from_plex = True + try: + if self._ma_player_id: + queue = self.provider.mass.player_queues.get(self._ma_player_id) + if queue: + new_position = max(0, queue.corrected_elapsed_time - 10) + await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) + await asyncio.sleep(0.1) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_seek_to(self, request: web.Request) -> web.Response: + """Handle seek command.""" + self._updating_from_plex = True + try: + offset_ms = int(request.query.get("offset", 0)) + if self._ma_player_id: + await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000)) + await asyncio.sleep(0.1) + await self._broadcast_timeline() + return web.Response(status=200) + finally: + self._updating_from_plex = False + + async def handle_skip_to(self, request: web.Request) -> web.Response: + """Handle skip to specific queue item.""" + key = request.query.get("key") + if not self._ma_player_id or not key: + return web.Response(status=400, text="Missing player ID or key") + + self._updating_from_plex = True + try: + ma_index = None + + if key.isdigit(): + # Key is a play queue item ID — look up MA queue index + play_queue_item_id = int(key) + for idx, pq_item_id in self.play_queue_item_ids.items(): + if pq_item_id == play_queue_item_id: + ma_index = idx + break + + if ma_index is None: + LOGGER.warning( + f"Could not find MA queue index for play queue item ID: " + f"{play_queue_item_id}" + ) + return web.Response(status=404, text="Queue item not found") + + LOGGER.info( + f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})" + ) + else: + # Key is a library path — find track in MA queue by Plex key + queue_items = self.provider.mass.player_queues.items(self._ma_player_id) + if not queue_items: + return web.Response(status=404, text="Queue is empty") + + for idx, item in enumerate(queue_items): + if not item.media_item: + continue + for mapping in item.media_item.provider_mappings: + if ( + mapping.provider_instance == self.provider.instance_id + and mapping.item_id == key + ): + ma_index = idx + break + if ma_index is not None: + break + + if ma_index is None: + LOGGER.warning(f"Could not find track with key {key} in MA queue") + return web.Response(status=404, text="Track not found in queue") + + LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})") + + await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index) + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error handling skipTo: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False + + async def handle_set_parameters(self, request: web.Request) -> web.Response: + """Handle parameter changes (volume, shuffle, repeat).""" + if not self._ma_player_id: + return web.Response(status=200) + + self._updating_from_plex = True + shuffle_changed = False + try: + if "volume" in request.query: + volume = int(request.query["volume"]) + await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume) + + if "shuffle" in request.query: + shuffle = request.query["shuffle"] == "1" + await self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle) + shuffle_changed = True + + if "repeat" in request.query: + repeat_value = int(request.query["repeat"]) + if repeat_value == 0: + self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF) + elif repeat_value == 1: + self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE) + elif repeat_value == 2: + self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL) + + await self._broadcast_timeline() + finally: + self._updating_from_plex = False + + if shuffle_changed: + # MA shuffles the queue asynchronously; give it a moment then sync the + # new order back to Plex so Plexamp sees the updated play queue. + await asyncio.sleep(0.2) + await self._create_plex_playqueue_from_ma() + synced_keys = self._collect_synced_keys(self._ma_player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + return web.Response(status=200) diff --git a/music_assistant/providers/plex_connect/player_remote.py b/music_assistant/providers/plex_connect/player_remote.py deleted file mode 100644 index a88a456372..0000000000 --- a/music_assistant/providers/plex_connect/player_remote.py +++ /dev/null @@ -1,1881 +0,0 @@ -"""Per-player Plex remote control instances.""" - -from __future__ import annotations - -import asyncio -import logging -import platform -import re -import time -import uuid -from collections.abc import Callable -from typing import TYPE_CHECKING, Any -from urllib.parse import urlparse - -from aiohttp import ClientTimeout, web -from music_assistant_models.enums import ( - EventType, - PlayerFeature, - PlayerType, - QueueOption, - RepeatMode, -) -from plexapi.playqueue import PlayQueue - -from .gdm import PlexGDMAdvertiser - -if TYPE_CHECKING: - from music_assistant_models.event import MassEvent - - from music_assistant.providers.plex import PlexProvider - - -LOGGER = logging.getLogger(__name__) - - -class PlayerRemoteInstance: - """Single remote control instance for one MA player.""" - - def __init__( - self, - plex_provider: PlexProvider, - ma_player_id: str, - player_name: str, - port: int, - device_class: str = "speaker", - remote_control: bool = False, - ) -> None: - """Initialize player remote instance. - - :param plex_provider: Plex provider instance. - :param ma_player_id: Music Assistant player ID. - :param player_name: Display name for the player. - :param port: Port for the remote control server. - :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). - :param remote_control: Whether to enable remote control. - """ - self.plex_provider = plex_provider - self.plex_server = plex_provider._plex_server - self.ma_player_id = ma_player_id - self.player_name = player_name - self.port = port - self.device_class = device_class - self.remote_control = remote_control - - self.client_id = str( - uuid.uuid5( - uuid.NAMESPACE_DNS, - f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}", - ) - ) - - if self.remote_control: - # Remote control server - self.server: PlexRemoteControlServer | None = None - # GDM advertiser - self.gdm: PlexGDMAdvertiser | None = None - - async def start(self) -> None: - """Start this player's remote control.""" - if self.remote_control: - # Create player-specific PlexServer instance with unique client identification - LOGGER.info( - f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}" - ) - - self.server = PlexRemoteControlServer( - plex_provider=self.plex_provider, - port=self.port, - client_id=self.client_id, - ma_player_id=self.ma_player_id, - device_class=self.device_class, - ) - LOGGER.info( - f"Remote control server for '{self.player_name}' bound to MA player: " - f"{self.ma_player_id}" - ) - - await self.server.start() - - # Step 4: Start GDM broadcasting - self.gdm = PlexGDMAdvertiser( - instance_id=self.client_id, - port=self.port, - publish_ip=str(self.plex_provider.mass.streams.publish_ip), - name=self.player_name, - product="Music Assistant", - version=self.plex_provider.mass.version - if self.plex_provider.mass.version != "0.0.0" - else "1.0.0", - ) - self.gdm.start() - - LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}") - - async def stop(self) -> None: - """Stop this player's remote control.""" - if self.remote_control: - if self.gdm: - await self.gdm.stop() - - if self.server: - await self.server.stop() - - LOGGER.info(f"Stopped remote control for player '{self.player_name}'") - - -class PlexRemoteControlServer: - """HTTP server to receive Plex remote control commands.""" - - def __init__( - self, - plex_provider: PlexProvider, - port: int = 32500, - client_id: str | None = None, - ma_player_id: str | None = None, - device_class: str = "speaker", - ) -> None: - """Initialize remote control server. - - :param plex_provider: Plex provider instance. - :param port: Port for the HTTP server. - :param client_id: Unique client identifier. - :param ma_player_id: Music Assistant player ID. - :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). - """ - self.provider = plex_provider - self.plex_server = plex_provider._plex_server - self.port = port - self.client_id = client_id or plex_provider.instance_id - self.device_class = device_class - self.app = web.Application() - self.subscriptions: dict[str, dict[str, object]] = {} - self.runner: web.AppRunner | None = None - self.http_site: web.TCPSite | None = None - - # Play queue tracking (Plex-specific state that doesn't exist in MA) - self.play_queue_id: str | None = None - self.play_queue_version: int = 1 - # Map queue index to item ID - self.play_queue_item_ids: dict[int, int] = {} - - # Track MA queue state to detect when we need to sync to Plex - self._last_synced_ma_queue_length: int = 0 - self._last_synced_ma_queue_keys: list[str] = [] - - # Specific MA player this server controls (set by PlayerRemoteInstance) - self._ma_player_id = ma_player_id - - # Store unsubscribe callbacks - self._unsub_callbacks: list[Callable[..., None]] = [] - - # Flag to prevent circular updates when we modify the queue ourselves - self._updating_from_plex = False - - self.player = self.provider.mass.players.get_player(self._ma_player_id) # type: ignore[arg-type] - - self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant" - - self.headers = { - "X-Plex-Device-Name": self.device_name, - "X-Plex-Session-Identifier": self.client_id, - "X-Plex-Client-Identifier": self.client_id, - "X-Plex-Product": "Music Assistant", - "X-Plex-Platform": "Music Assistant", - "X-Plex-Platform-Version": platform.release(), - } - - self._setup_routes() - - def _setup_routes(self) -> None: - """Set up all required endpoints.""" - # Root endpoint - self.app.router.add_get("/", self.handle_root) - - # Subscription management - self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe) - self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe) - self.app.router.add_get("/player/timeline/poll", self.handle_poll) - - # Playback commands - self.app.router.add_get("/player/playback/playMedia", self.handle_play_media) - self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue) - self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue) - self.app.router.add_get("/player/playback/pause", self.handle_pause) - self.app.router.add_get("/player/playback/play", self.handle_play) - self.app.router.add_get("/player/playback/stop", self.handle_stop) - self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next) - self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous) - self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward) - self.app.router.add_get("/player/playback/stepBack", self.handle_step_back) - self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to) - self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters) - self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to) - - # Resources endpoint - self.app.router.add_get("/resources", self.handle_resources) - - # CORS OPTIONS handler (for all routes) - self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options) - - # --- Catch-all fallback for debugging purposes --- - # self.app.router.add_route("*", "/{path_info:.*}", self.handle_unknown) - - async def start(self) -> None: - """Start HTTP server and GDM advertising.""" - self.runner = web.AppRunner(self.app) - await self.runner.setup() - - # Start HTTP server - self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port) - await self.http_site.start() - LOGGER.info(f"Plex remote control server started on HTTP port {self.port}") - - # Note: GDM advertising is handled by PlexProvider in __init__.py - # to avoid duplicate broadcasts - - # Subscribe to player and queue events for state synchronization - if self._ma_player_id: - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_player_event, - EventType.PLAYER_UPDATED, - id_filter=self._ma_player_id, - ) - ) - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_queue_event, - EventType.QUEUE_UPDATED, - id_filter=self._ma_player_id, - ) - ) - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_queue_event, - EventType.QUEUE_TIME_UPDATED, - id_filter=self._ma_player_id, - ) - ) - self._unsub_callbacks.append( - self.provider.mass.subscribe( - self._handle_queue_items_updated, - EventType.QUEUE_ITEMS_UPDATED, - id_filter=self._ma_player_id, - ) - ) - - async def stop(self) -> None: - """Stop the HTTP server.""" - # Unsubscribe from events - for unsub in self._unsub_callbacks: - unsub() - self._unsub_callbacks.clear() - - # Stop HTTP server - if self.http_site: - await self.http_site.stop() - if self.runner: - await self.runner.cleanup() - LOGGER.info("Plex remote control server stopped") - - async def handle_root(self, request: web.Request) -> web.Response: - """Handle root endpoint - return basic player info.""" - # Get player name - player_name = "Music Assistant" - if self._ma_player_id: - player = self.provider.mass.players.get_player(self._ma_player_id) - if player: - player_name = player.display_name - - xml = f""" - - -""" - return web.Response( - text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} - ) - - async def handle_subscribe(self, request: web.Request) -> web.Response: - """Handle timeline subscription from controller.""" - client_id = request.headers.get("X-Plex-Client-Identifier") - protocol = request.query.get("protocol", "http") - port = request.query.get("port") - command_id = int(request.query.get("commandID", 0)) - - if not client_id or not port: - return web.Response(status=400) - - self.subscriptions[client_id] = { - "url": f"{protocol}://{request.remote}:{port}", - "command_id": command_id, - "last_update": time.time(), - } - - LOGGER.info(f"Controller {client_id} subscribed for timeline updates") - await self._send_timeline(client_id) - return web.Response(status=200) - - async def handle_unsubscribe(self, request: web.Request) -> web.Response: - """Handle unsubscribe request.""" - client_id = request.headers.get("X-Plex-Client-Identifier") - if client_id in self.subscriptions: - del self.subscriptions[client_id] - LOGGER.info(f"Controller {client_id} unsubscribed") - return web.Response(status=200) - - async def handle_poll(self, request: web.Request) -> web.Response: - """Handle timeline poll request.""" - # Extract parameters - include_metadata = request.query.get("includeMetadata", "0") == "1" - command_id = request.query.get("commandID", "0") - - # Update subscription timestamp if this client is subscribed - client_id = request.headers.get("X-Plex-Client-Identifier") - if client_id and client_id in self.subscriptions: - self.subscriptions[client_id]["last_update"] = time.time() - - # Build timeline from current MA player state - timeline_xml = await self._build_timeline_xml( - include_metadata=include_metadata, command_id=command_id - ) - return web.Response( - text=timeline_xml, - content_type="text/xml", - headers={ - "X-Plex-Client-Identifier": self.client_id, - "Access-Control-Expose-Headers": "X-Plex-Client-Identifier", - "Access-Control-Allow-Origin": "*", - }, - ) - - async def _ungroup_player_if_needed(self, player_id: str) -> None: - """Ungroup player before playback if it's part of a group/sync.""" - player = self.provider.mass.players.get_player(player_id) - if not player or player.type == PlayerType.GROUP: - return - - if not (player.state.synced_to or player.state.group_members or player.state.active_group): - return - - LOGGER.debug("Ungrouping player %s before starting playback from Plex", player.display_name) - # Use set_members directly on the group to bypass static member check - if ( - player.state.active_group - and (group := self.provider.mass.players.get_player(player.state.active_group)) - and group.supports_feature(PlayerFeature.SET_MEMBERS) - ): - await group.set_members(player_ids_to_remove=[player_id]) - elif ( - player.state.synced_to - and (sync_leader := self.provider.mass.players.get_player(player.state.synced_to)) - and sync_leader.supports_feature(PlayerFeature.SET_MEMBERS) - ): - await sync_leader.set_members(player_ids_to_remove=[player_id]) - elif player.state.group_members and player.supports_feature(PlayerFeature.SET_MEMBERS): - await player.set_members(player_ids_to_remove=player.group_members) - - async def handle_play_media(self, request: web.Request) -> web.Response: - """ - Handle playMedia command from Plex controller. - - Plexamp sends various parameters: - - key: The item to play (track, album, playlist, etc.) - - containerKey: The container context (play queue) - - offset: Starting position in milliseconds - - shuffle: Whether to shuffle - - repeat: Repeat mode - """ - # Set flag to prevent circular updates - self._updating_from_plex = True - try: - key = request.query.get("key") - container_key = request.query.get("containerKey") - offset = int(request.query.get("offset", 0)) - shuffle = request.query.get("shuffle", "0") == "1" - - if not key: - return web.Response( - status=400, text="Missing required 'key' parameter for playMedia command" - ) - - LOGGER.info( - f"Received playMedia command - key: {key}, " - f"containerKey: {container_key}, offset: {offset}ms" - ) - - # Use the assigned player for this server instance - player_id = self._ma_player_id - if not player_id: - return web.Response(status=500, text="No player assigned to this server") - - # Ungroup player if it's part of a group/sync - # User selected this specific player, so remove from any groups - await self._ungroup_player_if_needed(player_id) - - if container_key and "/playQueues/" in container_key: - # Extract play queue ID from container key - queue_id_match = re.search(r"/playQueues/(\d+)", container_key) - if queue_id_match: - self.play_queue_id = queue_id_match.group(1) - self.play_queue_version = 1 - LOGGER.info(f"Playing from queue: {container_key} starting at {key}") - - await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset) - else: - # Reset queue tracking if no valid queue ID found - self.play_queue_id = None - self.play_queue_item_ids = {} - # Fall back to single track - media = await self._resolve_plex_item(key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=media, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - elif container_key: - # Playing from a regular container (album, playlist, artist) not a play queue - # Reset queue tracking - self.play_queue_id = None - self.play_queue_item_ids = {} - - # The key is the specific track, containerKey is the collection - media_to_play = await self._resolve_plex_item(container_key) - - # Queue the entire container - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=media_to_play, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - - else: - # Playing a single item, reset queue tracking - self.play_queue_id = None - self.play_queue_item_ids = {} - - media = await self._resolve_plex_item(key) - - # Replace the queue with this media - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=media, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - - # Set shuffle if requested - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) - - # Seek to offset if specified - if offset > 0: - await self._seek_to_offset_after_playback(player_id, offset) - - await self._broadcast_timeline() - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error handling playMedia: {e}") - return web.Response(status=500, text=str(e)) - finally: - # Clear flag after processing - self._updating_from_plex = False - - def _reorder_tracks_for_playback( - self, tracks: list[Any], start_index: int - ) -> tuple[list[Any], dict[int, int]]: - """Reorder tracks to start from a specific index and update item ID mappings. - - :param tracks: List of tracks to reorder. - :param start_index: Index of the track to start from. - :return: Tuple of (reordered tracks, updated item ID mappings). - """ - if start_index <= 0 or start_index >= len(tracks): - # No reordering needed - return tracks, self.play_queue_item_ids - - # Reorder: [selected track, tracks after it, tracks before it] - reordered_tracks = ( - tracks[start_index:] # From selected to end - + tracks[:start_index] # From start to selected - ) - - # Update play queue item ID mappings to reflect new order - new_item_ids = {} - for new_idx, old_idx in enumerate( - list(range(start_index, len(tracks))) + list(range(start_index)) - ): - if old_idx in self.play_queue_item_ids: - new_item_ids[new_idx] = self.play_queue_item_ids[old_idx] - - LOGGER.info(f"Started playback from offset {start_index} (reordered queue)") - return reordered_tracks, new_item_ids - - async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None: - """Seek to the specified offset after playback starts. - - :param player_id: The player ID to seek on. - :param offset: The offset in milliseconds. - """ - # Wait for the queue to have items loaded before seeking - for _ in range(10): # Try up to 10 times (5 seconds total) - await asyncio.sleep(0.5) - queue = self.provider.mass.player_queues.get(player_id) - if queue and queue.current_item: - try: - await self.provider.mass.players.cmd_seek(player_id, offset // 1000) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - break - except Exception as e: - LOGGER.debug(f"Could not seek to offset {offset}ms: {e}") - break - else: - LOGGER.warning("Queue not ready for seeking after timeout") - - async def _play_from_plex_queue( - self, - player_id: str, - container_key: str, - starting_key: str | None, - shuffle: bool, - offset: int, - ) -> None: - """Fetch play queue from Plex and load tracks. - - Starts playback immediately with the first track, - then loads remaining tracks in the background. - """ - try: - LOGGER.info(f"Fetching play queue: {container_key}") - - # Extract queue ID from container_key (e.g., "/playQueues/123" -> "123") - queue_id_match = re.search(r"/playQueues/(\d+)", container_key) - if not queue_id_match: - raise ValueError(f"Invalid container_key format: {container_key}") - - queue_id = queue_id_match.group(1) - - # Use plexapi to fetch the play queue - def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id) - - playqueue = await asyncio.to_thread(fetch_queue) - - if playqueue and playqueue.items: - # Get selected item offset from PlayQueue - this tells us which track to start from - selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0) - LOGGER.info(f"PlayQueue selected item offset: {selected_offset}") - - # Track play queue item IDs - self.play_queue_item_ids = {} - - # Fetch the first track to start playback immediately - first_item = ( - playqueue.items[selected_offset] - if selected_offset < len(playqueue.items) - else playqueue.items[0] - ) - first_track_key = first_item.key if hasattr(first_item, "key") else None - first_play_queue_item_id = ( - first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None - ) - - if not first_track_key: - LOGGER.error("No valid first track in play queue") - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - return - - # Fetch and start playing the first track immediately - try: - first_track = await self.provider.get_track(first_track_key) - LOGGER.info(f"Starting playback with first track: {first_track.name}") - - # Store first track's play queue item ID mapping - if first_play_queue_item_id: - self.play_queue_item_ids[0] = first_play_queue_item_id - - # Start playback immediately with just the first track - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=first_track, - option=QueueOption.REPLACE, - ) - - # Seek to offset if specified (do this before loading remaining tracks) - if offset > 0: - await self._seek_to_offset_after_playback(player_id, offset) - - # Broadcast timeline update immediately - await self._broadcast_timeline() - - # Now load the remaining tracks in the background - self.provider.mass.create_task( - self._load_remaining_queue_tracks( - player_id, playqueue, selected_offset, shuffle - ) - ) - - except Exception as e: - LOGGER.exception(f"Error starting playback with first track: {e}") - # Fall back to single track - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - else: - LOGGER.error("Play queue is empty or could not be fetched") - # Fall back to single track - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - - except Exception as e: - LOGGER.exception(f"Error playing from queue: {e}") - # Fall back to single track - if starting_key: - track = await self.provider.get_track(starting_key) - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=track, - option=QueueOption.REPLACE, - ) - - async def _load_remaining_queue_tracks( - self, - player_id: str, - playqueue: PlayQueue, - selected_offset: int, - shuffle: bool, - ) -> None: - """Load remaining tracks from play queue in the background. - - :param player_id: The Music Assistant player ID. - :param playqueue: The Plex play queue. - :param selected_offset: The offset of the track that's already playing. - :param shuffle: Whether shuffle is enabled. - """ - try: - # Prepare to fetch all tracks except the first one - remaining_items = [] - - # Get items after selected track - for i in range(selected_offset + 1, len(playqueue.items)): - remaining_items.append((i, playqueue.items[i])) - - # Get items before selected track (these will be added at the end) - for i in range(selected_offset): - remaining_items.append((i, playqueue.items[i])) - - if not remaining_items: - LOGGER.debug("No remaining tracks to load") - return - - # Fetch all remaining tracks concurrently - async def fetch_track( - plex_idx: int, item: Any - ) -> tuple[int, object | None, int | None]: - """Fetch a single track from Plex.""" - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = ( - item.playQueueItemID if hasattr(item, "playQueueItemID") else None - ) - - if track_key: - try: - track = await self.provider.get_track(track_key) - return plex_idx, track, play_queue_item_id - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - - return plex_idx, None, None - - # Fetch all tracks in parallel - fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items] - results = await asyncio.gather(*fetch_tasks, return_exceptions=True) - - # Process results and build track list - tracks_to_add: list[object] = [] - for result in results: - if isinstance(result, Exception): - LOGGER.debug(f"Error fetching track: {result}") - continue - - # result is guaranteed to be a tuple here after the Exception check - _plex_idx, track, play_queue_item_id = result # type: ignore[misc] - if track: - ma_idx = len(tracks_to_add) + 1 # +1 because first track is already queued - tracks_to_add.append(track) - - # Store play queue item ID mapping - if play_queue_item_id: - self.play_queue_item_ids[ma_idx] = play_queue_item_id - - if tracks_to_add: - LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue") - - # Add remaining tracks to the queue - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=tracks_to_add, # type: ignore[arg-type] - option=QueueOption.ADD, - ) - - # Update tracked state to prevent sync loop - queue_items = self.provider.mass.player_queues.items(player_id) - synced_keys = [] - for item in queue_items: - if item.media_item: - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - synced_keys.append(mapping.item_id) - break - self._last_synced_ma_queue_length = len(synced_keys) - self._last_synced_ma_queue_keys = synced_keys - - # Apply shuffle if requested (after all tracks are loaded) - if shuffle: - await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) - - LOGGER.info( - f"Successfully loaded {len(tracks_to_add)} remaining tracks " - f"(total queue: {len(synced_keys)} tracks)" - ) - else: - LOGGER.warning("No valid remaining tracks found in play queue") - - except Exception as e: - LOGGER.exception(f"Error loading remaining queue tracks: {e}") - - async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: - """Replace the entire queue when nothing is currently playing. - - :param player_id: The Music Assistant player ID. - :param playqueue: The Plex play queue to load. - """ - all_tracks = [] - self.play_queue_item_ids = {} - - for i, item in enumerate(playqueue.items): - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None - - if track_key: - try: - track = await self.provider.get_track(track_key) - all_tracks.append(track) - - if play_queue_item_id: - self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - continue - - if all_tracks: - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=all_tracks, # type: ignore[arg-type] - option=QueueOption.REPLACE, - ) - LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks") - - async def _replace_remaining_queue( - self, player_id: str, playqueue: PlayQueue, current_index: int - ) -> None: - """Replace only items after the current track. - - :param player_id: The Music Assistant player ID. - :param playqueue: The Plex play queue to load. - :param current_index: The current track index in the MA queue. - """ - # Fetch tracks that come AFTER the current track in the Plex queue - remaining_tracks = [] - new_item_mappings = {} - - # Start from the track after current_index - for i in range(current_index + 1, len(playqueue.items)): - item = playqueue.items[i] - track_key = item.key if hasattr(item, "key") else None - play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None - - if track_key: - try: - track = await self.provider.get_track(track_key) - remaining_tracks.append(track) - - # Map relative to the current position - if play_queue_item_id: - new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = ( - play_queue_item_id - ) - except Exception as e: - LOGGER.debug(f"Could not fetch track {track_key}: {e}") - continue - - # Replace items after current track - if remaining_tracks: - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=remaining_tracks, # type: ignore[arg-type] - option=QueueOption.REPLACE_NEXT, # Replace everything after current - ) - # Update mappings for the new items - self.play_queue_item_ids.update(new_item_mappings) - - LOGGER.info( - f"Replaced {len(remaining_tracks)} tracks after current track " - f"(index {current_index})" - ) - else: - # No tracks after current - clear remaining queue - LOGGER.debug("No tracks after current track in Plex queue") - - # Rebuild complete item ID mappings from Plex queue - # Keep mappings for tracks from index 0 to current_index unchanged - for i, item in enumerate(playqueue.items): - play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None - if play_queue_item_id: - self.play_queue_item_ids[i] = play_queue_item_id - - async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: - """ - Handle refreshPlayQueue command from Plex controller. - - This is called when the play queue is modified (items added, removed, reordered). - We need to sync the entire updated queue state to MA while preserving playback. - """ - try: - play_queue_id = request.query.get("playQueueID") - - if not play_queue_id: - return web.Response(status=400, text="Missing 'playQueueID' parameter") - - # Log all query parameters to understand what Plex sends - LOGGER.info( - f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, " - f"params: {dict(request.query)}" - ) - - # Verify this is our active play queue - if self.play_queue_id != play_queue_id: - LOGGER.warning( - f"Refresh requested for queue {play_queue_id} but active queue is " - f"{self.play_queue_id}" - ) - return web.Response( - status=409, - text=( - f"Requested playQueueID {play_queue_id} does not match " - f"active queue {self.play_queue_id}" - ), - ) - - # Update the play queue version (increments on each refresh) - self.play_queue_version += 1 - - # Use plexapi to fetch the updated play queue - def fetch_queue() -> PlayQueue: - return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id) - - playqueue = await asyncio.to_thread(fetch_queue) - - if not playqueue or not playqueue.items: - LOGGER.error("Failed to refresh play queue - queue is empty or not found") - return web.Response(status=404, text="Play queue not found") - - # Get current MA queue state - player_id = self._ma_player_id - if not player_id: - LOGGER.error("No player assigned to this server") - return web.Response(status=500, text="No player assigned") - - # disable shuffle to avoid infinite loop - await self.provider.mass.player_queues.set_shuffle(player_id, False) - ma_queue = self.provider.mass.player_queues.get(player_id) - if not ma_queue: - LOGGER.error(f"MA queue not found for player {player_id}") - return web.Response(status=500, text="MA queue not found") - - # Get current playback state - current_index = ma_queue.current_index - - # Get MA queue item count - ma_queue_items = self.provider.mass.player_queues.items(player_id) - ma_queue_count = len(ma_queue_items) if ma_queue_items else 0 - - LOGGER.debug( - f"Queue refresh: Current index={current_index}, " - f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items" - ) - - # If nothing is playing, replace the entire queue - if current_index is None: - LOGGER.debug("No track currently playing, replacing entire queue") - await self._replace_entire_queue(player_id, playqueue) - else: - # Something is playing - update only the remaining queue items - LOGGER.debug( - f"Track at index {current_index} is playing, " - f"replacing only items after current track" - ) - await self._replace_remaining_queue(player_id, playqueue, current_index) - - LOGGER.info( - f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items" - ) - - # Update tracked state to prevent sync loop - # Get what's actually in MA queue after the refresh - queue_items_after = self.provider.mass.player_queues.items(player_id) - synced_keys = [] - for item in queue_items_after: - if item.media_item: - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - synced_keys.append(mapping.item_id) - break - self._last_synced_ma_queue_length = len(synced_keys) - self._last_synced_ma_queue_keys = synced_keys - - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error handling refreshPlayQueue: {e}") - return web.Response(status=500, text=str(e)) - - async def handle_create_play_queue(self, request: web.Request) -> web.Response: - """ - Handle createPlayQueue command from Plex controller. - - Creates a new play queue from a URI (album, playlist, artist tracks, etc.) - and optionally applies shuffle. - """ - try: - uri = request.query.get("uri") - shuffle = request.query.get("shuffle", "0") == "1" - continuous = request.query.get("continuous", "0") == "1" - - if not uri: - return web.Response(status=400, text="Missing 'uri' parameter") - - LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}") - - # Use the assigned player for this server instance - player_id = self._ma_player_id - if not player_id: - return web.Response(status=500, text="No player assigned to this server") - - # Use plexapi to create play queue - def create_queue() -> PlayQueue: - # Fetch the item from URI first - item = self.provider._plex_server.fetchItem(uri) - # Create play queue from the item - return PlayQueue.create( - self.provider._plex_server, - item, - shuffle=1 if shuffle else 0, - continuous=1 if continuous else 0, - ) - - playqueue = await asyncio.to_thread(create_queue) - - if playqueue and playqueue.items: - # Extract play queue ID from response - self.play_queue_id = str(playqueue.playQueueID) - self.play_queue_version = 1 - - LOGGER.info( - f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items" - ) - - # Fetch the first track to start playback immediately - self.play_queue_item_ids = {} - first_item = playqueue.items[0] - first_track_key = first_item.key if hasattr(first_item, "key") else None - first_play_queue_item_id = ( - first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None - ) - - if not first_track_key: - LOGGER.error("No valid first track in created play queue") - return web.Response(status=500, text="Failed to load tracks from play queue") - - try: - # Fetch and start playing the first track immediately - first_track = await self.provider.get_track(first_track_key) - LOGGER.info(f"Starting playback with first track: {first_track.name}") - - # Store first track's play queue item ID mapping - if first_play_queue_item_id: - self.play_queue_item_ids[0] = first_play_queue_item_id - - # Start playback immediately with just the first track - await self.provider.mass.player_queues.play_media( - queue_id=player_id, - media=first_track, - option=QueueOption.REPLACE, - ) - - # Now load the remaining tracks in the background - if len(playqueue.items) > 1: - self.provider.mass.create_task( - self._load_remaining_queue_tracks( - player_id, - playqueue, - 0, # Selected offset is 0 since we started from the first track - shuffle, - ) - ) - - # Broadcast timeline update - await self._broadcast_timeline() - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error starting playback with first track: {e}") - return web.Response(status=500, text=f"Failed to start playback: {e}") - else: - LOGGER.error("Failed to create play queue or queue is empty") - return web.Response(status=500, text="Failed to create play queue") - - except Exception as e: - LOGGER.exception(f"Error handling createPlayQueue: {e}") - return web.Response(status=500, text=str(e)) - - async def _resolve_plex_item(self, key: str) -> object: - """Resolve a Plex key to a Music Assistant media item.""" - # Determine item type from the key format - if "/library/metadata/" in key: - # Could be track, album, or artist - # Try to fetch as track first - try: - return await self.provider.get_track(key) - except Exception as exc: - LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}") - - # Try as album - try: - return await self.provider.get_album(key) - except Exception as exc: - LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}") - - # Try as artist - try: - return await self.provider.get_artist(key) - except Exception: - raise ValueError(f"Could not resolve Plex item: {key}") from None - - elif "/playlists/" in key: - return await self.provider.get_playlist(key) - else: - raise ValueError(f"Unknown Plex key format: {key}") - - async def handle_pause(self, request: web.Request) -> web.Response: - """Handle pause command (test-client.py line 98-101).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - await self.provider.mass.players.cmd_pause(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_play(self, request: web.Request) -> web.Response: - """Handle play/resume command (test-client.py line 103-106).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - # Ungroup player before resuming playback - await self._ungroup_player_if_needed(self._ma_player_id) - await self.provider.mass.players.cmd_play(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_stop(self, request: web.Request) -> web.Response: - """Handle stop command - stops playback and clears the queue.""" - self._updating_from_plex = True - try: - if self._ma_player_id: - # Clear the queue (which also stops playback) - self.provider.mass.player_queues.clear(self._ma_player_id) - - # Reset play queue tracking since the queue is now cleared - self.play_queue_id = None - self.play_queue_item_ids = {} - - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_skip_next(self, request: web.Request) -> web.Response: - """Handle skip next command.""" - self._updating_from_plex = True - try: - if self._ma_player_id: - await self.provider.mass.player_queues.next(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_skip_previous(self, request: web.Request) -> web.Response: - """Handle skip previous command.""" - self._updating_from_plex = True - try: - if self._ma_player_id: - await self.provider.mass.player_queues.previous(self._ma_player_id) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_step_forward(self, request: web.Request) -> web.Response: - """Handle step forward command (small skip forward).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - queue = self.provider.mass.player_queues.get(self._ma_player_id) - if queue: - # Step forward 30 seconds - new_position = queue.corrected_elapsed_time + 30 - if queue.current_item and queue.current_item.media_item: - # Don't seek past the track duration - max_duration = queue.current_item.media_item.duration or new_position - new_position = min(new_position, max_duration) - await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_step_back(self, request: web.Request) -> web.Response: - """Handle step back command (small skip backward).""" - self._updating_from_plex = True - try: - if self._ma_player_id: - queue = self.provider.mass.player_queues.get(self._ma_player_id) - if queue: - # Step back 10 seconds - new_position = max(0, queue.corrected_elapsed_time - 10) - await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position)) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_skip_to(self, request: web.Request) -> web.Response: - """Handle skip to specific queue item.""" - key = request.query.get("key") - if not self._ma_player_id or not key: - return web.Response(status=400, text="Missing player ID or key") - - self._updating_from_plex = True - try: - ma_index = None - - # Check if key is a play queue item ID (numeric) or a library path - if key.isdigit(): - # Key is a play queue item ID - play_queue_item_id = int(key) - - # Find the MA queue index for this play queue item ID - for idx, pq_item_id in self.play_queue_item_ids.items(): - if pq_item_id == play_queue_item_id: - ma_index = idx - break - - if ma_index is None: - LOGGER.warning( - f"Could not find MA queue index for play queue item ID: " - f"{play_queue_item_id}" - ) - return web.Response(status=404, text="Queue item not found") - - LOGGER.info( - f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})" - ) - else: - # Key is a library path (e.g., "/library/metadata/856761") - # Find the track in the MA queue by matching the Plex key - queue_items = self.provider.mass.player_queues.items(self._ma_player_id) - if not queue_items: - return web.Response(status=404, text="Queue is empty") - - for idx, item in enumerate(queue_items): - if not item.media_item: - continue - - # Find Plex mapping for this track - for mapping in item.media_item.provider_mappings: - if ( - mapping.provider_instance == self.provider.instance_id - and mapping.item_id == key - ): - ma_index = idx - break - - if ma_index is not None: - break - - if ma_index is None: - LOGGER.warning(f"Could not find track with key {key} in MA queue") - return web.Response(status=404, text="Track not found in queue") - - LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})") - - # Skip to this index in the MA queue - await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index) - - await self._broadcast_timeline() - return web.Response(status=200) - - except Exception as e: - LOGGER.exception(f"Error handling skipTo: {e}") - return web.Response(status=500, text=str(e)) - finally: - self._updating_from_plex = False - - async def handle_seek_to(self, request: web.Request) -> web.Response: - """Handle seek command.""" - self._updating_from_plex = True - try: - offset_ms = int(request.query.get("offset", 0)) - if self._ma_player_id: - await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000)) - # Wait briefly for player state to update - await asyncio.sleep(0.1) - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_set_parameters(self, request: web.Request) -> web.Response: - """Handle parameter changes (volume, shuffle, repeat).""" - if not self._ma_player_id: - return web.Response(status=200) - - self._updating_from_plex = True - try: - if "volume" in request.query: - volume = int(request.query["volume"]) - await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume) - - if "shuffle" in request.query: - # Plex sends shuffle as "0" or "1" - shuffle = request.query["shuffle"] == "1" - await self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle) - - if "repeat" in request.query: - # Plex repeat: 0=off, 1=repeat one, 2=repeat all - repeat_value = int(request.query["repeat"]) - - # Map Plex repeat to MA repeat mode - if repeat_value == 0: - # Repeat off - self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF) - elif repeat_value == 1: - # Repeat one track - self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE) - elif repeat_value == 2: - # Repeat all - self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL) - - await self._broadcast_timeline() - return web.Response(status=200) - finally: - self._updating_from_plex = False - - async def handle_options(self, request: web.Request) -> web.Response: - """Handle OPTIONS requests for CORS (like test-client.py).""" - return web.Response( - status=200, - headers={ - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "GET, POST, OPTIONS", - "Access-Control-Allow-Headers": "*", - }, - ) - - async def handle_resources(self, request: web.Request) -> web.Response: - """Return player information (matching test-client.py format exactly).""" - # Get player name - player_name = "Music Assistant" - if self._ma_player_id: - player = self.provider.mass.players.get_player(self._ma_player_id) - if player: - player_name = player.display_name - - # Get player state - state = "stopped" - if self._ma_player_id: - player = self.provider.mass.players.get_player(self._ma_player_id) - if player and player.state: - state_value = ( - player.state.value if hasattr(player.state, "value") else str(player.state) - ) - if state_value in ["playing", "paused"]: - state = state_value - - local_ip = self.provider.mass.streams.publish_ip - version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0" - - # Match test-client.py format exactly - xml = f""" - - - - -""" - return web.Response( - text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} - ) - - def _build_timeline_attributes( - self, - track: Any, - state: str, - duration: int, - time: int, - volume: int, - shuffle: int, - repeat: int, - controllable: str, - queue: Any | None, - ) -> list[str]: - """Build timeline attributes for a playing track. - - :param track: The current track media item. - :param state: Playback state (playing, paused, etc.). - :param duration: Track duration in milliseconds. - :param time: Current playback time in milliseconds. - :param volume: Volume level (0-100). - :param shuffle: Shuffle state (0 or 1). - :param repeat: Repeat mode (0=off, 1=one, 2=all). - :param controllable: Controllable features string. - :param queue: The MA queue object. - :return: List of timeline attribute strings. - """ - # Get Plex key and ratingKey - key = None - rating_key = None - for mapping in track.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - key = mapping.item_id - rating_key = key.split("/")[-1] - break - - if not key: - return [] - - # Server identification - plex_url = urlparse(self.provider._baseurl) - machine_identifier = self.provider._plex_server.machineIdentifier - address = plex_url.hostname - port = plex_url.port or (443 if plex_url.scheme == "https" else 32400) - protocol = plex_url.scheme - - # Build timeline attributes - attrs = [ - f'state="{state}"', - f'duration="{duration}"', - f'time="{time}"', - f'ratingKey="{rating_key}"', - f'key="{key}"', - ] - - # Add play queue info if available - if self.play_queue_id and queue: - if queue.current_index is not None: - play_queue_item_id = self.play_queue_item_ids.get( - queue.current_index, queue.current_index + 1 - ) - attrs.append(f'playQueueItemID="{play_queue_item_id}"') - attrs.append(f'playQueueID="{self.play_queue_id}"') - attrs.append(f'playQueueVersion="{self.play_queue_version}"') - attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"') - - # Add standard attributes - attrs.extend( - [ - 'type="music"', - f'volume="{volume}"', - f'shuffle="{shuffle}"', - f'repeat="{repeat}"', - f'controllable="{controllable}"', - f'machineIdentifier="{machine_identifier}"', - f'address="{address}"', - f'port="{port}"', - f'protocol="{protocol}"', - ] - ) - - return attrs - - async def _build_timeline_xml( - self, include_metadata: bool = False, command_id: str = "0" - ) -> str: - """Build timeline XML from current Music Assistant player state.""" - player_id = self._ma_player_id - - # Get MA player and queue - player = self.provider.mass.players.get_player(player_id) if player_id else None - queue = self.provider.mass.player_queues.get(player_id) if player_id else None - - # Controllable features for music - controllable = ( - "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext" - ) - - # Map MA playback state to Plex state (stopped, paused, playing, buffering, error) - state = "stopped" - if player and player.playback_state: - state_value = ( - player.playback_state.value - if hasattr(player.playback_state, "value") - else str(player.playback_state) - ) - - # Map MA states to Plex states - if state_value == "playing": - state = "playing" - elif state_value == "paused": - state = "paused" - elif state_value == "buffering": - state = "buffering" - elif state_value == "idle": - # Idle with a current track = paused, idle without track = stopped - state = ( - "paused" - if queue and queue.current_item and queue.current_item.media_item - else "stopped" - ) - else: - state = "stopped" - - # Get volume (0-100) - use group_volume for groups, volume_level for others - volume = 0 - if player: - volume = ( - int(player.group_volume or 0) - if (player.type == PlayerType.GROUP or player.group_members) - else (int(player.volume_level or 0)) - ) - - # Get shuffle (0/1) and repeat (0=off, 1=one, 2=all) - shuffle = 0 - repeat = 0 - if queue: - shuffle = 1 if queue.shuffle_enabled else 0 - if hasattr(queue, "repeat_mode"): - repeat_mode = queue.repeat_mode - if hasattr(repeat_mode, "value"): - repeat_value = repeat_mode.value - if repeat_value == "one": - repeat = 1 - elif repeat_value == "all": - repeat = 2 - - # Build music timeline - if ( - state in ["playing", "paused"] - and queue - and queue.current_item - and queue.current_item.media_item - ): - track = queue.current_item.media_item - - # Duration in milliseconds - duration = round(track.duration * 1000) if track.duration else 0 - - # Current playback time in milliseconds - time = round(queue.corrected_elapsed_time * 1000) - - # Build timeline attributes - attrs = self._build_timeline_attributes( - track, state, duration, time, volume, shuffle, repeat, controllable, queue - ) - - if attrs: - music_timeline = f"" - else: - # No Plex mapping, send basic timeline with actual state - music_timeline = ( - f'' - ) - else: - # No current track - send stopped state with time=0 - time = 0 - music_timeline = ( - f'' - ) - - # Video and photo timelines (always stopped for music player) - video_timeline = '' - photo_timeline = '' - - # Combine all timelines - return ( - f'' - f"{music_timeline}{video_timeline}{photo_timeline}" - f"" - ) - - async def _handle_player_event(self, event: MassEvent) -> None: - """Handle player state change events.""" - if not self._ma_player_id or event.object_id != self._ma_player_id: - return - - # Skip if we're the ones making the changes - if self._updating_from_plex: - return - - try: - # Send timeline to Plex server (for activity tracking) - await self._send_timeline_to_server() - - # Broadcast timeline to subscribed controllers - # Timeline will be built from current MA player state - await self._broadcast_timeline() - except Exception as e: - LOGGER.debug(f"Error handling player event: {e}") - - async def _handle_queue_event(self, event: MassEvent) -> None: - """Handle queue change events.""" - if not self._ma_player_id or event.object_id != self._ma_player_id: - return - - # Skip if we're the ones making the changes - if self._updating_from_plex: - return - - try: - # Send timeline to Plex server (for activity tracking) - await self._send_timeline_to_server() - - # Broadcast timeline to subscribed controllers - # Timeline will be built from current MA player state - await self._broadcast_timeline() - except Exception as e: - LOGGER.debug(f"Error handling queue event: {e}") - - async def _handle_queue_items_updated(self, event: MassEvent) -> None: - """Handle queue items being added/removed/reordered.""" - if not self._ma_player_id or event.object_id != self._ma_player_id: - return - - # Skip if we're the ones making the changes - if self._updating_from_plex: - return - - # Get current MA queue state - queue_items = self.provider.mass.player_queues.items(self._ma_player_id) - if not queue_items: - return - - current_keys = [] - for item in queue_items: - if not item.media_item: - continue - # Find Plex mapping - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - current_keys.append(mapping.item_id) - break - - # Check if queue actually changed from what we last synced FROM Plex - if ( - len(current_keys) == self._last_synced_ma_queue_length - and current_keys == self._last_synced_ma_queue_keys - ): - # Queue hasn't changed from last sync, skip - LOGGER.debug("MA queue matches last synced state, skipping Plex sync") - return - - LOGGER.info( - f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items" - ) - - # (Re)create Plex PlayQueue from MA queue - try: - await self._create_plex_playqueue_from_ma() - # Update tracked state - self._last_synced_ma_queue_length = len(current_keys) - self._last_synced_ma_queue_keys = current_keys - except Exception as e: - LOGGER.debug(f"Error creating Plex PlayQueue: {e}") - - # Broadcast timeline update - try: - await self._broadcast_timeline() - except Exception as e: - LOGGER.debug(f"Error broadcasting timeline: {e}") - - async def _create_plex_playqueue_from_ma(self) -> None: - """Create a new Plex PlayQueue from current MA queue.""" - ma_queue = self.provider.mass.player_queues.get(self._ma_player_id) # type: ignore[arg-type] - queue_items = self.provider.mass.player_queues.items(self._ma_player_id) # type: ignore[arg-type] - - if not ma_queue or not queue_items: - return - - # Fetch Plex items for all tracks in MA queue - async def fetch_plex_item(plex_key: str) -> object | None: - """Fetch a single Plex item.""" - try: - - def fetch_item() -> object: - return self.plex_server.fetchItem(plex_key) - - return await asyncio.to_thread(fetch_item) - except Exception as e: - LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}") - return None - - # Collect all fetch tasks - fetch_tasks = [] - for item in queue_items: - if not item.media_item: - continue - - # Find Plex mapping - plex_key = None - for mapping in item.media_item.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - plex_key = mapping.item_id - break - - if plex_key: - fetch_tasks.append(fetch_plex_item(plex_key)) - - # Fetch all items concurrently - plex_items = [] - if fetch_tasks: - fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True) - plex_items = [item for item in fetched_items if item is not None] - - if not plex_items: - LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation") - return - - # Determine which track should be selected (currently playing) - start_item = None - if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items): - start_item = plex_items[ma_queue.current_index] - - # Create Plex PlayQueue - don't pass shuffle since MA queue is already in desired order - def create_queue() -> PlayQueue: - return PlayQueue.create( - self.plex_server, - items=plex_items, - startItem=start_item, - shuffle=0, # Don't shuffle, plex_items is already in MA queue order - continuous=1, - ) - - try: - playqueue = await asyncio.to_thread(create_queue) - - if playqueue: - self.play_queue_id = str(playqueue.playQueueID) - self.play_queue_version = playqueue.playQueueVersion - - # Build item ID mappings - self.play_queue_item_ids = {} - for i, item in enumerate(playqueue.items): - if hasattr(item, "playQueueItemID"): - self.play_queue_item_ids[i] = item.playQueueItemID - - LOGGER.info( - f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks" - ) - except Exception as e: - LOGGER.exception(f"Error creating Plex PlayQueue: {e}") - - async def _send_timeline(self, client_id: str) -> None: - """Send timeline update to specific controller.""" - subscription = self.subscriptions.get(client_id) - if not subscription: - return - - timeline_xml = await self._build_timeline_xml() - - try: - await self.provider.mass.http_session.post( - f"{subscription['url']}/:/timeline", - data=timeline_xml, - headers={ - "X-Plex-Client-Identifier": self.client_id, - "Content-Type": "text/xml", - }, - timeout=ClientTimeout(total=5), - ) - # Update last_update timestamp on successful send - subscription["last_update"] = time.time() - except Exception as e: - LOGGER.debug(f"Failed to send timeline to {client_id}: {e}") - - async def _send_timeline_to_server(self) -> None: - """Send timeline update to Plex server for activity tracking.""" - if not self._ma_player_id: - return - - try: - player = self.provider.mass.players.get_player(self._ma_player_id) - queue = self.provider.mass.player_queues.get(self._ma_player_id) - - if ( - not player - or not queue - or not queue.current_item - or not queue.current_item.media_item - ): - return - - track = queue.current_item.media_item - - # Find Plex mapping - plex_key = None - for mapping in track.provider_mappings: - if mapping.provider_instance == self.provider.instance_id: - plex_key = mapping.item_id - break - - if not plex_key: - return - - # Extract rating key from plex_key (e.g., "/library/metadata/12345" -> "12345") - rating_key = plex_key.split("/")[-1] - - # Get playback state - state_value = ( - player.playback_state.value - if hasattr(player.playback_state, "value") - else str(player.playback_state) - ) - - # Map to Plex state - if state_value == "playing": - plex_state = "playing" - elif state_value == "paused": - plex_state = "paused" - else: - plex_state = "stopped" - - # Get position and duration in milliseconds - position_ms = round(queue.corrected_elapsed_time * 1000) - duration_ms = round(track.duration * 1000) if track.duration else 0 - - # Get play queue info if available - container_key = "" - play_queue_item_id = "" - if self.play_queue_id: - container_key = f"/playQueues/{self.play_queue_id}" - if queue.current_index is not None: - play_queue_item_id = str( - self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1) - ) - - # Build timeline params (only Plex timeline data) - params = { - "ratingKey": rating_key, - "key": plex_key, - "state": plex_state, - "time": str(position_ms), - "duration": str(duration_ms), - } - - # Add play queue info if available - if container_key: - params["containerKey"] = container_key - if play_queue_item_id: - params["playQueueItemID"] = play_queue_item_id - - def send_timeline() -> None: - # Pass session headers to identify this specific player instance - self.plex_server.query("/:/timeline", params=params, headers=self.headers) - - await asyncio.to_thread(send_timeline) - - except Exception as e: - LOGGER.debug(f"Failed to send timeline to Plex server: {e}") - - async def _broadcast_timeline(self) -> None: - """Send timeline to all subscribed controllers.""" - current_time = time.time() - stale_clients = [] - for client_id, sub in self.subscriptions.items(): - try: - last_update = float(sub["last_update"]) # type: ignore[arg-type] - if current_time - last_update > 90: - stale_clients.append(client_id) - except (ValueError, TypeError): - # If conversion fails, treat client as stale - LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale") - stale_clients.append(client_id) - - for client_id in stale_clients: - del self.subscriptions[client_id] - - await asyncio.gather( - *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())), - return_exceptions=True, # Don't fail all if one fails - ) - - # for debugging purposes only - # async def handle_unknown(self, request: web.Request) -> web.Response: - # """Catch-all handler for unexpected or unsupported paths.""" - # LOGGER.debug( - # "Unhandled request: %s %s from %s", - # request.method, - # request.path, - # request.remote, - # ) - # - # # You can log query/body if needed (be careful not to leak tokens) - # if request.query: - # LOGGER.debug("Query params for %s: %s", request.path, dict(request.query)) - # try: - # data = await request.text() - # if data: - # LOGGER.debug("Body for %s: %s", request.path, data) - # except Exception as e: - # LOGGER.debug("Could not read request body: %s", e) - # - # return web.Response(status=404, text=f"Unhandled path: {request.path}") diff --git a/music_assistant/providers/plex_connect/queue_commands.py b/music_assistant/providers/plex_connect/queue_commands.py new file mode 100644 index 0000000000..0dca514a81 --- /dev/null +++ b/music_assistant/providers/plex_connect/queue_commands.py @@ -0,0 +1,591 @@ +"""Play queue command handlers for Plex remote control. + +Handles playMedia, createPlayQueue and refreshPlayQueue HTTP commands. +""" + +from __future__ import annotations + +import asyncio +import logging +import re +from typing import TYPE_CHECKING, Any +from urllib.parse import unquote + +from aiohttp import web +from music_assistant_models.enums import QueueOption +from plexapi.playqueue import PlayQueue + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class QueueCommandsMixin: + """Mixin providing HTTP handlers for Plex play queue commands.""" + + if TYPE_CHECKING: + provider: PlexProvider + _ma_player_id: str | None + _updating_from_plex: bool + play_queue_id: str | None + play_queue_version: int + play_queue_item_ids: dict[int, int] + _last_synced_ma_queue_length: int + _last_synced_ma_queue_keys: list[str] + + async def _broadcast_timeline(self) -> None: ... + + async def _ungroup_player_if_needed(self, player_id: str) -> None: ... + + async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None: ... + + async def _load_remaining_queue_tracks( + self, + player_id: str, + playqueue: PlayQueue, + selected_offset: int, + shuffle: bool, + ) -> None: ... + + async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: ... + + async def _replace_remaining_queue( + self, player_id: str, playqueue: PlayQueue, current_index: int + ) -> None: ... + + def _collect_synced_keys(self, player_id: str) -> list[str]: ... + + async def _create_plex_playqueue_from_ma(self) -> None: ... + + async def _fetch_full_play_queue(self, queue_id: str) -> PlayQueue | None: + """Fetch a complete PlayQueue, paginating past the server's per-request window cap. + + The Plex server caps each response to ~200 items regardless of the requested + window size. We use playQueueTotalCount to detect truncation and keep fetching + forward pages until we have every item. + + :param queue_id: The Plex PlayQueue ID to fetch. + :return: A PlayQueue whose items list contains all tracks, or None on failure. + """ + page_size = 200 + plex_server = self.provider._plex_server + + def fetch_initial() -> PlayQueue: + # own=True transfers ownership of this queue to MA so we can control it fully. + return PlayQueue.get(plex_server, playQueueID=queue_id, own=True, window=page_size) + + playqueue = await asyncio.to_thread(fetch_initial) + + if not playqueue or not playqueue.items: + return playqueue + + all_items = list(playqueue.items) + seen_ids = {item.playQueueItemID for item in all_items} + + while len(all_items) < playqueue.playQueueTotalCount: + last_id = all_items[-1].playQueueItemID + + def fetch_next(_last_id: int = last_id) -> PlayQueue: + # own=False — we already claimed ownership on the first fetch. + # includeBefore=False — only items strictly after center are returned. + return PlayQueue.get( + plex_server, + playQueueID=queue_id, + own=False, + center=_last_id, + window=page_size, + includeBefore=False, + ) + + next_page = await asyncio.to_thread(fetch_next) + if not next_page or not next_page.items: + break + + new_items = [i for i in next_page.items if i.playQueueItemID not in seen_ids] + if not new_items: + break + + all_items.extend(new_items) + seen_ids.update(i.playQueueItemID for i in new_items) + + # Patch the cached items property on the PlayQueue object. + playqueue.__dict__["items"] = all_items + return playqueue + + def _source_key_from_play_queue_uri(self, source_uri: str) -> str | None: + """Extract a Plex library key from a playQueueSourceURI string. + + Plex encodes the source as ``library:///directory/ENCODED_PATH``. We decode it + to a plain library path that :meth:`_resolve_plex_item` can use. + + :param source_uri: The playQueueSourceURI from a Plex PlayQueue. + :return: A Plex library key (e.g. ``/library/playlists/5329``), or None if unparsable. + """ + prefix = "library:///directory/" + if not source_uri.startswith(prefix): + return None + + path = unquote(source_uri[len(prefix) :]).lstrip("/") + if not path: + return None + + path = "/" + path.split("?")[0] + + for suffix in ("/children", "/allLeaves", "/items"): + if path.endswith(suffix): + path = path[: -len(suffix)] + break + + return path if path and path != "/" else None + + async def _play_from_shuffled_source( + self, + player_id: str, + source_uri: str, + offset: int, + ) -> bool: + """Load a shuffled PlayQueue's source in original order, then defer shuffle to MA. + + When Plex reports a shuffled PlayQueue the items are already in Plex's shuffled + order. Loading them directly would cause MA to shuffle an already-shuffled list. + Instead we parse the source URI, load the source collection unshuffled into MA, + and schedule a deferred task that applies MA's own shuffle and then recreates the + Plex PlayQueue so Plexamp sees the new order. + + :param player_id: The Music Assistant player ID. + :param source_uri: The playQueueSourceURI from the Plex PlayQueue. + :param offset: Starting position in milliseconds. + :return: True if handled, False if the caller should fall back to regular loading. + """ + source_key = self._source_key_from_play_queue_uri(source_uri) + if not source_key: + return False + + try: + LOGGER.info(f"Shuffled queue detected — loading source in original order: {source_key}") + source_media = await self._resolve_plex_item(source_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=source_media, + option=QueueOption.REPLACE, + ) + if offset > 0: + await self._seek_to_offset_after_playback(player_id, offset) + await self._broadcast_timeline() + + async def _apply_shuffle_deferred() -> None: + await self.provider.mass.player_queues.set_shuffle(player_id, True) + await asyncio.sleep(0.2) + await self._create_plex_playqueue_from_ma() + synced_keys = self._collect_synced_keys(player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + self.provider.mass.create_task(_apply_shuffle_deferred()) + return True + except Exception as e: + LOGGER.debug(f"Could not resolve source for shuffled queue, falling back: {e}") + return False + + async def _resolve_plex_item(self, key: str) -> Any: + """Resolve a Plex key to a Music Assistant media item. + + :param key: The Plex key to resolve. + """ + if "/library/metadata/" in key: + try: + return await self.provider.get_track(key) + except Exception as exc: + LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}") + + try: + return await self.provider.get_album(key) + except Exception as exc: + LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}") + + try: + return await self.provider.get_artist(key) + except Exception: + raise ValueError(f"Could not resolve Plex item: {key}") from None + + elif "/playlists/" in key: + return await self.provider.get_playlist(key) + else: + raise ValueError(f"Unknown Plex key format: {key}") + + async def _play_from_plex_queue( + self, + player_id: str, + container_key: str, + starting_key: str | None, + shuffle: bool, + offset: int, + ) -> None: + """Fetch a Plex PlayQueue and start playback, loading remaining tracks in the background. + + :param player_id: The Music Assistant player ID. + :param container_key: The Plex container key (e.g. /playQueues/123). + :param starting_key: Fallback track key if queue fetch fails. + :param shuffle: Whether shuffle is enabled. + :param offset: Starting position in milliseconds. + """ + try: + LOGGER.info(f"Fetching play queue: {container_key}") + + queue_id_match = re.search(r"/playQueues/(\d+)", container_key) + if not queue_id_match: + raise ValueError(f"Invalid container_key format: {container_key}") + + queue_id = queue_id_match.group(1) + + playqueue = await self._fetch_full_play_queue(queue_id) + + if playqueue and playqueue.items: + selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0) + LOGGER.info(f"PlayQueue selected item offset: {selected_offset}") + + # When Plex reports a shuffled queue, load the original source into MA + # unshuffled and let MA apply its own shuffle, then propagate back to Plex. + if playqueue.playQueueShuffled and getattr(playqueue, "playQueueSourceURI", None): + if await self._play_from_shuffled_source( + player_id, playqueue.playQueueSourceURI, offset + ): + return + + self.play_queue_item_ids = {} + + first_item = ( + playqueue.items[selected_offset] + if selected_offset < len(playqueue.items) + else playqueue.items[0] + ) + first_track_key = first_item.key if hasattr(first_item, "key") else None + first_play_queue_item_id = ( + first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None + ) + + if not first_track_key: + LOGGER.error("No valid first track in play queue") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + return + + try: + first_track = await self.provider.get_track(first_track_key) + LOGGER.info(f"Starting playback with first track: {first_track.name}") + + if first_play_queue_item_id: + self.play_queue_item_ids[0] = first_play_queue_item_id + + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=first_track, + option=QueueOption.REPLACE, + ) + + if offset > 0: + await self._seek_to_offset_after_playback(player_id, offset) + + await self._broadcast_timeline() + + # Use the queue's own shuffle flag as the authoritative state, + # falling back to the request parameter if not set. + effective_shuffle = playqueue.playQueueShuffled or shuffle + self.provider.mass.create_task( + self._load_remaining_queue_tracks( + player_id, playqueue, selected_offset, effective_shuffle + ) + ) + + except Exception as e: + LOGGER.exception(f"Error starting playback with first track: {e}") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + else: + LOGGER.error("Play queue is empty or could not be fetched") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + + except Exception as e: + LOGGER.exception(f"Error playing from queue: {e}") + if starting_key: + track = await self.provider.get_track(starting_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=track, + option=QueueOption.REPLACE, + ) + + async def handle_play_media(self, request: web.Request) -> web.Response: + """Handle playMedia command from Plex controller. + + Plexamp sends various parameters: + - key: The item to play (track, album, playlist, etc.) + - containerKey: The container context (play queue) + - offset: Starting position in milliseconds + - shuffle: Whether to shuffle + - repeat: Repeat mode + """ + self._updating_from_plex = True + try: + key = request.query.get("key") + container_key = request.query.get("containerKey") + offset = int(request.query.get("offset", 0)) + shuffle = request.query.get("shuffle", "0") == "1" + + if not key: + return web.Response( + status=400, text="Missing required 'key' parameter for playMedia command" + ) + + LOGGER.info( + f"Received playMedia command - key: {key}, " + f"containerKey: {container_key}, offset: {offset}ms" + ) + + player_id = self._ma_player_id + if not player_id: + return web.Response(status=500, text="No player assigned to this server") + + await self._ungroup_player_if_needed(player_id) + + if container_key and "/playQueues/" in container_key: + queue_id_match = re.search(r"/playQueues/(\d+)", container_key) + if queue_id_match: + self.play_queue_id = queue_id_match.group(1) + self.play_queue_version = 1 + LOGGER.info(f"Playing from queue: {container_key} starting at {key}") + await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset) + else: + self.play_queue_id = None + self.play_queue_item_ids = {} + media = await self._resolve_plex_item(key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=media, + option=QueueOption.REPLACE, + ) + elif container_key: + self.play_queue_id = None + self.play_queue_item_ids = {} + media_to_play = await self._resolve_plex_item(container_key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=media_to_play, + option=QueueOption.REPLACE, + ) + else: + self.play_queue_id = None + self.play_queue_item_ids = {} + media = await self._resolve_plex_item(key) + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=media, + option=QueueOption.REPLACE, + ) + + # Always sync shuffle state so that a previously enabled MA shuffle + # does not reorder an unshuffled Plex queue. + await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) + + if offset > 0: + await self._seek_to_offset_after_playback(player_id, offset) + + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error handling playMedia: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False + + async def handle_create_play_queue(self, request: web.Request) -> web.Response: + """Handle createPlayQueue command from Plex controller. + + Creates a new play queue from a URI (album, playlist, artist tracks, etc.) + and optionally applies shuffle. + """ + self._updating_from_plex = True + try: + uri = request.query.get("uri") + shuffle = request.query.get("shuffle", "0") == "1" + continuous = request.query.get("continuous", "0") == "1" + + if not uri: + return web.Response(status=400, text="Missing 'uri' parameter") + + LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}") + + player_id = self._ma_player_id + if not player_id: + return web.Response(status=500, text="No player assigned to this server") + + def create_queue() -> PlayQueue: + item = self.provider._plex_server.fetchItem(uri) + return PlayQueue.create( + self.provider._plex_server, + item, + shuffle=1 if shuffle else 0, + continuous=1 if continuous else 0, + ) + + playqueue = await asyncio.to_thread(create_queue) + + if playqueue and playqueue.items: + self.play_queue_id = str(playqueue.playQueueID) + self.play_queue_version = 1 + + LOGGER.info( + f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items" + ) + + self.play_queue_item_ids = {} + first_item = playqueue.items[0] + first_track_key = first_item.key if hasattr(first_item, "key") else None + first_play_queue_item_id = ( + first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None + ) + + if not first_track_key: + LOGGER.error("No valid first track in created play queue") + return web.Response(status=500, text="Failed to load tracks from play queue") + + try: + first_track = await self.provider.get_track(first_track_key) + LOGGER.info(f"Starting playback with first track: {first_track.name}") + + if first_play_queue_item_id: + self.play_queue_item_ids[0] = first_play_queue_item_id + + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=first_track, + option=QueueOption.REPLACE, + ) + + if len(playqueue.items) > 1: + self.provider.mass.create_task( + self._load_remaining_queue_tracks(player_id, playqueue, 0, shuffle) + ) + + await self._broadcast_timeline() + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error starting playback with first track: {e}") + return web.Response(status=500, text=f"Failed to start playback: {e}") + else: + LOGGER.error("Failed to create play queue or queue is empty") + return web.Response(status=500, text="Failed to create play queue") + + except Exception as e: + LOGGER.exception(f"Error handling createPlayQueue: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False + + async def handle_refresh_play_queue(self, request: web.Request) -> web.Response: + """Handle refreshPlayQueue command from Plex controller. + + Called when the play queue is modified (items added, removed, reordered). + Syncs the updated queue state to MA while preserving current playback. + """ + self._updating_from_plex = True + try: + play_queue_id = request.query.get("playQueueID") + + if not play_queue_id: + return web.Response(status=400, text="Missing 'playQueueID' parameter") + + LOGGER.info( + f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, " + f"params: {dict(request.query)}" + ) + + if self.play_queue_id != play_queue_id: + LOGGER.warning( + f"Refresh requested for queue {play_queue_id} but active queue is " + f"{self.play_queue_id}" + ) + return web.Response( + status=409, + text=( + f"Requested playQueueID {play_queue_id} does not match " + f"active queue {self.play_queue_id}" + ), + ) + + self.play_queue_version += 1 + + playqueue = await self._fetch_full_play_queue(play_queue_id) + + if not playqueue or not playqueue.items: + LOGGER.error("Failed to refresh play queue - queue is empty or not found") + return web.Response(status=404, text="Play queue not found") + + player_id = self._ma_player_id + if not player_id: + LOGGER.error("No player assigned to this server") + return web.Response(status=500, text="No player assigned") + + ma_queue = self.provider.mass.player_queues.get(player_id) + if not ma_queue: + LOGGER.error(f"MA queue not found for player {player_id}") + return web.Response(status=500, text="MA queue not found") + + current_index = ma_queue.current_index + ma_queue_items = self.provider.mass.player_queues.items(player_id) + ma_queue_count = len(ma_queue_items) if ma_queue_items else 0 + + LOGGER.debug( + f"Queue refresh: Current index={current_index}, " + f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items" + ) + + if current_index is None: + LOGGER.debug("No track currently playing, replacing entire queue") + await self._replace_entire_queue(player_id, playqueue) + else: + LOGGER.debug( + f"Track at index {current_index} is playing, " + f"replacing only items after current track" + ) + await self._replace_remaining_queue(player_id, playqueue, current_index) + + # Sync shuffle state from Plex to MA. + await self.provider.mass.player_queues.set_shuffle( + player_id, playqueue.playQueueShuffled + ) + + LOGGER.info( + f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items" + ) + + synced_keys = self._collect_synced_keys(player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + + return web.Response(status=200) + + except Exception as e: + LOGGER.exception(f"Error handling refreshPlayQueue: {e}") + return web.Response(status=500, text=str(e)) + finally: + self._updating_from_plex = False diff --git a/music_assistant/providers/plex_connect/queue_sync.py b/music_assistant/providers/plex_connect/queue_sync.py new file mode 100644 index 0000000000..265bf00323 --- /dev/null +++ b/music_assistant/providers/plex_connect/queue_sync.py @@ -0,0 +1,394 @@ +"""Queue synchronisation logic for Plex remote control. + +Handles background queue loading, MA→Plex sync, and MA event handlers. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import QueueOption +from plexapi.playqueue import PlayQueue + +if TYPE_CHECKING: + from music_assistant_models.event import MassEvent + + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class QueueSyncMixin: + """Mixin providing queue synchronisation between MA and Plex.""" + + if TYPE_CHECKING: + provider: PlexProvider + plex_server: Any + _ma_player_id: str | None + _updating_from_plex: bool + play_queue_id: str | None + play_queue_version: int + play_queue_item_ids: dict[int, int] + _last_synced_ma_queue_length: int + _last_synced_ma_queue_keys: list[str] + + async def _broadcast_timeline(self) -> None: ... + + async def _send_timeline_to_server(self) -> None: ... + + def _collect_synced_keys(self, player_id: str) -> list[str]: + """Return Plex item_id keys for every track currently in the MA queue. + + :param player_id: The Music Assistant player ID. + :return: Ordered list of Plex item IDs matching the current MA queue. + """ + synced_keys = [] + for item in self.provider.mass.player_queues.items(player_id): + if item.media_item: + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + synced_keys.append(mapping.item_id) + break + return synced_keys + + def _reorder_tracks_for_playback( + self, tracks: list[Any], start_index: int + ) -> tuple[list[Any], dict[int, int]]: + """Reorder tracks to start from a specific index and update item ID mappings. + + :param tracks: List of tracks to reorder. + :param start_index: Index of the track to start from. + :return: Tuple of (reordered tracks, updated item ID mappings). + """ + if start_index <= 0 or start_index >= len(tracks): + return tracks, self.play_queue_item_ids + + reordered_tracks = tracks[start_index:] + tracks[:start_index] + + new_item_ids = {} + for new_idx, old_idx in enumerate( + list(range(start_index, len(tracks))) + list(range(start_index)) + ): + if old_idx in self.play_queue_item_ids: + new_item_ids[new_idx] = self.play_queue_item_ids[old_idx] + + LOGGER.info(f"Started playback from offset {start_index} (reordered queue)") + return reordered_tracks, new_item_ids + + async def _load_remaining_queue_tracks( + self, + player_id: str, + playqueue: PlayQueue, + selected_offset: int, + shuffle: bool, + ) -> None: + """Load remaining tracks from play queue in the background. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue. + :param selected_offset: The offset of the track that's already playing. + :param shuffle: Whether shuffle is enabled. + """ + try: + remaining_items = [] + + for i in range(selected_offset + 1, len(playqueue.items)): + remaining_items.append((i, playqueue.items[i])) + + for i in range(selected_offset): + remaining_items.append((i, playqueue.items[i])) + + if not remaining_items: + LOGGER.debug("No remaining tracks to load") + return + + async def fetch_track( + plex_idx: int, item: Any + ) -> tuple[int, object | None, int | None]: + """Fetch a single track from Plex.""" + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = ( + item.playQueueItemID if hasattr(item, "playQueueItemID") else None + ) + if track_key: + try: + track = await self.provider.get_track(track_key) + return plex_idx, track, play_queue_item_id + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + return plex_idx, None, None + + fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items] + results = await asyncio.gather(*fetch_tasks, return_exceptions=True) + + tracks_to_add: list[object] = [] + for result in results: + if isinstance(result, Exception): + LOGGER.debug(f"Error fetching track: {result}") + continue + _plex_idx, track, play_queue_item_id = result # type: ignore[misc] + if track: + ma_idx = len(tracks_to_add) + 1 # +1 because first track is already queued + tracks_to_add.append(track) + if play_queue_item_id: + self.play_queue_item_ids[ma_idx] = play_queue_item_id + + if tracks_to_add: + LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue") + + # Guard against the ADD firing QUEUE_ITEMS_UPDATED before we update + # _last_synced_ma_queue_keys, which would trigger a spurious MA→Plex sync. + self._updating_from_plex = True + try: + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=tracks_to_add, # type: ignore[arg-type] + option=QueueOption.ADD, + ) + + synced_keys = self._collect_synced_keys(player_id) + self._last_synced_ma_queue_length = len(synced_keys) + self._last_synced_ma_queue_keys = synced_keys + finally: + self._updating_from_plex = False + + if shuffle: + await self.provider.mass.player_queues.set_shuffle(player_id, shuffle) + + LOGGER.info( + f"Successfully loaded {len(tracks_to_add)} remaining tracks " + f"(total queue: {self._last_synced_ma_queue_length} tracks)" + ) + else: + LOGGER.warning("No valid remaining tracks found in play queue") + + except Exception as e: + LOGGER.exception(f"Error loading remaining queue tracks: {e}") + + async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None: + """Replace the entire MA queue from a Plex play queue. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue to load. + """ + all_tracks = [] + self.play_queue_item_ids = {} + + for i, item in enumerate(playqueue.items): + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None + + if track_key: + try: + track = await self.provider.get_track(track_key) + all_tracks.append(track) + if play_queue_item_id: + self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + continue + + if all_tracks: + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=all_tracks, # type: ignore[arg-type] + option=QueueOption.REPLACE, + ) + LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks") + + async def _replace_remaining_queue( + self, player_id: str, playqueue: PlayQueue, current_index: int + ) -> None: + """Replace only items after the current track. + + :param player_id: The Music Assistant player ID. + :param playqueue: The Plex play queue to load. + :param current_index: The current track index in the MA queue. + """ + remaining_tracks = [] + new_item_mappings = {} + + for i in range(current_index + 1, len(playqueue.items)): + item = playqueue.items[i] + track_key = item.key if hasattr(item, "key") else None + play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None + + if track_key: + try: + track = await self.provider.get_track(track_key) + remaining_tracks.append(track) + if play_queue_item_id: + new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = ( + play_queue_item_id + ) + except Exception as e: + LOGGER.debug(f"Could not fetch track {track_key}: {e}") + continue + + if remaining_tracks: + await self.provider.mass.player_queues.play_media( + queue_id=player_id, + media=remaining_tracks, # type: ignore[arg-type] + option=QueueOption.REPLACE_NEXT, + ) + self.play_queue_item_ids.update(new_item_mappings) + LOGGER.info( + f"Replaced {len(remaining_tracks)} tracks after current track " + f"(index {current_index})" + ) + else: + LOGGER.debug("No tracks after current track in Plex queue") + + for i, item in enumerate(playqueue.items): + play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None + if play_queue_item_id: + self.play_queue_item_ids[i] = play_queue_item_id + + async def _create_plex_playqueue_from_ma(self) -> None: + """Create a new Plex PlayQueue mirroring the current MA queue.""" + ma_queue = self.provider.mass.player_queues.get(self._ma_player_id) # type: ignore[arg-type] + queue_items = self.provider.mass.player_queues.items(self._ma_player_id) # type: ignore[arg-type] + + if not ma_queue or not queue_items: + return + + async def fetch_plex_item(plex_key: str) -> object | None: + """Fetch a single Plex item.""" + try: + plex_server = self.plex_server + + def fetch_item() -> object: + return plex_server.fetchItem(plex_key) + + return await asyncio.to_thread(fetch_item) + except Exception as e: + LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}") + return None + + fetch_tasks = [] + for item in queue_items: + if not item.media_item: + continue + plex_key = None + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + plex_key = mapping.item_id + break + if plex_key: + fetch_tasks.append(fetch_plex_item(plex_key)) + + plex_items = [] + if fetch_tasks: + fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True) + plex_items = [item for item in fetched_items if item is not None] + + if not plex_items: + LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation") + return + + start_item = None + if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items): + start_item = plex_items[ma_queue.current_index] + + plex_server = self.plex_server + + def create_queue() -> PlayQueue: + return PlayQueue.create( + plex_server, + items=plex_items, + startItem=start_item, + shuffle=0, + continuous=1, + ) + + try: + playqueue = await asyncio.to_thread(create_queue) + + if playqueue: + self.play_queue_id = str(playqueue.playQueueID) + self.play_queue_version = playqueue.playQueueVersion + + self.play_queue_item_ids = {} + for i, item in enumerate(playqueue.items): + if hasattr(item, "playQueueItemID"): + self.play_queue_item_ids[i] = item.playQueueItemID + + LOGGER.info( + f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks" + ) + except Exception as e: + LOGGER.exception(f"Error creating Plex PlayQueue: {e}") + + async def _handle_player_event(self, event: MassEvent) -> None: + """Handle player state change events.""" + if not self._ma_player_id or event.object_id != self._ma_player_id: + return + + if self._updating_from_plex: + return + + try: + await self._send_timeline_to_server() + await self._broadcast_timeline() + except Exception as e: + LOGGER.debug(f"Error handling player event: {e}") + + async def _handle_queue_event(self, event: MassEvent) -> None: + """Handle queue change events.""" + if not self._ma_player_id or event.object_id != self._ma_player_id: + return + + if self._updating_from_plex: + return + + try: + await self._send_timeline_to_server() + await self._broadcast_timeline() + except Exception as e: + LOGGER.debug(f"Error handling queue event: {e}") + + async def _handle_queue_items_updated(self, event: MassEvent) -> None: + """Handle queue items being added/removed/reordered.""" + if not self._ma_player_id or event.object_id != self._ma_player_id: + return + + if self._updating_from_plex: + return + + queue_items = self.provider.mass.player_queues.items(self._ma_player_id) + if not queue_items: + return + + current_keys = [] + for item in queue_items: + if not item.media_item: + continue + for mapping in item.media_item.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + current_keys.append(mapping.item_id) + break + + if ( + len(current_keys) == self._last_synced_ma_queue_length + and current_keys == self._last_synced_ma_queue_keys + ): + LOGGER.debug("MA queue matches last synced state, skipping Plex sync") + return + + LOGGER.info( + f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items" + ) + + try: + await self._create_plex_playqueue_from_ma() + self._last_synced_ma_queue_length = len(current_keys) + self._last_synced_ma_queue_keys = current_keys + except Exception as e: + LOGGER.debug(f"Error creating Plex PlayQueue: {e}") + + try: + await self._broadcast_timeline() + except Exception as e: + LOGGER.debug(f"Error broadcasting timeline: {e}") diff --git a/music_assistant/providers/plex_connect/server.py b/music_assistant/providers/plex_connect/server.py new file mode 100644 index 0000000000..ed927210fc --- /dev/null +++ b/music_assistant/providers/plex_connect/server.py @@ -0,0 +1,371 @@ +"""Per-player Plex remote control instances.""" + +from __future__ import annotations + +import logging +import platform +import time +import uuid +from collections.abc import Callable +from typing import TYPE_CHECKING + +from aiohttp import web +from music_assistant_models.enums import EventType + +from .gdm import PlexGDMAdvertiser +from .playback import PlaybackMixin +from .queue_commands import QueueCommandsMixin +from .queue_sync import QueueSyncMixin +from .timeline import TimelineMixin + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class PlayerRemoteInstance: + """Single remote control instance for one MA player.""" + + def __init__( + self, + plex_provider: PlexProvider, + ma_player_id: str, + player_name: str, + port: int, + device_class: str = "speaker", + remote_control: bool = False, + ) -> None: + """Initialize player remote instance. + + :param plex_provider: Plex provider instance. + :param ma_player_id: Music Assistant player ID. + :param player_name: Display name for the player. + :param port: Port for the remote control server. + :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). + :param remote_control: Whether to enable remote control. + """ + self.plex_provider = plex_provider + self.plex_server = plex_provider._plex_server + self.ma_player_id = ma_player_id + self.player_name = player_name + self.port = port + self.device_class = device_class + self.remote_control = remote_control + + self.client_id = str( + uuid.uuid5( + uuid.NAMESPACE_DNS, + f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}", + ) + ) + + if self.remote_control: + self.server: PlexRemoteControlServer | None = None + self.gdm: PlexGDMAdvertiser | None = None + + async def start(self) -> None: + """Start this player's remote control.""" + if self.remote_control: + LOGGER.info( + f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}" + ) + + self.server = PlexRemoteControlServer( + plex_provider=self.plex_provider, + port=self.port, + client_id=self.client_id, + ma_player_id=self.ma_player_id, + device_class=self.device_class, + ) + LOGGER.info( + f"Remote control server for '{self.player_name}' bound to MA player: " + f"{self.ma_player_id}" + ) + + await self.server.start() + + self.gdm = PlexGDMAdvertiser( + instance_id=self.client_id, + port=self.port, + publish_ip=str(self.plex_provider.mass.streams.publish_ip), + name=self.player_name, + product="Music Assistant", + version=self.plex_provider.mass.version + if self.plex_provider.mass.version != "0.0.0" + else "1.0.0", + ) + self.gdm.start() + + LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}") + + async def stop(self) -> None: + """Stop this player's remote control.""" + if self.remote_control: + if self.gdm: + await self.gdm.stop() + + if self.server: + await self.server.stop() + + LOGGER.info(f"Stopped remote control for player '{self.player_name}'") + + +class PlexRemoteControlServer(QueueCommandsMixin, PlaybackMixin, QueueSyncMixin, TimelineMixin): + """HTTP server implementing the Plex remote control protocol for one MA player.""" + + def __init__( + self, + plex_provider: PlexProvider, + port: int = 32500, + client_id: str | None = None, + ma_player_id: str | None = None, + device_class: str = "speaker", + ) -> None: + """Initialize remote control server. + + :param plex_provider: Plex provider instance. + :param port: Port for the HTTP server. + :param client_id: Unique client identifier. + :param ma_player_id: Music Assistant player ID. + :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud). + """ + self.provider = plex_provider + self.plex_server = plex_provider._plex_server + self.port = port + self.client_id = client_id or plex_provider.instance_id + self.device_class = device_class + self.app = web.Application() + self.subscriptions: dict[str, dict[str, object]] = {} + self.runner: web.AppRunner | None = None + self.http_site: web.TCPSite | None = None + + # Play queue tracking (Plex-specific state that doesn't exist in MA) + self.play_queue_id: str | None = None + self.play_queue_version: int = 1 + self.play_queue_item_ids: dict[int, int] = {} + + # Track MA queue state to detect when we need to sync to Plex + self._last_synced_ma_queue_length: int = 0 + self._last_synced_ma_queue_keys: list[str] = [] + + self._ma_player_id = ma_player_id + + self._unsub_callbacks: list[Callable[..., None]] = [] + + # Flag to prevent circular updates when we modify the queue ourselves + self._updating_from_plex = False + + self.player = self.provider.mass.players.get_player(self._ma_player_id) # type: ignore[arg-type] + + self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant" + + self.headers = { + "X-Plex-Device-Name": self.device_name, + "X-Plex-Session-Identifier": self.client_id, + "X-Plex-Client-Identifier": self.client_id, + "X-Plex-Product": "Music Assistant", + "X-Plex-Platform": "Music Assistant", + "X-Plex-Platform-Version": platform.release(), + } + + self._setup_routes() + + def _setup_routes(self) -> None: + """Set up all HTTP endpoints.""" + self.app.router.add_get("/", self.handle_root) + + self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe) + self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe) + self.app.router.add_get("/player/timeline/poll", self.handle_poll) + + self.app.router.add_get("/player/playback/playMedia", self.handle_play_media) + self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue) + self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue) + self.app.router.add_get("/player/playback/pause", self.handle_pause) + self.app.router.add_get("/player/playback/play", self.handle_play) + self.app.router.add_get("/player/playback/stop", self.handle_stop) + self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next) + self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous) + self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward) + self.app.router.add_get("/player/playback/stepBack", self.handle_step_back) + self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to) + self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters) + self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to) + + self.app.router.add_get("/resources", self.handle_resources) + + self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options) + + async def start(self) -> None: + """Start HTTP server and subscribe to MA events.""" + self.runner = web.AppRunner(self.app) + await self.runner.setup() + + self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port) + await self.http_site.start() + LOGGER.info(f"Plex remote control server started on HTTP port {self.port}") + + if self._ma_player_id: + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_player_event, + EventType.PLAYER_UPDATED, + id_filter=self._ma_player_id, + ) + ) + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_queue_event, + EventType.QUEUE_UPDATED, + id_filter=self._ma_player_id, + ) + ) + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_queue_event, + EventType.QUEUE_TIME_UPDATED, + id_filter=self._ma_player_id, + ) + ) + self._unsub_callbacks.append( + self.provider.mass.subscribe( + self._handle_queue_items_updated, + EventType.QUEUE_ITEMS_UPDATED, + id_filter=self._ma_player_id, + ) + ) + + async def stop(self) -> None: + """Stop the HTTP server and unsubscribe from events.""" + for unsub in self._unsub_callbacks: + unsub() + self._unsub_callbacks.clear() + + if self.http_site: + await self.http_site.stop() + if self.runner: + await self.runner.cleanup() + LOGGER.info("Plex remote control server stopped") + + async def handle_root(self, request: web.Request) -> web.Response: + """Handle root endpoint - return basic player info.""" + player_name = "Music Assistant" + if self._ma_player_id: + player = self.provider.mass.players.get_player(self._ma_player_id) + if player: + player_name = player.display_name + + xml = f""" + + +""" + return web.Response( + text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} + ) + + async def handle_subscribe(self, request: web.Request) -> web.Response: + """Handle timeline subscription from controller.""" + client_id = request.headers.get("X-Plex-Client-Identifier") + protocol = request.query.get("protocol", "http") + port = request.query.get("port") + command_id = int(request.query.get("commandID", 0)) + + if not client_id or not port: + return web.Response(status=400) + + self.subscriptions[client_id] = { + "url": f"{protocol}://{request.remote}:{port}", + "command_id": command_id, + "last_update": time.time(), + } + + LOGGER.info(f"Controller {client_id} subscribed for timeline updates") + await self._send_timeline(client_id) + return web.Response(status=200) + + async def handle_unsubscribe(self, request: web.Request) -> web.Response: + """Handle unsubscribe request.""" + client_id = request.headers.get("X-Plex-Client-Identifier") + if client_id in self.subscriptions: + del self.subscriptions[client_id] + LOGGER.info(f"Controller {client_id} unsubscribed") + return web.Response(status=200) + + async def handle_poll(self, request: web.Request) -> web.Response: + """Handle timeline poll request.""" + include_metadata = request.query.get("includeMetadata", "0") == "1" + command_id = request.query.get("commandID", "0") + + client_id = request.headers.get("X-Plex-Client-Identifier") + if client_id and client_id in self.subscriptions: + self.subscriptions[client_id]["last_update"] = time.time() + + timeline_xml = await self._build_timeline_xml( + include_metadata=include_metadata, command_id=command_id + ) + return web.Response( + text=timeline_xml, + content_type="text/xml", + headers={ + "X-Plex-Client-Identifier": self.client_id, + "Access-Control-Expose-Headers": "X-Plex-Client-Identifier", + "Access-Control-Allow-Origin": "*", + }, + ) + + async def handle_options(self, request: web.Request) -> web.Response: + """Handle OPTIONS requests for CORS.""" + return web.Response( + status=200, + headers={ + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "*", + }, + ) + + async def handle_resources(self, request: web.Request) -> web.Response: + """Return player capabilities and connection information.""" + player_name = "Music Assistant" + if self._ma_player_id: + player = self.provider.mass.players.get_player(self._ma_player_id) + if player: + player_name = player.display_name + + state = "stopped" + if self._ma_player_id: + player = self.provider.mass.players.get_player(self._ma_player_id) + if player and player.state: + state_value = ( + player.state.value if hasattr(player.state, "value") else str(player.state) + ) + if state_value in ["playing", "paused"]: + state = state_value + + local_ip = self.provider.mass.streams.publish_ip + version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0" + + xml = f""" + + + + +""" + return web.Response( + text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"} + ) diff --git a/music_assistant/providers/plex_connect/timeline.py b/music_assistant/providers/plex_connect/timeline.py new file mode 100644 index 0000000000..6cbee09e2e --- /dev/null +++ b/music_assistant/providers/plex_connect/timeline.py @@ -0,0 +1,331 @@ +"""Timeline building and broadcasting for Plex remote control.""" + +from __future__ import annotations + +import asyncio +import logging +import time +from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse + +from aiohttp import ClientTimeout +from music_assistant_models.enums import PlayerType + +if TYPE_CHECKING: + from music_assistant.providers.plex import PlexProvider + +LOGGER = logging.getLogger(__name__) + + +class TimelineMixin: + """Mixin providing timeline building and broadcasting.""" + + if TYPE_CHECKING: + provider: PlexProvider + client_id: str + subscriptions: dict[str, dict[str, object]] + play_queue_id: str | None + play_queue_version: int + play_queue_item_ids: dict[int, int] + _ma_player_id: str | None + headers: dict[str, str] + plex_server: Any + + def _build_timeline_attributes( + self, + track: Any, + state: str, + duration: int, + time_ms: int, + volume: int, + shuffle: int, + repeat: int, + controllable: str, + queue: Any | None, + ) -> list[str]: + """Build timeline attributes for a playing track. + + :param track: The current track media item. + :param state: Playback state (playing, paused, etc.). + :param duration: Track duration in milliseconds. + :param time_ms: Current playback time in milliseconds. + :param volume: Volume level (0-100). + :param shuffle: Shuffle state (0 or 1). + :param repeat: Repeat mode (0=off, 1=one, 2=all). + :param controllable: Controllable features string. + :param queue: The MA queue object. + :return: List of timeline attribute strings. + """ + key = None + rating_key = None + for mapping in track.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + key = mapping.item_id + rating_key = key.split("/")[-1] + break + + if not key: + return [] + + plex_url = urlparse(self.provider._baseurl) + machine_identifier = self.provider._plex_server.machineIdentifier + address = plex_url.hostname + port = plex_url.port or (443 if plex_url.scheme == "https" else 32400) + protocol = plex_url.scheme + + attrs = [ + f'state="{state}"', + f'duration="{duration}"', + f'time="{time_ms}"', + f'ratingKey="{rating_key}"', + f'key="{key}"', + ] + + if self.play_queue_id and queue: + if queue.current_index is not None: + play_queue_item_id = self.play_queue_item_ids.get( + queue.current_index, queue.current_index + 1 + ) + attrs.append(f'playQueueItemID="{play_queue_item_id}"') + attrs.append(f'playQueueID="{self.play_queue_id}"') + attrs.append(f'playQueueVersion="{self.play_queue_version}"') + attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"') + + attrs.extend( + [ + 'type="music"', + f'volume="{volume}"', + f'shuffle="{shuffle}"', + f'repeat="{repeat}"', + f'controllable="{controllable}"', + f'machineIdentifier="{machine_identifier}"', + f'address="{address}"', + f'port="{port}"', + f'protocol="{protocol}"', + ] + ) + + return attrs + + async def _build_timeline_xml( + self, include_metadata: bool = False, command_id: str = "0" + ) -> str: + """Build timeline XML from current Music Assistant player state. + + :param include_metadata: Whether to include metadata in the timeline. + :param command_id: The command ID for the timeline response. + """ + player_id = self._ma_player_id + + player = self.provider.mass.players.get_player(player_id) if player_id else None + queue = self.provider.mass.player_queues.get(player_id) if player_id else None + + controllable = ( + "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext" + ) + + state = "stopped" + if player and player.playback_state: + state_value = ( + player.playback_state.value + if hasattr(player.playback_state, "value") + else str(player.playback_state) + ) + + if state_value == "playing": + state = "playing" + elif state_value == "paused": + state = "paused" + elif state_value == "buffering": + state = "buffering" + elif state_value == "idle": + state = ( + "paused" + if queue and queue.current_item and queue.current_item.media_item + else "stopped" + ) + else: + state = "stopped" + + volume = 0 + if player: + volume = ( + int(player.group_volume or 0) + if (player.type == PlayerType.GROUP or player.group_members) + else (int(player.volume_level or 0)) + ) + + shuffle = 0 + repeat = 0 + if queue: + shuffle = 1 if queue.shuffle_enabled else 0 + if hasattr(queue, "repeat_mode"): + repeat_mode = queue.repeat_mode + if hasattr(repeat_mode, "value"): + repeat_value = repeat_mode.value + if repeat_value == "one": + repeat = 1 + elif repeat_value == "all": + repeat = 2 + + if ( + state in ["playing", "paused"] + and queue + and queue.current_item + and queue.current_item.media_item + ): + track = queue.current_item.media_item + duration = round(track.duration * 1000) if track.duration else 0 + time_ms = round(queue.corrected_elapsed_time * 1000) + + attrs = self._build_timeline_attributes( + track, state, duration, time_ms, volume, shuffle, repeat, controllable, queue + ) + + if attrs: + music_timeline = f"" + else: + music_timeline = ( + f'' + ) + else: + time_ms = 0 + music_timeline = ( + f'' + ) + + video_timeline = '' + photo_timeline = '' + + return ( + f'' + f"{music_timeline}{video_timeline}{photo_timeline}" + f"" + ) + + async def _send_timeline(self, client_id: str) -> None: + """Send timeline update to a specific subscribed controller. + + :param client_id: The client ID to send the timeline to. + """ + subscription = self.subscriptions.get(client_id) + if not subscription: + return + + timeline_xml = await self._build_timeline_xml() + + try: + await self.provider.mass.http_session.post( + f"{subscription['url']}/:/timeline", + data=timeline_xml, + headers={ + "X-Plex-Client-Identifier": self.client_id, + "Content-Type": "text/xml", + }, + timeout=ClientTimeout(total=5), + ) + subscription["last_update"] = time.time() + except Exception as e: + LOGGER.debug(f"Failed to send timeline to {client_id}: {e}") + + async def _send_timeline_to_server(self) -> None: + """Send timeline update to Plex server for activity tracking.""" + if not self._ma_player_id: + return + + try: + player = self.provider.mass.players.get_player(self._ma_player_id) + queue = self.provider.mass.player_queues.get(self._ma_player_id) + + if ( + not player + or not queue + or not queue.current_item + or not queue.current_item.media_item + ): + return + + track = queue.current_item.media_item + + plex_key = None + for mapping in track.provider_mappings: + if mapping.provider_instance == self.provider.instance_id: + plex_key = mapping.item_id + break + + if not plex_key: + return + + rating_key = plex_key.split("/")[-1] + + state_value = ( + player.playback_state.value + if hasattr(player.playback_state, "value") + else str(player.playback_state) + ) + + if state_value == "playing": + plex_state = "playing" + elif state_value == "paused": + plex_state = "paused" + else: + plex_state = "stopped" + + position_ms = round(queue.corrected_elapsed_time * 1000) + duration_ms = round(track.duration * 1000) if track.duration else 0 + + container_key = "" + play_queue_item_id = "" + if self.play_queue_id: + container_key = f"/playQueues/{self.play_queue_id}" + if queue.current_index is not None: + play_queue_item_id = str( + self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1) + ) + + params: dict[str, str] = { + "ratingKey": rating_key, + "key": plex_key, + "state": plex_state, + "time": str(position_ms), + "duration": str(duration_ms), + } + + if container_key: + params["containerKey"] = container_key + if play_queue_item_id: + params["playQueueItemID"] = play_queue_item_id + + plex_server = self.plex_server + headers = self.headers + + def send_timeline() -> None: + plex_server.query("/:/timeline", params=params, headers=headers) + + await asyncio.to_thread(send_timeline) + + except Exception as e: + LOGGER.debug(f"Failed to send timeline to Plex server: {e}") + + async def _broadcast_timeline(self) -> None: + """Send timeline to all subscribed controllers.""" + current_time = time.time() + stale_clients = [] + for client_id, sub in self.subscriptions.items(): + try: + last_update = float(sub["last_update"]) # type: ignore[arg-type] + if current_time - last_update > 90: + stale_clients.append(client_id) + except (ValueError, TypeError): + LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale") + stale_clients.append(client_id) + + for client_id in stale_clients: + del self.subscriptions[client_id] + + await asyncio.gather( + *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())), + return_exceptions=True, + )