Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions music_assistant/controllers/media/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
)

from music_assistant.constants import (
DB_TABLE_AUDIO_ANALYSIS,
DB_TABLE_GENRE_MEDIA_ITEM_EXCLUSION,
DB_TABLE_GENRE_MEDIA_ITEM_MAPPING,
DB_TABLE_LOUDNESS_MEASUREMENTS,
DB_TABLE_PLAYLOG,
DB_TABLE_PROVIDER_MAPPINGS,
MASS_LOGGER_NAME,
Expand Down Expand Up @@ -244,10 +244,10 @@ async def remove_item_from_library(self, item_id: str | int, recursive: bool = T
"provider": prov_mapping.provider_instance,
},
)
# cleanup loudness measurements for this provider mapping
# cleanup audio analysis rows for this provider mapping
for prov_key in (prov_mapping.provider_domain, prov_mapping.provider_instance):
await self.mass.music.database.delete(
DB_TABLE_LOUDNESS_MEASUREMENTS,
DB_TABLE_AUDIO_ANALYSIS,
{
"media_type": self.media_type.value,
"item_id": prov_mapping.item_id,
Expand Down
96 changes: 20 additions & 76 deletions music_assistant/controllers/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from copy import deepcopy
from datetime import datetime
from itertools import zip_longest
from math import inf
from typing import TYPE_CHECKING, Any, Final, cast

from music_assistant_models.background_task import BackgroundTask, TaskMetadata, TaskSchedule
Expand Down Expand Up @@ -110,7 +109,7 @@
DEFAULT_SYNC_INTERVAL = 12 * 60 # default sync interval in minutes
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
DB_SCHEMA_VERSION: Final[int] = 38
DB_SCHEMA_VERSION: Final[int] = 39

CACHE_CATEGORY_SEARCH_RESULTS: Final[int] = 10
DATABASE_CLEANUP_TASK_ID: Final[str] = "music_database_cleanup"
Expand Down Expand Up @@ -1092,64 +1091,6 @@ async def refresh_item( # noqa: PLR0915
await self.mass.metadata.update_metadata(library_item, force_refresh=True)
return library_item

async def set_loudness(
self,
item_id: str,
provider_instance_id_or_domain: str,
loudness: float,
album_loudness: float | None = None,
media_type: MediaType = MediaType.TRACK,
) -> None:
"""Store (EBU-R128) Integrated Loudness Measurement for a mediaitem in db."""
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return
if loudness in (None, inf, -inf) or loudness <= LOUDNESS_MEASUREMENT_MIN_LUFS:
# skip invalid or unreliable values (ebur128 reports -70 LUFS on near-silence)
return
# prefer domain for streaming providers as the catalog is the same across instances
prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
values = {
"item_id": item_id,
"media_type": media_type.value,
"provider": prov_key,
"loudness": loudness,
}
if (
album_loudness not in (None, inf, -inf)
and album_loudness > LOUDNESS_MEASUREMENT_MIN_LUFS
):
values["loudness_album"] = album_loudness
await self.database.insert_or_replace(DB_TABLE_LOUDNESS_MEASUREMENTS, values)

async def get_loudness(
self,
item_id: str,
provider_instance_id_or_domain: str,
media_type: MediaType = MediaType.TRACK,
) -> tuple[float, float | None] | None:
"""Get (EBU-R128) Integrated Loudness Measurement for a mediaitem in db."""
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
return None
# prefer domain for streaming providers as the catalog is the same across instances
prov_key = provider.domain if provider.is_streaming_provider else provider.instance_id
db_row = await self.database.get_row(
DB_TABLE_LOUDNESS_MEASUREMENTS,
{
"item_id": item_id,
"media_type": media_type.value,
"provider": prov_key,
},
)
if db_row and db_row["loudness"] != inf and db_row["loudness"] != -inf:
loudness = round(db_row["loudness"], 2)
loudness_album = db_row["loudness_album"]
loudness_album = (
None if loudness_album in (None, inf, -inf) else round(loudness_album, 2)
)
return (loudness, loudness_album)

return None

@api_command("music/mark_played")
async def mark_item_played(
self,
Expand Down Expand Up @@ -2706,6 +2647,25 @@ async def _get_or_create_genre(
f"WHERE loudness_album <= {LOUDNESS_MEASUREMENT_MIN_LUFS}"
)

if prev_version <= 38:
# migrate loudness measurements to the unified audio_analysis table
# under the new builtin loudness_analysis provider, then drop the
# legacy table. album loudness rides along when present.
await self._database.execute(
f"INSERT OR IGNORE INTO {DB_TABLE_AUDIO_ANALYSIS} "
f"(media_type, item_id, provider, aa_provider_domain, "
f" analysis_data, analysis_version) "
f"SELECT media_type, item_id, provider, 'loudness_analysis', "
f" json_object("
f" 'loudness_integrated', loudness, "
f" 'loudness_album', loudness_album"
f" ), 1 "
f"FROM {DB_TABLE_LOUDNESS_MEASUREMENTS} "
f"WHERE loudness IS NOT NULL "
f" AND loudness > {LOUDNESS_MEASUREMENT_MIN_LUFS}"
)
await self._database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_LOUDNESS_MEASUREMENTS}")

# save changes
await self._database.commit()

Expand Down Expand Up @@ -2974,17 +2934,6 @@ async def __create_database_tables(self) -> None:
);"""
)

await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_LOUDNESS_MEASUREMENTS}(
[id] INTEGER PRIMARY KEY AUTOINCREMENT,
[media_type] TEXT NOT NULL,
[item_id] TEXT NOT NULL,
[provider] TEXT NOT NULL,
[loudness] REAL,
[loudness_album] REAL,
UNIQUE(media_type,item_id,provider));"""
)

await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_AUDIO_ANALYSIS}(
[id] INTEGER PRIMARY KEY AUTOINCREMENT,
Expand Down Expand Up @@ -3099,11 +3048,6 @@ async def __create_database_indexes(self) -> None:
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_ALBUM_ARTISTS}_artist_id_idx "
f"on {DB_TABLE_ALBUM_ARTISTS}(artist_id);"
)
# index on loudness measurements table
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_LOUDNESS_MEASUREMENTS}_idx "
f"on {DB_TABLE_LOUDNESS_MEASUREMENTS}(media_type,item_id,provider);"
)
# indexes on genre_media_item_mapping table
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_GENRE_MEDIA_ITEM_MAPPING}_media_idx "
Expand Down
152 changes: 15 additions & 137 deletions music_assistant/controllers/streams/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
CONF_VOLUME_NORMALIZATION_TARGET,
INTERNAL_PCM_FORMAT,
LOUDNESS_MEASUREMENT_MIN_LUFS,
MASS_LOGGER_NAME,
VERBOSE_LOG_LEVEL,
)
Expand Down Expand Up @@ -256,12 +255,6 @@ async def get_stream_details(
self._update_hls_radio_metadata
)
streamdetails.stream_metadata_update_interval = 5
# handle volume normalization details
if result := await mass.music.get_loudness(
streamdetails.item_id, streamdetails.provider, media_type=queue_item.media_type
):
streamdetails.loudness = result[0]
streamdetails.loudness_album = result[1]

if streamdetails.duration is None:
if queue_item.media_item and queue_item.media_item.duration:
Expand Down Expand Up @@ -863,136 +856,6 @@ async def get_multi_file_stream(
finally:
await remove_file(temp_file)

def attach_loudness_analyzer(
self,
audio_buffer: AudioBuffer,
streamdetails: StreamDetails,
max_duration_seconds: int = 600,
min_duration_seconds: int = 10,
) -> None:
"""
Attach a loudness measurement job to an AudioBuffer.

Registers a chunk callback that feeds raw PCM into an FFmpeg ebur128 process.
After max_duration_seconds of audio (or EOF), the measurement is finalized
and stored via mass.music.set_loudness.

:param audio_buffer: The AudioBuffer to observe.
:param streamdetails: Stream details for the track being buffered.
:param max_duration_seconds: Maximum seconds of audio to analyze.
:param min_duration_seconds: Minimum seconds of audio required to persist the result.
"""
item_id = streamdetails.item_id
provider = streamdetails.provider
media_type = streamdetails.media_type
pcm_format = audio_buffer.pcm_format

chunk_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=10)
chunks_received = 0

async def _on_chunk(position_seconds: int, pcm_data: bytes, is_last_chunk: bool) -> None: # noqa: ARG001
nonlocal chunks_received
if chunks_received >= max_duration_seconds:
return
if is_last_chunk:
# EOF
await chunk_queue.put(None)
return
chunks_received += 1
await chunk_queue.put(pcm_data)
if chunks_received >= max_duration_seconds:
# signal we have enough data
await chunk_queue.put(None)

async def _chunk_generator() -> AsyncGenerator[bytes, None]:
"""Yield chunks from the queue until None (EOF/done)."""
while True:
chunk = await chunk_queue.get()
if chunk is None:
break
yield chunk

async def _run_analysis() -> None:
"""Run the ebur128 measurement in a background FFmpeg process."""
try:
async with FFMpeg(
audio_input=_chunk_generator(),
input_format=pcm_format,
output_format=pcm_format,
audio_output="NULL",
filter_params=["ebur128=framelog=verbose"],
collect_log_history=True,
loglevel="info",
) as ffmpeg_proc:
await ffmpeg_proc.wait()

if chunks_received < min_duration_seconds:
self.logger.debug(
"Loudness analysis for %s skipped: "
"insufficient audio data (%s/%s seconds analyzed)",
streamdetails.uri,
chunks_received,
min_duration_seconds,
)
return

log_lines_str = "\n".join(ffmpeg_proc.log_history)
try:
loudness_str = (
log_lines_str.split("Integrated loudness")[1]
.split("I:")[1]
.split("LUFS")[0]
)
loudness = float(loudness_str.strip())
except (IndexError, ValueError, AttributeError):
self.logger.debug(
"Could not determine loudness of %s from buffer analysis",
streamdetails.uri,
)
return

if loudness <= LOUDNESS_MEASUREMENT_MIN_LUFS:
self.logger.debug(
"Loudness measurement for %s discarded: "
"%s LUFS is below the reliability threshold (%s LUFS)",
streamdetails.uri,
loudness,
LOUDNESS_MEASUREMENT_MIN_LUFS,
)
return

await self.mass.music.set_loudness(
item_id, provider, loudness, media_type=media_type
)
# update the in-memory streamdetails so subsequent seeks
# can use measurement-based normalization instead of dynamic
streamdetails.loudness = loudness
self.logger.debug(
"Loudness measurement for %s: %s LUFS",
streamdetails.uri,
loudness,
)
except Exception as err:
self.logger.debug("Loudness analysis from buffer failed: %s", err)

async def _attach() -> None:
# check if loudness already exists
if await self.mass.music.get_loudness(item_id, provider, media_type=media_type):
return
self.logger.debug("Attached loudness analyzer to buffer for %s", streamdetails.uri)
audio_buffer.register_chunk_callback(_on_chunk)

def _on_cancel() -> None:
if chunk_queue.full():
chunk_queue.get_nowait()
chunk_queue.put_nowait(None)

audio_buffer.register_cancel_callback(_on_cancel)
# start the FFmpeg analysis process
await _run_analysis()

self.mass.create_task(_attach)

def get_player_dsp_details(
self, player: Player, group_preventing_dsp: bool = False
) -> DSPDetails:
Expand Down Expand Up @@ -1283,6 +1146,21 @@ async def get_queue_item_stream(

logger = self.logger.getChild("queue_item_stream")

# hydrate loudness from audio analysis (just-in-time, so that a measurement
# completed during a previous play is picked up here). A live analyzer run
# may have already populated streamdetails.loudness in memory — don't clobber
# that, and don't clobber a value set upstream by the music provider.
if streamdetails.loudness is None:
if analysis := await self.mass.streams.audio_analysis.get_audio_analysis(
streamdetails.item_id,
streamdetails.provider,
media_type=streamdetails.media_type,
):
if analysis.loudness_integrated is not None:
streamdetails.loudness = round(analysis.loudness_integrated, 2)
if analysis.loudness_album is not None and streamdetails.loudness_album is None:
streamdetails.loudness_album = round(analysis.loudness_album, 2)

# re-evaluate normalization mode: the background loudness analyzer may have
# updated streamdetails.loudness since get_stream_details was called
if streamdetails.queue_id:
Expand Down
Loading
Loading