-
-
Notifications
You must be signed in to change notification settings - Fork 384
Twitch music provider #3492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Drizzt321
wants to merge
57
commits into
music-assistant:dev
Choose a base branch
from
Drizzt321:twitch-music-provider
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Twitch music provider #3492
Changes from 50 commits
Commits
Show all changes
57 commits
Select commit
Hold shift + click to select a range
381048c
Add Twitch music provider scaffold with core streaming tests (Step 2)
Drizzt321 fa33f75
Add OAuth2 authentication with token refresh and revocation (Step 3)
Drizzt321 b25cb71
Add browse, library radios, search, and Twitch API methods (Step 4)
Drizzt321 d6b16d6
Add ad handling with silence injection and passthrough modes (Step 5)
Drizzt321 4cd2db9
Add EventSub WebSocket client and raid following state machine (Step 6)
Drizzt321 9532d74
Polish: resolve TODO tests, verify all linting passes (Step 7)
Drizzt321 95809fd
Fix OAuth redirect URI to use MA callback relay
Drizzt321 0b1941a
Add setup instructions label showing OAuth redirect URL in config
Drizzt321 0fe6f27
Improve Streamlink Auth Token description with extraction instructions
Drizzt321 7e35e59
Add profile images for offline channels in Following browse
Drizzt321 40405dc
Show ad break status in stream title and fix mypy with streamlink
Drizzt321 615f067
Fix ad break title: import module not value for live flag reference
Drizzt321 7dd3b16
Add 37 missing tests from spec: 160 total, all passing
Drizzt321 b106abd
Fix all P0 issues from code review
Drizzt321 130fff5
Fix P1 + P2 issues: get_radio, auth init, ad patch, gather, cleanup
Drizzt321 fcc41b1
Replace string literals with constants for ad modes and browse paths
Drizzt321 1809094
Reuse Streamlink session across stream resolutions
Drizzt321 24beed7
Address test review: 5 missing tests, fix weak assertion, deduplicate
Drizzt321 3a80dec
Implement grace/idle timer state machine for raid following
Drizzt321 bf63c6c
Add test_disconnect_triggers_reconnect — last missing spec test
Drizzt321 72e3975
Add 2 missing tests, fix misleading docstring
Drizzt321 cbcd120
Strengthen Twitch test suite from spec revision findings
Drizzt321 c60aed2
Fix silence.ts sample rate: 44100 Hz → 48000 Hz
Drizzt321 7e149cd
Add debug logging to raid state machine and event dispatch
Drizzt321 23d2c55
Fix silence injection: override should_filter_segment to prevent ad s…
Drizzt321 1fa45d2
Fix silence injection: resume reader after writing silence to buffer
Drizzt321 ee95379
Add tests for silence injection pipeline integration
Drizzt321 768d364
Add silence.ts format validation test
Drizzt321 8a6f284
Fix raid tracking for library:// URIs and add provider mapping resolu…
Drizzt321 50dadb4
Fix ad handling: patch the actual Streamlink plugin class, not the im…
Drizzt321 93458c0
Remove dead init-time ad handling patch
Drizzt321 ed16fcf
Fix raid follow URI: twitch://channel/ → twitch://radio/
Drizzt321 bf808bf
Fix duplicate title after ad break: clear stream_metadata instead of …
Drizzt321 9935afd
Fix duplicate EventSub subscription race between welcome and subscrib…
Drizzt321 ee41bf4
Add debug logging to EventSub stop and idle disconnect
Drizzt321 c879563
Merge branch 'dev' into twitch-music-provider
Drizzt321 9db8f9f
Add recommendations and rename Streamlink token to Twitch Website Token
Drizzt321 037a2da
Rename recommendations folder to Twitch Live Channels
Drizzt321 b63b93e
Merge branch 'dev' into twitch-music-provider
Drizzt321 7bc1efb
Remove silence injection ad handling (Twitch ToS compliance)
Drizzt321 034d56a
Increase Streamlink queue deadline to survive Twitch ad breaks
Drizzt321 c383396
Remove documentation and multi_instance from Twitch manifest
Drizzt321 9b7e363
Rework raid following: ref-counted streams, multi-queue support
Drizzt321 694da74
Fix raid handling during grace period for IDLE queues
Drizzt321 cec7b34
Move lazy imports to top of file
Drizzt321 44a2353
Create Streamlink session per stream, not shared on provider
Drizzt321 d0b1edb
Replace file-wide mypy unreachable suppression with inline ignores
Drizzt321 77ab142
Merge branch 'dev' into twitch-music-provider
Drizzt321 d2ed806
Add comment clarifying stream title reset after ad break
Drizzt321 5dc259a
Add mypy issue references to unreachable type ignore comments
Drizzt321 4e4dbc2
Merge branch 'dev' into twitch-music-provider
Drizzt321 dc11650
Use instance_id in raid play_media URI for multi-instance support
Drizzt321 06a756e
Map 401/403 to LoginFailed in _raise_for_status
Drizzt321 c4ca448
Validate session_id presence in auth action handler
Drizzt321 a6d66b7
Replace source-scanning test with behavioral to_thread assertion
Drizzt321 b3c1503
Replace sleep-based test synchronization with deterministic waits
Drizzt321 5561b9b
Move values null guard to top of _handle_auth_action
Drizzt321 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| """Ad handling for Twitch streams via Streamlink monkey-patching.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Module-level flag — GIL makes bool read/write atomic. | ||
| # Set by Streamlink writer (runs in thread), read by provider. | ||
| ad_break_active: bool = False | ||
|
|
||
|
|
||
| def patch_ad_handling(reader_cls: type | None = None) -> None: | ||
| """Patch TwitchHLSStreamReader.__writer__ to pass through ads with logging. | ||
|
|
||
| Args: | ||
| reader_cls: The actual TwitchHLSStreamReader class to patch. If None, | ||
| patches the imported class (which may differ from the class Streamlink's | ||
| plugin system uses at runtime due to fresh module loading). | ||
| Callers should pass the reader class from the resolved stream object | ||
| to ensure the correct class is patched. | ||
|
|
||
| """ | ||
| from streamlink.plugins.twitch import ( # noqa: PLC0415 | ||
| TwitchHLSSegment, | ||
| TwitchHLSStreamReader, | ||
| TwitchHLSStreamWriter, | ||
| ) | ||
|
|
||
| target_reader = reader_cls or TwitchHLSStreamReader | ||
|
|
||
| class PassthroughTwitchWriter(TwitchHLSStreamWriter): | ||
| """Writer that logs ad segments and tracks ad break state.""" | ||
|
|
||
| def should_filter_segment(self, segment: TwitchHLSSegment) -> bool: # type: ignore[override] | ||
| """Never filter — let all segments through.""" | ||
| global ad_break_active # noqa: PLW0603 | ||
| if segment.ad: | ||
| ad_break_active = True | ||
| logger.debug( | ||
| "Ad segment %d (%.1fs): passing through as audio", | ||
| segment.num, | ||
| segment.duration, | ||
| ) | ||
| else: | ||
| if ad_break_active: | ||
| logger.debug( | ||
| "Content segment %d: ad block ended, audio resuming", | ||
| segment.num, | ||
| ) | ||
| ad_break_active = False | ||
| return False | ||
|
|
||
| target_reader.__writer__ = PassthroughTwitchWriter # type: ignore[attr-defined] | ||
| logger.info("Twitch ad handling: passthrough (ads play as audio)") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,268 @@ | ||
| """EventSub WebSocket client for Twitch raid following.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import contextlib | ||
| import json | ||
| import logging | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Callable | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| EVENTSUB_WS_URL = "wss://eventsub.wss.twitch.tv/ws" | ||
| MAX_BACKOFF = 60.0 | ||
|
|
||
|
|
||
| class EventSubClient: | ||
| """Async EventSub WebSocket client for channel.raid subscriptions.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| http_session: Any, | ||
| api_headers_fn: Callable[[], dict[str, str]], | ||
| ) -> None: | ||
| """Initialize EventSub client. | ||
|
|
||
| Args: | ||
| http_session: aiohttp ClientSession for WebSocket + API calls | ||
| api_headers_fn: callable returning auth headers for Twitch API | ||
|
|
||
| """ | ||
| self._http_session = http_session | ||
| self._api_headers_fn = api_headers_fn | ||
|
|
||
| self._ws: Any | None = None | ||
| self._session_id: str | None = None | ||
| self._subscriptions: dict[str, str] = {} # broadcaster_user_id -> subscription_id | ||
| self._reconnect_url: str | None = None | ||
|
|
||
| self._ready = asyncio.Event() | ||
| self._stopped = False | ||
| self._backoff = 1.0 | ||
| self._listen_task: asyncio.Task[None] | None = None | ||
| self._on_raid: Callable[[str, str], Any] | None = None | ||
| self._subscribe_pending: set[str] = set() | ||
|
|
||
| @property | ||
| def is_connected(self) -> bool: | ||
| """Return whether the WebSocket is connected.""" | ||
| return self._ws is not None and not self._stopped | ||
|
|
||
| async def start(self, on_raid: Callable[[str, str], Any]) -> None: | ||
| """Start the EventSub WebSocket connection. | ||
|
|
||
| Args: | ||
| on_raid: callback(from_login, to_login) when a raid is received | ||
|
|
||
| """ | ||
| self._on_raid = on_raid | ||
| self._stopped = False | ||
| self._listen_task = asyncio.create_task(self._connect_loop()) | ||
|
|
||
| async def stop(self) -> None: | ||
| """Stop the EventSub WebSocket and clean up.""" | ||
| logger.debug("EventSub: stopping WebSocket and cleaning up") | ||
| self._stopped = True | ||
| self._session_id = None | ||
| self._subscriptions.clear() | ||
| self._ready.clear() | ||
|
|
||
| if self._ws is not None: | ||
| await self._ws.close() | ||
| self._ws = None | ||
|
|
||
| if self._listen_task is not None: | ||
| self._listen_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError, Exception): | ||
| await self._listen_task | ||
| self._listen_task = None | ||
|
|
||
| async def subscribe_raids(self, broadcaster_user_id: str) -> None: | ||
| """Subscribe to channel.raid events for a broadcaster. No-op if already subscribed.""" | ||
| if broadcaster_user_id in self._subscriptions: | ||
| return | ||
|
|
||
| # Wait for WebSocket to be ready | ||
| self._subscribe_pending.add(broadcaster_user_id) | ||
| try: | ||
| await asyncio.wait_for(self._ready.wait(), timeout=10.0) | ||
| except TimeoutError: | ||
| logger.warning( | ||
| "EventSub not ready — cannot subscribe to raids for %s", | ||
| broadcaster_user_id, | ||
| ) | ||
| return | ||
| finally: | ||
| self._subscribe_pending.discard(broadcaster_user_id) | ||
|
|
||
| # Check if welcome handler already re-subscribed (reconnect case) | ||
| if broadcaster_user_id in self._subscriptions: | ||
| return | ||
|
|
||
| await self._create_subscription(broadcaster_user_id) | ||
|
|
||
| async def unsubscribe_raids(self, broadcaster_user_id: str) -> None: | ||
| """Unsubscribe from raid events for a specific broadcaster.""" | ||
| sub_id = self._subscriptions.pop(broadcaster_user_id, None) | ||
| if not sub_id: | ||
| return | ||
|
|
||
| try: | ||
| async with self._http_session.delete( | ||
| "https://api.twitch.tv/helix/eventsub/subscriptions", | ||
| headers=self._api_headers_fn(), | ||
| params={"id": sub_id}, | ||
| ): | ||
| pass | ||
| logger.debug( | ||
| "EventSub: unsubscribed %s for broadcaster %s", | ||
| sub_id, | ||
| broadcaster_user_id, | ||
| ) | ||
| except Exception: | ||
| logger.warning("EventSub: failed to unsubscribe %s", sub_id, exc_info=True) | ||
|
|
||
| async def unsubscribe_all(self) -> None: | ||
| """Unsubscribe from all active EventSub subscriptions.""" | ||
| broadcaster_ids = list(self._subscriptions.keys()) | ||
| for broadcaster_id in broadcaster_ids: | ||
| await self.unsubscribe_raids(broadcaster_id) | ||
|
|
||
| async def _create_subscription(self, broadcaster_user_id: str) -> None: | ||
| """Create an EventSub subscription for channel.raid.""" | ||
| body = { | ||
| "type": "channel.raid", | ||
| "version": "1", | ||
| "condition": {"from_broadcaster_user_id": broadcaster_user_id}, | ||
| "transport": {"method": "websocket", "session_id": self._session_id}, | ||
| } | ||
| try: | ||
| async with self._http_session.post( | ||
| "https://api.twitch.tv/helix/eventsub/subscriptions", | ||
| headers={**self._api_headers_fn(), "Content-Type": "application/json"}, | ||
| json=body, | ||
| ) as response: | ||
| if response.status in (200, 202): | ||
| data = await response.json() | ||
| self._subscriptions[broadcaster_user_id] = data["data"][0]["id"] | ||
| logger.debug( | ||
| "EventSub: subscribed to channel.raid for %s (sub=%s)", | ||
| broadcaster_user_id, | ||
| self._subscriptions[broadcaster_user_id], | ||
| ) | ||
| else: | ||
| text = await response.text() | ||
| logger.warning("EventSub: subscribe failed: %s %s", response.status, text) | ||
| except Exception: | ||
| logger.warning("EventSub: failed to create subscription", exc_info=True) | ||
|
|
||
| async def _connect_loop(self) -> None: | ||
| """Run the connection loop — connect, listen, reconnect with backoff.""" | ||
| while not self._stopped: | ||
| url = self._reconnect_url or EVENTSUB_WS_URL | ||
| self._reconnect_url = None # consume after use | ||
|
|
||
| try: | ||
| self._ws = await self._http_session.ws_connect(url) | ||
| async for msg in self._ws: | ||
| if self._stopped: | ||
| # mypy false positive (python/mypy#12784): async for + except | ||
| # CancelledError makes mypy think the loop body is unreachable | ||
| break # type: ignore[unreachable] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to fix these rather than ignore them. If you fix the types I mentioned before then somthing like this should work (so look at all the ignores again you will be able to fix them) |
||
| data = getattr(msg, "data", None) | ||
| if not isinstance(data, str): | ||
| continue | ||
| try: | ||
| self._handle_message(json.loads(data)) | ||
| except (json.JSONDecodeError, KeyError, TypeError): | ||
| logger.debug("EventSub: ignoring malformed message") | ||
| except asyncio.CancelledError: | ||
| return | ||
| except Exception: | ||
| logger.debug("EventSub: WebSocket disconnected", exc_info=True) | ||
| finally: | ||
| self._ws = None | ||
| self._ready.clear() | ||
|
|
||
| if self._stopped: | ||
| # mypy false positive (python/mypy#13104): the except Exception | ||
| # path falls through the finally to here, but mypy doesn't track | ||
| # it because except CancelledError returns | ||
| return # type: ignore[unreachable] | ||
|
|
||
| # Backoff before reconnect | ||
| logger.debug("EventSub: reconnecting in %.1fs", self._backoff) | ||
| await asyncio.sleep(self._backoff) | ||
| self._backoff = min(self._backoff * 2, MAX_BACKOFF) | ||
|
|
||
| def _handle_message(self, msg: dict[str, Any]) -> None: | ||
| """Dispatch an EventSub WebSocket message by type.""" | ||
| msg_type = msg.get("metadata", {}).get("message_type", "") | ||
|
|
||
| if msg_type == "session_welcome": | ||
| self._handle_welcome(msg) | ||
| elif msg_type == "session_reconnect": | ||
| self._handle_reconnect(msg) | ||
| elif msg_type == "notification": | ||
| self._handle_notification(msg) | ||
| elif msg_type == "revocation": | ||
| self._handle_revocation(msg) | ||
| # session_keepalive is a no-op | ||
|
|
||
| def _handle_welcome(self, msg: dict[str, Any]) -> None: | ||
| """Handle session_welcome — store session ID, re-subscribe if needed.""" | ||
| self._session_id = msg["payload"]["session"]["id"] | ||
| self._backoff = 1.0 # reset backoff | ||
|
|
||
| # Old subscriptions are invalid on the new session. Keep the broadcaster | ||
| # IDs (we need to re-subscribe) but clear the subscription IDs. | ||
| stale_broadcasters = [ | ||
| bid for bid in self._subscriptions if bid not in self._subscribe_pending | ||
| ] | ||
| self._subscriptions.clear() | ||
|
|
||
| # Re-subscribe for all broadcasters that aren't already being handled | ||
| # by a concurrent subscribe_raids call. | ||
| for broadcaster_id in stale_broadcasters: | ||
| asyncio.create_task(self._create_subscription(broadcaster_id)) | ||
|
|
||
| self._ready.set() | ||
|
|
||
| def _handle_reconnect(self, msg: dict[str, Any]) -> None: | ||
| """Handle session_reconnect — store new URL, close current WS.""" | ||
| self._reconnect_url = msg["payload"]["session"]["reconnect_url"] | ||
| if self._ws is not None: | ||
| asyncio.create_task(self._ws.close()) | ||
|
|
||
| def _handle_notification(self, msg: dict[str, Any]) -> None: | ||
| """Handle notification — fire raid callback if channel.raid.""" | ||
| sub_type = msg.get("metadata", {}).get("subscription_type", "") | ||
| if sub_type != "channel.raid": | ||
| return | ||
|
|
||
| event = msg["payload"]["event"] | ||
| from_login = event["from_broadcaster_user_login"] | ||
| to_login = event["to_broadcaster_user_login"] | ||
|
|
||
| if self._on_raid: | ||
| self._on_raid(from_login, to_login) | ||
|
|
||
| def _handle_revocation(self, msg: dict[str, Any]) -> None: | ||
| """Handle revocation — clear subscription, log warning.""" | ||
| sub = msg.get("payload", {}).get("subscription", {}) | ||
| logger.warning( | ||
| "EventSub: subscription revoked: type=%s status=%s", | ||
| sub.get("type"), | ||
| sub.get("status"), | ||
| ) | ||
| # Remove the revoked subscription by its ID | ||
| revoked_id = sub.get("id") | ||
| if revoked_id: | ||
| self._subscriptions = { | ||
| bid: sid for bid, sid in self._subscriptions.items() if sid != revoked_id | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| { | ||
| "type": "music", | ||
| "domain": "twitch", | ||
| "stage": "beta", | ||
| "name": "Twitch Audio", | ||
| "description": "Audio-only streaming from Twitch channels with raid following.", | ||
| "codeowners": ["@Drizzt321"], | ||
| "requirements": ["streamlink>=8.0,<9"], | ||
| "icon": "twitch" | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Tests for the Twitch provider.""" |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typing this as Any is defeating mypy type checking. I think this should be aiohttp.ClientSession and below ws should be aiohttp.ClientWebSocketResponse