diff --git a/Cargo.lock b/Cargo.lock index 6490aa1f1e..69b8a710fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8821,6 +8821,7 @@ dependencies = [ "tower-service", "tracing", "tracing-opentelemetry", + "tracing-subscriber", "wasmtime", "wasmtime-wasi", "wasmtime-wasi-http", diff --git a/crates/factor-outbound-http/Cargo.toml b/crates/factor-outbound-http/Cargo.toml index 395a904e92..51046dc150 100644 --- a/crates/factor-outbound-http/Cargo.toml +++ b/crates/factor-outbound-http/Cargo.toml @@ -35,6 +35,7 @@ wasmtime-wasi-http = { workspace = true } spin-common = { path = "../common" } spin-factor-variables = { path = "../factor-variables" } spin-factors-test = { path = "../factors-test" } +tracing-subscriber = { version = "0.3", default-features = false, features = ["registry"] } [features] default = ["spin-cli"] diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index df0fa6f8d9..908aa825d6 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -197,40 +197,45 @@ impl p3::WasiHttpHooks for InstanceHttpHooks { .and_then(|v| v.between_bytes_timeout) .unwrap_or(DEFAULT_TIMEOUT), }; - Box::new(async { - match request_sender - .send( - request.map(|body| body.map_err(p3_to_p2_error_code).boxed_unsync()), - config, - ) - .await - { - Ok(IncomingResponse { - resp, - between_bytes_timeout, - .. - }) => Ok(( - resp.map(|body| { - BetweenBytesTimeoutBody { - body, - sleep: None, - timeout: between_bytes_timeout, + Box::new( + async { + match request_sender + .send( + request.map(|body| body.map_err(p3_to_p2_error_code).boxed_unsync()), + config, + ) + .await + { + Ok(IncomingResponse { + resp, + between_bytes_timeout, + .. + }) => Ok(( + resp.map(|body| { + BetweenBytesTimeoutBody { + body, + sleep: None, + timeout: between_bytes_timeout, + } + .boxed_unsync() + }), + Box::new(async { + // TODO: Can we plumb connection errors through to here, or + // will `hyper_util::client::legacy::Client` pass them all + // via the response body? + Ok(()) + }) as Box + Send>, + )), + Err(http_error) => match http_error.downcast() { + Ok(error_code) => { + Err(TrappableError::from(p2_to_p3_error_code(error_code))) } - .boxed_unsync() - }), - Box::new(async { - // TODO: Can we plumb connection errors through to here, or - // will `hyper_util::client::legacy::Client` pass them all - // via the response body? - Ok(()) - }) as Box + Send>, - )), - Err(http_error) => match http_error.downcast() { - Ok(error_code) => Err(TrappableError::from(p2_to_p3_error_code(error_code))), - Err(trap) => Err(TrappableError::trap(trap)), - }, + Err(trap) => Err(TrappableError::trap(trap)), + }, + } } - }) + .in_current_span(), + ) } } diff --git a/crates/factor-outbound-http/tests/factor_test.rs b/crates/factor-outbound-http/tests/factor_test.rs index 002615b21e..74b402e1ef 100644 --- a/crates/factor-outbound-http/tests/factor_test.rs +++ b/crates/factor-outbound-http/tests/factor_test.rs @@ -1,7 +1,10 @@ +use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::bail; +use bytes::Bytes; use http::{Request, Uri}; +use http_body_util::{BodyExt, Empty, combinators::UnsyncBoxBody}; use spin_common::{assert_matches, assert_not_matches}; use spin_factor_outbound_http::{ ErrorCode, HostFutureIncomingResponse, OutboundHttpFactor, SelfRequestOrigin, @@ -12,8 +15,19 @@ use spin_factor_variables::VariablesFactor; use spin_factors::{RuntimeFactors, anyhow}; use spin_factors_test::{TestEnvironment, toml}; use spin_world::async_trait; +use tracing::{ + Subscriber, + field::{Field, Visit}, + span::{self, Record}, +}; +use tracing_subscriber::{ + Layer, + layer::{Context, SubscriberExt}, + registry::LookupSpan, +}; use wasmtime_wasi::p2::Pollable; use wasmtime_wasi_http::p2::types::OutgoingRequestConfig; +use wasmtime_wasi_http::p3::{RequestOptions, bindings::http::types as p3_types}; #[derive(RuntimeFactors)] struct TestFactors { @@ -176,3 +190,92 @@ fn assert_discard_prefix_error(future_resp: HostFutureIncomingResponse) { | ErrorCode::DnsError(_)), ); } + +// Regression: deferred `Span::record(...)` calls (e.g. `url.full`) must +// land on the `spin_outbound_http.send_request` span created by +// `#[instrument]`. Uses the current_thread runtime so the thread-local +// subscriber covers all async work. +#[tokio::test(flavor = "current_thread")] +async fn p3_send_request_propagates_span_to_async_work() -> anyhow::Result<()> { + let layer = CaptureLayer::default(); + let records = Arc::clone(&layer.records); + let subscriber = tracing_subscriber::registry().with(layer); + let _guard = tracing::subscriber::set_default(subscriber); + + let mut state = test_instance_state("https://*", true).await?; + let p3_view = OutboundHttpFactor::get_wasi_p3_http_impl(&mut state).unwrap(); + // [100::1] is the IPv6 discard prefix — connection fails fast. + let req = Request::get("https://[100::1]:443").body(empty_p3_body())?; + let result_fut = p3_view + .hooks + .send_request(req, fast_p3_options(), p3_noop_cleanup_fut()); + let _ = Box::into_pin(result_fut).await; + + let records = records.lock().unwrap(); + assert!( + records.iter().any(|(span, field)| { + span == "spin_outbound_http.send_request" && field == "url.full" + }), + "`url.full` missing from `spin_outbound_http.send_request` span — \ + async block likely lost its `.in_current_span()` wrapper. \ + Recorded: {records:?}" + ); + Ok(()) +} + +type CapturedRecords = Arc>>; + +#[derive(Default)] +struct CaptureLayer { + records: CapturedRecords, +} + +impl Layer for CaptureLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_record(&self, id: &span::Id, values: &Record<'_>, ctx: Context<'_, S>) { + let span_name = ctx + .span(id) + .map(|s| s.name().to_string()) + .unwrap_or_default(); + let mut v = CaptureVisitor { + span_name: &span_name, + records: &self.records, + }; + values.record(&mut v); + } +} + +struct CaptureVisitor<'a> { + span_name: &'a str, + records: &'a CapturedRecords, +} + +impl Visit for CaptureVisitor<'_> { + fn record_debug(&mut self, f: &Field, _v: &dyn std::fmt::Debug) { + self.records + .lock() + .unwrap() + .push((self.span_name.to_string(), f.name().to_string())); + } +} + +fn empty_p3_body() -> UnsyncBoxBody { + Empty::::new() + .map_err(|never: std::convert::Infallible| match never {}) + .boxed_unsync() +} + +fn fast_p3_options() -> Option { + Some(RequestOptions { + connect_timeout: Some(Duration::from_millis(10)), + first_byte_timeout: Some(Duration::from_millis(10)), + between_bytes_timeout: Some(Duration::from_millis(10)), + }) +} + +fn p3_noop_cleanup_fut() +-> Box> + Send> { + Box::new(async { Ok(()) }) +}