Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3c9559e
feat(audio_analysis): add post_analysis hook to AudioAnalysisProvider…
chrisuthe Apr 30, 2026
5f85028
feat(audio_analysis): _finalize returns analysis; finalize wrapper fi…
chrisuthe Apr 30, 2026
0c8601a
fix(audio_analysis): add exc_info to post_analysis warning and assert…
chrisuthe Apr 30, 2026
fa3d2a0
docs(audio_analysis): codify provider contract in base class docstring
chrisuthe Apr 30, 2026
fcf2d2a
feat(loudness_analysis): _finalize returns persisted analysis
chrisuthe Apr 30, 2026
f11556e
feat(loudness_analysis): implement post_analysis for ReplayGain tag-w…
chrisuthe Apr 30, 2026
ecadfef
feat(smart_fades): _finalize returns persisted analysis
chrisuthe Apr 30, 2026
fdab5de
refactor(audio_analysis): extract _distribute_chunk for shared chunk …
chrisuthe Apr 30, 2026
c9d09be
feat(audio_analysis): add config and constants for background streaming
chrisuthe Apr 30, 2026
42a731f
feat(audio_analysis): add background streaming runner with per-track …
chrisuthe Apr 30, 2026
6c8c4b5
feat(audio_analysis): rewrite background scan as decode-once-fan-out …
chrisuthe Apr 30, 2026
3e8cfd9
refactor(audio_analysis): remove analyze_file from providers and dead…
chrisuthe Apr 30, 2026
d2cebfe
test(audio_analysis): update controller tests for new exception/evict…
chrisuthe Apr 30, 2026
76a96de
refactor(audio_analysis): remove analyze_file default from base class
chrisuthe Apr 30, 2026
caea261
test(audio_analysis): end-to-end integration test for streaming backg…
chrisuthe Apr 30, 2026
62861b0
fix(audio_analysis): use __getitem__ on sqlite3.Row in candidate-find…
chrisuthe Apr 30, 2026
2a37efa
fix(audio_analysis): output PCM_S16LE from ffmpeg, not source format
chrisuthe Apr 30, 2026
aaa7a71
feat(audio_analysis): configurable scan window with start/end hour an…
chrisuthe Apr 30, 2026
76388d9
Merge branch 'dev' into feat/audio-analysis-streaming-bg-scan
chrisuthe Apr 30, 2026
b2b7657
Merge branch 'dev' into feat/audio-analysis-streaming-bg-scan
chrisuthe May 4, 2026
0f27dce
refactor(audio_analysis): hand bg scan scheduling to TasksController
chrisuthe May 4, 2026
d20e46a
Merge branch 'dev' into feat/audio-analysis-streaming-bg-scan
chrisuthe May 4, 2026
dcb5c1e
chore(audio_analysis): drop orphaned WAV fixture replaced by FLAC
chrisuthe May 4, 2026
c4a7ce6
feat(audio_analysis): bound bg scan with 4h budget, clean cancel + close
chrisuthe May 4, 2026
1563e1b
style(audio_analysis): trim verbose docstrings and follow OHF format
chrisuthe May 4, 2026
1e18985
refactor(audio_analysis): address PR review (#3821) cleanups
chrisuthe May 4, 2026
52d2f8d
refactor(audio_analysis): use get_media_stream for bg scan decode
chrisuthe May 4, 2026
0cfc72d
refactor(audio_analysis): use dataclasses.replace for pcm_format, tri…
chrisuthe May 4, 2026
9158d79
PR Suggestions
chrisuthe May 4, 2026
ba64f20
refactor(audio_analysis): centralize set_audio_analysis in base final…
chrisuthe May 4, 2026
0a5d19b
chore(audio_analysis): group smart fades log-level under audio_analys…
chrisuthe May 4, 2026
74a4753
test(audio_analysis): mock set_audio_analysis as AsyncMock in base-cl…
chrisuthe May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
405 changes: 271 additions & 134 deletions music_assistant/controllers/streams/audio_analysis.py

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion music_assistant/controllers/streams/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
)
from music_assistant.controllers.players.helpers import AnnounceData
from music_assistant.controllers.streams.audio import StreamsAudio
from music_assistant.controllers.streams.audio_analysis import AudioAnalysisController
from music_assistant.controllers.streams.audio_analysis import (
CONF_BACKGROUND_SCAN_CONCURRENCY,
DEFAULT_BACKGROUND_SCAN_CONCURRENCY,
AudioAnalysisController,
)
from music_assistant.controllers.streams.constants import (
CONF_ALLOW_CROSSFADE_SAME_ALBUM,
CONF_BUFFER_SIZE,
Expand Down Expand Up @@ -252,6 +256,17 @@ async def get_config_entries(
category="generic",
Comment thread
MarvinSchenkel marked this conversation as resolved.
Outdated
advanced=True,
),
ConfigEntry(
key=CONF_BACKGROUND_SCAN_CONCURRENCY,
type=ConfigEntryType.INTEGER,
range=(1, 8),
default_value=DEFAULT_BACKGROUND_SCAN_CONCURRENCY,
label="Background analysis concurrency",
description="Maximum number of tracks analyzed concurrently during the nightly "
"background scan. Default 1 (serial). Increase only if your hardware can handle "
"concurrent torch/ffmpeg work.",
category="audio_analysis",
),
)

async def setup(self, config: CoreConfig) -> None:
Expand Down
92 changes: 70 additions & 22 deletions music_assistant/models/audio_analysis_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,37 @@ class AudioAnalysisProvider(Provider):
These providers receive PCM audio chunks during streaming and produce analysis
results such as beat tracking, key detection, phrase boundaries, etc.

The AudioAnalysisController creates session IDs and passes them to all methods.
Providers implement _start_analysis and _finalize as hooks — the base class
manages session lifecycle, version gating, and cleanup.
Providers implement four hooks (`_start_analysis`, `process_pcm_chunk`,
`_finalize`, optionally `post_analysis`) and the base class manages session
lifecycle, version gating, and cleanup. The same hooks drive both live
playback (PCM from `AudioBuffer`) and background scans (PCM from ffmpeg
decoding a local file). Providers do not need to know which context they
are running in.

Provider contract (binding):

1. `process_pcm_chunk` MUST `await` all work that processes the chunk.
The controller serializes chunks across providers and uses this to
backpressure the audio source. Fire-and-forget per-chunk work breaks
backpressure and can pile up unboundedly at flat-out background rates.

2. Providers MAY spawn background tasks during `process_pcm_chunk` only
when the total task count is bounded by per-track properties (e.g. a
fixed number of CLAP target windows configured per track). All such
tasks MUST be tracked and awaited in `_finalize`.

3. Providers MUST NOT begin work for session N+1 while session N is
still active. Per-session state should be keyed on `session_id`.

4. `_finalize` MUST return the AudioAnalysisData it persisted, or None
if it chose not to persist (e.g. insufficient audio). The base class
uses the return value to drive `post_analysis`.

5. `post_analysis` is optional and called by the base class after
`_finalize` returns a non-None analysis. It is the place for filesystem
side effects such as tag-writing. Implementations MUST self-gate on
whether `streamdetails.path` is a writable filesystem path, because
this hook fires for both live and background scans.
"""

# Version of the analysis algorithm. Providers should increment this when
Expand Down Expand Up @@ -121,44 +149,64 @@ async def process_pcm_chunk(
"""

@abstractmethod
async def _finalize(self, session_id: str) -> None:
"""Finalize analysis and store results.
async def _finalize(self, session_id: str) -> AudioAnalysisData | None:
"""Finalize analysis and return the persisted analysis (or None to skip).

Called when the track has finished streaming. Providers are responsible
for storing their results via mass.streams.audio_analysis.set_audio_analysis().
Return the AudioAnalysisData that was persisted, or None if the provider
chose not to store a result (e.g. insufficient audio data). The base
class uses the returned value to drive the post_analysis hook.

:param session_id: The analysis session ID.
"""

async def finalize(self, session_id: str) -> None:
"""Finalize analysis and clean up session state.
"""Finalize analysis, optionally fire post_analysis, and clean up state.

Calls _finalize, then removes the session from _sessions.
The controller calls this method — providers override _finalize.
Calls _finalize, then post_analysis (when _finalize returned a non-None
analysis), then removes the session from _sessions. Both _finalize and
post_analysis exceptions are caught and logged — neither is allowed to
leave session state behind or to propagate to the controller.

:param session_id: The analysis session ID.
"""
analysis: AudioAnalysisData | None = None
try:
await self._finalize(session_id)
finally:
self._sessions.pop(session_id, None)
analysis = await self._finalize(session_id)
except Exception as err:
self.logger.error("_finalize raised for session %s: %s", session_id, err, exc_info=err)
session = self._sessions.get(session_id)
if analysis is not None and session is not None:
try:
await self.post_analysis(session.streamdetails, analysis)
except Exception as err:
self.logger.warning(
"post_analysis raised for %s: %s", self.domain, err, exc_info=err
)
self._sessions.pop(session_id, None)

async def analyze_file(
async def post_analysis(
self,
streamdetails: StreamDetails,
) -> AudioAnalysisData | None:
"""
Run analysis directly on a local audio file.
analysis: AudioAnalysisData,
) -> None:
"""Run side effects after analysis is finalized and persisted.

Called by the base class `finalize` wrapper after `_finalize` returns
a non-None analysis. Default is a no-op. Providers override this to
perform filesystem side effects such as writing tags back to the
source file. Failures are caught by the base class and logged — they
must not undo the analysis row.

Used by the AudioAnalysisController's background scan. Providers that can
analyze a file without going through live PCM streaming (e.g. by handing
the path to FFmpeg/librosa/etc.) should override this. Default returns
None, meaning the provider does not support file-based analysis.
Implementations must self-gate on whether `streamdetails.path` is a
writable filesystem path, since this hook fires for both live and
background-scan analyses.

:param streamdetails: StreamDetails for the item being analyzed.
Contains the local file path and audio format.
:param streamdetails: The stream details for the analyzed item.
:param analysis: The analysis data that was persisted by `_finalize`.
"""
return None
return

async def cancel(self, session_id: str) -> None:
"""Cancel an in-progress analysis session."""
Expand Down
85 changes: 34 additions & 51 deletions music_assistant/providers/loudness_analysis/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,6 @@ async def cancel(self, session_id: str) -> None:
await data.ffmpeg.close()
await super().cancel(session_id)

async def analyze_file(self, streamdetails: StreamDetails) -> AudioAnalysisData | None:
"""Run ebur128 directly on a local audio file and return the measurement."""
if not isinstance(streamdetails.path, str) or not streamdetails.path:
return None
metrics = await _run_ebur128_on_file(streamdetails.path, streamdetails.audio_format)
if metrics is None:
return None
loudness, loudness_range, true_peak = metrics
if loudness is None or loudness <= LOUDNESS_MEASUREMENT_MIN_LUFS:
return None
if self.config.get_value(CONF_WRITE_REPLAYGAIN_TAGS):
# ReplayGain 2.0: track_gain_db = -18 - loudness_lufs
track_gain_db = -18.0 - loudness
ok = await write_replaygain_track_gain(streamdetails.path, track_gain_db)
if ok:
self.logger.debug(
"Background loudness: wrote ReplayGain tag to %s", streamdetails.path
)
return AudioAnalysisData(
loudness_integrated=round(loudness, 2),
loudness_range=round(loudness_range, 2) if loudness_range is not None else None,
true_peak=round(true_peak, 2) if true_peak is not None else None,
)

async def _start_analysis(
self,
session_id: str,
Expand All @@ -131,26 +107,26 @@ async def _start_analysis(
self._data[session_id] = LoudnessSessionData(ffmpeg=ffmpeg)
return True

async def _finalize(self, session_id: str) -> None:
async def _finalize(self, session_id: str) -> AudioAnalysisData | None:
"""Persist the final loudness measurement for the session."""
data = self._data.pop(session_id, None)
if not data:
return
return None

await self._send_eof(data)
try:
await data.ffmpeg.wait()
except Exception as err:
self.logger.debug("Loudness analysis ffmpeg failed: %s", err)
await data.ffmpeg.close()
return
return None

metrics = _parse_ebur128_metrics(data.ffmpeg.log_history)
await data.ffmpeg.close()

session = self._sessions.get(session_id)
if session is None:
return
return None

if data.chunks_received < MIN_DURATION_SECONDS:
self.logger.debug(
Expand All @@ -160,15 +136,15 @@ async def _finalize(self, session_id: str) -> None:
data.chunks_received,
MIN_DURATION_SECONDS,
)
return
return None

loudness, loudness_range, true_peak = metrics
if loudness is None:
self.logger.debug(
"Could not determine loudness of %s from buffer analysis",
session.streamdetails.uri,
)
return
return None

if loudness <= LOUDNESS_MEASUREMENT_MIN_LUFS:
# ebur128 reports ~-70 LUFS on near-silence / cancelled streams,
Expand All @@ -180,7 +156,7 @@ async def _finalize(self, session_id: str) -> None:
loudness,
LOUDNESS_MEASUREMENT_MIN_LUFS,
)
return
return None

analysis = AudioAnalysisData(
loudness_integrated=round(loudness, 2),
Expand All @@ -205,6 +181,33 @@ async def _finalize(self, session_id: str) -> None:
loudness_range,
true_peak,
)
return analysis

async def post_analysis(
self,
streamdetails: StreamDetails,
analysis: AudioAnalysisData,
) -> None:
"""Write the ReplayGain track-gain tag back to the source file when configured.

:param streamdetails: Stream context for the analyzed track.
:param analysis: The persisted analysis result produced by ``_finalize``.
"""
if not isinstance(streamdetails.path, str) or not streamdetails.path:
return
if not self.config.get_value(CONF_WRITE_REPLAYGAIN_TAGS):
return
if analysis.loudness_integrated is None:
return
# ReplayGain 2.0: track_gain_db = -18 - loudness_lufs
track_gain_db = -18.0 - analysis.loudness_integrated
ok = await write_replaygain_track_gain(streamdetails.path, track_gain_db)
if ok:
self.logger.debug(
"Wrote ReplayGain tag to %s (gain=%.2f dB)",
streamdetails.path,
track_gain_db,
)

async def _send_eof(self, data: LoudnessSessionData) -> None:
"""Signal end-of-input to the session's ffmpeg process (idempotent)."""
Expand Down Expand Up @@ -234,23 +237,3 @@ def _match_float(pattern: re.Pattern[str], text: str) -> float | None:
return float(match.group(1))
except ValueError:
return None


async def _run_ebur128_on_file(
file_path: str, audio_format: AudioFormat
) -> tuple[float | None, float | None, float | None] | None:
"""Run ebur128 on a local audio file and return the (I, LRA, TP) tuple."""
try:
async with FFMpeg(
audio_input=file_path,
input_format=audio_format,
output_format=audio_format,
audio_output="NULL",
filter_params=["ebur128=framelog=verbose"],
collect_log_history=True,
loglevel="info",
) as ffmpeg:
await ffmpeg.wait()
return _parse_ebur128_metrics(ffmpeg.log_history)
except Exception:
return None
9 changes: 5 additions & 4 deletions music_assistant/providers/smart_fades/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ async def _start_analysis(
self.logger.debug("Started beat tracking session %s", session_id)
return True

async def _finalize(self, session_id: str) -> None:
async def _finalize(self, session_id: str) -> AudioAnalysisData | None:
"""Finalize beat tracking and store results."""
data = self._data.pop(session_id, None)
if not data:
return
return None

# Flush remaining buffered PCM
if data.pcm_samples:
Expand All @@ -205,7 +205,7 @@ async def _finalize(self, session_id: str) -> None:
data.beats_feature_blocks.append(final_feats)

if not data.beats_feature_blocks:
return
return None

feats = np.concatenate(data.beats_feature_blocks, axis=0)
duration = data.total_pcm_samples / ANALYSIS_SAMPLE_RATE
Expand All @@ -220,7 +220,7 @@ async def _finalize(self, session_id: str) -> None:
beats, downbeats = await asyncio.to_thread(self._infer_beat_timings, feats)
if len(beats) < 2:
self.logger.debug("Not enough beats detected, skipping storage")
return
return None
key, mode = await asyncio.to_thread(self._infer_musical_key, all_vqt)

bpm = calculate_overall_bpm(beats)
Expand Down Expand Up @@ -276,6 +276,7 @@ async def _finalize(self, session_id: str) -> None:
len(downbeats),
f"{key} {mode}" if key else "unknown",
)
return analysis

async def _process_block(self, data: SmartFadesData, *, last: bool = False) -> None:
"""Resample accumulated PCM buffer and extract features."""
Expand Down
1 change: 1 addition & 0 deletions tests/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for Music Assistant controllers."""
1 change: 1 addition & 0 deletions tests/controllers/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for Music Assistant stream controllers."""
Loading
Loading