Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions relay-server/src/endpoints/project_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ async fn inner(

let project_info = match project.state() {
ProjectState::Enabled(info) => info,
ProjectState::DummyAllowed => {
// We have no data for this project.
//
// This really would only happen if someone tried to run a managed relay behind a
// proxy relay, which currently is not a supported setup.
//
// To support this, the proxy Relay would have to act as a pure proxy and not fetch
// the project configuration from its own cache.
continue;
}
ProjectState::Disabled => {
// Don't insert project config. Downstream Relay will consider it disabled.
continue;
Expand Down
4 changes: 0 additions & 4 deletions relay-server/src/services/buffer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ impl ProjectKeyPair {
}
}

pub fn has_distinct_sampling_key(&self) -> bool {
self.own_key != self.sampling_key
}

pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Expand Down
220 changes: 139 additions & 81 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

use std::error::Error;
use std::num::NonZeroU8;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use ahash::RandomState;
use chrono::DateTime;
use chrono::Utc;
use relay_base_schema::project::ProjectKey;
use relay_config::{Config, EnvelopeSpoolPartitioning};
use relay_system::Receiver;
use relay_system::ServiceSpawn;
Expand All @@ -26,7 +27,7 @@ use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::{EnvelopeProcessor, ProcessEnvelope};
use crate::services::projects::cache::{ProjectCacheHandle, ProjectChange};
use crate::services::projects::cache::{Project, ProjectCacheHandle, ProjectChange};
use crate::statsd::RelayCounters;

use crate::MemoryChecker;
Expand All @@ -44,7 +45,7 @@ pub use envelope_stack::EnvelopeStack;
// pub for benchmarks
pub use envelope_store::sqlite::SqliteEnvelopeStore;

use crate::services::projects::project::ProjectState;
use crate::services::projects::project::{ProjectInfo, ProjectState};
pub use common::ProjectKeyPair;

mod common;
Expand Down Expand Up @@ -536,91 +537,51 @@ impl EnvelopeBufferService {
buffer: &mut PolymorphicEnvelopeBuffer,
project_key_pair: ProjectKeyPair,
) -> Result<(), EnvelopeBufferError> {
let own_key = project_key_pair.own_key;
let own_project = services.project_cache_handle.get(own_key);
// We try to load the own project state and bail in case it's pending.
let own_project_info = match own_project.state() {
ProjectState::Enabled(info) => Some(info.clone()),
ProjectState::Disabled => None,
ProjectState::Pending => {
buffer.mark_ready(&own_key, false);
macro_rules! pop_envelope {
() => {{
relay_log::trace!("EnvelopeBufferService: popping envelope");
// If we arrived here, know that both projects are available, so we pop the envelope.
//
// Available, doesn't necessarily mean enabled/active.
let envelope = buffer.pop().await?;
let envelope = envelope.expect("Element disappeared despite exclusive excess");
Managed::from_envelope(envelope, services.outcome_aggregator.clone())
}};
}

match resolve_project(&services.project_cache_handle, project_key_pair) {
ResolvedProject::Enabled {
own_project,
own_project_info,
sampling_project_info,
} => {
let mut envelope = pop_envelope!();
if own_project.check_envelope(&mut envelope).await.is_err() || envelope.is_empty() {
// Outcomes are emitted by `check_envelope`.
return Ok(());
};

services.envelope_processor.send(ProcessEnvelope {
envelope: envelope.into(),
project_info: own_project_info,
rate_limits: own_project.rate_limits().current_limits(),
sampling_project_info,
});
}
// If the own project state is disabled, we want to drop the envelope and early return since
// we can't do much about it.
ResolvedProject::Disabled => {
let _ = pop_envelope!().reject_err(Outcome::Invalid(DiscardReason::ProjectId));
}
ResolvedProject::NotReady(key) => {
buffer.mark_ready(&key, false);
relay_statsd::metric!(
counter(RelayCounters::BufferProjectPending) += 1,
partition_id = partition_tag
);

return Ok(());
}
};

let sampling_key = project_key_pair.sampling_key;
// If the projects are different, we load the project key of the sampling project. On the
// other hand, if they are the same, we just reuse the own project.
let sampling_project_info = if project_key_pair.has_distinct_sampling_key() {
// We try to load the sampling project state and bail in case it's pending.
match services.project_cache_handle.get(sampling_key).state() {
ProjectState::Enabled(info) => Some(info.clone()),
ProjectState::Disabled => None,
ProjectState::Pending => {
buffer.mark_ready(&sampling_key, false);
relay_statsd::metric!(
counter(RelayCounters::BufferProjectPending) += 1,
partition_id = partition_tag
);

return Ok(());
}
}
} else {
own_project_info.clone()
};

relay_log::trace!("EnvelopeBufferService: popping envelope");

// If we arrived here, know that both projects are available, so we pop the envelope.
let envelope = buffer
.pop()
.await?
.expect("Element disappeared despite exclusive excess");

// If the own project state is disabled, we want to drop the envelope and early return since
// we can't do much about it.
let Some(own_project_info) = own_project_info else {
let mut managed_envelope =
ManagedEnvelope::new(envelope, services.outcome_aggregator.clone());
managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId));

return Ok(());
};

// We only extract the sampling project info if both projects belong to the same org.
let sampling_project_info = sampling_project_info
.filter(|info| info.organization_id == own_project_info.organization_id);

let mut managed_envelope =
Managed::from_envelope(envelope, services.outcome_aggregator.clone());

if own_project
.check_envelope(&mut managed_envelope)
.await
.is_err()
{
// Outcomes are emitted by `check_envelope`.
return Ok(());
};

if managed_envelope.is_empty() {
// Nothing left to process.
return Ok(());
}

services.envelope_processor.send(ProcessEnvelope {
envelope: managed_envelope.into(),
project_info: own_project_info.clone(),
rate_limits: own_project.rate_limits().current_limits(),
sampling_project_info: sampling_project_info.clone(),
});

Ok(())
}

Expand Down Expand Up @@ -747,6 +708,103 @@ impl Service for EnvelopeBufferService {
}
}

/// Resolves the project and project information for an envelope about to be popped from the buffer.
///
/// Resolves the own and sampling project information
fn resolve_project(
project_cache: &ProjectCacheHandle,
ProjectKeyPair {
own_key,
sampling_key,
}: ProjectKeyPair,
) -> ResolvedProject<'_> {
static DUMMY_CONFIG: LazyLock<Arc<ProjectInfo>> = LazyLock::new(|| {
Arc::new(ProjectInfo {
project_id: None,
last_change: None,
rev: Default::default(),
public_keys: Default::default(),
slug: None,
config: Default::default(),
organization_id: None,
upstream: None,
})
});

let own_project = project_cache.get(own_key);
let own_project_info = match own_project.state() {
ProjectState::Enabled(info) => info.clone(),
ProjectState::DummyAllowed => {
return ResolvedProject::Enabled {
own_project,
// Since downstream requires a project config, we re-use this dummy config.
//
// This is how Relay historically always handled its proxy mode.
// It would make sense to instead of passing down this dummy, making the project
// config state here optional or similarly typed to the project state.
own_project_info: Arc::clone(&DUMMY_CONFIG),
sampling_project_info: None,
};
}
ProjectState::Disabled => return ResolvedProject::Disabled,
ProjectState::Pending => return ResolvedProject::NotReady(own_key),
};

// If the projects are different, we load the project key of the sampling project. On the
// other hand, if they are the same, we just reuse the own project.
let sampling_project_info = match own_key == sampling_key {
// For matching keys, we can re-use the existing config.
true => Some(own_project_info.clone()),
// If the sampling project is distinct, we need also fetch that config.
false => {
match project_cache.get(sampling_key).state() {
ProjectState::Enabled(info) => {
// The sampling project key must belong to the same organization as the own project key.
//
// Dynamic sampling does not work across organizations, we also want to have a clear separation
// of data between organizations.
(info.organization_id == own_project_info.organization_id)
.then(|| Arc::clone(info))
}
ProjectState::DummyAllowed => {
// This case should never happen, the own project info would already be dummy allowed.
debug_assert!(false);
None
}
ProjectState::Disabled => None,
ProjectState::Pending => return ResolvedProject::NotReady(sampling_key),
}
}
};

ResolvedProject::Enabled {
own_project,
own_project_info,
sampling_project_info,
}
}

/// State returned from [`resolve_project`].
enum ResolvedProject<'a> {
/// The project is enabled and data for it can be processed.
Enabled {
/// The own project.
own_project: Project<'a>,
/// The own, enabled, project info.
own_project_info: Arc<ProjectInfo>,
/// The sampling project info.
///
/// May be `None` when the sampling project is disabled or from a different organization.
sampling_project_info: Option<Arc<ProjectInfo>>,
},
/// The project is disabled.
Disabled,
/// The project information isn't ready.
///
/// This may be returned for either the own project or the sampling project.
NotReady(ProjectKey),
}

/// The spooler uses internal time based mechanics and to not make the tests actually wait
/// it's good to use `#[tokio::test(start_paused = true)]`. For memory based spooling, this will
/// just work.
Expand Down
11 changes: 11 additions & 0 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ impl AggregatorService {

let project_info = match project.state() {
ProjectState::Enabled(info) => Arc::clone(info),
// The dummy state should never happen, as a proxy Relay must not use the
// metrics aggregator.
ProjectState::DummyAllowed => {
relay_log::error!(
tags.aggregator = self.aggregator.name(),
tags.project_key = project_key.as_str(),
"metrics aggregator requires a project config"
);
// Drop the bucket.
continue;
}
ProjectState::Disabled => continue, // Drop the bucket.
ProjectState::Pending => {
// Return to the aggregator, which will assign a new flush time.
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/projects/cache/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl ProjectCacheHandle {
Ok(project) => {
match project.state() {
ProjectState::Enabled(_) => "enabled",
ProjectState::DummyAllowed => "dummy_allowed",
ProjectState::Disabled => "disabled",
ProjectState::Pending => "pending",
}
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/projects/cache/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl<'a> Project<'a> {
) -> Result<RateLimits, Rejected<DiscardReason>> {
let state = match self.state() {
ProjectState::Enabled(state) => Some(Arc::clone(state)),
ProjectState::DummyAllowed => None,
ProjectState::Disabled => {
// TODO(jjbayer): We should refactor this function to either return a Result or
// handle envelope rejections internally, but not both.
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/projects/cache/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,9 +1026,9 @@ mod tests {
// The old cached state is still available and not replaced.
assert_state!(store, project_key, ProjectState::Disabled);

let fetch = fetch.complete(ProjectState::new_allowed().into());
let fetch = fetch.complete(ProjectState::DummyAllowed.into());
assert!(store.complete_fetch(fetch).is_none());
assert_state!(store, project_key, ProjectState::Enabled(_));
assert_state!(store, project_key, ProjectState::DummyAllowed);
}

#[tokio::test(start_paused = true)]
Expand Down
Loading
Loading