diff --git a/relay-server/src/endpoints/project_configs.rs b/relay-server/src/endpoints/project_configs.rs index fcf53e4ab61..97df5950cd4 100644 --- a/relay-server/src/endpoints/project_configs.rs +++ b/relay-server/src/endpoints/project_configs.rs @@ -170,6 +170,16 @@ async fn inner( let project_info = match project.state() { ProjectState::Enabled(info) => info, + ProjectState::DummyAllowed => { + // We have no data for this project. + // + // This really would only happen if someone tried to run a managed relay behind a + // proxy relay, which currently is not a supported setup. + // + // To support this, the proxy Relay would have to act as a pure proxy and not fetch + // the project configuration from its own cache. + continue; + } ProjectState::Disabled => { // Don't insert project config. Downstream Relay will consider it disabled. continue; diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index b213fa40f99..dee680ad51d 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -17,10 +17,6 @@ impl ProjectKeyPair { } } - pub fn has_distinct_sampling_key(&self) -> bool { - self.own_key != self.sampling_key - } - pub fn from_envelope(envelope: &Envelope) -> Self { let own_key = envelope.meta().public_key(); let sampling_key = envelope.sampling_key().unwrap_or(own_key); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index c79ce7a69c8..7fb50e8f152 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -2,14 +2,15 @@ use std::error::Error; use std::num::NonZeroU8; -use std::sync::Arc; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize}; +use std::sync::{Arc, LazyLock}; use std::time::Duration; use ahash::RandomState; use chrono::DateTime; use chrono::Utc; +use relay_base_schema::project::ProjectKey; use relay_config::{Config, EnvelopeSpoolPartitioning}; use relay_system::Receiver; use relay_system::ServiceSpawn; @@ -26,7 +27,7 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::{EnvelopeProcessor, ProcessEnvelope}; -use crate::services::projects::cache::{ProjectCacheHandle, ProjectChange}; +use crate::services::projects::cache::{Project, ProjectCacheHandle, ProjectChange}; use crate::statsd::RelayCounters; use crate::MemoryChecker; @@ -44,7 +45,7 @@ pub use envelope_stack::EnvelopeStack; // pub for benchmarks pub use envelope_store::sqlite::SqliteEnvelopeStore; -use crate::services::projects::project::ProjectState; +use crate::services::projects::project::{ProjectInfo, ProjectState}; pub use common::ProjectKeyPair; mod common; @@ -536,91 +537,51 @@ impl EnvelopeBufferService { buffer: &mut PolymorphicEnvelopeBuffer, project_key_pair: ProjectKeyPair, ) -> Result<(), EnvelopeBufferError> { - let own_key = project_key_pair.own_key; - let own_project = services.project_cache_handle.get(own_key); - // We try to load the own project state and bail in case it's pending. - let own_project_info = match own_project.state() { - ProjectState::Enabled(info) => Some(info.clone()), - ProjectState::Disabled => None, - ProjectState::Pending => { - buffer.mark_ready(&own_key, false); + macro_rules! pop_envelope { + () => {{ + relay_log::trace!("EnvelopeBufferService: popping envelope"); + // If we arrived here, know that both projects are available, so we pop the envelope. + // + // Available, doesn't necessarily mean enabled/active. + let envelope = buffer.pop().await?; + let envelope = envelope.expect("Element disappeared despite exclusive excess"); + Managed::from_envelope(envelope, services.outcome_aggregator.clone()) + }}; + } + + match resolve_project(&services.project_cache_handle, project_key_pair) { + ResolvedProject::Enabled { + own_project, + own_project_info, + sampling_project_info, + } => { + let mut envelope = pop_envelope!(); + if own_project.check_envelope(&mut envelope).await.is_err() || envelope.is_empty() { + // Outcomes are emitted by `check_envelope`. + return Ok(()); + }; + + services.envelope_processor.send(ProcessEnvelope { + envelope: envelope.into(), + project_info: own_project_info, + rate_limits: own_project.rate_limits().current_limits(), + sampling_project_info, + }); + } + // If the own project state is disabled, we want to drop the envelope and early return since + // we can't do much about it. + ResolvedProject::Disabled => { + let _ = pop_envelope!().reject_err(Outcome::Invalid(DiscardReason::ProjectId)); + } + ResolvedProject::NotReady(key) => { + buffer.mark_ready(&key, false); relay_statsd::metric!( counter(RelayCounters::BufferProjectPending) += 1, partition_id = partition_tag ); - - return Ok(()); - } - }; - - let sampling_key = project_key_pair.sampling_key; - // If the projects are different, we load the project key of the sampling project. On the - // other hand, if they are the same, we just reuse the own project. - let sampling_project_info = if project_key_pair.has_distinct_sampling_key() { - // We try to load the sampling project state and bail in case it's pending. - match services.project_cache_handle.get(sampling_key).state() { - ProjectState::Enabled(info) => Some(info.clone()), - ProjectState::Disabled => None, - ProjectState::Pending => { - buffer.mark_ready(&sampling_key, false); - relay_statsd::metric!( - counter(RelayCounters::BufferProjectPending) += 1, - partition_id = partition_tag - ); - - return Ok(()); - } } - } else { - own_project_info.clone() - }; - - relay_log::trace!("EnvelopeBufferService: popping envelope"); - - // If we arrived here, know that both projects are available, so we pop the envelope. - let envelope = buffer - .pop() - .await? - .expect("Element disappeared despite exclusive excess"); - - // If the own project state is disabled, we want to drop the envelope and early return since - // we can't do much about it. - let Some(own_project_info) = own_project_info else { - let mut managed_envelope = - ManagedEnvelope::new(envelope, services.outcome_aggregator.clone()); - managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); - - return Ok(()); - }; - - // We only extract the sampling project info if both projects belong to the same org. - let sampling_project_info = sampling_project_info - .filter(|info| info.organization_id == own_project_info.organization_id); - - let mut managed_envelope = - Managed::from_envelope(envelope, services.outcome_aggregator.clone()); - - if own_project - .check_envelope(&mut managed_envelope) - .await - .is_err() - { - // Outcomes are emitted by `check_envelope`. - return Ok(()); - }; - - if managed_envelope.is_empty() { - // Nothing left to process. - return Ok(()); } - services.envelope_processor.send(ProcessEnvelope { - envelope: managed_envelope.into(), - project_info: own_project_info.clone(), - rate_limits: own_project.rate_limits().current_limits(), - sampling_project_info: sampling_project_info.clone(), - }); - Ok(()) } @@ -747,6 +708,103 @@ impl Service for EnvelopeBufferService { } } +/// Resolves the project and project information for an envelope about to be popped from the buffer. +/// +/// Resolves the own and sampling project information +fn resolve_project( + project_cache: &ProjectCacheHandle, + ProjectKeyPair { + own_key, + sampling_key, + }: ProjectKeyPair, +) -> ResolvedProject<'_> { + static DUMMY_CONFIG: LazyLock> = LazyLock::new(|| { + Arc::new(ProjectInfo { + project_id: None, + last_change: None, + rev: Default::default(), + public_keys: Default::default(), + slug: None, + config: Default::default(), + organization_id: None, + upstream: None, + }) + }); + + let own_project = project_cache.get(own_key); + let own_project_info = match own_project.state() { + ProjectState::Enabled(info) => info.clone(), + ProjectState::DummyAllowed => { + return ResolvedProject::Enabled { + own_project, + // Since downstream requires a project config, we re-use this dummy config. + // + // This is how Relay historically always handled its proxy mode. + // It would make sense to instead of passing down this dummy, making the project + // config state here optional or similarly typed to the project state. + own_project_info: Arc::clone(&DUMMY_CONFIG), + sampling_project_info: None, + }; + } + ProjectState::Disabled => return ResolvedProject::Disabled, + ProjectState::Pending => return ResolvedProject::NotReady(own_key), + }; + + // If the projects are different, we load the project key of the sampling project. On the + // other hand, if they are the same, we just reuse the own project. + let sampling_project_info = match own_key == sampling_key { + // For matching keys, we can re-use the existing config. + true => Some(own_project_info.clone()), + // If the sampling project is distinct, we need also fetch that config. + false => { + match project_cache.get(sampling_key).state() { + ProjectState::Enabled(info) => { + // The sampling project key must belong to the same organization as the own project key. + // + // Dynamic sampling does not work across organizations, we also want to have a clear separation + // of data between organizations. + (info.organization_id == own_project_info.organization_id) + .then(|| Arc::clone(info)) + } + ProjectState::DummyAllowed => { + // This case should never happen, the own project info would already be dummy allowed. + debug_assert!(false); + None + } + ProjectState::Disabled => None, + ProjectState::Pending => return ResolvedProject::NotReady(sampling_key), + } + } + }; + + ResolvedProject::Enabled { + own_project, + own_project_info, + sampling_project_info, + } +} + +/// State returned from [`resolve_project`]. +enum ResolvedProject<'a> { + /// The project is enabled and data for it can be processed. + Enabled { + /// The own project. + own_project: Project<'a>, + /// The own, enabled, project info. + own_project_info: Arc, + /// The sampling project info. + /// + /// May be `None` when the sampling project is disabled or from a different organization. + sampling_project_info: Option>, + }, + /// The project is disabled. + Disabled, + /// The project information isn't ready. + /// + /// This may be returned for either the own project or the sampling project. + NotReady(ProjectKey), +} + /// The spooler uses internal time based mechanics and to not make the tests actually wait /// it's good to use `#[tokio::test(start_paused = true)]`. For memory based spooling, this will /// just work. diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 6892f36ae08..f0b43a03eeb 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -164,6 +164,17 @@ impl AggregatorService { let project_info = match project.state() { ProjectState::Enabled(info) => Arc::clone(info), + // The dummy state should never happen, as a proxy Relay must not use the + // metrics aggregator. + ProjectState::DummyAllowed => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + "metrics aggregator requires a project config" + ); + // Drop the bucket. + continue; + } ProjectState::Disabled => continue, // Drop the bucket. ProjectState::Pending => { // Return to the aggregator, which will assign a new flush time. diff --git a/relay-server/src/services/projects/cache/handle.rs b/relay-server/src/services/projects/cache/handle.rs index 8f2bac9d8de..bac0adfe7b0 100644 --- a/relay-server/src/services/projects/cache/handle.rs +++ b/relay-server/src/services/projects/cache/handle.rs @@ -53,6 +53,7 @@ impl ProjectCacheHandle { Ok(project) => { match project.state() { ProjectState::Enabled(_) => "enabled", + ProjectState::DummyAllowed => "dummy_allowed", ProjectState::Disabled => "disabled", ProjectState::Pending => "pending", } diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index a09d434ed80..26aaa7c8e8b 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -47,6 +47,7 @@ impl<'a> Project<'a> { ) -> Result> { let state = match self.state() { ProjectState::Enabled(state) => Some(Arc::clone(state)), + ProjectState::DummyAllowed => None, ProjectState::Disabled => { // TODO(jjbayer): We should refactor this function to either return a Result or // handle envelope rejections internally, but not both. diff --git a/relay-server/src/services/projects/cache/state.rs b/relay-server/src/services/projects/cache/state.rs index d0fb1eac99a..c581e679558 100644 --- a/relay-server/src/services/projects/cache/state.rs +++ b/relay-server/src/services/projects/cache/state.rs @@ -1026,9 +1026,9 @@ mod tests { // The old cached state is still available and not replaced. assert_state!(store, project_key, ProjectState::Disabled); - let fetch = fetch.complete(ProjectState::new_allowed().into()); + let fetch = fetch.complete(ProjectState::DummyAllowed.into()); assert!(store.complete_fetch(fetch).is_none()); - assert_state!(store, project_key, ProjectState::Enabled(_)); + assert_state!(store, project_key, ProjectState::DummyAllowed); } #[tokio::test(start_paused = true)] diff --git a/relay-server/src/services/projects/project/mod.rs b/relay-server/src/services/projects/project/mod.rs index b4be3867b77..b9dd76056fd 100644 --- a/relay-server/src/services/projects/project/mod.rs +++ b/relay-server/src/services/projects/project/mod.rs @@ -15,6 +15,12 @@ pub use self::serialize::*; pub enum ProjectState { /// A valid project that is not disabled. Enabled(Arc), + /// A dummy state for Relay instances which do not synchronize project configs with their upstream. + /// + /// This is used by proxy Relay instances which still communicate with the project cache, + /// but never fetch project configs from upstream. In this configuration data must still be + /// forwarded even without a project config. + DummyAllowed, /// A project that was marked as "gone" by the upstream. This variant does not expose /// any other project information. Disabled, @@ -28,28 +34,13 @@ pub enum ProjectState { } impl ProjectState { - /// Project state for an unknown but allowed project. - /// - /// This state is used for forwarding in Proxy mode. - pub fn new_allowed() -> Self { - Self::Enabled(Arc::new(ProjectInfo { - project_id: None, - last_change: None, - rev: Default::default(), - public_keys: Default::default(), - slug: None, - config: Default::default(), - organization_id: None, - upstream: None, - })) - } - /// Runs a post-deserialization step to normalize the project config (e.g. legacy fields). pub fn sanitized(self, is_processing: bool) -> Self { match self { Self::Enabled(state) => { Self::Enabled(Arc::new(state.as_ref().clone().sanitized(is_processing))) } + Self::DummyAllowed => Self::DummyAllowed, Self::Disabled => Self::Disabled, Self::Pending => Self::Pending, } @@ -64,7 +55,7 @@ impl ProjectState { pub fn enabled(self) -> Option> { match self { Self::Enabled(info) => Some(info), - Self::Disabled | Self::Pending => None, + Self::DummyAllowed | Self::Disabled | Self::Pending => None, } } @@ -74,7 +65,7 @@ impl ProjectState { pub fn revision(&self) -> Revision { match &self { Self::Enabled(info) => info.rev.clone(), - Self::Disabled | Self::Pending => Revision::default(), + Self::DummyAllowed | Self::Disabled | Self::Pending => Revision::default(), } } diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs index 0d2c02dbfbd..c909885e209 100644 --- a/relay-server/src/services/projects/source/mod.rs +++ b/relay-server/src/services/projects/source/mod.rs @@ -61,7 +61,7 @@ impl ProjectSource { current_revision: Revision, ) -> Result { match self.config.relay_mode() { - RelayMode::Proxy => return Ok(ProjectState::new_allowed().into()), + RelayMode::Proxy => return Ok(ProjectState::DummyAllowed.into()), RelayMode::Managed => (), // Proceed with loading the config from redis or upstream }