diff --git a/music_assistant/providers/sonic_analysis/__init__.py b/music_assistant/providers/sonic_analysis/__init__.py new file mode 100644 index 0000000000..2b089e99bd --- /dev/null +++ b/music_assistant/providers/sonic_analysis/__init__.py @@ -0,0 +1,320 @@ +"""Sonic Analysis provider for Music Assistant. + +Extracts audio features from PCM audio streams during playback and +stores them as semantic AudioAnalysisData fields. +""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import numpy as np + +from music_assistant.models.audio_analysis_provider import ( + AnalysisSessionData, + AudioAnalysisProvider, +) + +from .helpers import ( + BlockFeatures, + collapse_to_analysis, + extract_block_features, + merge_block_features, +) + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig + from music_assistant_models.media_items import AudioFormat + from music_assistant_models.provider import ProviderManifest + from music_assistant_models.streamdetails import StreamDetails + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + from music_assistant.models.audio_analysis import AudioAnalysisData + +ANALYZE_FILE_SAMPLE_RATE: int = 22050 +# Minimum audio length (1 second) required for meaningful feature extraction. +ANALYZE_FILE_MIN_SAMPLES: int = 22050 + + +BLOCK_SECONDS: int = 10 +OVERLAP_SAMPLES: int = 2048 + + +@dataclass +class SonicSessionData(AnalysisSessionData): + """Per-session state: PCM block buffer and accumulated per-block features.""" + + pcm_buffer: bytearray = field(default_factory=bytearray) + block_samples: int = 0 + accumulated: BlockFeatures = field(default_factory=BlockFeatures) + total_samples: int = 0 + overlap: np.ndarray | None = None + start_time: float = 0.0 + peak_absolute: float = 0.0 + waveform_peaks: list[float] = field(default_factory=list) + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider instance with given configuration.""" + return SonicAnalysisProvider(mass, manifest, config) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider. + + :param mass: MusicAssistant instance. + :param instance_id: id of an existing provider instance (None if new instance setup). + :param action: action key called from config entries UI. + :param values: the (intermediate) raw values for config entries sent with the action. + """ + return () + + +def _pcm_bytes_to_audio( + pcm_data: bytes, + sample_rate: int, + bit_depth: int, + channels: int, +) -> np.ndarray: + """Convert raw PCM bytes to a mono float32 numpy array. + + :param pcm_data: Raw PCM audio bytes. + :param sample_rate: Sample rate in Hz (unused in conversion, kept for API symmetry). + :param bit_depth: Bits per sample (16, 24, or 32). + :param channels: Number of audio channels. + """ + _ = sample_rate + if bit_depth == 16: + samples = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) + samples /= 32768.0 + elif bit_depth == 24: + num_samples = len(pcm_data) // 3 + raw = np.frombuffer(pcm_data[: num_samples * 3], dtype=np.uint8).reshape(-1, 3) + i32 = ( + raw[:, 0].astype(np.int32) + | (raw[:, 1].astype(np.int32) << 8) + | (raw[:, 2].astype(np.int32) << 16) + ) + i32[i32 >= 0x800000] -= 0x1000000 + samples = i32.astype(np.float32) / 8388608.0 + elif bit_depth == 32: + samples = np.frombuffer(pcm_data, dtype=np.int32).astype(np.float32) + samples /= 2147483648.0 + else: + msg = f"Unsupported bit depth: {bit_depth}" + raise ValueError(msg) + + if channels > 1: + samples = samples.reshape(-1, channels).mean(axis=1) + return samples + + +class SonicAnalysisProvider(AudioAnalysisProvider): + """Provider that extracts sonic features from audio streams.""" + + analysis_version: int = 1 + + async def loaded_in_mass(self) -> None: + """Call after the provider has been loaded.""" + + async def _start_analysis( + self, + session_id: str, + streamdetails: StreamDetails, + audio_format: AudioFormat, + ) -> bool: + """Initialize a new sonic analysis session. + + :param session_id: Unique session ID created by the controller. + :param streamdetails: Details about the stream being analyzed. + :param audio_format: PCM format of the audio stream. + """ + bytes_per_sample = audio_format.bit_depth // 8 + block_bytes = ( + audio_format.sample_rate * bytes_per_sample * audio_format.channels * BLOCK_SECONDS + ) + if block_bytes <= 0: + self.logger.warning( + "Invalid audio format for session %s (sample_rate=%d, bit_depth=%d, channels=%d)" + " — skipping analysis", + session_id, + audio_format.sample_rate, + audio_format.bit_depth, + audio_format.channels, + ) + return False + base = self._sessions[session_id] + self._sessions[session_id] = SonicSessionData( + streamdetails=base.streamdetails, + audio_format=base.audio_format, + block_samples=block_bytes, + start_time=time.monotonic(), + ) + self.logger.debug( + "Started sonic analysis for %s/%s", streamdetails.provider, streamdetails.item_id + ) + return True + + async def process_pcm_chunk( + self, + session_id: str, + pcm_chunk: bytes, + ) -> None: + """Accumulate PCM and extract features when a 10-second block is full. + + :param session_id: The analysis session ID. + :param pcm_chunk: Raw PCM audio data. + """ + if session_id not in self._sessions: + return + session = self._sessions[session_id] + assert isinstance(session, SonicSessionData) + session.pcm_buffer.extend(pcm_chunk) + af = session.audio_format + while len(session.pcm_buffer) >= session.block_samples: + block_bytes = bytes(session.pcm_buffer[: session.block_samples]) + del session.pcm_buffer[: session.block_samples] + audio = _pcm_bytes_to_audio(block_bytes, af.sample_rate, af.bit_depth, af.channels) + session.total_samples += len(audio) + block_peak = float(np.max(np.abs(audio))) + session.peak_absolute = max(session.peak_absolute, block_peak) + session.waveform_peaks.append(block_peak) + if session.overlap is not None: + audio = np.concatenate([session.overlap, audio]) + session.overlap = audio[-OVERLAP_SAMPLES:].copy() + bf = await asyncio.to_thread(extract_block_features, audio, af.sample_rate) + if bf is not None: + merge_block_features(session.accumulated, bf) + + async def _finalize(self, session_id: str) -> None: + """Process remaining PCM, collapse features, and store analysis data. + + :param session_id: The analysis session ID. + """ + if session_id not in self._sessions: + return + session = self._sessions[session_id] + assert isinstance(session, SonicSessionData) + sd = session.streamdetails + af = session.audio_format + + # Flush any remaining PCM as a final partial block + if session.pcm_buffer: + audio = _pcm_bytes_to_audio( + bytes(session.pcm_buffer), af.sample_rate, af.bit_depth, af.channels + ) + session.total_samples += len(audio) + block_peak = float(np.max(np.abs(audio))) + session.peak_absolute = max(session.peak_absolute, block_peak) + session.waveform_peaks.append(block_peak) + if session.overlap is not None: + audio = np.concatenate([session.overlap, audio]) + bf = await asyncio.to_thread(extract_block_features, audio, af.sample_rate) + if bf is not None: + merge_block_features(session.accumulated, bf) + session.pcm_buffer.clear() + + if not session.accumulated.mfcc_frames: + self.logger.debug("No feature blocks for session %s, skipping", session_id) + return + + analysis = await asyncio.to_thread( + collapse_to_analysis, session.accumulated, af.sample_rate + ) + + # Fill in fields that need session-level state + analysis.duration = session.total_samples / af.sample_rate + if session.peak_absolute > 0: + analysis.true_peak = float(20.0 * np.log10(session.peak_absolute)) + else: + analysis.true_peak = -96.0 + + # Build 800-bin waveform from per-block peaks + if session.waveform_peaks: + peaks = np.array(session.waveform_peaks, dtype=np.float32) + if len(peaks) >= 800: + bin_edges = np.linspace(0, len(peaks), 801, dtype=int) + waveform = np.array( + [peaks[bin_edges[i] : bin_edges[i + 1]].max() for i in range(800)], + dtype=np.float32, + ) + else: + indices = np.linspace(0, len(peaks) - 1, 800, dtype=int) + waveform = peaks[indices] + wf_max = waveform.max() + if wf_max > 0: + waveform = waveform / wf_max + analysis.wave_form = waveform + + await self.mass.streams.audio_analysis.set_audio_analysis( + item_id=sd.item_id, + provider_instance_id_or_domain=sd.provider, + aa_provider_domain=self.domain, + analysis=analysis, + analysis_version=self.analysis_version, + media_type=sd.media_type, + ) + elapsed = time.monotonic() - session.start_time + self.logger.debug( + "Stored analysis for %s/%s (%.1fs elapsed)", + sd.provider, + sd.item_id, + elapsed, + ) + + async def analyze_file( + self, streamdetails: StreamDetails + ) -> AudioAnalysisData | None: + """Run librosa analysis directly on a local audio file for background scan. + + :param streamdetails: StreamDetails pointing at a local file path. + """ + if not isinstance(streamdetails.path, str) or not streamdetails.path: + return None + try: + import librosa # noqa: PLC0415 + except ImportError: + return None + try: + audio, _sr = await asyncio.to_thread( + librosa.load, + streamdetails.path, + sr=ANALYZE_FILE_SAMPLE_RATE, + mono=True, + ) + except Exception as err: + self.logger.debug( + "analyze_file: load failed for %s/%s: %s", + streamdetails.provider, + streamdetails.item_id, + err, + ) + return None + if len(audio) < ANALYZE_FILE_MIN_SAMPLES: + return None + + bf = await asyncio.to_thread( + extract_block_features, audio, ANALYZE_FILE_SAMPLE_RATE + ) + if bf is None: + return None + analysis = await asyncio.to_thread( + collapse_to_analysis, bf, ANALYZE_FILE_SAMPLE_RATE + ) + analysis.duration = len(audio) / ANALYZE_FILE_SAMPLE_RATE + peak = float(np.max(np.abs(audio))) + analysis.true_peak = ( + float(20.0 * np.log10(peak)) if peak > 0 else -96.0 + ) + return analysis diff --git a/music_assistant/providers/sonic_analysis/helpers.py b/music_assistant/providers/sonic_analysis/helpers.py new file mode 100644 index 0000000000..a397f6b555 --- /dev/null +++ b/music_assistant/providers/sonic_analysis/helpers.py @@ -0,0 +1,285 @@ +"""Sonic analysis helper — feature extraction and semantic audio analysis. + +Extracts per-block spectral/timbral features from raw PCM audio using librosa, +then collapses accumulated blocks into a populated AudioAnalysisData with +semantic descriptors. + +Fields NOT computed here are left as None and expected to be supplied by +overlay providers (see `sonic_similarity.OVERLAY_SOURCES`): + +- `bpm` ← smart_fades (beat_this CNN) +- `key`, `mode` ← smart_fades (S-KEY neural classifier) +- `danceability` ← clap_analysis (zero-shot, Platt-calibrated) +- `valence`, `arousal`, + `instrumentalness`, + `acousticness` ← clap_analysis (zero-shot, Platt-calibrated) +- `loudness_integrated`, + `loudness_range`, + `true_peak` ← loudness_analysis (ebur128) when enabled; + fallback approximations populated here +""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass, field + +import librosa +import numpy as np +import numpy.typing as npt + +from music_assistant.models.audio_analysis import AudioAnalysisData + +# Fixed resolution for time-series fields (rms_energy, spectral_centroid) on +# AudioAnalysisData — matches the upstream contract shared with other analysis +# providers. Produces a consistent x-axis resolution regardless of track length. +_TIME_SERIES_BINS = 1800 + +# Energy threshold below which spectral centroid becomes noise-dominated; centroid +# bins with RMS below this are zeroed to keep the signal musically meaningful. +_SILENCE_THRESHOLD = 0.01 + + +@dataclass +class BlockFeatures: + """Per-block feature arrays accumulated across 10-second blocks. + + After all blocks are processed, collapse_to_analysis() aggregates these + into a populated AudioAnalysisData. + + Only features actually consumed by the current collapse pipeline are + extracted. (MFCC, tonnetz, rolloff, and ZCR were previously extracted + but never read; removed to save ~100ms per 10s block.) + """ + + chroma_frames: list[np.ndarray] = field(default_factory=list) + contrast_frames: list[np.ndarray] = field(default_factory=list) + centroid_frames: list[np.ndarray] = field(default_factory=list) + flatness_frames: list[np.ndarray] = field(default_factory=list) + rms_frames: list[np.ndarray] = field(default_factory=list) + onset_env_frames: list[np.ndarray] = field(default_factory=list) + + +MIN_BLOCK_SAMPLES: int = 4096 + + +def extract_block_features(audio: np.ndarray, sample_rate: int) -> BlockFeatures | None: + """Extract per-frame features from a single audio block (~10 seconds). + + Returns None if the audio is too short for STFT processing. + + Computes a single STFT up front and passes it to each spectral feature + via the `S=` kwarg. librosa functions used here (chroma_stft, + spectral_contrast, spectral_centroid, spectral_flatness) all share the + same default n_fft=2048 / hop_length=512, so a single STFT is the + correct input for all of them. Output is numerically identical to + calling each with raw audio — we just skip 3 redundant STFT passes. + + :param audio: Mono float32 audio samples for this block. + :param sample_rate: Sample rate in Hz. + """ + if len(audio) < MIN_BLOCK_SAMPLES: + return None + bf = BlockFeatures() + + # Suppress librosa's n_fft warnings from internal sub-calls (harmonic/percussive + # separation in chroma can produce sub-signals shorter than n_fft) + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", message="n_fft=", category=UserWarning) + warnings.filterwarnings("ignore", message="Trying to estimate tuning", category=UserWarning) + + # One STFT, shared across spectral features + stft_mag = np.abs(librosa.stft(audio)) + stft_power = stft_mag**2 + + bf.chroma_frames.append(librosa.feature.chroma_stft(S=stft_power, sr=sample_rate)) + bf.contrast_frames.append( + librosa.feature.spectral_contrast(S=stft_mag, sr=sample_rate, n_bands=6) + ) + bf.centroid_frames.append(librosa.feature.spectral_centroid(S=stft_mag, sr=sample_rate)) + bf.flatness_frames.append(librosa.feature.spectral_flatness(S=stft_mag)) + # RMS operates in time domain, doesn't benefit from STFT sharing. + bf.rms_frames.append(librosa.feature.rms(y=audio)) + # onset_strength uses a MEL spectrogram internally with different + # parameters; not worth sharing the linear STFT here. + bf.onset_env_frames.append(librosa.onset.onset_strength(y=audio, sr=sample_rate)) + + return bf + + +def merge_block_features(target: BlockFeatures, source: BlockFeatures) -> None: + """Merge source block features into target (in place). + + :param target: Accumulator to merge into. + :param source: New block features to add. + """ + target.chroma_frames.extend(source.chroma_frames) + target.contrast_frames.extend(source.contrast_frames) + target.centroid_frames.extend(source.centroid_frames) + target.flatness_frames.extend(source.flatness_frames) + target.rms_frames.extend(source.rms_frames) + target.onset_env_frames.extend(source.onset_env_frames) + + +def collapse_to_analysis(accumulated: BlockFeatures, sample_rate: int) -> AudioAnalysisData: + """Collapse accumulated per-block features into a populated AudioAnalysisData. + + Populates measurement-based scalar and time-series fields that librosa is + well-suited to compute. Fields owned by overlay providers (bpm/key/mode via + smart_fades, soft scalars via clap_analysis, real LUFS via loudness_analysis) + are left as None and filled in at vector-assembly time by the similarity + plugin's overlay system. + + :param accumulated: All block features accumulated during streaming. + :param sample_rate: Sample rate used during extraction. + """ + onset_env = np.concatenate(accumulated.onset_env_frames) + chroma = np.concatenate(accumulated.chroma_frames, axis=1) + rms = np.concatenate(accumulated.rms_frames, axis=1).squeeze() + centroid = np.concatenate(accumulated.centroid_frames, axis=1).squeeze() + contrast = np.concatenate(accumulated.contrast_frames, axis=1) + flatness = np.concatenate(accumulated.flatness_frames, axis=1).squeeze() + + energy = _derive_energy(rms) + loudness_integrated, loudness_range = _derive_loudness(rms) + brightness = _derive_brightness(centroid, sample_rate) + harmonic_complexity = _derive_harmonic_complexity(chroma) + roughness = _derive_roughness(contrast, flatness) + rhythmic_regularity = _derive_rhythmic_regularity(onset_env, sample_rate) + rms_energy_series = _derive_rms_energy_series(rms) + spectral_centroid_series = _derive_spectral_centroid_series(centroid, rms_energy_series) + + return AudioAnalysisData( + energy=energy, + loudness_integrated=loudness_integrated, + loudness_range=loudness_range, + brightness=brightness, + harmonic_complexity=harmonic_complexity, + roughness=roughness, + rhythmic_regularity=rhythmic_regularity, + rms_energy=rms_energy_series, + spectral_centroid=spectral_centroid_series, + ) + + +def _clamp(value: float) -> float: + """Clamp a float to [0.0, 1.0].""" + return float(max(0.0, min(1.0, value))) + + +def _derive_energy(rms: np.ndarray) -> float: + """Compute normalized mean RMS energy in [0, 1]. + + :param rms: Per-frame RMS values (1D after squeeze). + """ + # RMS values are typically in [0, 1] for float32 audio; take mean and clamp + return _clamp(float(rms.mean())) + + +def _derive_loudness(rms: np.ndarray) -> tuple[float, float]: + """Compute RMS-derived dB approximations for integrated loudness and loudness range. + + Fallback only — real EBU R128 values come from the loudness_analysis + provider when enabled; the similarity plugin does not currently overlay + those onto primary rows, so these approximations remain the source of + truth for loudness fields in the vector until that overlay exists. + + :param rms: Per-frame RMS values (1D after squeeze). + """ + rms_clipped = np.clip(rms, 1e-8, None) + rms_db = 20.0 * np.log10(rms_clipped) + loudness_integrated = float(rms_db.mean()) + loudness_range = float(rms_db.std()) + return loudness_integrated, loudness_range + + +def _derive_brightness(centroid: np.ndarray, sample_rate: int) -> float: + """Compute mean spectral centroid normalized against the Nyquist frequency. + + :param centroid: Per-frame spectral centroid values in Hz (1D after squeeze). + :param sample_rate: Sample rate in Hz. + """ + nyquist = sample_rate / 2.0 + return _clamp(float(centroid.mean()) / nyquist) + + +def _derive_harmonic_complexity(chroma: np.ndarray) -> float: + """Compute normalized Shannon entropy of the mean chroma vector. + + :param chroma: Concatenated chroma feature matrix (12 x N_frames). + """ + mean_chroma = chroma.mean(axis=1).astype(np.float64) + # Normalize to a probability distribution + chroma_sum = mean_chroma.sum() + if chroma_sum <= 0: + return 0.0 + p = mean_chroma / chroma_sum + p = np.clip(p, 1e-10, None) + entropy = float(-np.sum(p * np.log(p))) + # Max entropy for 12 bins is ln(12) + max_entropy = float(np.log(12)) + return _clamp(entropy / max_entropy) + + +def _derive_roughness(contrast: np.ndarray, flatness: np.ndarray) -> float: + """Combine spectral contrast range and spectral flatness into a roughness measure. + + :param contrast: Spectral contrast matrix (7 x N_frames). + :param flatness: Per-frame spectral flatness values (1D after squeeze). + """ + # High contrast range → more tonal variation → rougher texture + contrast_range = float(contrast.max() - contrast.min()) + # Normalize against a reasonable max contrast range (~80 dB) + contrast_score = _clamp(contrast_range / 80.0) + + # High flatness (noise-like) → rougher; low flatness (tonal) → smoother + flatness_score = _clamp(float(flatness.mean())) + + return _clamp(0.6 * contrast_score + 0.4 * flatness_score) + + +def _derive_rhythmic_regularity(onset_env: np.ndarray, sample_rate: int) -> float: + """Estimate rhythmic regularity as 1 minus the normalized CV of inter-onset intervals. + + :param onset_env: Concatenated onset strength envelope. + :param sample_rate: Sample rate in Hz. + """ + onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sample_rate) + if len(onset_frames) < 2: + return 0.0 + ioi = np.diff(onset_frames).astype(np.float64) + cv = float(ioi.std() / (ioi.mean() + 1e-8)) + return _clamp(1.0 - cv) + + +def _derive_rms_energy_series(rms: np.ndarray) -> npt.NDArray[np.float32]: + """Interpolate per-frame RMS onto fixed 1800 bins and peak-normalize. + + :param rms: Per-frame RMS values (1D after squeeze). + """ + if len(rms) == 0: + return np.zeros(_TIME_SERIES_BINS, dtype=np.float32) + src_x = np.linspace(0.0, 1.0, num=len(rms)) + dst_x = np.linspace(0.0, 1.0, num=_TIME_SERIES_BINS) + result = np.interp(dst_x, src_x, rms).astype(np.float32) + peak = result.max() + if peak > 0: + result = result / peak + return result + + +def _derive_spectral_centroid_series( + centroid: np.ndarray, rms_energy: npt.NDArray[np.float32] +) -> npt.NDArray[np.float32]: + """Interpolate per-frame centroid onto fixed 1800 bins, zeroing silent regions. + + :param centroid: Per-frame spectral centroid values in Hz (1D after squeeze). + :param rms_energy: Normalized RMS energy series (1800 bins) used to mask silence. + """ + if len(centroid) == 0: + return np.zeros(_TIME_SERIES_BINS, dtype=np.float32) + src_x = np.linspace(0.0, 1.0, num=len(centroid)) + dst_x = np.linspace(0.0, 1.0, num=_TIME_SERIES_BINS) + result = np.interp(dst_x, src_x, centroid).astype(np.float32) + result[rms_energy < _SILENCE_THRESHOLD] = 0.0 + return result diff --git a/music_assistant/providers/sonic_analysis/manifest.json b/music_assistant/providers/sonic_analysis/manifest.json new file mode 100644 index 0000000000..3d5fbef4a2 --- /dev/null +++ b/music_assistant/providers/sonic_analysis/manifest.json @@ -0,0 +1,10 @@ +{ + "type": "audio_analysis", + "domain": "sonic_analysis", + "name": "Sonic Analysis", + "description": "Extracts audio signatures from PCM audio streams during playback.", + "codeowners": ["@chrisuthe"], + "requirements": [], + "documentation": "", + "builtin": true +} diff --git a/tests/providers/sonic_analysis/__init__.py b/tests/providers/sonic_analysis/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/providers/sonic_analysis/test_helpers.py b/tests/providers/sonic_analysis/test_helpers.py new file mode 100644 index 0000000000..0fe3379387 --- /dev/null +++ b/tests/providers/sonic_analysis/test_helpers.py @@ -0,0 +1,213 @@ +"""Unit tests for sonic analysis helper functions. + +sonic_analysis produces only measurement-based scalars and time-series data. +Fields owned by overlay providers (bpm, key, mode, danceability, valence, +arousal, instrumentalness, acousticness) are intentionally left None here. +""" + +import math + +import numpy as np +import pytest + +from music_assistant.models.audio_analysis import AudioAnalysisData +from music_assistant.providers.sonic_analysis.helpers import ( + MIN_BLOCK_SAMPLES, + BlockFeatures, + collapse_to_analysis, + extract_block_features, + merge_block_features, +) + + +def _make_sine(freq: float = 440.0, duration: float = 5.0, sr: int = 22050) -> np.ndarray: + """Generate a mono sine wave for testing.""" + t = np.linspace(0, duration, int(sr * duration), endpoint=False) + return np.sin(2 * np.pi * freq * t).astype(np.float32) + + +def _make_noise(duration: float = 5.0, sr: int = 22050) -> np.ndarray: + """Generate mono white noise for testing using a fixed RNG seed.""" + rng = np.random.default_rng(42) + return rng.standard_normal(int(sr * duration)).astype(np.float32) + + +# --- extract_block_features --- + + +def test_extract_block_features_returns_block_features() -> None: + """Verify extract_block_features returns a BlockFeatures with correct shapes.""" + audio = _make_sine(440.0, 10.0, 22050) + result = extract_block_features(audio, 22050) + + assert isinstance(result, BlockFeatures) + assert len(result.chroma_frames) == 1 + assert result.chroma_frames[0].shape[0] == 12 + + assert len(result.contrast_frames) == 1 + assert result.contrast_frames[0].shape[0] == 7 + + assert len(result.centroid_frames) == 1 + assert len(result.flatness_frames) == 1 + assert len(result.rms_frames) == 1 + + assert len(result.onset_env_frames) == 1 + assert result.onset_env_frames[0].ndim == 1 + + +def test_extract_block_features_too_short_returns_none() -> None: + """Verify audio shorter than MIN_BLOCK_SAMPLES returns None.""" + audio = np.zeros(MIN_BLOCK_SAMPLES - 1, dtype=np.float32) + result = extract_block_features(audio, 22050) + assert result is None + + +# --- merge_block_features --- + + +def test_merge_block_features() -> None: + """Verify merging two BlockFeatures doubles all frame lists.""" + audio_a = _make_sine(440.0, 5.0, 22050) + audio_b = _make_sine(880.0, 5.0, 22050) + target = extract_block_features(audio_a, 22050) + source = extract_block_features(audio_b, 22050) + + assert target is not None + assert source is not None + merge_block_features(target, source) + + assert len(target.chroma_frames) == 2 + assert len(target.contrast_frames) == 2 + assert len(target.centroid_frames) == 2 + assert len(target.flatness_frames) == 2 + assert len(target.rms_frames) == 2 + assert len(target.onset_env_frames) == 2 + + +# --- collapse_to_analysis --- + + +def _make_analysis( + audio: np.ndarray | None = None, duration: float = 10.0, sr: int = 22050 +) -> AudioAnalysisData: + """Build AudioAnalysisData from a sine wave (or provided audio) via collapse_to_analysis.""" + if audio is None: + audio = _make_sine(440.0, duration, sr) + bf = extract_block_features(audio, sr) + assert bf is not None + return collapse_to_analysis(bf, sr) + + +def test_collapse_to_analysis_returns_audio_analysis_data() -> None: + """Verify collapse_to_analysis returns an AudioAnalysisData instance.""" + result = _make_analysis() + assert isinstance(result, AudioAnalysisData) + + +def test_collapse_to_analysis_scalars_in_unit_range() -> None: + """All 0-1 scalar fields must be within [0.0, 1.0].""" + result = _make_analysis() + scalar_fields = [ + "energy", + "brightness", + "harmonic_complexity", + "roughness", + "rhythmic_regularity", + ] + for field_name in scalar_fields: + value = getattr(result, field_name) + assert value is not None, f"{field_name} should not be None" + assert 0.0 <= value <= 1.0, f"{field_name}={value!r} is outside [0.0, 1.0]" + + +def test_collapse_to_analysis_loudness_values_finite() -> None: + """Loudness fields must be finite floats.""" + result = _make_analysis() + assert result.loudness_integrated is not None + assert math.isfinite(result.loudness_integrated) + assert result.loudness_range is not None + assert math.isfinite(result.loudness_range) + + +def test_collapse_to_analysis_time_series_populated() -> None: + """Time-series arrays must be populated and non-empty.""" + result = _make_analysis() + + assert result.rms_energy is not None + assert len(result.rms_energy) > 0 + + assert result.spectral_centroid is not None + assert len(result.spectral_centroid) > 0 + + +def test_collapse_to_analysis_deterministic() -> None: + """Same input must produce identical output.""" + audio = _make_sine(440.0, 10.0, 22050) + sr = 22050 + + bf_a = extract_block_features(audio, sr) + assert bf_a is not None + result_a = collapse_to_analysis(bf_a, sr) + + bf_b = extract_block_features(audio, sr) + assert bf_b is not None + result_b = collapse_to_analysis(bf_b, sr) + + assert result_a.energy == result_b.energy + assert result_a.brightness == result_b.brightness + assert result_a.harmonic_complexity == result_b.harmonic_complexity + assert result_a.roughness == result_b.roughness + assert result_a.rhythmic_regularity == result_b.rhythmic_regularity + assert result_a.loudness_integrated == result_b.loudness_integrated + assert result_a.loudness_range == result_b.loudness_range + np.testing.assert_array_equal(result_a.rms_energy, result_b.rms_energy) + np.testing.assert_array_equal(result_a.spectral_centroid, result_b.spectral_centroid) + + +def test_collapse_to_analysis_noise_vs_sine_differ() -> None: + """Noise should produce higher roughness and brightness than a pure sine tone.""" + sr = 22050 + duration = 10.0 + + sine_result = _make_analysis(audio=_make_sine(440.0, duration, sr), sr=sr) + noise_result = _make_analysis(audio=_make_noise(duration, sr), sr=sr) + + assert sine_result.roughness is not None + assert noise_result.roughness is not None + assert noise_result.roughness > sine_result.roughness, ( + f"Expected noise roughness ({noise_result.roughness}) > " + f"sine roughness ({sine_result.roughness})" + ) + + assert sine_result.brightness is not None + assert noise_result.brightness is not None + assert noise_result.brightness > sine_result.brightness, ( + f"Expected noise brightness ({noise_result.brightness}) > " + f"sine brightness ({sine_result.brightness})" + ) + + +def test_collapse_to_analysis_overlay_owned_fields_are_none() -> None: + """Fields owned by overlay providers must be left None by sonic_analysis. + + Overlay providers fill these in at vector-assembly time: + - bpm, key, mode ← smart_fades + - danceability, valence, + arousal, instrumentalness, + acousticness ← clap_analysis + Plus external-only fields (speechiness) that nothing in our stack computes. + """ + result = _make_analysis() + assert result.bpm is None + assert result.key is None + assert result.mode is None + assert result.danceability is None + assert result.valence is None + assert result.arousal is None + assert result.instrumentalness is None + assert result.acousticness is None + assert result.speechiness is None + + +# Ensure pytest doesn't complain about unused import if no test uses it directly +_ = pytest diff --git a/tests/providers/sonic_analysis/test_provider_units.py b/tests/providers/sonic_analysis/test_provider_units.py new file mode 100644 index 0000000000..6621d07ff4 --- /dev/null +++ b/tests/providers/sonic_analysis/test_provider_units.py @@ -0,0 +1,145 @@ +"""Unit tests for sonic analysis provider functions that don't require a running MA instance.""" + +import struct + +import numpy as np +import pytest + +from music_assistant.providers.sonic_analysis import _pcm_bytes_to_audio + +# --------------------------------------------------------------------------- # +# _pcm_bytes_to_audio # +# --------------------------------------------------------------------------- # + + +def _make_pcm_16bit(samples: list[int]) -> bytes: + """Build raw 16-bit little-endian PCM bytes from integer sample values.""" + return struct.pack(f"<{len(samples)}h", *samples) + + +def _make_pcm_32bit(samples: list[int]) -> bytes: + """Build raw 32-bit little-endian PCM bytes from integer sample values.""" + return struct.pack(f"<{len(samples)}i", *samples) + + +def test_pcm_16bit_mono() -> None: + """16-bit mono: max positive sample should convert to ~1.0.""" + pcm = _make_pcm_16bit([0, 16384, -16384, 32767]) + audio = _pcm_bytes_to_audio(pcm, sample_rate=44100, bit_depth=16, channels=1) + assert audio.dtype == np.float32 + assert len(audio) == 4 + assert abs(audio[0]) < 1e-6 + assert abs(audio[1] - 0.5) < 0.001 + assert abs(audio[2] + 0.5) < 0.001 + assert abs(audio[3] - 1.0) < 0.001 + + +def test_pcm_16bit_stereo_downmix() -> None: + """16-bit stereo: two channels should be averaged to mono.""" + # L=32767 R=0 → mono ≈ 0.5, L=0 R=32767 → mono ≈ 0.5 + pcm = _make_pcm_16bit([32767, 0, 0, 32767]) + audio = _pcm_bytes_to_audio(pcm, sample_rate=44100, bit_depth=16, channels=2) + assert len(audio) == 2 + assert abs(audio[0] - 0.5) < 0.001 + assert abs(audio[1] - 0.5) < 0.001 + + +def test_pcm_32bit_mono() -> None: + """32-bit mono: max positive sample should convert to ~1.0.""" + pcm = _make_pcm_32bit([0, 2147483647]) + audio = _pcm_bytes_to_audio(pcm, sample_rate=44100, bit_depth=32, channels=1) + assert audio.dtype == np.float32 + assert len(audio) == 2 + assert abs(audio[0]) < 1e-6 + assert abs(audio[1] - 1.0) < 0.01 + + +def test_pcm_24bit_mono() -> None: + """24-bit mono: verify positive and negative values convert correctly.""" + # 24-bit max positive: 0x7FFFFF = 8388607, stored as 3 bytes little-endian + pos_max = (0x7FFFFF).to_bytes(3, byteorder="little", signed=False) + zero = (0).to_bytes(3, byteorder="little", signed=False) + # 24-bit negative: -1 = 0xFFFFFF in 24-bit two's complement + neg_one = (0xFFFFFF).to_bytes(3, byteorder="little", signed=False) + pcm = zero + pos_max + neg_one + audio = _pcm_bytes_to_audio(pcm, sample_rate=44100, bit_depth=24, channels=1) + assert len(audio) == 3 + assert abs(audio[0]) < 1e-6 + assert abs(audio[1] - 1.0) < 0.001 + assert abs(audio[2] + (1.0 / 8388608.0)) < 0.001 + + +def test_pcm_unsupported_bit_depth() -> None: + """Unsupported bit depth should raise ValueError.""" + with pytest.raises(ValueError, match="Unsupported bit depth"): + _pcm_bytes_to_audio(b"\x00\x00", sample_rate=44100, bit_depth=8, channels=1) + + +def test_pcm_sample_rate_unused() -> None: + """Sample rate is accepted but doesn't affect conversion.""" + pcm = _make_pcm_16bit([16384]) + a1 = _pcm_bytes_to_audio(pcm, sample_rate=22050, bit_depth=16, channels=1) + a2 = _pcm_bytes_to_audio(pcm, sample_rate=48000, bit_depth=16, channels=1) + assert np.array_equal(a1, a2) + + +# --------------------------------------------------------------------------- # +# _get_or_assign_label (tested via instance state dicts) # +# --------------------------------------------------------------------------- # + + +class _FakeLabelMapper: + """Minimal stand-in that replicates the label mapping logic.""" + + def __init__(self) -> None: + self._label_map: dict[int, tuple[str, str]] = {} + self._reverse_label_map: dict[tuple[str, str], int] = {} + self._next_label: int = 1 + + def _get_or_assign_label(self, item_id: str, provider: str) -> int: + key = (item_id, provider) + if key in self._reverse_label_map: + return self._reverse_label_map[key] + label = self._next_label + self._next_label += 1 + self._label_map[label] = key + self._reverse_label_map[key] = label + return label + + +def test_label_idempotent() -> None: + """Same (item_id, provider) always returns the same label.""" + m = _FakeLabelMapper() + label1 = m._get_or_assign_label("track1", "spotify") + label2 = m._get_or_assign_label("track1", "spotify") + assert label1 == label2 + + +def test_label_unique_per_pair() -> None: + """Different (item_id, provider) pairs get different labels.""" + m = _FakeLabelMapper() + a = m._get_or_assign_label("track1", "spotify") + b = m._get_or_assign_label("track1", "tidal") + c = m._get_or_assign_label("track2", "spotify") + assert len({a, b, c}) == 3 + + +def test_label_maps_bidirectional() -> None: + """Label map and reverse map are consistent.""" + m = _FakeLabelMapper() + label = m._get_or_assign_label("track1", "spotify") + assert m._label_map[label] == ("track1", "spotify") + assert m._reverse_label_map[("track1", "spotify")] == label + + +def test_label_starts_at_one() -> None: + """First assigned label should be 1.""" + m = _FakeLabelMapper() + assert m._get_or_assign_label("a", "b") == 1 + + +def test_label_increments() -> None: + """Labels should increment sequentially.""" + m = _FakeLabelMapper() + labels = [m._get_or_assign_label(f"t{i}", "p") for i in range(5)] + assert labels == [1, 2, 3, 4, 5]