Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/core.ex
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ defmodule TelemetryMetricsPrometheus.Core do

Metrics.sum("websocket.connection.count", reporter_options: [prometheus_type: :gauge])

- `:exemplar_tags` - a subset of metadata keys by which aggregations will be broken down. Defaults to an empty list.
- `:exemplar_tag_values` - a function that receives the metadata and returns a map with the tags as keys and their respective values. Defaults to returning the metadata itself.

### Missing or Invalid Measurements and Tags

If a measurement value is missing or non-numeric, the error is logged at the `debug` level
Expand Down
152 changes: 126 additions & 26 deletions lib/core/aggregator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ defmodule TelemetryMetricsPrometheus.Core.Aggregator do
alias Telemetry.Metrics
alias TelemetryMetricsPrometheus.Core

@typep bucket :: {upper_bound :: String.t(), count :: non_neg_integer()}
@typep sample :: {name :: :telemetry.event_name(), {labels :: map(), measurement :: number()}}
@typep bucket ::
{upper_bound :: String.t(), count :: non_neg_integer(),
{measurement :: number(), exemplar_labels :: map(), monotonic_sample_time :: number()}
| nil}
@typep sample ::
{name :: :telemetry.event_name(),
{labels :: map(), exemplar_labels :: map(), monotonic_sample_time :: number(),
measurement :: number()}}
@typep key :: {name :: :telemetry.event_name(), map()}
@typep aggregation :: {[bucket()], non_neg_integer(), number()}

Expand Down Expand Up @@ -78,7 +84,9 @@ defmodule TelemetryMetricsPrometheus.Core.Aggregator do
defp merge({l_b, l_c, l_s}, {r_b, r_c, r_s}) do
buckets =
Enum.zip(l_b, r_b)
|> Enum.map(fn {{bucket, a}, {bucket, b}} -> {bucket, a + b} end)
|> Enum.map(fn {{bucket, a, a_exemplar}, {bucket, b, b_exemplar}} ->
{bucket, a + b, select_exemplar(a_exemplar, b_exemplar)}
end)

{buckets, l_c + r_c, l_s + r_s}
end
Expand All @@ -98,43 +106,135 @@ defmodule TelemetryMetricsPrometheus.Core.Aggregator do
:ets.insert(tid, {key, aggregation})
end

@spec group_samples(samples :: [sample()]) :: map()
@spec group_samples(samples :: [sample()]) :: %{
:telemetry.event_name() => %{
map() => [{number(), exemplar_labels :: map(), integer}]
}
}
def group_samples(samples) do
Enum.reduce(samples, %{}, fn {name, {labels, measurement}}, acc ->
metric = Map.get(acc, name, %{})
values = Map.get(metric, labels, [])
new_values = [measurement | values]
new_metric = Map.put(metric, labels, new_values)
Map.put(acc, name, new_metric)
end)
Enum.reduce(
samples,
%{},
fn {name, {labels, exemplar_labels, monotonic_sample_time, measurement}}, acc ->
metric = Map.get(acc, name, %{})
values = Map.get(metric, labels, [])
new_values = [{measurement, exemplar_labels, monotonic_sample_time} | values]
new_metric = Map.put(metric, labels, new_values)
Map.put(acc, name, new_metric)
end
)
end

@spec bucket_measurements(measurements :: [number()], buckets :: Core.Distribution.buckets()) ::
@spec bucket_measurements(
measurements :: [{number(), map(), integer()}],
buckets :: Core.Distribution.buckets()
) ::
{[bucket()], non_neg_integer(), number()}
def bucket_measurements(measurements, [b | buckets]),
do: bucket(measurements, buckets, b, 0, 0, [])
do: bucket(measurements, buckets, b, nil, 0, 0, [])

defp bucket([], [], _, count, sum, result), do: {Enum.reverse(result), count, sum}
defp bucket([], [], _, _exemplar, count, sum, result), do: {Enum.reverse(result), count, sum}

defp bucket(measurements, [], "+Inf", count, sum, result) do
{new_count, new_sum} =
Enum.reduce(measurements, {count, sum}, fn m, {c, s} ->
{c + 1, s + m}
defp bucket(measurements, [], "+Inf", exemplar, count, sum, result) do
{new_count, new_sum, new_exemplar} =
Enum.reduce(measurements, {count, sum, exemplar}, fn {m, _el, _mst} = sample, {c, s, e} ->
{c + 1, s + m, select_exemplar(sample, e)}
end)

bucket([], [], "+Inf", new_count, new_sum, [{"+Inf", new_count} | result])
bucket([], [], "+Inf", new_exemplar, new_count, new_sum, [
{"+Inf", new_count, new_exemplar} | result
])
end

defp bucket([], buckets, cur_bucket, nil, count, sum, result) do
rest =
[cur_bucket | buckets]
|> Enum.reverse()
|> Enum.map(fn bucket -> {"#{bucket}", count, nil} end)

bucket([], [], nil, nil, count, sum, rest ++ result)
end

defp bucket(
[],
buckets,
cur_bucket,
{exemplar_value, _el, _emst} = exemplar,
count,
sum,
result
) do
buckets_with_exemplars =
case Enum.split_while([cur_bucket | buckets], fn bucket ->
bucket != "+Inf" and exemplar_value > bucket
end) do
{le_buckets, [first_g_bucket | rest_g_buckets] = g_buckets} ->
Enum.map(le_buckets, &{&1, nil}) ++
[{first_g_bucket, exemplar} | Enum.map(rest_g_buckets, &{&1, nil})]
end

rest =
buckets_with_exemplars
|> Enum.reverse()
|> Enum.map(fn {bucket, exemplar} -> {"#{bucket}", count, exemplar} end)

bucket([], [], nil, exemplar, count, sum, rest ++ result)
end

defp bucket(
[{measurement, _exemplar_labels, _monotonic_sample_time} = sample | r_m] = measurements,
[b | r_b] = buckets,
cur_bucket,
exemplar,
count,
sum,
result
) do
cond do
measurement <= cur_bucket ->
bucket(
r_m,
buckets,
cur_bucket,
select_exemplar(sample, exemplar),
count + 1,
sum + measurement,
result
)

true ->
bucket(measurements, r_b, b, nil, count, sum, [
{"#{cur_bucket}", count, exemplar} | result
])
end
end

defp select_exemplar(nil, nil) do
nil
end

defp select_exemplar(
{_lm, _ll, left_monotonic_sample_time} = left,
nil
) do
left
end

defp bucket([], buckets, cur_bucket, count, sum, result) do
rest = Enum.reverse([cur_bucket | buckets]) |> Enum.map(&{"#{&1}", count})
bucket([], [], nil, count, sum, rest ++ result)
defp select_exemplar(
nil,
{_rm, _rl, right_monotonic_sample_time} = right
) do
right
end

defp bucket([m | r_m] = measurements, [b | r_b] = buckets, cur_bucket, count, sum, result) do
if m <= cur_bucket do
bucket(r_m, buckets, cur_bucket, count + 1, sum + m, result)
defp select_exemplar(
{_lm, _ll, left_monotonic_sample_time} = left,
{_rm, _rl, right_monotonic_sample_time} = right
) do
if left_monotonic_sample_time <= right_monotonic_sample_time do
right
else
bucket(measurements, r_b, b, count, sum, [{"#{cur_bucket}", count} | result])
left
end
end
end
20 changes: 18 additions & 2 deletions lib/core/distribution.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule TelemetryMetricsPrometheus.Core.Distribution do
table: atom(),
tags: Metrics.tags(),
tag_values_fun: Metrics.tag_values(),
exemplar_tags: Metrics.tags(),
exemplar_tag_values_fun: Metrics.tag_values(),
type: :histogram,
unit: Metrics.unit()
}
Expand All @@ -48,6 +50,9 @@ defmodule TelemetryMetricsPrometheus.Core.Distribution do
table: table_id,
tags: metric.tags,
tag_values_fun: metric.tag_values,
exemplar_tags: Keyword.get(metric.reporter_options, :exemplar_tags, []),
exemplar_tag_values_fun:
Keyword.get(metric.reporter_options, :exemplar_tag_values, &Function.identity/1),
type: :histogram,
unit: metric.unit
}
Expand All @@ -71,8 +76,19 @@ defmodule TelemetryMetricsPrometheus.Core.Distribution do
EventHandler.get_measurement(measurements, metadata, config.measurement),
mapped_values <- config.tag_values_fun.(metadata),
:ok <- EventHandler.validate_tags_in_tag_values(config.tags, mapped_values),
labels <- Map.take(mapped_values, config.tags) do
true = :ets.insert(config.table, {config.name, {labels, measurement}})
labels <- Map.take(mapped_values, config.tags),
mapped_exemplar_values <- config.exemplar_tag_values_fun.(metadata),
:ok <-
EventHandler.validate_tags_in_exemplar_tag_values(
config.exemplar_tags,
mapped_exemplar_values
),
exemplar_labels <- Map.take(mapped_exemplar_values, config.exemplar_tags) do
true =
:ets.insert(
config.table,
{config.name, {labels, exemplar_labels, System.monotonic_time(), measurement}}
)

:ok
else
Expand Down
15 changes: 15 additions & 0 deletions lib/core/event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ defmodule TelemetryMetricsPrometheus.Core.EventHandler do
end
end

@spec validate_tags_in_exemplar_tag_values(Telemetry.Metrics.tags(), map()) ::
:ok | tags_missing_error()
def validate_tags_in_exemplar_tag_values(tags, tag_values) do
case Enum.reject(tags, &match?(%{^&1 => _}, tag_values)) do
[] -> :ok
missing_tags -> {:exemplar_tags_missing, missing_tags}
end
end

@spec get_measurement(
measurements :: :telemetry.event_measurements(),
metadata :: :telemetry.event_metadata(),
Expand Down Expand Up @@ -89,4 +98,10 @@ defmodule TelemetryMetricsPrometheus.Core.EventHandler do
"Tags missing from tag_values. metric_name:=#{inspect(config.name)} tags:=#{inspect(Enum.join(tags))}"
)
end

def handle_event_error({:exemplar_tags_missing, tags}, config) do
Logger.debug(
"Tags missing from exemplar_tag_values. metric_name:=#{inspect(config.name)} exemplar_tags:=#{inspect(Enum.join(tags))}"
)
end
end
20 changes: 18 additions & 2 deletions lib/core/exporter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,28 @@ defmodule TelemetryMetricsPrometheus.Core.Exporter do
has_labels = map_size(labels) > 0

samples =
Enum.map_join(buckets, "\n", fn {upper_bound, count} ->
Enum.map_join(buckets, "\n", fn {upper_bound, count, exemplar} ->
exemplar_text =
case exemplar do
nil ->
""

{value, exemplar_labels, monotonic_sample_time} ->
time =
System.convert_time_unit(
monotonic_sample_time + System.time_offset(),
:native,
:millisecond
) / 1000

" # {#{format_labels(exemplar_labels)}} #{value} #{time}"
end

if has_labels do
~s(#{name}_bucket{#{format_labels(labels)},le="#{upper_bound}"} #{count})
else
~s(#{name}_bucket{le="#{upper_bound}"} #{count})
end
end <> exemplar_text
end)

summary =
Expand Down
40 changes: 33 additions & 7 deletions test/aggregator_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ defmodule TelemetryMetricsPrometheus.Core.AggregatorTest do
describe "bucket_measurements/2" do
test "measurements are properly bucketed" do
[
{[1, 2, 3, "+Inf"], [], {[{"1", 0}, {"2", 0}, {"3", 0}, {"+Inf", 0}], 0, 0},
{[1, 2, 3, "+Inf"], [],
{[{"1", 0, nil}, {"2", 0, nil}, {"3", 0, nil}, {"+Inf", 0, nil}], 0, 0},
"with no measurements"},
{[1, 2, 3, "+Inf"], [0.1], {[{"1", 1}, {"2", 1}, {"3", 1}, {"+Inf", 1}], 1, 0.1},
{[1, 2, 3, "+Inf"], [{0.1, %{}, 1}],
{[{"1", 1, {0.1, %{}, 1}}, {"2", 1, nil}, {"3", 1, nil}, {"+Inf", 1, nil}], 1, 0.1},
"with one measurement"},
{[1, 2, 3, "+Inf"], [2, 3.1], {[{"1", 0}, {"2", 1}, {"3", 1}, {"+Inf", 2}], 2, 5.1},
"compares measurement to bucket limit correctly"},
{[1, 2, 3, "+Inf"], [4, 5], {[{"1", 0}, {"2", 0}, {"3", 0}, {"+Inf", 2}], 2, 9},
{[1, 2, 3, "+Inf"], [{2, %{}, 1}, {3.1, %{}, 2}],
{[{"1", 0, nil}, {"2", 1, {2, %{}, 1}}, {"3", 1, nil}, {"+Inf", 2, {3.1, %{}, 2}}], 2,
5.1}, "compares measurement to bucket limit correctly"},
{[1, 2, 3, "+Inf"], [{4, %{}, 1}, {5, %{}, 2}],
{[{"1", 0, nil}, {"2", 0, nil}, {"3", 0, nil}, {"+Inf", 2, {5, %{}, 2}}], 2, 9},
"with measurements over the bucket limits"}
]
|> Enum.each(fn {buckets, measurements, expected_buckets, message} ->
Expand Down Expand Up @@ -111,37 +115,59 @@ defmodule TelemetryMetricsPrometheus.Core.AggregatorTest do
conn: %{method: "GET", path_info: ["users", "123"]}
})

middle_time = System.monotonic_time()

:telemetry.execute([:some, :plug, :call, :stop], %{duration: 3_000_000_000}, %{
conn: %{method: "GET", path_info: ["users", "123"]}
})

end_time = System.monotonic_time()

:ok = Aggregator.aggregate([metric], tid, dist_tid)

[
{{[:some, :plug, :call, :duration], %{method: "GET", path_root: "users"}},
{bucketed, count, sum}}
] = :ets.tab2list(tid)

assert bucketed == [{"1", 0}, {"2", 0}, {"3", 2}, {"+Inf", 2}]
assert [{"1", 0, nil}, {"2", 0, nil}, {"3", 2, {3.0, %{}, mst}}, {"+Inf", 2, nil}] =
bucketed

assert mst >= middle_time and mst <= end_time
assert count == 2
assert sum == 6.0

beginning_time = System.monotonic_time()

:telemetry.execute([:some, :plug, :call, :stop], %{duration: 1_500_000_000}, %{
conn: %{method: "GET", path_info: ["users", "123"]}
})

middle_time = System.monotonic_time()

:telemetry.execute([:some, :plug, :call, :stop], %{duration: 0_800_000_000}, %{
conn: %{method: "GET", path_info: ["users", "123"]}
})

end_time = System.monotonic_time()

:ok = Aggregator.aggregate([metric], tid, dist_tid)

[
{{[:some, :plug, :call, :duration], %{method: "GET", path_root: "users"}},
{bucketed_2, count_2, sum_2}}
] = :ets.tab2list(tid)

assert bucketed_2 == [{"1", 1}, {"2", 2}, {"3", 4}, {"+Inf", 4}]
assert [
{"1", 1, {0.8, %{}, mst1}},
{"2", 2, {1.5, %{}, mst2}},
{"3", 4, {3.0, %{}, mst3}},
{"+Inf", 4, nil}
] = bucketed_2

assert mst1 >= middle_time and mst1 <= end_time
assert mst2 >= beginning_time and mst2 <= middle_time
assert mst3 <= beginning_time
assert count_2 == 4
assert sum_2 == 8.3

Expand Down
Loading