Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
381048c
Add Twitch music provider scaffold with core streaming tests (Step 2)
Drizzt321 Mar 22, 2026
fa33f75
Add OAuth2 authentication with token refresh and revocation (Step 3)
Drizzt321 Mar 22, 2026
b25cb71
Add browse, library radios, search, and Twitch API methods (Step 4)
Drizzt321 Mar 22, 2026
d6b16d6
Add ad handling with silence injection and passthrough modes (Step 5)
Drizzt321 Mar 22, 2026
4cd2db9
Add EventSub WebSocket client and raid following state machine (Step 6)
Drizzt321 Mar 22, 2026
9532d74
Polish: resolve TODO tests, verify all linting passes (Step 7)
Drizzt321 Mar 22, 2026
95809fd
Fix OAuth redirect URI to use MA callback relay
Drizzt321 Mar 22, 2026
0b1941a
Add setup instructions label showing OAuth redirect URL in config
Drizzt321 Mar 22, 2026
0fe6f27
Improve Streamlink Auth Token description with extraction instructions
Drizzt321 Mar 22, 2026
7e35e59
Add profile images for offline channels in Following browse
Drizzt321 Mar 22, 2026
40405dc
Show ad break status in stream title and fix mypy with streamlink
Drizzt321 Mar 23, 2026
615f067
Fix ad break title: import module not value for live flag reference
Drizzt321 Mar 23, 2026
7dd3b16
Add 37 missing tests from spec: 160 total, all passing
Drizzt321 Mar 23, 2026
b106abd
Fix all P0 issues from code review
Drizzt321 Mar 23, 2026
130fff5
Fix P1 + P2 issues: get_radio, auth init, ad patch, gather, cleanup
Drizzt321 Mar 23, 2026
fcc41b1
Replace string literals with constants for ad modes and browse paths
Drizzt321 Mar 23, 2026
1809094
Reuse Streamlink session across stream resolutions
Drizzt321 Mar 23, 2026
24beed7
Address test review: 5 missing tests, fix weak assertion, deduplicate
Drizzt321 Mar 23, 2026
3a80dec
Implement grace/idle timer state machine for raid following
Drizzt321 Mar 23, 2026
bf63c6c
Add test_disconnect_triggers_reconnect — last missing spec test
Drizzt321 Mar 23, 2026
72e3975
Add 2 missing tests, fix misleading docstring
Drizzt321 Mar 23, 2026
cbcd120
Strengthen Twitch test suite from spec revision findings
Drizzt321 Mar 23, 2026
c60aed2
Fix silence.ts sample rate: 44100 Hz → 48000 Hz
Drizzt321 Mar 23, 2026
7e149cd
Add debug logging to raid state machine and event dispatch
Drizzt321 Mar 23, 2026
23d2c55
Fix silence injection: override should_filter_segment to prevent ad s…
Drizzt321 Mar 23, 2026
1fa45d2
Fix silence injection: resume reader after writing silence to buffer
Drizzt321 Mar 23, 2026
ee95379
Add tests for silence injection pipeline integration
Drizzt321 Mar 23, 2026
768d364
Add silence.ts format validation test
Drizzt321 Mar 23, 2026
8a6f284
Fix raid tracking for library:// URIs and add provider mapping resolu…
Drizzt321 Mar 23, 2026
50dadb4
Fix ad handling: patch the actual Streamlink plugin class, not the im…
Drizzt321 Mar 23, 2026
93458c0
Remove dead init-time ad handling patch
Drizzt321 Mar 23, 2026
ed16fcf
Fix raid follow URI: twitch://channel/ → twitch://radio/
Drizzt321 Mar 24, 2026
bf808bf
Fix duplicate title after ad break: clear stream_metadata instead of …
Drizzt321 Mar 24, 2026
9935afd
Fix duplicate EventSub subscription race between welcome and subscrib…
Drizzt321 Mar 24, 2026
ee41bf4
Add debug logging to EventSub stop and idle disconnect
Drizzt321 Mar 26, 2026
c879563
Merge branch 'dev' into twitch-music-provider
Drizzt321 Mar 27, 2026
9db8f9f
Add recommendations and rename Streamlink token to Twitch Website Token
Drizzt321 Mar 27, 2026
037a2da
Rename recommendations folder to Twitch Live Channels
Drizzt321 Mar 27, 2026
b63b93e
Merge branch 'dev' into twitch-music-provider
Drizzt321 Mar 27, 2026
7bc1efb
Remove silence injection ad handling (Twitch ToS compliance)
Drizzt321 Mar 27, 2026
034d56a
Increase Streamlink queue deadline to survive Twitch ad breaks
Drizzt321 Mar 27, 2026
c383396
Remove documentation and multi_instance from Twitch manifest
Drizzt321 Apr 3, 2026
9b7e363
Rework raid following: ref-counted streams, multi-queue support
Drizzt321 Apr 3, 2026
694da74
Fix raid handling during grace period for IDLE queues
Drizzt321 Apr 3, 2026
cec7b34
Move lazy imports to top of file
Drizzt321 Apr 3, 2026
44a2353
Create Streamlink session per stream, not shared on provider
Drizzt321 Apr 3, 2026
d0b1edb
Replace file-wide mypy unreachable suppression with inline ignores
Drizzt321 Apr 3, 2026
77ab142
Merge branch 'dev' into twitch-music-provider
Drizzt321 Apr 4, 2026
d2ed806
Add comment clarifying stream title reset after ad break
Drizzt321 Apr 4, 2026
5dc259a
Add mypy issue references to unreachable type ignore comments
Drizzt321 Apr 4, 2026
4e4dbc2
Merge branch 'dev' into twitch-music-provider
Drizzt321 Apr 8, 2026
dc11650
Use instance_id in raid play_media URI for multi-instance support
Drizzt321 Apr 8, 2026
06a756e
Map 401/403 to LoginFailed in _raise_for_status
Drizzt321 Apr 8, 2026
c4ca448
Validate session_id presence in auth action handler
Drizzt321 Apr 8, 2026
a6d66b7
Replace source-scanning test with behavioral to_thread assertion
Drizzt321 Apr 8, 2026
b3c1503
Replace sleep-based test synchronization with deterministic waits
Drizzt321 Apr 8, 2026
5561b9b
Move values null guard to top of _handle_auth_action
Drizzt321 Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
946 changes: 946 additions & 0 deletions music_assistant/providers/twitch/__init__.py

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions music_assistant/providers/twitch/ad_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Ad handling for Twitch streams via Streamlink monkey-patching."""

from __future__ import annotations

import logging

logger = logging.getLogger(__name__)

# Module-level flag — GIL makes bool read/write atomic.
# Set by Streamlink writer (runs in thread), read by provider.
ad_break_active: bool = False


def patch_ad_handling(reader_cls: type | None = None) -> None:
"""Patch TwitchHLSStreamReader.__writer__ to pass through ads with logging.

Args:
reader_cls: The actual TwitchHLSStreamReader class to patch. If None,
patches the imported class (which may differ from the class Streamlink's
plugin system uses at runtime due to fresh module loading).
Callers should pass the reader class from the resolved stream object
to ensure the correct class is patched.

"""
from streamlink.plugins.twitch import ( # noqa: PLC0415
TwitchHLSSegment,
TwitchHLSStreamReader,
TwitchHLSStreamWriter,
)

target_reader = reader_cls or TwitchHLSStreamReader

class PassthroughTwitchWriter(TwitchHLSStreamWriter):
"""Writer that logs ad segments and tracks ad break state."""

def should_filter_segment(self, segment: TwitchHLSSegment) -> bool: # type: ignore[override]
"""Never filter — let all segments through."""
global ad_break_active # noqa: PLW0603
if segment.ad:
ad_break_active = True
logger.debug(
"Ad segment %d (%.1fs): passing through as audio",
segment.num,
segment.duration,
)
else:
if ad_break_active:
logger.debug(
"Content segment %d: ad block ended, audio resuming",
segment.num,
)
ad_break_active = False
return False

target_reader.__writer__ = PassthroughTwitchWriter # type: ignore[attr-defined]
logger.info("Twitch ad handling: passthrough (ads play as audio)")
268 changes: 268 additions & 0 deletions music_assistant/providers/twitch/eventsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
"""EventSub WebSocket client for Twitch raid following."""

from __future__ import annotations

import asyncio
import contextlib
import json
import logging
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from collections.abc import Callable


logger = logging.getLogger(__name__)

EVENTSUB_WS_URL = "wss://eventsub.wss.twitch.tv/ws"
MAX_BACKOFF = 60.0


class EventSubClient:
"""Async EventSub WebSocket client for channel.raid subscriptions."""

def __init__(
self,
http_session: Any,
api_headers_fn: Callable[[], dict[str, str]],
) -> None:
"""Initialize EventSub client.

Args:
http_session: aiohttp ClientSession for WebSocket + API calls
api_headers_fn: callable returning auth headers for Twitch API

"""
self._http_session = http_session
self._api_headers_fn = api_headers_fn

self._ws: Any | None = None
self._session_id: str | None = None
self._subscriptions: dict[str, str] = {} # broadcaster_user_id -> subscription_id
self._reconnect_url: str | None = None

self._ready = asyncio.Event()
self._stopped = False
self._backoff = 1.0
self._listen_task: asyncio.Task[None] | None = None
self._on_raid: Callable[[str, str], Any] | None = None
self._subscribe_pending: set[str] = set()

@property
def is_connected(self) -> bool:
"""Return whether the WebSocket is connected."""
return self._ws is not None and not self._stopped

async def start(self, on_raid: Callable[[str, str], Any]) -> None:
"""Start the EventSub WebSocket connection.

Args:
on_raid: callback(from_login, to_login) when a raid is received

"""
self._on_raid = on_raid
self._stopped = False
self._listen_task = asyncio.create_task(self._connect_loop())

async def stop(self) -> None:
"""Stop the EventSub WebSocket and clean up."""
logger.debug("EventSub: stopping WebSocket and cleaning up")
self._stopped = True
self._session_id = None
self._subscriptions.clear()
self._ready.clear()

if self._ws is not None:
await self._ws.close()
self._ws = None

if self._listen_task is not None:
self._listen_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await self._listen_task
self._listen_task = None

async def subscribe_raids(self, broadcaster_user_id: str) -> None:
"""Subscribe to channel.raid events for a broadcaster. No-op if already subscribed."""
if broadcaster_user_id in self._subscriptions:
return

# Wait for WebSocket to be ready
self._subscribe_pending.add(broadcaster_user_id)
try:
await asyncio.wait_for(self._ready.wait(), timeout=10.0)
except TimeoutError:
logger.warning(
"EventSub not ready — cannot subscribe to raids for %s",
broadcaster_user_id,
)
return
finally:
self._subscribe_pending.discard(broadcaster_user_id)

# Check if welcome handler already re-subscribed (reconnect case)
if broadcaster_user_id in self._subscriptions:
return

await self._create_subscription(broadcaster_user_id)

async def unsubscribe_raids(self, broadcaster_user_id: str) -> None:
"""Unsubscribe from raid events for a specific broadcaster."""
sub_id = self._subscriptions.pop(broadcaster_user_id, None)
if not sub_id:
return

try:
async with self._http_session.delete(
"https://api.twitch.tv/helix/eventsub/subscriptions",
headers=self._api_headers_fn(),
params={"id": sub_id},
):
pass
logger.debug(
"EventSub: unsubscribed %s for broadcaster %s",
sub_id,
broadcaster_user_id,
)
except Exception:
logger.warning("EventSub: failed to unsubscribe %s", sub_id, exc_info=True)

async def unsubscribe_all(self) -> None:
"""Unsubscribe from all active EventSub subscriptions."""
broadcaster_ids = list(self._subscriptions.keys())
for broadcaster_id in broadcaster_ids:
await self.unsubscribe_raids(broadcaster_id)

async def _create_subscription(self, broadcaster_user_id: str) -> None:
"""Create an EventSub subscription for channel.raid."""
body = {
"type": "channel.raid",
"version": "1",
"condition": {"from_broadcaster_user_id": broadcaster_user_id},
"transport": {"method": "websocket", "session_id": self._session_id},
}
try:
async with self._http_session.post(
"https://api.twitch.tv/helix/eventsub/subscriptions",
headers={**self._api_headers_fn(), "Content-Type": "application/json"},
json=body,
) as response:
if response.status in (200, 202):
data = await response.json()
self._subscriptions[broadcaster_user_id] = data["data"][0]["id"]
logger.debug(
"EventSub: subscribed to channel.raid for %s (sub=%s)",
broadcaster_user_id,
self._subscriptions[broadcaster_user_id],
)
else:
text = await response.text()
logger.warning("EventSub: subscribe failed: %s %s", response.status, text)
except Exception:
logger.warning("EventSub: failed to create subscription", exc_info=True)

async def _connect_loop(self) -> None:
"""Run the connection loop — connect, listen, reconnect with backoff."""
while not self._stopped:
url = self._reconnect_url or EVENTSUB_WS_URL
self._reconnect_url = None # consume after use

try:
self._ws = await self._http_session.ws_connect(url)
async for msg in self._ws:
if self._stopped:
# mypy false positive (python/mypy#12784): async for + except
# CancelledError makes mypy think the loop body is unreachable
break # type: ignore[unreachable]
data = getattr(msg, "data", None)
if not isinstance(data, str):
continue
try:
self._handle_message(json.loads(data))
except (json.JSONDecodeError, KeyError, TypeError):
logger.debug("EventSub: ignoring malformed message")
except asyncio.CancelledError:
return
except Exception:
logger.debug("EventSub: WebSocket disconnected", exc_info=True)
finally:
self._ws = None
self._ready.clear()

if self._stopped:
# mypy false positive (python/mypy#13104): the except Exception
# path falls through the finally to here, but mypy doesn't track
# it because except CancelledError returns
return # type: ignore[unreachable]

# Backoff before reconnect
logger.debug("EventSub: reconnecting in %.1fs", self._backoff)
await asyncio.sleep(self._backoff)
self._backoff = min(self._backoff * 2, MAX_BACKOFF)

def _handle_message(self, msg: dict[str, Any]) -> None:
"""Dispatch an EventSub WebSocket message by type."""
msg_type = msg.get("metadata", {}).get("message_type", "")

if msg_type == "session_welcome":
self._handle_welcome(msg)
elif msg_type == "session_reconnect":
self._handle_reconnect(msg)
elif msg_type == "notification":
self._handle_notification(msg)
elif msg_type == "revocation":
self._handle_revocation(msg)
# session_keepalive is a no-op

def _handle_welcome(self, msg: dict[str, Any]) -> None:
"""Handle session_welcome — store session ID, re-subscribe if needed."""
self._session_id = msg["payload"]["session"]["id"]
self._backoff = 1.0 # reset backoff

# Old subscriptions are invalid on the new session. Keep the broadcaster
# IDs (we need to re-subscribe) but clear the subscription IDs.
stale_broadcasters = [
bid for bid in self._subscriptions if bid not in self._subscribe_pending
]
self._subscriptions.clear()

# Re-subscribe for all broadcasters that aren't already being handled
# by a concurrent subscribe_raids call.
for broadcaster_id in stale_broadcasters:
asyncio.create_task(self._create_subscription(broadcaster_id))

self._ready.set()

def _handle_reconnect(self, msg: dict[str, Any]) -> None:
"""Handle session_reconnect — store new URL, close current WS."""
self._reconnect_url = msg["payload"]["session"]["reconnect_url"]
if self._ws is not None:
asyncio.create_task(self._ws.close())

def _handle_notification(self, msg: dict[str, Any]) -> None:
"""Handle notification — fire raid callback if channel.raid."""
sub_type = msg.get("metadata", {}).get("subscription_type", "")
if sub_type != "channel.raid":
return

event = msg["payload"]["event"]
from_login = event["from_broadcaster_user_login"]
to_login = event["to_broadcaster_user_login"]

if self._on_raid:
self._on_raid(from_login, to_login)

def _handle_revocation(self, msg: dict[str, Any]) -> None:
"""Handle revocation — clear subscription, log warning."""
sub = msg.get("payload", {}).get("subscription", {})
logger.warning(
"EventSub: subscription revoked: type=%s status=%s",
sub.get("type"),
sub.get("status"),
)
# Remove the revoked subscription by its ID
revoked_id = sub.get("id")
if revoked_id:
self._subscriptions = {
bid: sid for bid, sid in self._subscriptions.items() if sid != revoked_id
}
10 changes: 10 additions & 0 deletions music_assistant/providers/twitch/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "music",
"domain": "twitch",
"stage": "beta",
"name": "Twitch Audio",
"description": "Audio-only streaming from Twitch channels with raid following.",
"codeowners": ["@Drizzt321"],
"requirements": ["streamlink>=8.0,<9"],
"icon": "twitch"
}
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ soco==0.30.14
soundcloudpy==0.1.4
sounddevice==0.5.5
srptools>=1.0.0
streamlink>=8.0,<9
sxm==0.2.8
unidecode==1.4.0
uv>=0.8.0
Expand Down
1 change: 1 addition & 0 deletions tests/providers/twitch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for the Twitch provider."""
Loading
Loading