Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions music_assistant/controllers/media/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
)

from music_assistant.constants import (
DB_TABLE_AUDIO_ANALYSIS,
DB_TABLE_GENRE_MEDIA_ITEM_EXCLUSION,
DB_TABLE_GENRE_MEDIA_ITEM_MAPPING,
DB_TABLE_LOUDNESS_MEASUREMENTS,
DB_TABLE_PLAYLOG,
DB_TABLE_PROVIDER_MAPPINGS,
MASS_LOGGER_NAME,
Expand Down Expand Up @@ -244,10 +244,10 @@ async def remove_item_from_library(self, item_id: str | int, recursive: bool = T
"provider": prov_mapping.provider_instance,
},
)
# cleanup loudness measurements for this provider mapping
# cleanup audio analysis rows for this provider mapping
for prov_key in (prov_mapping.provider_domain, prov_mapping.provider_instance):
await self.mass.music.database.delete(
DB_TABLE_LOUDNESS_MEASUREMENTS,
DB_TABLE_AUDIO_ANALYSIS,
{
"media_type": self.media_type.value,
"item_id": prov_mapping.item_id,
Expand Down
96 changes: 20 additions & 76 deletions music_assistant/controllers/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from copy import deepcopy
from datetime import datetime
from itertools import zip_longest
from math import inf
from typing import TYPE_CHECKING, Any, Final, cast

from music_assistant_models.background_task import BackgroundTask, TaskMetadata, TaskSchedule
Expand Down Expand Up @@ -110,7 +109,7 @@
DEFAULT_SYNC_INTERVAL = 12 * 60 # default sync interval in minutes
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
DB_SCHEMA_VERSION: Final[int] = 38
DB_SCHEMA_VERSION: Final[int] = 39

CACHE_CATEGORY_SEARCH_RESULTS: Final[int] = 10
DATABASE_CLEANUP_TASK_ID: Final[str] = "music_database_cleanup"
Expand Down Expand Up @@ -1092,64 +1091,6 @@ async def refresh_item( # noqa: PLR0915
await self.mass.metadata.update_metadata(library_item, force_refresh=True)
return library_item

async def set_loudness(
self,
item_id: str,
provider_instance_id_or_domain: str,
loudness: float,
album_loudness: float | None = None,
media_type: MediaType = MediaType.TRACK,
) -> None:
"""Store (EBU-R128) Integrated Loudness Measurement for a mediaitem in db."""
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return
if loudness in (None, inf, -inf) or loudness <= LOUDNESS_MEASUREMENT_MIN_LUFS:
# skip invalid or unreliable values (ebur128 reports -70 LUFS on near-silence)
return
# prefer domain for streaming providers as the catalog is the same across instances
prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
values = {
"item_id": item_id,
"media_type": media_type.value,
"provider": prov_key,
"loudness": loudness,
}
if (
album_loudness not in (None, inf, -inf)
and album_loudness > LOUDNESS_MEASUREMENT_MIN_LUFS
):
values["loudness_album"] = album_loudness
await self.database.insert_or_replace(DB_TABLE_LOUDNESS_MEASUREMENTS, values)

async def get_loudness(
self,
item_id: str,
provider_instance_id_or_domain: str,
media_type: MediaType = MediaType.TRACK,
) -> tuple[float, float | None] | None:
"""Get (EBU-R128) Integrated Loudness Measurement for a mediaitem in db."""
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return None
# prefer domain for streaming providers as the catalog is the same across instances
prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
db_row = await self.database.get_row(
DB_TABLE_LOUDNESS_MEASUREMENTS,
{
"item_id": item_id,
"media_type": media_type.value,
"provider": prov_key,
},
)
if db_row and db_row["loudness"] != inf and db_row["loudness"] != -inf:
loudness = round(db_row["loudness"], 2)
loudness_album = db_row["loudness_album"]
loudness_album = (
None if loudness_album in (None, inf, -inf) else round(loudness_album, 2)
)
return (loudness, loudness_album)

return None

@api_command("music/mark_played")
async def mark_item_played(
self,
Expand Down Expand Up @@ -2706,6 +2647,25 @@ async def _get_or_create_genre(
f"WHERE loudness_album <= {LOUDNESS_MEASUREMENT_MIN_LUFS}"
)

if prev_version <= 38:
# migrate loudness measurements to the unified audio_analysis table
# under the new builtin loudness_analysis provider, then drop the
# legacy table. album loudness rides along when present.
await self._database.execute(
f"INSERT OR IGNORE INTO {DB_TABLE_AUDIO_ANALYSIS} "
f"(media_type, item_id, provider, aa_provider_domain, "
f" analysis_data, analysis_version) "
f"SELECT media_type, item_id, provider, 'loudness_analysis', "
f" json_object("
f" 'loudness_integrated', loudness, "
f" 'loudness_album', loudness_album"
f" ), 1 "
f"FROM {DB_TABLE_LOUDNESS_MEASUREMENTS} "
f"WHERE loudness IS NOT NULL "
f" AND loudness > {LOUDNESS_MEASUREMENT_MIN_LUFS}"
)
await self._database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_LOUDNESS_MEASUREMENTS}")

# save changes
await self._database.commit()

Expand Down Expand Up @@ -2974,17 +2934,6 @@ async def __create_database_tables(self) -> None:
);"""
)

await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_LOUDNESS_MEASUREMENTS}(
[id] INTEGER PRIMARY KEY AUTOINCREMENT,
[media_type] TEXT NOT NULL,
[item_id] TEXT NOT NULL,
[provider] TEXT NOT NULL,
[loudness] REAL,
[loudness_album] REAL,
UNIQUE(media_type,item_id,provider));"""
)

await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_AUDIO_ANALYSIS}(
[id] INTEGER PRIMARY KEY AUTOINCREMENT,
Expand Down Expand Up @@ -3099,11 +3048,6 @@ async def __create_database_indexes(self) -> None:
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_ALBUM_ARTISTS}_artist_id_idx "
f"on {DB_TABLE_ALBUM_ARTISTS}(artist_id);"
)
# index on loudness measurements table
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_LOUDNESS_MEASUREMENTS}_idx "
f"on {DB_TABLE_LOUDNESS_MEASUREMENTS}(media_type,item_id,provider);"
)
# indexes on genre_media_item_mapping table
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_GENRE_MEDIA_ITEM_MAPPING}_media_idx "
Expand Down
152 changes: 15 additions & 137 deletions music_assistant/controllers/streams/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
CONF_VOLUME_NORMALIZATION_TARGET,
INTERNAL_PCM_FORMAT,
LOUDNESS_MEASUREMENT_MIN_LUFS,
MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
Expand Down Expand Up @@ -256,12 +255,6 @@ async def get_stream_details(
self._update_hls_radio_metadata
)
streamdetails.stream_metadata_update_interval = 5
# handle volume normalization details
if result := await mass.music.get_loudness(
streamdetails.item_id, streamdetails.provider, media_type=queue_item.media_type
):
streamdetails.loudness = result[0]
streamdetails.loudness_album = result[1]

if streamdetails.duration is None:
if queue_item.media_item and queue_item.media_item.duration:
Expand Down Expand Up @@ -863,136 +856,6 @@ async def get_multi_file_stream(
finally:
await remove_file(temp_file)

def attach_loudness_analyzer(
self,
audio_buffer: AudioBuffer,
streamdetails: StreamDetails,
max_duration_seconds: int = 600,
min_duration_seconds: int = 10,
) -> None:
"""
Attach a loudness measurement job to an AudioBuffer.

Registers a chunk callback that feeds raw PCM into an FFmpeg ebur128 process.
After max_duration_seconds of audio (or EOF), the measurement is finalized
and stored via mass.music.set_loudness.

:param audio_buffer: The AudioBuffer to observe.
:param streamdetails: Stream details for the track being buffered.
:param max_duration_seconds: Maximum seconds of audio to analyze.
:param min_duration_seconds: Minimum seconds of audio required to persist the result.
"""
item_id = streamdetails.item_id
provider = streamdetails.provider
media_type = streamdetails.media_type
pcm_format = audio_buffer.pcm_format

chunk_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=10)
chunks_received = 0

async def _on_chunk(position_seconds: int, pcm_data: bytes, is_last_chunk: bool) -> None: # noqa: ARG001
nonlocal chunks_received
if chunks_received >= max_duration_seconds:
return
if is_last_chunk:
# EOF
await chunk_queue.put(None)
return
chunks_received += 1
await chunk_queue.put(pcm_data)
if chunks_received >= max_duration_seconds:
# signal we have enough data
await chunk_queue.put(None)

async def _chunk_generator() -> AsyncGenerator[bytes, None]:
"""Yield chunks from the queue until None (EOF/done)."""
while True:
chunk = await chunk_queue.get()
if chunk is None:
break
yield chunk

async def _run_analysis() -> None:
"""Run the ebur128 measurement in a background FFmpeg process."""
try:
async with FFMpeg(
audio_input=_chunk_generator(),
input_format=pcm_format,
output_format=pcm_format,
audio_output="NULL",
filter_params=["ebur128=framelog=verbose"],
collect_log_history=True,
loglevel="info",
) as ffmpeg_proc:
await ffmpeg_proc.wait()

if chunks_received < min_duration_seconds:
self.logger.debug(
"Loudness analysis for %s skipped: "
"insufficient audio data (%s/%s seconds analyzed)",
streamdetails.uri,
chunks_received,
min_duration_seconds,
)
return

log_lines_str = "\n".join(ffmpeg_proc.log_history)
try:
loudness_str = (
log_lines_str.split("Integrated loudness")[1]
.split("I:")[1]
.split("LUFS")[0]
)
loudness = float(loudness_str.strip())
except (IndexError, ValueError, AttributeError):
self.logger.debug(
"Could not determine loudness of %s from buffer analysis",
streamdetails.uri,
)
return

if loudness <= LOUDNESS_MEASUREMENT_MIN_LUFS:
self.logger.debug(
"Loudness measurement for %s discarded: "
"%s LUFS is below the reliability threshold (%s LUFS)",
streamdetails.uri,
loudness,
LOUDNESS_MEASUREMENT_MIN_LUFS,
)
return

await self.mass.music.set_loudness(
item_id, provider, loudness, media_type=media_type
)
# update the in-memory streamdetails so subsequent seeks
# can use measurement-based normalization instead of dynamic
streamdetails.loudness = loudness
self.logger.debug(
"Loudness measurement for %s: %s LUFS",
streamdetails.uri,
loudness,
)
except Exception as err:
self.logger.debug("Loudness analysis from buffer failed: %s", err)

async def _attach() -> None:
# check if loudness already exists
if await self.mass.music.get_loudness(item_id, provider, media_type=media_type):
return
self.logger.debug("Attached loudness analyzer to buffer for %s", streamdetails.uri)
audio_buffer.register_chunk_callback(_on_chunk)

def _on_cancel() -> None:
if chunk_queue.full():
chunk_queue.get_nowait()
chunk_queue.put_nowait(None)

audio_buffer.register_cancel_callback(_on_cancel)
# start the FFmpeg analysis process
await _run_analysis()

self.mass.create_task(_attach)

def get_player_dsp_details(
self, player: Player, group_preventing_dsp: bool = False
) -> DSPDetails:
Expand Down Expand Up @@ -1283,6 +1146,21 @@ async def get_queue_item_stream(

logger = self.logger.getChild("queue_item_stream")

# hydrate loudness from audio analysis (just-in-time, so that a measurement
# completed during a previous play is picked up here). A live analyzer run
# may have already populated streamdetails.loudness in memory — don't clobber
# that, and don't clobber a value set upstream by the music provider.
if streamdetails.loudness is None:
if analysis := await self.mass.streams.audio_analysis.get_audio_analysis(
streamdetails.item_id,
streamdetails.provider,
media_type=streamdetails.media_type,
):
if analysis.loudness_integrated is not None:
streamdetails.loudness = round(analysis.loudness_integrated, 2)
if analysis.loudness_album is not None and streamdetails.loudness_album is None:
streamdetails.loudness_album = round(analysis.loudness_album, 2)

# re-evaluate normalization mode: the background loudness analyzer may have
# updated streamdetails.loudness since get_stream_details was called
if streamdetails.queue_id:
Expand Down
Loading
Loading