diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index 908aa825d..9b80e1df2 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -10,7 +10,7 @@ use std::{ time::Duration, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::channel::oneshot; use http::{ HeaderMap, Uri, @@ -138,6 +138,7 @@ impl p3::WasiHttpHooks for InstanceHttpHooks { url.full = Empty, http.request.method = %request.method(), otel.name = %request.method(), + http.response.body.size = Empty, http.response.status_code = Empty, server.address = Empty, server.port = Empty, @@ -216,6 +217,8 @@ impl p3::WasiHttpHooks for InstanceHttpHooks { body, sleep: None, timeout: between_bytes_timeout, + byte_count: 0, + span: Some(Span::current()), } .boxed_unsync() }), @@ -246,6 +249,8 @@ pin_project_lite::pin_project! { #[pin] sleep: Option, timeout: Duration, + byte_count: u64, + span: Option, } } @@ -258,9 +263,33 @@ impl> Body for BetweenBytesTimeoutBody { cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { let mut me = self.project(); - match me.body.poll_frame(cx) { + match me.body.as_mut().poll_frame(cx) { Poll::Ready(value) => { me.sleep.as_mut().set(None); + + let mut record_body_size_once = |body_size: u64| { + if let Some(span) = me.span.take() { + span.record("http.response.body.size", body_size); + } + }; + + match &value { + Some(Ok(frame)) => { + if let Some(data) = frame.data_ref() { + *me.byte_count += data.remaining() as u64; + } + if me.body.as_ref().is_end_stream() { + record_body_size_once(*me.byte_count); + } + } + None => { + record_body_size_once(*me.byte_count); + } + Some(Err(e)) => { + tracing::warn!("error reading response body: {e:?}"); + } + } + Poll::Ready(value.map(|v| v.map_err(p2_to_p3_error_code))) } Poll::Pending => {