Stream PCM to audio analysis providers during background scan#3821
Merged
MarvinSchenkel merged 32 commits intomusic-assistant:devfrom May 4, 2026
Merged
Conversation
…res post_analysis
…fan-out Pull the gather + per-provider eviction logic out of _chunk_worker into a new _distribute_chunk method so the background-scan path (Task 10) can reuse it without duplicating the fan-out and eviction logic. Behavior change vs. old _chunk_worker: a provider that raises an exception in process_pcm_chunk is now evicted (same as a timeout), rather than being silently skipped. A provider that errors mid-session cannot safely continue processing.
Adds BACKGROUND_PER_TRACK_TIMEOUT_SECONDS, CONF_BACKGROUND_SCAN_CONCURRENCY, and DEFAULT_BACKGROUND_SCAN_CONCURRENCY constants to audio_analysis.py. Registers a StreamsController ConfigEntry for the new concurrency key under the audio_analysis category. Adds _get_scan_concurrency() helper to AudioAnalysisController that reads from streams core config with [1, 8] clamping.
…budget Adds _run_background_streaming_for_track (outer, wraps asyncio.wait_for with BACKGROUND_PER_TRACK_TIMEOUT_SECONDS) and _run_background_streaming_inner (inner body: start providers, decode via FFMpeg source-native, fan chunks via _distribute_chunk, finalize on clean EOF). Also imports FFMpeg. On timeout, ffmpeg failure, or any exception _cancel_providers is called and the session is cleaned up cleanly. Path guard added to reject non-str/empty paths before reaching FFMpeg (satisfies mypy and aligns with existing _run_background_scan contract).
…streaming Replace the per-provider sequential loop with a union SQL query that finds all filesystem tracks missing analysis from any AA provider in a single pass, then schedules them in parallel under an asyncio.Semaphore for concurrency control.
… helpers Drop LoudnessAnalysisProvider.analyze_file and the module-level _run_ebur128_on_file helper it called (both unreachable now that the streaming background-scan path is in place), and remove the _find_tracks_missing_analysis controller method that was superseded by _find_candidates_missing_analysis in Task 11.
_run_background_streaming_inner was passing streamdetails.audio_format as both ffmpeg input and output format. For compressed sources (FLAC, MP3, etc.) this emits compressed frames instead of PCM, causing numpy.frombuffer to fail with odd-byte frame lengths. Construct a PCM_S16LE output format that preserves the source's sample rate and channel count, pass it to both FFMpeg output_format and _start_analysis_on_providers so providers see the format they actually receive. Update the integration test fixture from WAV to FLAC so this decode path is exercised going forward.
…d catch-up Add CONF_BACKGROUND_SCAN_START_HOUR (default 0) and CONF_BACKGROUND_SCAN_END_HOUR (default 6) to the StreamsController config, allowing operators to confine the background analysis scan to a specific wall-clock window. - Remove BACKGROUND_SCAN_BATCH_SIZE; deadline now gates work instead of a count cap - _compute_scan_deadline_monotonic handles same-day, wrap-around (start >= end), and 24-hour (start == end) windows - _is_in_scan_window checked at setup() so a boot inside the window triggers an immediate catch-up scan; the nightly schedule continues to use start_hour - _run_one checks the deadline after acquiring the semaphore so in-flight tracks finish while new ones are blocked past end_hour - 16 new tests covering config clamping, deadline arithmetic, window membership, deadline gating, and setup() catch-up behaviour
Drop the custom scan window (start/end hour configs, deadline computation, in-window check, boot catch-up) in favor of the TaskSchedule.daily pattern used by every other recurring task in the codebase. Run time is hardcoded to 00:00 local. Set allow_retry=True and surface per-track timeouts/exceptions via add_task_failure so flaky runs end as PARTIAL_SUCCESS with retryable status and per-track failure messages visible in the Tasks UI. The candidate query is idempotent on missing analysis, so retries naturally pick up only tracks that still lack data.
short_test.wav was the original integration-test fixture but was replaced by short_test.flac in 2a37efa so the FLAC decode path gets exercised. The WAV was never deleted; remove it now.
Three related fixes that give the background scan a robust end mechanism: - Cap each _run_background_scan invocation at 4 hours wall-clock. Once the budget expires, in-flight tracks finish (still bounded by their per-track timeout) but no new tracks are started. Remaining candidates fall to the next scheduled run. - Catch asyncio.CancelledError in _run_background_streaming_for_track so an admin cancel or process shutdown still calls _cancel_providers for the in-flight session before propagating. CancelledError inherits from BaseException so the broad except below was silently leaking session state. - Add AudioAnalysisController.close() to drain _active_sessions and cancel any pending live-playback chunk workers. Wire it into StreamsController.close() so MA shutdown stops cleanly.
OHF style: multi-line docstrings open with `"""` on its own line, and docstrings should describe caller-relevant behaviour rather than internal mechanics. Sweep the audio-analysis files in this PR to: - Reformat all multi-line docstrings to start `"""` on a new line. - Drop bodies that explained internal implementation details where the code itself was self-evident. - Compress the AudioAnalysisProvider class contract from 38 lines to a short paragraph; the binding bits live alongside the abstract methods they govern. - Drop two trivial inline comments on PCM input/output format params and one redundant comment on a self-explanatory if-branch.
…eanups - Move CONF/DEFAULT_BACKGROUND_SCAN_CONCURRENCY to streams/constants.py so the streams controller no longer reaches into audio_analysis.py for shared config keys (PR feedback r3181913597). - Rewrite _find_candidates_missing_analysis as CROSS JOIN + NOT EXISTS, letting the database compute the missing-domain diff and dropping the Python post-processing loop (PR feedback r3181870282). Scalar values use :name placeholders; the AA-domain list is bound via UNION ALL. - Fix the PCM format derivation: override only content_type so ffmpeg decodes (the FLAC bug fix from 2a37efa also pinned bit_depth=16, which wasn't required and narrowed the format unnecessarily). Now uses ContentType.from_bit_depth(streamdetails.audio_format.bit_depth) to preserve source bit depth end-to-end, matching the live playback path. chunk_size now uses pcm_format.pcm_sample_size (PR feedback r3181941108). - Rename CHUNK_PROCESS_TIMEOUT → CHUNK_PROCESS_TIMEOUT_SECONDS for within-file consistency with the other timeout constants. - Use float division and %.1fh in the run-budget log line so the output is correct for non-multiple-of-3600 budgets.
Replace the explicit FFMpeg block in _run_background_streaming_inner with mass.streams.audio.get_media_stream(streamdetails, pcm_format) so live and background paths share a single decode pipeline. For LOCAL_FILE sources this is functionally identical (no -re pacing on either side) but kills the duplication and inherits future improvements to the canonical pipeline (codec detection, duration tracking, stream-type generality). Update unit tests to mock mass.streams.audio.get_media_stream (returning an async generator yielding fake chunks) instead of patching FFMpeg. The integration test gets a real-ffmpeg get_media_stream stand-in that mirrors audio.py's wait-then-close pattern so close() doesn't hit the SIGINT path on Windows when the process is still running.
…m comments Use dataclasses.replace(streamdetails.audio_format, content_type=...) so all source fields (codec_type, output_format_str, bit_rate) are preserved instead of silently dropped by the explicit AudioFormat constructor. Trim comments per CLAUDE.md guidance.
…ize() Move the persistence call out of each provider's _finalize() and into AudioAnalysisProvider.finalize(). The base class now orchestrates compute -> persist -> post_analysis -> cleanup, with post_analysis skipped if persistence fails. Providers return the AudioAnalysisData (or None to skip) and no longer call set_audio_analysis themselves.
…is category Moves CONF_SMART_FADES_LOG_LEVEL out of the generic category so it shows alongside the other audio-analysis settings in the UI.
…ass tests After centralizing the persistence call in AudioAnalysisProvider.finalize(), the base-class test fixture's MagicMock'd set_audio_analysis raises TypeError when awaited. The try/except in finalize() catches it and skips post_analysis, which broke two tests that assert post_analysis was called. Mock it as AsyncMock to match the loudness/smart_fades fixtures.
MarvinSchenkel
approved these changes
May 4, 2026
Contributor
MarvinSchenkel
left a comment
There was a problem hiding this comment.
Awesome job @chrisuthe , thanks 👏
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Shifts the audio-analysis background scan from per-provider
analyze_file()(each provider opens its own decoder) to a single decode-once-fan-out streaming PCM path, so providers only need to implement live-streaming hooks. Adds an optionalpost_analysislifecycle hook for filesystem side effects (replaces loudness's previous in-analyze_fileReplayGain tag-write), and adds a configurablebackground_scan_concurrencyknob.analyze_file()removed from the base class and fromloudness_analysis;loudness_analysisnow writes ReplayGain tags via the newpost_analysishook (which fires for both background scans AND live filesystem playback)design decisions
background_scan_concurrency, default 1_finalizereturn typeAudioAnalysisData | None— analysis (or None to skip), used by base class to drivepost_analysisAudioAnalysisProviderclass docstring — "process_pcm_chunkMUSTawaitits work; per-track-bounded background tasks OK; no cross-track work"changes
Two intentional behavior changes worth flagging for reviewers:
Provider exceptions in
process_pcm_chunknow evict the provider from the session (previously logged but the provider stayed in). Spec rationale: a provider that raises can't keep processing; this also makes the live and background paths consistent._finalizeexceptions are now caught and logged ERROR by the base class wrapper instead of propagating. Spec rationale: a misbehaving provider must not leave session state behind or take down the controller. Companion test intests/core/test_audio_analysis_controller.pyupdated to match."Storage provider unavailable" no longer aborts the whole scan batch — only that track is skipped now. Slightly more granular than before.
tested locally
tests/core/test_audio_analysis_controller.py+tests/controllers/streams/+tests/models/+tests/providers/loudness_analysis/+tests/providers/smart_fades/+tests/integration/)pre-commit run --all-filesclean across the whole worktreemypyclean on all touched modulesloudness_integrated = -14.1 LUFSagainst a 0.3-amplitude 440 Hz mono sine — plausible