Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,375 changes: 2,375 additions & 0 deletions music_assistant/providers/sonic_similarity/__init__.py

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions music_assistant/providers/sonic_similarity/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "plugin",
"domain": "sonic_similarity",
"name": "Sonic Similarity",
"description": "Find similar tracks in your library using audio signature analysis.",
"codeowners": ["@chrisuthe"],
"requirements": ["usearch"],
"documentation": "",
"depends_on": "sonic_analysis"
}
157 changes: 157 additions & 0 deletions music_assistant/providers/sonic_similarity/similarity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Pure similarity functions — no MA dependencies.

Centroid blending, union merging, and MMR diversity re-ranking.
All functions operate on plain lists of floats and numpy arrays.
"""

from __future__ import annotations

from collections.abc import Callable

import numpy as np


def combine_seeds_centroid(
seeds: list[list[float]],
weights: list[float] | None = None,
) -> list[float]:
"""Compute weighted average of seed signature vectors.

:param seeds: List of signature vectors (all same dimensionality).
:param weights: Per-seed weights. If None, equal weighting.
:raises ValueError: If seeds is empty or weights length mismatches.
"""
if not seeds:
msg = "Cannot compute centroid from at least one seed"
raise ValueError(msg)
if weights is not None and len(weights) != len(seeds):
msg = f"weights length ({len(weights)}) must match seeds length ({len(seeds)})"
raise ValueError(msg)

arr = np.array(seeds, dtype=np.float64)
if weights is None:
centroid = arr.mean(axis=0)
else:
w = np.array(weights, dtype=np.float64)
w = w / w.sum()
centroid = (arr * w[:, np.newaxis]).sum(axis=0)

return [float(v) for v in centroid]


def merge_union_results(
neighborhoods: list[list[tuple[str, float]]],
) -> list[tuple[str, float]]:
"""Merge per-seed ANN results, keeping the best distance per track.

:param neighborhoods: List of result lists, each containing (item_id, distance) pairs.
"""
if not neighborhoods:
return []

best: dict[str, float] = {}
for neighborhood in neighborhoods:
for item_id, dist in neighborhood:
if item_id not in best or dist < best[item_id]:
best[item_id] = dist

merged = list(best.items())
merged.sort(key=lambda x: x[1])
return merged


def apply_mmr(
candidates: list[tuple[str, list[float], float]],
seed_vec: list[float],
diversity: float,
limit: int,
) -> list[tuple[str, float]]:
"""Apply Maximal Marginal Relevance to re-rank candidates for diversity.

:param candidates: List of (item_id, normalized_features, distance) tuples.
:param seed_vec: The seed signature vector (normalized).
:param diversity: MMR lambda, 0.0 = pure relevance, 1.0 = max diversity.
:param limit: Maximum number of results to return.
"""
if not candidates:
return []

cand_vecs = {cid: np.array(vec, dtype=np.float64) for cid, vec, _d in candidates}
seed_arr = np.array(seed_vec, dtype=np.float64)
seed_norm = float(np.linalg.norm(seed_arr))

def _cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
na, nb = float(np.linalg.norm(a)), float(np.linalg.norm(b))
if na == 0 or nb == 0:
return 0.0
return float(np.dot(a, b) / (na * nb))

relevance: dict[str, float] = {}
for cid, _vec, _d in candidates:
relevance[cid] = _cosine_sim(cand_vecs[cid], seed_arr) if seed_norm > 0 else 0.0

selected: list[tuple[str, float]] = []
remaining = {cid for cid, _, _ in candidates}
dist_lookup = {cid: d for cid, _, d in candidates}

for _ in range(min(limit, len(candidates))):
best_id: str | None = None
best_score = -float("inf")

for cid in remaining:
rel = relevance[cid]
if not selected:
redundancy = 0.0
else:
redundancy = max(_cosine_sim(cand_vecs[cid], cand_vecs[sid]) for sid, _ in selected)
score = (1.0 - diversity) * rel - diversity * redundancy
if score > best_score:
best_score = score
best_id = cid

if best_id is None:
break
remaining.discard(best_id)
selected.append((best_id, dist_lookup[best_id]))

return selected


def expand_recursive(
initial_seeds: list[list[float]],
searcher: Callable[
[list[list[float]], set[str]],
list[tuple[str, str, list[float], float]],
],
depth: int,
branch_factor: int,
) -> list[tuple[str, str, list[float], float, int]]:
"""Expand similarity search across multiple generations.

:param initial_seeds: Seed signature vectors for generation 0.
:param searcher: Callback that takes (seed_vectors, seen_ids) and returns
list of (item_id, provider, features, distance).
:param depth: Number of generations to run.
:param branch_factor: How many top results from each generation become seeds.
"""
all_results: list[tuple[str, str, list[float], float, int]] = []
seen: set[str] = set()
current_seeds = initial_seeds

for gen in range(depth):
gen_results = searcher(current_seeds, seen)
new_results: list[tuple[str, str, list[float], float]] = []
for item_id, provider, features, dist in gen_results:
if item_id not in seen:
seen.add(item_id)
new_results.append((item_id, provider, features, dist))
all_results.append((item_id, provider, features, dist, gen))

if not new_results or gen == depth - 1:
break

new_results.sort(key=lambda x: x[3])
next_seeds = [features for _, _, features, _ in new_results[:branch_factor]]
current_seeds = next_seeds

return all_results
218 changes: 218 additions & 0 deletions music_assistant/providers/sonic_similarity/vectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""14-dimensional semantic vector schema for sonic similarity search.

Owns the mapping from AudioAnalysisData fields to a fixed-size float vector
suitable for USearch ANN indexing. The 14 dimensions are:
[0-8] 9 scalar features (bpm, energy, danceability, ...)
[9-11] circular key encoding (sin, cos) + mode
[12] RMS energy variance over time
[13] Spectral centroid variance over time
"""

from __future__ import annotations

import math

import numpy as np

from music_assistant.models.audio_analysis import AudioAnalysisData

PITCH_CLASS_NAMES = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]

# Required fields — must be non-None for a valid vector
VECTOR_FIELDS = [
"bpm",
"energy",
"danceability",
"loudness_integrated",
"loudness_range",
"brightness",
"harmonic_complexity",
"roughness",
"rhythmic_regularity",
]

# Optional ML fields — use neutral default (0.5) when absent
OPTIONAL_FIELDS = [
"instrumentalness",
"valence",
"arousal",
"acousticness",
]
OPTIONAL_DEFAULT = 0.5

# 9 required + 4 optional ML + 2 key encoding + 1 mode + 2 time-series variance = 18
VECTOR_DIMENSIONS = 18

FEATURE_GROUPS = {
"rhythm": (0, 3), # bpm, energy, danceability
"loudness": (3, 5), # loudness_integrated, loudness_range
"timbre": (5, 8), # brightness, harmonic_complexity, roughness
"regularity": (8, 9), # rhythmic_regularity
"mood": (9, 13), # instrumentalness, valence, arousal, acousticness
"tonal": (13, 16), # key_sin, key_cos, mode
"dynamics": (16, 18), # rms_variance, centroid_variance
}


def encode_key_mode(key: str, mode: str) -> tuple[float, float, float]:
"""Encode musical key and mode as three floats for circular and binary representation.

:param key: Pitch class name (e.g. "C", "F#"). Unknown keys default to pitch class 0.
:param mode: Tonality string — "major" encodes to 1.0, anything else to 0.0.
:returns: Tuple of (key_sin, key_cos, mode_float).
"""
pitch_class = PITCH_CLASS_NAMES.index(key) if key in PITCH_CLASS_NAMES else 0
angle = 2.0 * math.pi * pitch_class / 12
key_sin = math.sin(angle)
key_cos = math.cos(angle)
mode_float = 1.0 if mode == "major" else 0.0
return key_sin, key_cos, mode_float


def assemble_vector(analysis: AudioAnalysisData) -> list[float] | None:
"""Assemble a 17-dimensional feature vector from an AudioAnalysisData instance.

Returns None if any required field (VECTOR_FIELDS, key, or mode) is None or
NaN — treating NaN values as unusable input so they cannot propagate into
distance calculations and produce null values in JSON responses.
Optional ML fields (instrumentalness, valence, acousticness) use a neutral
default of 0.5 when absent, so tracks without ML analysis still get valid
vectors that don't skew similarity in any direction.

:param analysis: Source audio analysis data.
:returns: 17-element list of floats, or None if required fields are missing.
"""
# Validate all required scalar fields are present and finite
for field in VECTOR_FIELDS:
val = getattr(analysis, field)
if val is None or math.isnan(float(val)):
return None
if analysis.key is None or analysis.mode is None:
return None

# 9 required scalars
vec: list[float] = [float(getattr(analysis, field)) for field in VECTOR_FIELDS]

# 3 optional ML scalars (default to 0.5 = neutral when absent or NaN)
for field in OPTIONAL_FIELDS:
val = getattr(analysis, field, None)
vec.append(
float(val)
if val is not None and not math.isnan(float(val))
else OPTIONAL_DEFAULT
)

# 3 key/mode encoding
key_sin, key_cos, mode_float = encode_key_mode(analysis.key, analysis.mode)
vec.extend([key_sin, key_cos, mode_float])

# 2 time-series variance (NaN-safe: np.var of NaN-containing arrays yields NaN)
rms = analysis.rms_energy
rms_var = float(np.var(rms)) if rms is not None and len(rms) > 1 else 0.0
vec.append(rms_var if not math.isnan(rms_var) else 0.0)

centroid = analysis.spectral_centroid
centroid_var = (
float(np.var(centroid)) if centroid is not None and len(centroid) > 1 else 0.0
)
vec.append(centroid_var if not math.isnan(centroid_var) else 0.0)

return vec


def normalize_features(
raw_features: list[float],
corpus_means: list[float],
corpus_stds: list[float],
) -> list[float]:
"""Apply z-score then L2 normalization to a raw feature vector.

Zero standard deviation for a feature produces 0.0 for that dimension.
If the resulting z-score vector has zero L2 norm, it is returned as-is
without L2 normalization.

:param raw_features: Raw feature vector to normalize.
:param corpus_means: Per-feature means from the corpus.
:param corpus_stds: Per-feature standard deviations from the corpus.
:returns: Normalized feature vector as a list of floats.
"""
# Z-score normalization; zero std → 0.0 for that dimension
z_scored = [
(v - m) / s if s != 0.0 else 0.0
for v, m, s in zip(raw_features, corpus_means, corpus_stds, strict=True)
]

norm = math.sqrt(sum(v * v for v in z_scored))
if norm == 0.0:
return [float(v) for v in z_scored]

return [float(v / norm) for v in z_scored]


def compute_corpus_stats(
all_features: list[list[float]],
) -> tuple[list[float], list[float]]:
"""Compute per-feature means and standard deviations across a corpus.

:param all_features: List of feature vectors (all same dimensionality).
:returns: Tuple of (means, stds) as lists of floats.
:raises ValueError: If all_features is empty.
"""
if not all_features:
msg = "Empty corpus: cannot compute stats from zero feature vectors"
raise ValueError(msg)

arr = np.array(all_features, dtype=np.float64)
means = arr.mean(axis=0)
stds = arr.std(axis=0)
return [float(v) for v in means], [float(v) for v in stds]


def compute_group_distances(
sig_a: list[float],
sig_b: list[float],
weights: dict[str, float], # noqa: ARG001
) -> dict[str, float]:
"""Compute per-group Euclidean distance between two feature vectors.

Returns a dict mapping each FEATURE_GROUPS name to its raw (unweighted)
normalized distance. Weights are accepted for API symmetry but do not
affect the per-group values.

:param sig_a: First feature vector.
:param sig_b: Second feature vector.
:param weights: Accepted for API compatibility, not used.
"""
a = np.array(sig_a, dtype=np.float64)
b = np.array(sig_b, dtype=np.float64)
result: dict[str, float] = {}
for group, (start, end) in FEATURE_GROUPS.items():
diff = a[start:end] - b[start:end]
dim_count = end - start
result[group] = math.sqrt(float(np.dot(diff, diff)) / dim_count)
return result


def compute_weighted_distance(
sig_a: list[float],
sig_b: list[float],
weights: dict[str, float],
) -> float:
"""Compute per-group weighted Euclidean distance between two feature vectors.

:param sig_a: First feature vector.
:param sig_b: Second feature vector.
:param weights: Per-group weight overrides keyed by FEATURE_GROUPS name.
:returns: Weighted normalized distance as a float.
"""
group_dists = compute_group_distances(sig_a, sig_b, weights)
weighted_sq_sum = 0.0
total_weighted_dims = 0.0
for group, (start, end) in FEATURE_GROUPS.items():
w = weights.get(group, 1.0)
dim_count = end - start
weighted_sq_sum += w * (group_dists[group] ** 2) * dim_count
total_weighted_dims += w * dim_count
if total_weighted_dims == 0.0:
return 0.0
return math.sqrt(weighted_sq_sum / total_weighted_dims)
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ torch==2.11.0; sys_platform != 'linux' or platform_machine != 'x86_64'
torchaudio==2.11.0+cpu; sys_platform == 'linux' and platform_machine == 'x86_64'
torchaudio==2.11.0; sys_platform != 'linux' or platform_machine != 'x86_64'
unidecode==1.4.0
usearch==2.25.1
uv>=0.8.0
websocket-client==1.9.0
wiim==0.1.1
Expand Down
Empty file.
Empty file.
Loading