Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ pub enum Feature {
/// Stream minidumps to objectstore.
#[serde(rename = "projects:relay-minidump-uploads")]
MinidumpUploads,
/// When converting measurements into attributes, use the name from the measurement
/// definition.
#[serde(rename = "projects:relay-measurements-smart-conversion")]
MeasurementsSmartConversion,

/// Enables OTLP spans to use the Span V2 processing pipeline in Relay.
///
Expand Down
9 changes: 8 additions & 1 deletion relay-server/src/processing/legacy_spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ impl Forward for LegacySpanOutput {
s: processing::forward::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use relay_dynamic_config::Feature;

let spans = match self {
Self::Serialized(spans) => {
return Err(spans
Expand All @@ -192,9 +194,14 @@ impl Forward for LegacySpanOutput {
}

let retention = ctx.retention(|r| r.span.as_ref());
let use_measurements_attribute = ctx
.project_info
.has_feature(Feature::MeasurementsSmartConversion);

for span in spans.split(|spans| spans.spans.into_iter().map(IndexedOnly)) {
if let Ok(span) = span.try_map(|span, _| store::convert(span.0, retention)) {
if let Ok(span) = span
.try_map(|span, _| store::convert(span.0, retention, use_measurements_attribute))
Comment thread
loewenheim marked this conversation as resolved.
Outdated
{
s.send_to_store(span)
};
}
Expand Down
9 changes: 7 additions & 2 deletions relay-server/src/processing/legacy_spans/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ macro_rules! required {
}

/// Converts a [`Span`] into a [`StoreSpanV2`] to be sent to Kafka.
pub fn convert(span: Annotated<Span>, retentions: Retention) -> Result<Box<StoreSpanV2>> {
let span = span.map_value(relay_spans::span_v1_to_span_v2);
pub fn convert(
span: Annotated<Span>,
retentions: Retention,
use_measurements_attribute: bool,
) -> Result<Box<StoreSpanV2>> {
let span =
span.map_value(|span| relay_spans::span_v1_to_span_v2(span, use_measurements_attribute));
let span = required!(span);

Ok(Box::new(StoreSpanV2 {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl processing::Processor for SpansProcessor {
dynamic_sampling::validate_configs(ctx);
dynamic_sampling::validate_dsc_presence(&spans).reject(&spans)?;

let spans = process::expand(spans)?;
let spans = process::expand(spans, ctx)?;

let mut spans = match dynamic_sampling::run(spans, ctx) {
Ok(spans) => spans,
Expand Down
18 changes: 13 additions & 5 deletions relay-server/src/processing/spans/process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::time::Duration;

use relay_dynamic_config::Feature;
use relay_event_normalization::eap::ClientUserAgentInfo;
use relay_event_normalization::{GeoIpLookup, RequiredMode, SchemaProcessor, eap};
use relay_event_schema::processor::{ProcessingState, ValueType, process_value};
Expand All @@ -19,7 +20,10 @@ use crate::services::outcome::DiscardReason;
/// Parses all serialized spans.
///
/// Individual, invalid spans are discarded.
pub fn expand(spans: Managed<SerializedSpans>) -> Result<Managed<ExpandedSpans>, Rejected<Error>> {
pub fn expand(
spans: Managed<SerializedSpans>,
ctx: Context<'_>,
) -> Result<Managed<ExpandedSpans>, Rejected<Error>> {
spans.try_map(|spans, records| {
let SerializedSpans {
headers,
Expand All @@ -35,7 +39,7 @@ pub fn expand(spans: Managed<SerializedSpans>) -> Result<Managed<ExpandedSpans>,

let (settings, spans) = match items {
SpanItems::Container(item) => expand_span_container(&item)?,
SpanItems::Legacy(items) => expand_legacy_spans(items, records),
SpanItems::Legacy(items) => expand_legacy_spans(items, records, ctx),
SpanItems::Integration(item) => spans::integrations::expand(records, &[item]),
SpanItems::None => (Default::default(), Vec::new()),
};
Expand Down Expand Up @@ -126,10 +130,11 @@ fn expand_span_container(item: &Item) -> Result<(Settings, ContainerItems<SpanV2
fn expand_legacy_spans(
items: Vec<Item>,
records: &mut RecordKeeper<'_>,
ctx: Context<'_>,
) -> (Settings, ContainerItems<SpanV2>) {
let spans = items
.into_iter()
.filter_map(|item| match expand_legacy_span(&item) {
.filter_map(|item| match expand_legacy_span(&item, ctx) {
Ok(span) => Some(span),
Err(err) => {
records.reject_err(err, item);
Expand All @@ -141,15 +146,18 @@ fn expand_legacy_spans(
(Settings::default(), spans)
}

fn expand_legacy_span(item: &Item) -> Result<WithHeader<SpanV2>> {
fn expand_legacy_span(item: &Item, ctx: Context<'_>) -> Result<WithHeader<SpanV2>> {
Comment thread
loewenheim marked this conversation as resolved.
let payload = item.payload();
let use_measurements_smart_conversion = ctx
.project_info
.has_feature(Feature::MeasurementsSmartConversion);

let span = Annotated::<Span>::from_json_bytes(&payload)
.map_err(|err| {
relay_log::debug!("failed to parse span: {err}");
Error::Invalid(DiscardReason::InvalidJson)
})?
.map_value(relay_spans::span_v1_to_span_v2);
.map_value(|span| relay_spans::span_v1_to_span_v2(span, use_measurements_smart_conversion));

Ok(WithHeader::new(span))
}
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/transactions/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ pub fn extract_spans(
if let Some(results) = spans::extract_from_event(
work.headers.dsc(),
&work.event,
ctx.config,
ctx,
server_sample_rate,
EventMetricsExtracted(work.flags.metrics_extracted),
SpansExtracted(work.flags.spans_extracted),
Expand Down
27 changes: 16 additions & 11 deletions relay-server/src/processing/transactions/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::envelope::{ContentType, Item, ItemType};
use crate::processing;
use crate::processing::utils::event::{EventMetricsExtracted, SpansExtracted, event_type};
use relay_base_schema::events::EventType;
use relay_config::Config;
use relay_dynamic_config::Feature;
use relay_event_schema::protocol::{Event, Measurement, Measurements, Span};
use relay_metrics::MetricNamespace;
use relay_metrics::{FractionUnit, MetricUnit};
Expand All @@ -15,7 +15,7 @@ use relay_sampling::DynamicSamplingContext;
pub fn extract_from_event(
dsc: Option<&DynamicSamplingContext>,
event: &Annotated<Event>,
config: &Config,
ctx: processing::Context<'_>,
server_sample_rate: Option<f64>,
event_metrics_extracted: EventMetricsExtracted,
spans_extracted: SpansExtracted,
Expand All @@ -35,7 +35,7 @@ pub fn extract_from_event(

let transaction_span = processing::transactions::extraction::extract_segment_span(
event,
config
ctx.config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
&[],
Expand Down Expand Up @@ -64,7 +64,7 @@ pub fn extract_from_event(

results.push(make_span_item(
new_span,
config,
ctx,
client_sample_rate,
server_sample_rate,
event_metrics_extracted.0,
Expand All @@ -74,7 +74,7 @@ pub fn extract_from_event(

results.push(make_span_item(
transaction_span,
config,
ctx,
client_sample_rate,
server_sample_rate,
event_metrics_extracted.0,
Expand All @@ -85,7 +85,7 @@ pub fn extract_from_event(

fn make_span_item(
mut span: Span,
config: &Config,
ctx: processing::Context<'_>,
client_sample_rate: Option<f64>,
server_sample_rate: Option<f64>,
metrics_extracted: bool,
Expand Down Expand Up @@ -114,7 +114,7 @@ fn make_span_item(
})
.map_err(|_| ())?;

let mut item = create_span_item(span, config)?;
let mut item = create_span_item(span, ctx)?;

// If metrics extraction happened for the event, it also happened for its spans:
item.set_metrics_extracted(metrics_extracted);
Expand Down Expand Up @@ -200,10 +200,15 @@ pub fn validate(span: &mut Annotated<Span>) -> Result<(), ValidationError> {
/// Serializes the given span into an envelope item.
///
/// In processing relays, creates a Span V2 so it can be published via kafka.
pub fn create_span_item(span: Annotated<Span>, config: &Config) -> Result<Item, ()> {
pub fn create_span_item(span: Annotated<Span>, ctx: processing::Context<'_>) -> Result<Item, ()> {
let mut new_item = Item::new(ItemType::Span);
if cfg!(feature = "processing") && config.processing_enabled() {
let span_v2 = span.map_value(relay_spans::span_v1_to_span_v2);
if cfg!(feature = "processing") && ctx.config.processing_enabled() {
let use_measurements_smart_conversion = ctx
.project_info
.has_feature(Feature::MeasurementsSmartConversion);
let span_v2 = span.map_value(|span| {
relay_spans::span_v1_to_span_v2(span, use_measurements_smart_conversion)
});
let payload = match span_v2.to_json() {
Ok(payload) => payload,
Err(err) => {
Expand Down Expand Up @@ -306,7 +311,7 @@ mod tests {
let spans = extract_from_event(
managed_envelope.envelope().dsc(),
&event,
&Default::default(),
processing::Context::for_test(),
Some(0.1),
EventMetricsExtracted(false),
SpansExtracted(false),
Expand Down
17 changes: 12 additions & 5 deletions relay-spans/src/v1_to_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::name::name_for_attributes;
///
/// - `tags`, `sentry_tags`, `measurements` and `data` are transferred to `attributes`.
/// - Nested `data` items are encoded as JSON.
pub fn span_v1_to_span_v2(span_v1: SpanV1) -> SpanV2 {
pub fn span_v1_to_span_v2(span_v1: SpanV1, use_measurements_smart_conversion: bool) -> SpanV2 {
Comment thread
loewenheim marked this conversation as resolved.
let SpanV1 {
timestamp,
start_timestamp,
Expand Down Expand Up @@ -62,9 +62,16 @@ pub fn span_v1_to_span_v2(span_v1: SpanV1) -> SpanV2 {
if let Some(measurements) = measurements.into_value() {
for (key, measurement) in measurements.0 {
let key = match key.as_str() {
// TODO: If these measurements were defined in conventions we could get rid of the match entirely
"client_sample_rate" => SENTRY__CLIENT_SAMPLE_RATE,
"server_sample_rate" => SENTRY__SERVER_SAMPLE_RATE,
other => other,
other => {
if use_measurements_smart_conversion {
relay_conventions::measurement_to_attribute(other).unwrap_or(other)
} else {
other
}
}
};

attributes.insert_if_missing(key, || match measurement {
Expand Down Expand Up @@ -314,7 +321,7 @@ mod tests {
});

let span_v1 = SpanV1::from_value(json.into()).into_value().unwrap();
let span_v2 = span_v1_to_span_v2(span_v1);
let span_v2 = span_v1_to_span_v2(span_v1, false);

let annotated_span_v2: Annotated<SpanV2> = Annotated::new(span_v2);
insta::assert_json_snapshot!(SerializableAnnotated(&annotated_span_v2), @r#"
Expand Down Expand Up @@ -481,7 +488,7 @@ mod tests {
}
"###);

let span_v2 = span_v1_to_span_v2(span_v1);
let span_v2 = span_v1_to_span_v2(span_v1, false);

// The `name` and the `sentry.segment.name` attribute are the same as the transaction.
insta::assert_json_snapshot!(SerializableAnnotated(&Annotated::new(span_v2)), @r###"
Expand Down Expand Up @@ -511,7 +518,7 @@ mod tests {
fn start_timestamp() {
let json = r#"{"timestamp": 123, "end_timestamp": "invalid data"}"#;
let span_v1 = Annotated::<SpanV1>::from_json(json).unwrap();
let span_v2 = span_v1_to_span_v2(span_v1.into_value().unwrap());
let span_v2 = span_v1_to_span_v2(span_v1.into_value().unwrap(), false);

// Parsed version is still fine:
assert_eq!(
Expand Down
39 changes: 32 additions & 7 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,20 @@ def envelope_with_transaction_and_spans(start: datetime, end: datetime) -> Envel
return envelope


@pytest.mark.parametrize("measurements_conversion", ["direct", "smart"])
def test_span_ingestion_with_performance_scores(
mini_sentry, relay_with_processing, spans_consumer
mini_sentry, relay_with_processing, spans_consumer, measurements_conversion
):
spans_consumer = spans_consumer()
relay = relay_with_processing()

project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
if measurements_conversion == "smart":
project_config["config"]["features"] = [
"projects:relay-measurements-smart-conversion"
]

project_config["config"]["performanceScore"] = {
"profiles": [
{
Expand Down Expand Up @@ -589,6 +595,29 @@ def test_span_ingestion_with_performance_scores(
# endpoint might overtake envelope
spans.sort(key=lambda msg: msg["span_id"])

if measurements_conversion == "smart":
measurements1 = {
"browser.web_vital.cls.value": 100.0,
"browser.web_vital.fcp.value": 200.0,
Comment on lines +600 to +601
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure the product can actually deal with these new attributes first. Coalesce etc. must work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a range of options for how defensive to be here, from "always double write the replacement attribute name and the literal measurement name" to "check if the literal measurement name wants to be backfilled in sentry-conventions.

"fid": 300.0,
"browser.web_vital.lcp.value": 400.0,
"browser.web_vital.ttfb.value": 500.0,
}
measurements2 = {
"browser.web_vital.inp.value": 100.0,
}
else:
measurements1 = {
"cls": 100.0,
"fcp": 200.0,
"fid": 300.0,
"lcp": 400.0,
"ttfb": 500.0,
}
measurements2 = {
"inp": 100.0,
}

expected_scores = [
{
"score.fcp": 0.14999972769539766,
Expand All @@ -606,19 +635,15 @@ def test_span_ingestion_with_performance_scores(
"score.weight.fid": 0.3,
"score.weight.lcp": 0.3,
"score.weight.ttfb": 0.0,
"cls": 100.0,
"fcp": 200.0,
"fid": 300.0,
"lcp": 400.0,
"ttfb": 500.0,
"score.cls": 0.0,
**measurements1,
},
{
"inp": 100.0,
"score.inp": 0.9948129113413748,
"score.ratio.inp": 0.9948129113413748,
"score.total": 0.9948129113413748,
"score.weight.inp": 1.0,
**measurements2,
},
]

Expand Down
Loading
Loading