diff --git a/lib/core.ex b/lib/core.ex index c85b4b2..5ffb0e0 100644 --- a/lib/core.ex +++ b/lib/core.ex @@ -135,6 +135,9 @@ defmodule TelemetryMetricsPrometheus.Core do Metrics.sum("websocket.connection.count", reporter_options: [prometheus_type: :gauge]) + - `:exemplar_tags` - a subset of metadata keys by which aggregations will be broken down. Defaults to an empty list. + - `:exemplar_tag_values` - a function that receives the metadata and returns a map with the tags as keys and their respective values. Defaults to returning the metadata itself. + ### Missing or Invalid Measurements and Tags If a measurement value is missing or non-numeric, the error is logged at the `debug` level diff --git a/lib/core/aggregator.ex b/lib/core/aggregator.ex index 2fe5ea4..8da840c 100644 --- a/lib/core/aggregator.ex +++ b/lib/core/aggregator.ex @@ -6,8 +6,14 @@ defmodule TelemetryMetricsPrometheus.Core.Aggregator do alias Telemetry.Metrics alias TelemetryMetricsPrometheus.Core - @typep bucket :: {upper_bound :: String.t(), count :: non_neg_integer()} - @typep sample :: {name :: :telemetry.event_name(), {labels :: map(), measurement :: number()}} + @typep bucket :: + {upper_bound :: String.t(), count :: non_neg_integer(), + {measurement :: number(), exemplar_labels :: map(), monotonic_sample_time :: number()} + | nil} + @typep sample :: + {name :: :telemetry.event_name(), + {labels :: map(), exemplar_labels :: map(), monotonic_sample_time :: number(), + measurement :: number()}} @typep key :: {name :: :telemetry.event_name(), map()} @typep aggregation :: {[bucket()], non_neg_integer(), number()} @@ -78,7 +84,9 @@ defmodule TelemetryMetricsPrometheus.Core.Aggregator do defp merge({l_b, l_c, l_s}, {r_b, r_c, r_s}) do buckets = Enum.zip(l_b, r_b) - |> Enum.map(fn {{bucket, a}, {bucket, b}} -> {bucket, a + b} end) + |> Enum.map(fn {{bucket, a, a_exemplar}, {bucket, b, b_exemplar}} -> + {bucket, a + b, select_exemplar(a_exemplar, b_exemplar)} + end) {buckets, l_c + r_c, l_s + r_s} end @@ -98,43 +106,135 @@ defmodule TelemetryMetricsPrometheus.Core.Aggregator do :ets.insert(tid, {key, aggregation}) end - @spec group_samples(samples :: [sample()]) :: map() + @spec group_samples(samples :: [sample()]) :: %{ + :telemetry.event_name() => %{ + map() => [{number(), exemplar_labels :: map(), integer}] + } + } def group_samples(samples) do - Enum.reduce(samples, %{}, fn {name, {labels, measurement}}, acc -> - metric = Map.get(acc, name, %{}) - values = Map.get(metric, labels, []) - new_values = [measurement | values] - new_metric = Map.put(metric, labels, new_values) - Map.put(acc, name, new_metric) - end) + Enum.reduce( + samples, + %{}, + fn {name, {labels, exemplar_labels, monotonic_sample_time, measurement}}, acc -> + metric = Map.get(acc, name, %{}) + values = Map.get(metric, labels, []) + new_values = [{measurement, exemplar_labels, monotonic_sample_time} | values] + new_metric = Map.put(metric, labels, new_values) + Map.put(acc, name, new_metric) + end + ) end - @spec bucket_measurements(measurements :: [number()], buckets :: Core.Distribution.buckets()) :: + @spec bucket_measurements( + measurements :: [{number(), map(), integer()}], + buckets :: Core.Distribution.buckets() + ) :: {[bucket()], non_neg_integer(), number()} def bucket_measurements(measurements, [b | buckets]), - do: bucket(measurements, buckets, b, 0, 0, []) + do: bucket(measurements, buckets, b, nil, 0, 0, []) - defp bucket([], [], _, count, sum, result), do: {Enum.reverse(result), count, sum} + defp bucket([], [], _, _exemplar, count, sum, result), do: {Enum.reverse(result), count, sum} - defp bucket(measurements, [], "+Inf", count, sum, result) do - {new_count, new_sum} = - Enum.reduce(measurements, {count, sum}, fn m, {c, s} -> - {c + 1, s + m} + defp bucket(measurements, [], "+Inf", exemplar, count, sum, result) do + {new_count, new_sum, new_exemplar} = + Enum.reduce(measurements, {count, sum, exemplar}, fn {m, _el, _mst} = sample, {c, s, e} -> + {c + 1, s + m, select_exemplar(sample, e)} end) - bucket([], [], "+Inf", new_count, new_sum, [{"+Inf", new_count} | result]) + bucket([], [], "+Inf", new_exemplar, new_count, new_sum, [ + {"+Inf", new_count, new_exemplar} | result + ]) + end + + defp bucket([], buckets, cur_bucket, nil, count, sum, result) do + rest = + [cur_bucket | buckets] + |> Enum.reverse() + |> Enum.map(fn bucket -> {"#{bucket}", count, nil} end) + + bucket([], [], nil, nil, count, sum, rest ++ result) + end + + defp bucket( + [], + buckets, + cur_bucket, + {exemplar_value, _el, _emst} = exemplar, + count, + sum, + result + ) do + buckets_with_exemplars = + case Enum.split_while([cur_bucket | buckets], fn bucket -> + bucket != "+Inf" and exemplar_value > bucket + end) do + {le_buckets, [first_g_bucket | rest_g_buckets] = g_buckets} -> + Enum.map(le_buckets, &{&1, nil}) ++ + [{first_g_bucket, exemplar} | Enum.map(rest_g_buckets, &{&1, nil})] + end + + rest = + buckets_with_exemplars + |> Enum.reverse() + |> Enum.map(fn {bucket, exemplar} -> {"#{bucket}", count, exemplar} end) + + bucket([], [], nil, exemplar, count, sum, rest ++ result) + end + + defp bucket( + [{measurement, _exemplar_labels, _monotonic_sample_time} = sample | r_m] = measurements, + [b | r_b] = buckets, + cur_bucket, + exemplar, + count, + sum, + result + ) do + cond do + measurement <= cur_bucket -> + bucket( + r_m, + buckets, + cur_bucket, + select_exemplar(sample, exemplar), + count + 1, + sum + measurement, + result + ) + + true -> + bucket(measurements, r_b, b, nil, count, sum, [ + {"#{cur_bucket}", count, exemplar} | result + ]) + end + end + + defp select_exemplar(nil, nil) do + nil + end + + defp select_exemplar( + {_lm, _ll, left_monotonic_sample_time} = left, + nil + ) do + left end - defp bucket([], buckets, cur_bucket, count, sum, result) do - rest = Enum.reverse([cur_bucket | buckets]) |> Enum.map(&{"#{&1}", count}) - bucket([], [], nil, count, sum, rest ++ result) + defp select_exemplar( + nil, + {_rm, _rl, right_monotonic_sample_time} = right + ) do + right end - defp bucket([m | r_m] = measurements, [b | r_b] = buckets, cur_bucket, count, sum, result) do - if m <= cur_bucket do - bucket(r_m, buckets, cur_bucket, count + 1, sum + m, result) + defp select_exemplar( + {_lm, _ll, left_monotonic_sample_time} = left, + {_rm, _rl, right_monotonic_sample_time} = right + ) do + if left_monotonic_sample_time <= right_monotonic_sample_time do + right else - bucket(measurements, r_b, b, count, sum, [{"#{cur_bucket}", count} | result]) + left end end end diff --git a/lib/core/distribution.ex b/lib/core/distribution.ex index 1c6b5d1..2f780cc 100644 --- a/lib/core/distribution.ex +++ b/lib/core/distribution.ex @@ -25,6 +25,8 @@ defmodule TelemetryMetricsPrometheus.Core.Distribution do table: atom(), tags: Metrics.tags(), tag_values_fun: Metrics.tag_values(), + exemplar_tags: Metrics.tags(), + exemplar_tag_values_fun: Metrics.tag_values(), type: :histogram, unit: Metrics.unit() } @@ -48,6 +50,9 @@ defmodule TelemetryMetricsPrometheus.Core.Distribution do table: table_id, tags: metric.tags, tag_values_fun: metric.tag_values, + exemplar_tags: Keyword.get(metric.reporter_options, :exemplar_tags, []), + exemplar_tag_values_fun: + Keyword.get(metric.reporter_options, :exemplar_tag_values, &Function.identity/1), type: :histogram, unit: metric.unit } @@ -71,8 +76,19 @@ defmodule TelemetryMetricsPrometheus.Core.Distribution do EventHandler.get_measurement(measurements, metadata, config.measurement), mapped_values <- config.tag_values_fun.(metadata), :ok <- EventHandler.validate_tags_in_tag_values(config.tags, mapped_values), - labels <- Map.take(mapped_values, config.tags) do - true = :ets.insert(config.table, {config.name, {labels, measurement}}) + labels <- Map.take(mapped_values, config.tags), + mapped_exemplar_values <- config.exemplar_tag_values_fun.(metadata), + :ok <- + EventHandler.validate_tags_in_exemplar_tag_values( + config.exemplar_tags, + mapped_exemplar_values + ), + exemplar_labels <- Map.take(mapped_exemplar_values, config.exemplar_tags) do + true = + :ets.insert( + config.table, + {config.name, {labels, exemplar_labels, System.monotonic_time(), measurement}} + ) :ok else diff --git a/lib/core/event_handler.ex b/lib/core/event_handler.ex index b8f7b33..763ecc0 100644 --- a/lib/core/event_handler.ex +++ b/lib/core/event_handler.ex @@ -37,6 +37,15 @@ defmodule TelemetryMetricsPrometheus.Core.EventHandler do end end + @spec validate_tags_in_exemplar_tag_values(Telemetry.Metrics.tags(), map()) :: + :ok | tags_missing_error() + def validate_tags_in_exemplar_tag_values(tags, tag_values) do + case Enum.reject(tags, &match?(%{^&1 => _}, tag_values)) do + [] -> :ok + missing_tags -> {:exemplar_tags_missing, missing_tags} + end + end + @spec get_measurement( measurements :: :telemetry.event_measurements(), metadata :: :telemetry.event_metadata(), @@ -89,4 +98,10 @@ defmodule TelemetryMetricsPrometheus.Core.EventHandler do "Tags missing from tag_values. metric_name:=#{inspect(config.name)} tags:=#{inspect(Enum.join(tags))}" ) end + + def handle_event_error({:exemplar_tags_missing, tags}, config) do + Logger.debug( + "Tags missing from exemplar_tag_values. metric_name:=#{inspect(config.name)} exemplar_tags:=#{inspect(Enum.join(tags))}" + ) + end end diff --git a/lib/core/exporter.ex b/lib/core/exporter.ex index 6dd07a9..d257bc4 100644 --- a/lib/core/exporter.ex +++ b/lib/core/exporter.ex @@ -54,12 +54,28 @@ defmodule TelemetryMetricsPrometheus.Core.Exporter do has_labels = map_size(labels) > 0 samples = - Enum.map_join(buckets, "\n", fn {upper_bound, count} -> + Enum.map_join(buckets, "\n", fn {upper_bound, count, exemplar} -> + exemplar_text = + case exemplar do + nil -> + "" + + {value, exemplar_labels, monotonic_sample_time} -> + time = + System.convert_time_unit( + monotonic_sample_time + System.time_offset(), + :native, + :millisecond + ) / 1000 + + " # {#{format_labels(exemplar_labels)}} #{value} #{time}" + end + if has_labels do ~s(#{name}_bucket{#{format_labels(labels)},le="#{upper_bound}"} #{count}) else ~s(#{name}_bucket{le="#{upper_bound}"} #{count}) - end + end <> exemplar_text end) summary = diff --git a/test/aggregator_test.exs b/test/aggregator_test.exs index 712fc43..03ada4c 100644 --- a/test/aggregator_test.exs +++ b/test/aggregator_test.exs @@ -24,13 +24,17 @@ defmodule TelemetryMetricsPrometheus.Core.AggregatorTest do describe "bucket_measurements/2" do test "measurements are properly bucketed" do [ - {[1, 2, 3, "+Inf"], [], {[{"1", 0}, {"2", 0}, {"3", 0}, {"+Inf", 0}], 0, 0}, + {[1, 2, 3, "+Inf"], [], + {[{"1", 0, nil}, {"2", 0, nil}, {"3", 0, nil}, {"+Inf", 0, nil}], 0, 0}, "with no measurements"}, - {[1, 2, 3, "+Inf"], [0.1], {[{"1", 1}, {"2", 1}, {"3", 1}, {"+Inf", 1}], 1, 0.1}, + {[1, 2, 3, "+Inf"], [{0.1, %{}, 1}], + {[{"1", 1, {0.1, %{}, 1}}, {"2", 1, nil}, {"3", 1, nil}, {"+Inf", 1, nil}], 1, 0.1}, "with one measurement"}, - {[1, 2, 3, "+Inf"], [2, 3.1], {[{"1", 0}, {"2", 1}, {"3", 1}, {"+Inf", 2}], 2, 5.1}, - "compares measurement to bucket limit correctly"}, - {[1, 2, 3, "+Inf"], [4, 5], {[{"1", 0}, {"2", 0}, {"3", 0}, {"+Inf", 2}], 2, 9}, + {[1, 2, 3, "+Inf"], [{2, %{}, 1}, {3.1, %{}, 2}], + {[{"1", 0, nil}, {"2", 1, {2, %{}, 1}}, {"3", 1, nil}, {"+Inf", 2, {3.1, %{}, 2}}], 2, + 5.1}, "compares measurement to bucket limit correctly"}, + {[1, 2, 3, "+Inf"], [{4, %{}, 1}, {5, %{}, 2}], + {[{"1", 0, nil}, {"2", 0, nil}, {"3", 0, nil}, {"+Inf", 2, {5, %{}, 2}}], 2, 9}, "with measurements over the bucket limits"} ] |> Enum.each(fn {buckets, measurements, expected_buckets, message} -> @@ -111,10 +115,14 @@ defmodule TelemetryMetricsPrometheus.Core.AggregatorTest do conn: %{method: "GET", path_info: ["users", "123"]} }) + middle_time = System.monotonic_time() + :telemetry.execute([:some, :plug, :call, :stop], %{duration: 3_000_000_000}, %{ conn: %{method: "GET", path_info: ["users", "123"]} }) + end_time = System.monotonic_time() + :ok = Aggregator.aggregate([metric], tid, dist_tid) [ @@ -122,18 +130,27 @@ defmodule TelemetryMetricsPrometheus.Core.AggregatorTest do {bucketed, count, sum}} ] = :ets.tab2list(tid) - assert bucketed == [{"1", 0}, {"2", 0}, {"3", 2}, {"+Inf", 2}] + assert [{"1", 0, nil}, {"2", 0, nil}, {"3", 2, {3.0, %{}, mst}}, {"+Inf", 2, nil}] = + bucketed + + assert mst >= middle_time and mst <= end_time assert count == 2 assert sum == 6.0 + beginning_time = System.monotonic_time() + :telemetry.execute([:some, :plug, :call, :stop], %{duration: 1_500_000_000}, %{ conn: %{method: "GET", path_info: ["users", "123"]} }) + middle_time = System.monotonic_time() + :telemetry.execute([:some, :plug, :call, :stop], %{duration: 0_800_000_000}, %{ conn: %{method: "GET", path_info: ["users", "123"]} }) + end_time = System.monotonic_time() + :ok = Aggregator.aggregate([metric], tid, dist_tid) [ @@ -141,7 +158,16 @@ defmodule TelemetryMetricsPrometheus.Core.AggregatorTest do {bucketed_2, count_2, sum_2}} ] = :ets.tab2list(tid) - assert bucketed_2 == [{"1", 1}, {"2", 2}, {"3", 4}, {"+Inf", 4}] + assert [ + {"1", 1, {0.8, %{}, mst1}}, + {"2", 2, {1.5, %{}, mst2}}, + {"3", 4, {3.0, %{}, mst3}}, + {"+Inf", 4, nil} + ] = bucketed_2 + + assert mst1 >= middle_time and mst1 <= end_time + assert mst2 >= beginning_time and mst2 <= middle_time + assert mst3 <= beginning_time assert count_2 == 4 assert sum_2 == 8.3 diff --git a/test/exporter_test.exs b/test/exporter_test.exs index c9a954b..428bda1 100644 --- a/test/exporter_test.exs +++ b/test/exporter_test.exs @@ -252,12 +252,12 @@ defmodule TelemetryMetricsPrometheus.Core.ExporterTest do ) buckets = [ - {"0.05", 24054}, - {"0.1", 33444}, - {"0.2", 100_392}, - {"0.5", 129_389}, - {"1", 133_988}, - {"+Inf", 144_320} + {"0.05", 24054, nil}, + {"0.1", 33444, nil}, + {"0.2", 100_392, nil}, + {"0.5", 129_389, nil}, + {"1", 133_988, nil}, + {"+Inf", 144_320, nil} ] result = @@ -294,12 +294,53 @@ defmodule TelemetryMetricsPrometheus.Core.ExporterTest do ) buckets = [ - {"0.05", 24054}, - {"0.1", 33444}, - {"0.2", 100_392}, - {"0.5", 129_389}, - {"1", 133_988}, - {"+Inf", 144_320} + {"0.05", 24054, nil}, + {"0.1", 33444, nil}, + {"0.2", 100_392, nil}, + {"0.5", 129_389, nil}, + {"1", 133_988, nil}, + {"+Inf", 144_320, nil} + ] + + result = Exporter.format(metric, [{{metric.name, %{}}, {buckets, 144_320, 53423}}]) + + assert result == expected + end + + test "distribution with exemplar" do + fake_trace_id = inspect(make_ref()) + time = System.monotonic_time() + + time_text = + System.convert_time_unit(time + System.time_offset(), :native, :millisecond) / 1000 + + expected = """ + # HELP http_request_duration_seconds A histogram of the request duration. + # TYPE http_request_duration_seconds histogram + http_request_duration_seconds_bucket{le="0.05"} 24054 + http_request_duration_seconds_bucket{le="0.1"} 33444 + http_request_duration_seconds_bucket{le="0.2"} 100392 # {trace_id="#{fake_trace_id}"} 0.15 #{time_text} + http_request_duration_seconds_bucket{le="0.5"} 129389 + http_request_duration_seconds_bucket{le="1"} 133988 + http_request_duration_seconds_bucket{le="+Inf"} 144320 + http_request_duration_seconds_sum 53423 + http_request_duration_seconds_count 144320\ + """ + + metric = + Metrics.distribution("http.request.duration.seconds", + buckets: [0.05, 0.1, 0.2, 0.5, 1], + description: "A histogram of the request duration.", + unit: {:native, :second} + ) + + buckets = [ + {"0.05", 24054, nil}, + {"0.1", 33444, nil}, + {"0.2", 100_392, {0.15, %{trace_id: fake_trace_id}, time}}, + {"0.5", 129_389, nil}, + {"1", 133_988, nil}, + {"+Inf", 144_320, nil} ] result = Exporter.format(metric, [{{metric.name, %{}}, {buckets, 144_320, 53423}}]) diff --git a/test/metrics_test.exs b/test/metrics_test.exs index 6a9922c..50c4dc0 100644 --- a/test/metrics_test.exs +++ b/test/metrics_test.exs @@ -261,6 +261,70 @@ defmodule TelemetryMetricsPrometheus.Core.MetricsTest do cleanup(tid) end + test "records a times series and exemplar for each tag kv pair using a measurement from the metadata", + %{ + dist_tid: tid + } do + buckets = [ + 256, + 512, + 1024, + 2048, + 4096 + ] + + metric = + Metrics.distribution("some.plug.call.resp_payload_size", + buckets: buckets, + description: "Plug call response payload size", + event_name: [:some, :plug, :call, :stop], + measurement: fn _measurements, metadata -> + metadata.conn.resp_size + end, + unit: :kilobyte, + tags: [:method], + tag_values: fn %{conn: conn} -> + %{ + method: conn.method + } + end, + reporter_options: [ + exemplar_tags: [:trace_id], + exemplar_tag_values: fn metadata -> + assert Map.has_key?(metadata, :conn) + %{trace_id: :erlang.term_to_binary(make_ref())} + end + ] + ) + + {:ok, _handler_id} = Distribution.register(metric, tid, self()) + + :telemetry.execute([:some, :plug, :call, :stop], %{duration: 5.6e7}, %{ + conn: %{method: "GET", path_info: ["users", "123"], resp_size: 180} + }) + + :telemetry.execute([:some, :plug, :call, :stop], %{duration: 1.1e8}, %{ + conn: %{method: "POST", path_info: ["products", "238"], resp_size: 4_000} + }) + + :telemetry.execute([:some, :plug, :call, :stop], %{duration: 8.7e7}, %{ + conn: %{method: "GET", path_info: ["users", "123"], resp_size: 2_000} + }) + + # , %{method: "GET", path_root: "users"}} + key_1 = metric.name + samples = :ets.lookup(tid, key_1) + + assert length(samples) == 3 + + assert {[:some, :plug, :call, :resp_payload_size], + {%{method: "GET"}, %{trace_id: trace_id}, _exemplar_time, 180}} = hd(samples) + + assert is_reference(:erlang.binary_to_term(trace_id)) + + cleanup(tid) + end + test "records a times series for each tag kv pair using a measurement from the metadata", %{ dist_tid: tid } do @@ -309,7 +373,8 @@ defmodule TelemetryMetricsPrometheus.Core.MetricsTest do assert length(samples) == 3 - assert hd(samples) == {[:some, :plug, :call, :resp_payload_size], {%{method: "GET"}, 180}} + assert {[:some, :plug, :call, :resp_payload_size], + {%{method: "GET"}, _exemplar_labels, _exemplar_time, 180}} = hd(samples) cleanup(tid) end @@ -374,8 +439,9 @@ defmodule TelemetryMetricsPrometheus.Core.MetricsTest do assert length(samples) == 3 - assert hd(samples) == - {[:some, :plug, :call, :duration], {%{method: "GET", path_root: "users"}, 0.056}} + assert {[:some, :plug, :call, :duration], + {%{method: "GET", path_root: "users"}, _exemplar_labels, _exemplar_time, 0.056}} = + hd(samples) cleanup(tid) end