Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions relay-server/src/endpoints/minidump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use axum::routing::{MethodRouter, post};
use bytes::Bytes;
use bzip2::read::BzDecoder;
use flate2::read::GzDecoder;
use futures::{self, Stream};
use liblzma::read::XzDecoder;
use multer::{Field, Multipart};
use relay_config::Config;
use relay_config::{Config, HttpEncoding};
use relay_dynamic_config::Feature;
use relay_event_schema::protocol::EventId;
use relay_quotas::{DataCategory, RateLimits, Scoping};
Expand All @@ -23,7 +24,7 @@ use crate::constants::{ITEM_NAME_BREADCRUMBS1, ITEM_NAME_BREADCRUMBS2, ITEM_NAME
use crate::endpoints::common::{self, BadStoreRequest, TextResponse, upload_to_objectstore};
use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType};
use crate::extractors::{RawContentType, RequestMeta};
use crate::managed::{Managed, ManagedResult};
use crate::managed::{Managed, ManagedResult, Rejected};
use crate::middlewares;
use crate::service::ServiceState;
use crate::services::outcome::{DiscardAttachmentType, DiscardItemType, DiscardReason, Outcome};
Expand Down Expand Up @@ -60,9 +61,42 @@ const BZIP2_MAGIC_HEADER: &[u8] = b"\x42\x5A\x68";
/// Magic bytes for zstd compressed minidump containers.
const ZSTD_MAGIC_HEADER: &[u8] = b"\x28\xB5\x2F\xFD";

/// Longest magic header we recognize (XZ is 6 bytes).
const MAGIC_PEEK: usize = 6;

/// Content types by which standalone uploads can be recognized.
const MINIDUMP_RAW_CONTENT_TYPES: &[&str] = &["application/octet-stream", "application/x-dmp"];

#[derive(Debug, thiserror::Error)]
enum PeekError<E> {
#[error("compressed minidump payloads are not supported for streaming upload")]
Compressed,
#[error(transparent)]
Source(#[from] E),
}

/// Peek the first bytes of `stream` and reject if they look compressed (gzip/xz/bzip2/zstd).
/// Returns the original stream contents if not.
async fn reject_if_compressed<S, E>(
stream: S,
) -> Result<impl Stream<Item = Result<Bytes, E>> + Send, PeekError<E>>
where
S: Stream<Item = Result<Bytes, E>> + Send,
E: Send,
{
let (head, stream) = utils::stream::peek_n(stream, MAGIC_PEEK).await?;

if head.starts_with(GZIP_MAGIC_HEADER)
|| head.starts_with(XZ_MAGIC_HEADER)
|| head.starts_with(BZIP2_MAGIC_HEADER)
|| head.starts_with(ZSTD_MAGIC_HEADER)
Comment on lines +89 to +92
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could put this in a helper function get_compression(data: &[u8]) -> Option<HttpEncoding> and share the helper with decoder_from.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we can use it in decoder_from since that logic is slightly more sophisticated:

/// Creates a decoder based on the magic bytes the minidump payload
fn decoder_from(minidump_data: Bytes) -> Option<Box<dyn Read>> {
if minidump_data.starts_with(GZIP_MAGIC_HEADER) {
return Some(Box::new(GzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(XZ_MAGIC_HEADER) {
return Some(Box::new(XzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(BZIP2_MAGIC_HEADER) {
return Some(Box::new(BzDecoder::new(Cursor::new(minidump_data))));
} else if minidump_data.starts_with(ZSTD_MAGIC_HEADER) {
return match ZstdDecoder::new(Cursor::new(minidump_data)) {
Ok(decoder) => Some(Box::new(decoder)),
Err(ref err) => {
relay_log::error!(error = err as &dyn Error, "failed to create ZstdDecoder");
None
}
};
}
None
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was something like

fn decoder_from(minidump_data: Bytes) -> Option<Box<dyn Read>> {
    match get_compression(minidump_data) {
        HttpEncoding::Gzip => Some(Box::new(GzDecoder::new(Cursor::new(minidump_data))))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we're hesitant to add uncommon variants to the HttpEncoding enum so this would require a separate enum -> not worth the effort.

{
Err(PeekError::Compressed)
} else {
Ok(stream)
}
}

fn validate_minidump(data: &[u8]) -> Result<(), BadStoreRequest> {
if !data.starts_with(MINIDUMP_MAGIC_HEADER_LE) && !data.starts_with(MINIDUMP_MAGIC_HEADER_BE) {
relay_log::trace!("invalid minidump file");
Expand Down Expand Up @@ -236,7 +270,7 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> {
match upload_context.upload_decision(item.attachment_type()) {
UploadDecision::Upload => {
let content_type = field.content_type().map(ToString::to_string);
Ok(upload_to_objectstore(
Ok(upload_to_objectstore_checked(
field,
content_type,
item,
Expand Down Expand Up @@ -272,6 +306,52 @@ impl<'a> AttachmentStrategy for MinidumpAttachmentStrategy<'a> {
}
}

/// Wrapper around [`upload_to_objectstore`] that enforces that minidumps are not compressed.
pub async fn upload_to_objectstore_checked<S, E>(
stream: S,
content_type: Option<String>,
item: Managed<Item>,
config: &Config,
scoping: Scoping,
upload: &Addr<Upload>,
referrer: &'static str,
) -> Result<Managed<Item>, Rejected<()>>
Comment thread
tobias-wilfert marked this conversation as resolved.
where
S: futures::Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + Send + 'static,
{
if !matches!(item.attachment_type(), Some(AttachmentType::Minidump)) {
return upload_to_objectstore(
stream,
content_type,
item,
config,
scoping,
upload,
referrer,
)
.await;
}

let stream = match reject_if_compressed(stream).await {
Ok(stream) => stream,
Err(_) => {
return Err(item.reject_err(Outcome::Invalid(DiscardReason::InvalidMinidump)));
}
Comment on lines +338 to +340
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also reject the item here if we just fail to read for the stream for any reason, but I think that is ok.

Comment thread
tobias-wilfert marked this conversation as resolved.
};

upload_to_objectstore(
stream,
content_type,
item,
config,
scoping,
upload,
referrer,
)
.await
}

Comment thread
tobias-wilfert marked this conversation as resolved.
async fn multipart_to_envelope(
multipart: Multipart<'static>,
meta: RequestMeta,
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod sizes;
mod sleep_handle;
mod split_off;
mod statsd;
mod stream;
pub mod stream;
mod thread_pool;
pub mod tus;

Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/utils/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod bounded;
mod metered;
mod peek;
mod retryable;

pub use bounded::*;
pub use metered::*;
pub use peek::*;
pub use retryable::*;
26 changes: 26 additions & 0 deletions relay-server/src/utils/stream/peek.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use bytes::{Bytes, BytesMut};
use futures::{Stream, StreamExt, future, stream};

/// Peeks `n` bytes into a `stream`, returning the peeked bytes together with the stream containing
/// all the bytes of the original `stream`.
pub async fn peek_n<S, E>(
stream: S,
n: usize,
) -> Result<(Bytes, impl Stream<Item = Result<Bytes, E>> + Send), E>
where
S: Stream<Item = Result<Bytes, E>> + Send,
E: Send,
{
let mut stream = Box::pin(stream);
let mut head = BytesMut::with_capacity(n);
while head.len() < n {
match stream.next().await {
Some(Ok(chunk)) => head.extend_from_slice(&chunk),
Some(Err(e)) => return Err(e),
None => break,
}
}
let head = head.freeze();
let replay = stream::once(future::ready(Ok(head.clone()))).chain(stream);
Ok((head, replay))
}
64 changes: 63 additions & 1 deletion tests/integration/test_minidump.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ def test_minidump_objectstore_uploads(
):
project_id = 42
minidump_content = b"MDMP content"
log_content = b"Some log file content"
log_content = b"\x1f\x8b Some log file content"
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this to ensure that compressed attachment still work.


project_config = mini_sentry.add_full_project_config(project_id)
if upload_attachments:
Expand Down Expand Up @@ -1338,3 +1338,65 @@ def test_minidump_large_attachment_skipped_when_no_project_fetching(mini_sentry,

assert len(envelope.items) == 1
assert envelope.items[0].payload.bytes == minidump_content


@pytest.mark.parametrize(
"magic,filename",
[
pytest.param(b"\x1f\x8b", "minidump.dmp.gz", id="gzip"),
pytest.param(b"\xfd7zXZ\x00", "minidump.dmp.xz", id="xz"),
pytest.param(b"BZh", "minidump.dmp.bz2", id="bzip2"),
pytest.param(b"\x28\xb5\x2f\xfd", "minidump.dmp.zst", id="zstd"),
],
)
def test_minidump_objectstore_uploads_rejects_compressed(
mini_sentry,
relay,
magic,
filename,
):
"""
When streaming a minidump to objectstore, a compressed payload should be reject
(untill objectstore or minidump can handle them).
"""
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"].setdefault("features", []).append(
"projects:relay-minidump-uploads"
)

relay = relay(
mini_sentry,
options={
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
},
},
)

with pytest.raises(HTTPError) as exc_info:
relay.send_minidump(
project_id=project_id,
files=[(MINIDUMP_ATTACHMENT_NAME, filename, magic + b"\x00" * 32)],
)

assert exc_info.value.response.status_code == 400

assert mini_sentry.get_aggregated_outcomes() == [
{
"category": 4,
"outcome": 3,
"project_id": 42,
"reason": "invalid_minidump",
"quantity": 1,
},
{
"category": 22,
"outcome": 3,
"project_id": 42,
"reason": "invalid_minidump",
"quantity": 1,
},
]
Comment thread
tobias-wilfert marked this conversation as resolved.
Loading