Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/factor-outbound-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ wasmtime-wasi-http = { workspace = true }
spin-common = { path = "../common" }
spin-factor-variables = { path = "../factor-variables" }
spin-factors-test = { path = "../factors-test" }
tracing-subscriber = { version = "0.3", default-features = false, features = ["registry"] }

[features]
default = ["spin-cli"]
Expand Down
69 changes: 37 additions & 32 deletions crates/factor-outbound-http/src/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,40 +197,45 @@ impl p3::WasiHttpHooks for InstanceHttpHooks {
.and_then(|v| v.between_bytes_timeout)
.unwrap_or(DEFAULT_TIMEOUT),
};
Box::new(async {
match request_sender
.send(
request.map(|body| body.map_err(p3_to_p2_error_code).boxed_unsync()),
config,
)
.await
{
Ok(IncomingResponse {
resp,
between_bytes_timeout,
..
}) => Ok((
resp.map(|body| {
BetweenBytesTimeoutBody {
body,
sleep: None,
timeout: between_bytes_timeout,
Box::new(
async {
match request_sender
.send(
request.map(|body| body.map_err(p3_to_p2_error_code).boxed_unsync()),
config,
)
.await
{
Ok(IncomingResponse {
resp,
between_bytes_timeout,
..
}) => Ok((
resp.map(|body| {
BetweenBytesTimeoutBody {
body,
sleep: None,
timeout: between_bytes_timeout,
}
.boxed_unsync()
}),
Box::new(async {
// TODO: Can we plumb connection errors through to here, or
// will `hyper_util::client::legacy::Client` pass them all
// via the response body?
Ok(())
}) as Box<dyn Future<Output = _> + Send>,
)),
Err(http_error) => match http_error.downcast() {
Ok(error_code) => {
Err(TrappableError::from(p2_to_p3_error_code(error_code)))
}
.boxed_unsync()
}),
Box::new(async {
// TODO: Can we plumb connection errors through to here, or
// will `hyper_util::client::legacy::Client` pass them all
// via the response body?
Ok(())
}) as Box<dyn Future<Output = _> + Send>,
)),
Err(http_error) => match http_error.downcast() {
Ok(error_code) => Err(TrappableError::from(p2_to_p3_error_code(error_code))),
Err(trap) => Err(TrappableError::trap(trap)),
},
Err(trap) => Err(TrappableError::trap(trap)),
},
}
}
})
.in_current_span(),
)
}
}

Expand Down
103 changes: 103 additions & 0 deletions crates/factor-outbound-http/tests/factor_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::bail;
use bytes::Bytes;
use http::{Request, Uri};
use http_body_util::{BodyExt, Empty, combinators::UnsyncBoxBody};
use spin_common::{assert_matches, assert_not_matches};
use spin_factor_outbound_http::{
ErrorCode, HostFutureIncomingResponse, OutboundHttpFactor, SelfRequestOrigin,
Expand All @@ -12,8 +15,19 @@ use spin_factor_variables::VariablesFactor;
use spin_factors::{RuntimeFactors, anyhow};
use spin_factors_test::{TestEnvironment, toml};
use spin_world::async_trait;
use tracing::{
Subscriber,
field::{Field, Visit},
span::{self, Record},
};
use tracing_subscriber::{
Layer,
layer::{Context, SubscriberExt},
registry::LookupSpan,
};
use wasmtime_wasi::p2::Pollable;
use wasmtime_wasi_http::p2::types::OutgoingRequestConfig;
use wasmtime_wasi_http::p3::{RequestOptions, bindings::http::types as p3_types};

#[derive(RuntimeFactors)]
struct TestFactors {
Expand Down Expand Up @@ -176,3 +190,92 @@ fn assert_discard_prefix_error(future_resp: HostFutureIncomingResponse) {
| ErrorCode::DnsError(_)),
);
}

// Regression: deferred `Span::record(...)` calls (e.g. `url.full`) must
// land on the `spin_outbound_http.send_request` span created by
// `#[instrument]`. Uses the current_thread runtime so the thread-local
// subscriber covers all async work.
#[tokio::test(flavor = "current_thread")]
async fn p3_send_request_propagates_span_to_async_work() -> anyhow::Result<()> {
let layer = CaptureLayer::default();
let records = Arc::clone(&layer.records);
let subscriber = tracing_subscriber::registry().with(layer);
let _guard = tracing::subscriber::set_default(subscriber);

let mut state = test_instance_state("https://*", true).await?;
let p3_view = OutboundHttpFactor::get_wasi_p3_http_impl(&mut state).unwrap();
// [100::1] is the IPv6 discard prefix — connection fails fast.
let req = Request::get("https://[100::1]:443").body(empty_p3_body())?;
let result_fut = p3_view
.hooks
.send_request(req, fast_p3_options(), p3_noop_cleanup_fut());
let _ = Box::into_pin(result_fut).await;

let records = records.lock().unwrap();
assert!(
records.iter().any(|(span, field)| {
span == "spin_outbound_http.send_request" && field == "url.full"
}),
"`url.full` missing from `spin_outbound_http.send_request` span — \
async block likely lost its `.in_current_span()` wrapper. \
Recorded: {records:?}"
);
Ok(())
}

type CapturedRecords = Arc<Mutex<Vec<(String, String)>>>;

#[derive(Default)]
struct CaptureLayer {
records: CapturedRecords,
}

impl<S> Layer<S> for CaptureLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_record(&self, id: &span::Id, values: &Record<'_>, ctx: Context<'_, S>) {
let span_name = ctx
.span(id)
.map(|s| s.name().to_string())
.unwrap_or_default();
let mut v = CaptureVisitor {
span_name: &span_name,
records: &self.records,
};
values.record(&mut v);
}
}

struct CaptureVisitor<'a> {
span_name: &'a str,
records: &'a CapturedRecords,
}

impl Visit for CaptureVisitor<'_> {
fn record_debug(&mut self, f: &Field, _v: &dyn std::fmt::Debug) {
self.records
.lock()
.unwrap()
.push((self.span_name.to_string(), f.name().to_string()));
}
}

fn empty_p3_body() -> UnsyncBoxBody<Bytes, p3_types::ErrorCode> {
Empty::<Bytes>::new()
.map_err(|never: std::convert::Infallible| match never {})
.boxed_unsync()
}

fn fast_p3_options() -> Option<RequestOptions> {
Some(RequestOptions {
connect_timeout: Some(Duration::from_millis(10)),
first_byte_timeout: Some(Duration::from_millis(10)),
between_bytes_timeout: Some(Duration::from_millis(10)),
})
}

fn p3_noop_cleanup_fut()
-> Box<dyn std::future::Future<Output = Result<(), p3_types::ErrorCode>> + Send> {
Box::new(async { Ok(()) })
}
Loading