diff --git a/relay-server/src/endpoints/minidump.rs b/relay-server/src/endpoints/minidump.rs index 91df064822b..9bb67f1ebdc 100644 --- a/relay-server/src/endpoints/minidump.rs +++ b/relay-server/src/endpoints/minidump.rs @@ -5,6 +5,7 @@ use axum::routing::{MethodRouter, post}; use bytes::Bytes; use bzip2::read::BzDecoder; use flate2::read::GzDecoder; +use futures::{self, Stream}; use liblzma::read::XzDecoder; use multer::{Field, Multipart}; use relay_config::Config; @@ -23,7 +24,7 @@ use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; use crate::extractors::{RawContentType, RequestMeta}; -use crate::managed::{Managed, ManagedResult}; +use crate::managed::{Managed, ManagedResult, Rejected}; use crate::middlewares; use crate::service::ServiceState; use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome}; @@ -60,9 +61,42 @@ const BZIP2_MAGIC_HEADER: &[u8] = b"\x42\x5A\x68"; /// Magic bytes for zstd compressed minidump containers. const ZSTD_MAGIC_HEADER: &[u8] = b"\x28\xB5\x2F\xFD"; +/// Longest magic header we recognize (XZ is 6 bytes). +const MAGIC_PEEK: usize = 6; + /// Content types by which standalone uploads can be recognized. const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"]; +#[derive(Debug, thiserror::Error)] +enum PeekError { + #[error("compressed minidump payloads are not supported for streaming upload")] + Compressed, + #[error(transparent)] + Source(#[from] E), +} + +/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd). +/// Returns the original stream contents if not. +async fn reject_if_compressed( + stream: S, +) -> Result> + Send, PeekError> +where + S: Stream> + Send, + E: Send, +{ + let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?; + + if head.starts_with(GZIP_MAGIC_HEADER) + || head.starts_with(XZ_MAGIC_HEADER) + || head.starts_with(BZIP2_MAGIC_HEADER) + || head.starts_with(ZSTD_MAGIC_HEADER) + { + Err(PeekError::Compressed) + } else { + Ok(stream) + } +} + fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> { if !data.starts_with(MINIDUMP_MAGIC_HEADER_LE) && !data.starts_with(MINIDUMP_MAGIC_HEADER_BE) { relay_log::trace!("invalid minidump file"); @@ -236,7 +270,7 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> { match upload_context.upload_decision(item.attachment_type()) { UploadDecision::Upload => { let content_type = field.content_type().map(ToString::to_string); - Ok(upload_to_objectstore( + Ok(upload_to_objectstore_checked( field, content_type, item, @@ -272,6 +306,52 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> { } } +/// Wrapper around [`upload_to_objectstore`] that enforces that minidumps are not compressed. +pub async fn upload_to_objectstore_checked( + stream: S, + content_type: Option, + item: Managed, + config: &Config, + scoping: Scoping, + upload: &Addr, + referrer: &'static str, +) -> Result, Rejected<()>> +where + S: futures::Stream> + Send + 'static, + E: Into> + Send + 'static, +{ + if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) { + return upload_to_objectstore( + stream, + content_type, + item, + config, + scoping, + upload, + referrer, + ) + .await; + } + + let stream = match reject_if_compressed(stream).await { + Ok(stream) => stream, + Err(_) => { + return Err(item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump))); + } + }; + + upload_to_objectstore( + stream, + content_type, + item, + config, + scoping, + upload, + referrer, + ) + .await +} + async fn multipart_to_envelope( multipart: Multipart<'static>, meta: RequestMeta, @@ -410,8 +490,12 @@ async fn raw_minidump_to_envelope( if let Some(upload_context) = upload_context && matches!(upload_context.upload_minidumps, UploadDecision::Upload) { + let stream = reject_if_compressed(request.into_body().into_data_stream()) + .await + .map_err(|_| BadStoreRequest::InvalidMinidump)?; + item = upload_to_objectstore( - request.into_body().into_data_stream(), + stream, Some(content_type.to_string()).filter(|s| !s.is_empty()), item, config, diff --git a/relay-server/src/utils/mod.rs b/relay-server/src/utils/mod.rs index febbdcaa33b..4be775f45c1 100644 --- a/relay-server/src/utils/mod.rs +++ b/relay-server/src/utils/mod.rs @@ -14,7 +14,7 @@ mod sizes; mod sleep_handle; mod split_off; mod statsd; -mod stream; +pub mod stream; mod thread_pool; pub mod tus; diff --git a/relay-server/src/utils/stream/mod.rs b/relay-server/src/utils/stream/mod.rs index 07985e7fd4b..c4956e29006 100644 --- a/relay-server/src/utils/stream/mod.rs +++ b/relay-server/src/utils/stream/mod.rs @@ -1,7 +1,9 @@ mod bounded; mod metered; +mod peek; mod retryable; pub use bounded::*; pub use metered::*; +pub use peek::*; pub use retryable::*; diff --git a/relay-server/src/utils/stream/peek.rs b/relay-server/src/utils/stream/peek.rs new file mode 100644 index 00000000000..a417b869c40 --- /dev/null +++ b/relay-server/src/utils/stream/peek.rs @@ -0,0 +1,26 @@ +use bytes::{Bytes, BytesMut}; +use futures::{Stream, StreamExt, future, stream}; + +/// Peeks `n` bytes into a `stream`, returning the peeked bytes together with the stream containing +/// all the bytes of the original `stream`. +pub async fn peek_n( + stream: S, + n: usize, +) -> Result<(Bytes, impl Stream> + Send), E> +where + S: Stream> + Send, + E: Send, +{ + let mut stream = Box::pin(stream); + let mut head = BytesMut::with_capacity(n); + while head.len() < n { + match stream.next().await { + Some(Ok(chunk)) => head.extend_from_slice(&chunk), + Some(Err(e)) => return Err(e), + None => break, + } + } + let head = head.freeze(); + let replay = stream::once(future::ready(Ok(head.clone()))).chain(stream); + Ok((head, replay)) +} diff --git a/tests/integration/test_minidump.py b/tests/integration/test_minidump.py index aeb4cb9aa12..a6c82a629a8 100644 --- a/tests/integration/test_minidump.py +++ b/tests/integration/test_minidump.py @@ -920,7 +920,7 @@ def test_minidump_objectstore_uploads( ): project_id = 42 minidump_content = b"MDMP content" - log_content = b"Some log file content" + log_content = b"\x1f\x8b Some log file content" project_config = mini_sentry.add_full_project_config(project_id) if upload_attachments: @@ -1338,3 +1338,65 @@ def test_minidump_large_attachment_skipped_when_no_project_fetching(mini_sentry, assert len(envelope.items) == 1 assert envelope.items[0].payload.bytes == minidump_content + + +@pytest.mark.parametrize( + "magic,filename", + [ + pytest.param(b"\x1f\x8b", "minidump.dmp.gz", id="gzip"), + pytest.param(b"\xfd7zXZ\x00", "minidump.dmp.xz", id="xz"), + pytest.param(b"BZh", "minidump.dmp.bz2", id="bzip2"), + pytest.param(b"\x28\xb5\x2f\xfd", "minidump.dmp.zst", id="zstd"), + ], +) +def test_minidump_objectstore_uploads_rejects_compressed( + mini_sentry, + relay, + magic, + filename, +): + """ + When streaming a minidump to objectstore, a compressed payload should be reject + (untill objectstore or minidump can handle them). + """ + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "projects:relay-minidump-uploads" + ) + + relay = relay( + mini_sentry, + options={ + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + }, + }, + ) + + with pytest.raises(HTTPError) as exc_info: + relay.send_minidump( + project_id=project_id, + files=[(MINIDUMP_ATTACHMENT_NAME, filename, magic + b"\x00" * 32)], + ) + + assert exc_info.value.response.status_code == 400 + + assert mini_sentry.get_aggregated_outcomes() == [ + { + "category": 4, + "outcome": 3, + "project_id": 42, + "reason": "invalid_minidump", + "quantity": 1, + }, + { + "category": 22, + "outcome": 3, + "project_id": 42, + "reason": "invalid_minidump", + "quantity": 1, + }, + ]