diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index 6a4d78fbc74..d66273b9b22 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -19,7 +19,7 @@ workspace = true [dependencies] anyhow = { workspace = true } -clap = { workspace = true, features = ["derive"] } +clap = { workspace = true, features = ["derive", "env"] } clap_complete = { workspace = true } codex-app-server = { workspace = true } codex-app-server-daemon = { workspace = true } diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 2a5ce171b7a..848d3e16bb1 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -542,6 +542,14 @@ struct ExecServerCommand { /// Use Agent Identity auth from CODEX_ACCESS_TOKEN for remote registration. #[arg(long = "use-agent-identity-auth", requires = "remote")] use_agent_identity_auth: bool, + + /// Opt into Noise-encrypted remote relay communication. + #[arg( + long = "enable-noise", + env = "CODEX_EXEC_SERVER_ENABLE_NOISE", + requires = "remote" + )] + enable_noise: bool, } #[derive(Debug, clap::Subcommand)] @@ -1611,6 +1619,9 @@ async fn run_exec_server_command( if let Some(name) = cmd.name { remote_config.name = name; } + if cmd.enable_noise { + remote_config.relay_protocol = codex_exec_server::RemoteRelayProtocol::Noise; + } codex_exec_server::run_remote_environment(remote_config, runtime_paths).await?; Ok(()) } else { @@ -3222,6 +3233,30 @@ mod tests { .expect("exec-server should support root --strict-config"); } + #[test] + fn exec_server_enable_noise_is_explicit_remote_opt_in() { + let cli = MultitoolCli::try_parse_from([ + "codex", + "exec-server", + "--remote", + "https://registry.example", + "--environment-id", + "environment-1", + "--enable-noise", + ]) + .expect("parse"); + + assert_matches!( + cli.subcommand, + Some(Subcommand::ExecServer(ExecServerCommand { + enable_noise: true, + .. + })) + ); + MultitoolCli::try_parse_from(["codex", "exec-server", "--enable-noise"]) + .expect_err("--enable-noise should require --remote"); + } + #[test] fn root_strict_config_is_rejected_for_unsupported_subcommands() { let cli = MultitoolCli::try_parse_from(["codex", "--strict-config", "mcp", "list"]) diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 8fa1a9eb75c..11178852839 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -22,10 +22,15 @@ the wire. The CLI entrypoint supports: - `ws://IP:PORT` (default) -- `--remote URL --environment-id ID [--name NAME]` +- `--remote URL --environment-id ID [--name NAME] [--enable-noise]` Remote mode registers the local exec-server with the environment registry, then reconnects to the service-provided rendezvous websocket as the environment. +It uses the legacy relay contract by default for compatibility with existing +registries and harnesses. `--enable-noise` explicitly opts into Noise-encrypted +relay registration and requires both the registry and harness to support the +Noise relay contract. Container entrypoints can make the same explicit choice +by setting `CODEX_EXEC_SERVER_ENABLE_NOISE=true`. It uses the standard Codex ChatGPT sign-in state; run `codex login` first when remote registration needs authentication. Containerized callers that receive an Agent Identity JWT in `CODEX_ACCESS_TOKEN` can opt into that auth path with @@ -45,7 +50,8 @@ codex exec-server \ Wire framing: - local websocket: one JSON-RPC message per websocket frame -- remote websocket: binary protobuf relay frames carrying JSON-RPC payloads +- legacy remote websocket: binary protobuf relay frames carrying JSON-RPC payloads +- Noise remote websocket: binary protobuf relay frames carrying encrypted payloads ## Remote Relay Message Format diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 19906c1fc61..93df5cc10c8 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -110,6 +110,7 @@ pub use protocol::WriteParams; pub use protocol::WriteResponse; pub use protocol::WriteStatus; pub use remote::RemoteEnvironmentConfig; +pub use remote::RemoteRelayProtocol; pub use remote::run_remote_environment; pub use runtime_paths::ExecServerRuntimePaths; pub use server::DEFAULT_LISTEN_URL; diff --git a/codex-rs/exec-server/src/remote.rs b/codex-rs/exec-server/src/remote.rs index ae7242377f4..2dc663974ad 100644 --- a/codex-rs/exec-server/src/remote.rs +++ b/codex-rs/exec-server/src/remote.rs @@ -3,18 +3,29 @@ use std::time::Duration; use codex_api::SharedAuthProvider; use reqwest::StatusCode; use serde::Deserialize; +use serde::Serialize; use tokio::time::sleep; -use tokio_tungstenite::connect_async; +use tokio::time::timeout; +use tokio_tungstenite::connect_async_with_config; +use tracing::info; use tracing::warn; use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use crate::ExecServerError; use crate::ExecServerRuntimePaths; +use crate::NoiseChannelIdentity; +use crate::NoiseChannelPublicKey; +use crate::noise_relay::HarnessKeyValidator; +use crate::noise_relay::noise_relay_websocket_config; +use crate::noise_relay::run_noise_multiplexed_environment; use crate::relay::run_multiplexed_environment; use crate::server::ConnectionProcessor; const ERROR_BODY_PREVIEW_BYTES: usize = 4096; +const ENVIRONMENT_REGISTRY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); +const MAX_REMOTE_ENVIRONMENT_ID_LEN: usize = 256; +const REMOTE_RENDEZVOUS_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Clone)] struct EnvironmentRegistryClient { @@ -40,11 +51,16 @@ impl EnvironmentRegistryClient { auth_provider, http: reqwest::Client::builder() .redirect(reqwest::redirect::Policy::none()) + .timeout(ENVIRONMENT_REGISTRY_REQUEST_TIMEOUT) .build()?, }) } - async fn register_environment( + /// Register using the original body-less registry contract. + /// + /// This remains the default so a new exec-server can still connect through + /// registries and harnesses that have not rolled out Noise relay support. + async fn register_legacy_environment( &self, environment_id: &str, ) -> Result { @@ -60,6 +76,64 @@ impl EnvironmentRegistryClient { self.parse_json_response(response).await } + /// Register the exec-server's static Noise identity with a Noise-aware registry. + /// + /// Supplying this request body is the protocol-level opt in. Registries can + /// therefore distinguish Noise registrations from the legacy body-less + /// contract without guessing based on binary version or rollout state. + async fn register_noise_environment( + &self, + environment_id: &str, + executor_public_key: &NoiseChannelPublicKey, + ) -> Result { + let response = self + .http + .post(endpoint_url( + &self.base_url, + &format!("/cloud/environment/{environment_id}/register"), + )) + .headers(self.auth_provider.to_auth_headers()) + .json(&EnvironmentRegistryRegistrationRequest { + security_profile: NOISE_RELAY_SECURITY_PROFILE, + executor_public_key, + }) + .send() + .await?; + self.parse_json_response(response).await + } + + async fn validate_harness_key( + &self, + environment_id: &str, + executor_registration_id: &str, + harness_public_key: &NoiseChannelPublicKey, + harness_key_authorization: &str, + ) -> Result<(), ExecServerError> { + let response = self + .http + .post(endpoint_url( + &self.base_url, + &format!("/cloud/environment/{environment_id}/validate"), + )) + .headers(self.auth_provider.to_auth_headers()) + .json(&EnvironmentRegistryHarnessKeyValidationRequest { + executor_registration_id, + harness_public_key, + harness_key_authorization, + }) + .send() + .await?; + let response = self + .parse_json_response::(response) + .await?; + if !response.valid { + return Err(ExecServerError::Protocol( + "environment registry rejected Noise relay harness key".to_string(), + )); + } + Ok(()) + } + async fn parse_json_response( &self, response: reqwest::Response, @@ -81,18 +155,112 @@ impl EnvironmentRegistryClient { } } +const NOISE_RELAY_SECURITY_PROFILE: &str = "noise_hybrid_ik_v1"; + +#[derive(Serialize)] +struct EnvironmentRegistryRegistrationRequest<'a> { + security_profile: &'static str, + executor_public_key: &'a NoiseChannelPublicKey, +} + #[derive(Debug, Clone, Eq, PartialEq, Deserialize)] struct EnvironmentRegistryRegistrationResponse { environment_id: String, url: String, } +#[derive(Debug, Clone, Eq, PartialEq, Deserialize)] +struct EnvironmentRegistryNoiseRegistrationResponse { + environment_id: String, + url: String, + security_profile: String, + executor_registration_id: String, +} + +#[derive(Serialize)] +struct EnvironmentRegistryHarnessKeyValidationRequest<'a> { + executor_registration_id: &'a str, + harness_public_key: &'a NoiseChannelPublicKey, + harness_key_authorization: &'a str, +} + +#[derive(Deserialize)] +struct EnvironmentRegistryHarnessKeyValidationResponse { + valid: bool, +} + +#[derive(Clone)] +struct RegistryHarnessKeyValidator { + client: EnvironmentRegistryClient, + environment_id: String, + executor_registration_id: String, +} + +impl HarnessKeyValidator for RegistryHarnessKeyValidator { + async fn validate_harness_key( + &self, + harness_public_key: &NoiseChannelPublicKey, + authorization: &str, + ) -> Result<(), ExecServerError> { + self.client + .validate_harness_key( + &self.environment_id, + &self.executor_registration_id, + harness_public_key, + authorization, + ) + .await + } +} + +/// Protocol used for an exec-server's registered remote relay. +/// +/// Legacy is intentionally the default during rollout. Noise must be selected +/// explicitly so mixed-version deployments keep the original registry and relay +/// contract until both endpoints are ready for Noise. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum RemoteRelayProtocol { + #[default] + Legacy, + Noise, +} + +enum RemoteRelayProtocolState { + Legacy, + Noise(NoiseChannelIdentity), +} + +enum RegisteredRemoteEnvironment { + Legacy(EnvironmentRegistryRegistrationResponse), + Noise { + response: EnvironmentRegistryNoiseRegistrationResponse, + identity: NoiseChannelIdentity, + }, +} + +impl RegisteredRemoteEnvironment { + fn environment_id(&self) -> &str { + match self { + Self::Legacy(response) => &response.environment_id, + Self::Noise { response, .. } => &response.environment_id, + } + } + + fn websocket_url(&self) -> &str { + match self { + Self::Legacy(response) => &response.url, + Self::Noise { response, .. } => &response.url, + } + } +} + /// Configuration for registering an exec-server for remote use. #[derive(Clone)] pub struct RemoteEnvironmentConfig { pub base_url: String, pub environment_id: String, pub name: String, + pub relay_protocol: RemoteRelayProtocol, auth_provider: SharedAuthProvider, } @@ -102,6 +270,7 @@ impl std::fmt::Debug for RemoteEnvironmentConfig { .field("base_url", &self.base_url) .field("environment_id", &self.environment_id) .field("name", &self.name) + .field("relay_protocol", &self.relay_protocol) .field("auth_provider", &"") .finish() } @@ -118,6 +287,7 @@ impl RemoteEnvironmentConfig { base_url, environment_id, name: "codex-exec-server".to_string(), + relay_protocol: RemoteRelayProtocol::Legacy, auth_provider, }) } @@ -133,23 +303,88 @@ pub async fn run_remote_environment( let client = EnvironmentRegistryClient::new(config.base_url.clone(), config.auth_provider.clone())?; let processor = ConnectionProcessor::new(runtime_paths); + let protocol_state = match config.relay_protocol { + RemoteRelayProtocol::Legacy => RemoteRelayProtocolState::Legacy, + RemoteRelayProtocol::Noise => { + RemoteRelayProtocolState::Noise(NoiseChannelIdentity::generate()?) + } + }; let mut backoff = Duration::from_secs(1); loop { - let response = client.register_environment(&config.environment_id).await?; - eprintln!( - "codex exec-server remote environment registered with environment_id {}", - response.environment_id + let registration = match &protocol_state { + RemoteRelayProtocolState::Legacy => RegisteredRemoteEnvironment::Legacy( + client + .register_legacy_environment(&config.environment_id) + .await?, + ), + RemoteRelayProtocolState::Noise(identity) => { + let response = client + .register_noise_environment(&config.environment_id, &identity.public_key()) + .await?; + if response.security_profile != NOISE_RELAY_SECURITY_PROFILE { + return Err(ExecServerError::Protocol(format!( + "environment registry returned unsupported security profile `{}`", + response.security_profile + ))); + } + RegisteredRemoteEnvironment::Noise { + response, + identity: identity.clone(), + } + } + }; + if registration.environment_id() != config.environment_id { + return Err(ExecServerError::Protocol( + "environment registry returned a different environment id".to_string(), + )); + } + let environment_id = registration.environment_id(); + info!( + "codex exec-server remote environment registered with environment_id {environment_id}" ); - - match connect_async(response.url.as_str()).await { - Ok((websocket, _)) => { + let websocket_config = match ®istration { + RegisteredRemoteEnvironment::Legacy(_) => None, + RegisteredRemoteEnvironment::Noise { .. } => Some(noise_relay_websocket_config()), + }; + + match timeout( + REMOTE_RENDEZVOUS_CONNECT_TIMEOUT, + connect_async_with_config( + registration.websocket_url(), + websocket_config, + /*disable_nagle*/ false, + ), + ) + .await + { + Ok(Ok((websocket, _))) => { backoff = Duration::from_secs(1); - run_multiplexed_environment(websocket, processor.clone()).await; + match registration { + RegisteredRemoteEnvironment::Legacy(_) => { + run_multiplexed_environment(websocket, processor.clone()).await; + } + RegisteredRemoteEnvironment::Noise { response, identity } => { + run_noise_multiplexed_environment( + websocket, + processor.clone(), + response.environment_id, + response.executor_registration_id.clone(), + identity, + RegistryHarnessKeyValidator { + client: client.clone(), + environment_id: config.environment_id.clone(), + executor_registration_id: response.executor_registration_id, + }, + ) + .await; + } + } } - Err(err) => { + Ok(Err(err)) => { warn!("failed to connect remote exec-server websocket: {err}"); } + Err(_) => warn!("timed out connecting remote exec-server websocket"), } sleep(backoff).await; @@ -158,12 +393,32 @@ pub async fn run_remote_environment( } fn normalize_environment_id(environment_id: String) -> Result { - let environment_id = environment_id.trim().to_string(); if environment_id.is_empty() { return Err(ExecServerError::EnvironmentRegistryConfig( "environment id is required for remote exec-server registration".to_string(), )); } + if environment_id.trim() != environment_id { + return Err(ExecServerError::EnvironmentRegistryConfig( + "environment id must not contain surrounding whitespace".to_string(), + )); + } + if environment_id.len() > MAX_REMOTE_ENVIRONMENT_ID_LEN { + return Err(ExecServerError::EnvironmentRegistryConfig(format!( + "environment id cannot be longer than {MAX_REMOTE_ENVIRONMENT_ID_LEN} characters" + ))); + } + // The ID is interpolated into authenticated registry request paths below. + // Keep it to one URL path component so a caller cannot use a delimiter to + // redirect the exec-server's registration credential to another endpoint. + if !environment_id + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + { + return Err(ExecServerError::EnvironmentRegistryConfig( + "environment id must contain only ASCII letters, numbers, '-' or '_'".to_string(), + )); + } Ok(environment_id) } @@ -242,125 +497,5 @@ fn preview_error_body(body: &str) -> Option { } #[cfg(test)] -mod tests { - use std::sync::Arc; - - use codex_api::AuthProvider; - use http::HeaderMap; - use http::HeaderValue; - use pretty_assertions::assert_eq; - use wiremock::Mock; - use wiremock::MockServer; - use wiremock::ResponseTemplate; - use wiremock::matchers::header; - use wiremock::matchers::method; - use wiremock::matchers::path; - - use super::*; - - #[derive(Debug)] - struct StaticRegistryAuthProvider; - - impl AuthProvider for StaticRegistryAuthProvider { - fn add_auth_headers(&self, headers: &mut HeaderMap) { - let _ = headers.insert( - http::header::AUTHORIZATION, - HeaderValue::from_static("Bearer registry-token"), - ); - let _ = headers.insert( - "ChatGPT-Account-ID", - HeaderValue::from_static("workspace-123"), - ); - } - } - - fn static_registry_auth_provider() -> SharedAuthProvider { - Arc::new(StaticRegistryAuthProvider) - } - - #[tokio::test] - async fn register_environment_posts_with_auth_provider_headers() { - let server = MockServer::start().await; - let config = RemoteEnvironmentConfig::new( - server.uri(), - "environment-requested".to_string(), - static_registry_auth_provider(), - ) - .expect("config"); - Mock::given(method("POST")) - .and(path("/cloud/environment/environment-requested/register")) - .and(header("authorization", "Bearer registry-token")) - .and(header("chatgpt-account-id", "workspace-123")) - .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ - "environment_id": "env-1", - "url": "wss://rendezvous.test/cloud-agent/default/ws/environment/env-1?role=environment&sig=abc" - }))) - .mount(&server) - .await; - let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) - .expect("client"); - - let response = client - .register_environment(&config.environment_id) - .await - .expect("register environment"); - - assert_eq!( - response, - EnvironmentRegistryRegistrationResponse { - environment_id: "env-1".to_string(), - url: "wss://rendezvous.test/cloud-agent/default/ws/environment/env-1?role=environment&sig=abc".to_string(), - } - ); - } - - #[tokio::test] - async fn register_environment_does_not_follow_redirects_with_auth_headers() { - let server = MockServer::start().await; - Mock::given(method("POST")) - .and(path("/cloud/environment/environment-requested/register")) - .and(header("authorization", "Bearer registry-token")) - .respond_with( - ResponseTemplate::new(302) - .insert_header("location", format!("{}/redirect-target", server.uri())), - ) - .mount(&server) - .await; - Mock::given(path("/redirect-target")) - .and(header("authorization", "Bearer registry-token")) - .respond_with(ResponseTemplate::new(200)) - .expect(0) - .mount(&server) - .await; - let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) - .expect("client"); - - let error = client - .register_environment("environment-requested") - .await - .expect_err("redirect response should not be followed"); - - assert!(matches!( - error, - ExecServerError::EnvironmentRegistryHttp { - status: StatusCode::FOUND, - .. - } - )); - } - - #[test] - fn debug_output_redacts_auth_provider() { - let config = RemoteEnvironmentConfig::new( - "https://registry.example".to_string(), - "env-1".to_string(), - static_registry_auth_provider(), - ) - .expect("config"); - - let debug = format!("{config:?}"); - - assert!(debug.contains("")); - assert!(!debug.contains("workspace-123")); - } -} +#[path = "remote_tests.rs"] +mod tests; diff --git a/codex-rs/exec-server/src/remote_tests.rs b/codex-rs/exec-server/src/remote_tests.rs new file mode 100644 index 00000000000..022e1e5dfcf --- /dev/null +++ b/codex-rs/exec-server/src/remote_tests.rs @@ -0,0 +1,232 @@ +use std::sync::Arc; + +use codex_api::AuthProvider; +use http::HeaderMap; +use http::HeaderValue; +use pretty_assertions::assert_eq; +use reqwest::StatusCode; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::body_bytes; +use wiremock::matchers::body_partial_json; +use wiremock::matchers::header; +use wiremock::matchers::method; +use wiremock::matchers::path; + +use super::*; + +#[derive(Debug)] +struct StaticRegistryAuthProvider; + +impl AuthProvider for StaticRegistryAuthProvider { + fn add_auth_headers(&self, headers: &mut HeaderMap) { + let _ = headers.insert( + http::header::AUTHORIZATION, + HeaderValue::from_static("Bearer registry-token"), + ); + let _ = headers.insert( + "ChatGPT-Account-ID", + HeaderValue::from_static("workspace-123"), + ); + } +} + +fn static_registry_auth_provider() -> SharedAuthProvider { + Arc::new(StaticRegistryAuthProvider) +} + +#[tokio::test] +async fn register_environment_posts_with_auth_provider_headers() { + let server = MockServer::start().await; + let config = RemoteEnvironmentConfig::new( + server.uri(), + "environment-requested".to_string(), + static_registry_auth_provider(), + ) + .expect("config"); + Mock::given(method("POST")) + .and(path("/cloud/environment/environment-requested/register")) + .and(header("authorization", "Bearer registry-token")) + .and(header("chatgpt-account-id", "workspace-123")) + .and(body_bytes(Vec::::new())) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "environment_id": "env-1", + "url": "wss://rendezvous.test/cloud-agent/default/ws/environment/env-1?role=environment&sig=abc", + }))) + .mount(&server) + .await; + let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) + .expect("client"); + let response = client + .register_legacy_environment(&config.environment_id) + .await + .expect("register environment"); + + assert_eq!( + response, + EnvironmentRegistryRegistrationResponse { + environment_id: "env-1".to_string(), + url: "wss://rendezvous.test/cloud-agent/default/ws/environment/env-1?role=environment&sig=abc".to_string(), + } + ); +} + +#[tokio::test] +async fn register_noise_environment_posts_security_profile_and_public_key() { + let server = MockServer::start().await; + let executor_public_key = NoiseChannelIdentity::generate() + .expect("identity") + .public_key(); + Mock::given(method("POST")) + .and(path("/cloud/environment/environment-requested/register")) + .and(header("authorization", "Bearer registry-token")) + .and(body_partial_json(serde_json::json!({ + "security_profile": NOISE_RELAY_SECURITY_PROFILE, + "executor_public_key": executor_public_key.clone(), + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "environment_id": "environment-requested", + "url": "wss://rendezvous.test/noise", + "security_profile": NOISE_RELAY_SECURITY_PROFILE, + "executor_registration_id": "registration-1", + }))) + .mount(&server) + .await; + let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) + .expect("client"); + + let response = client + .register_noise_environment("environment-requested", &executor_public_key) + .await + .expect("register Noise environment"); + + assert_eq!( + response, + EnvironmentRegistryNoiseRegistrationResponse { + environment_id: "environment-requested".to_string(), + url: "wss://rendezvous.test/noise".to_string(), + security_profile: NOISE_RELAY_SECURITY_PROFILE.to_string(), + executor_registration_id: "registration-1".to_string(), + } + ); +} + +#[tokio::test] +async fn validate_harness_key_requires_explicit_valid_response() { + let server = MockServer::start().await; + let harness_public_key = NoiseChannelIdentity::generate() + .expect("identity") + .public_key(); + Mock::given(method("POST")) + .and(path("/cloud/environment/environment-requested/validate")) + .and(header("authorization", "Bearer registry-token")) + .and(body_partial_json(serde_json::json!({ + "executor_registration_id": "registration-1", + "harness_public_key": harness_public_key.clone(), + "harness_key_authorization": "authorization-1", + }))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "valid": false, + }))) + .mount(&server) + .await; + let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) + .expect("client"); + + let error = client + .validate_harness_key( + "environment-requested", + "registration-1", + &harness_public_key, + "authorization-1", + ) + .await + .expect_err("a false validation response must fail closed"); + + assert!(matches!( + error, + ExecServerError::Protocol(message) + if message == "environment registry rejected Noise relay harness key" + )); +} + +#[tokio::test] +async fn register_legacy_environment_does_not_follow_redirects_with_auth_headers() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/cloud/environment/environment-requested/register")) + .and(header("authorization", "Bearer registry-token")) + .respond_with( + ResponseTemplate::new(302) + .insert_header("location", format!("{}/redirect-target", server.uri())), + ) + .mount(&server) + .await; + Mock::given(path("/redirect-target")) + .and(header("authorization", "Bearer registry-token")) + .respond_with(ResponseTemplate::new(200)) + .expect(0) + .mount(&server) + .await; + let client = EnvironmentRegistryClient::new(server.uri(), static_registry_auth_provider()) + .expect("client"); + let error = client + .register_legacy_environment("environment-requested") + .await + .expect_err("redirect response should not be followed"); + + assert!(matches!( + error, + ExecServerError::EnvironmentRegistryHttp { + status: StatusCode::FOUND, + .. + } + )); +} + +#[test] +fn debug_output_redacts_auth_provider() { + let config = RemoteEnvironmentConfig::new( + "https://registry.example".to_string(), + "env-1".to_string(), + static_registry_auth_provider(), + ) + .expect("config"); + + let debug = format!("{config:?}"); + + assert!(debug.contains("")); + assert!(!debug.contains("workspace-123")); + assert!(debug.contains("Legacy")); +} + +#[test] +fn remote_environment_config_accepts_cloud_environment_id() { + let environment_id = "ccarenv_b64_Y2Fhcy1zdGFnaW5nLWV4ZWN1dG9yLWVudmlyb25tZW50LTE".to_string(); + + let config = RemoteEnvironmentConfig::new( + "https://registry.example".to_string(), + environment_id.clone(), + static_registry_auth_provider(), + ) + .expect("config"); + + assert_eq!(config.environment_id, environment_id); + assert_eq!(config.relay_protocol, RemoteRelayProtocol::Legacy); +} + +#[test] +fn remote_environment_config_rejects_registry_path_injection() { + let error = RemoteEnvironmentConfig::new( + "https://registry.example".to_string(), + "ccarenv_b64_valid/../../status".to_string(), + static_registry_auth_provider(), + ) + .expect_err("path delimiter must not reach an authenticated registry request"); + + assert!(matches!( + error, + ExecServerError::EnvironmentRegistryConfig(message) if message.contains("ASCII letters") + )); +} diff --git a/codex-rs/exec-server/tests/relay.rs b/codex-rs/exec-server/tests/relay.rs index ff41dd525ed..4c63bf9e5a0 100644 --- a/codex-rs/exec-server/tests/relay.rs +++ b/codex-rs/exec-server/tests/relay.rs @@ -4,40 +4,51 @@ mod common; mod relay_proto; use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::time::Duration; use anyhow::Context; use anyhow::Result; -use anyhow::anyhow; -use anyhow::bail; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD; use codex_api::AuthProvider; -use codex_app_server_protocol::JSONRPCError; -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCNotification; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::RequestId; +use codex_exec_server::EnvironmentManager; +use codex_exec_server::ExecParams; +use codex_exec_server::ExecResponse; +use codex_exec_server::ExecServerClient; +use codex_exec_server::ExecServerError; use codex_exec_server::ExecServerRuntimePaths; -use codex_exec_server::InitializeParams; -use codex_exec_server::InitializeResponse; +use codex_exec_server::FsReadFileParams; +use codex_exec_server::NoiseChannelIdentity; +use codex_exec_server::NoiseChannelPublicKey; +use codex_exec_server::NoiseRendezvousConnectArgs; +use codex_exec_server::NoiseRendezvousConnectBundle; +use codex_exec_server::NoiseRendezvousConnectProvider; +use codex_exec_server::ProcessId; use codex_exec_server::RemoteEnvironmentConfig; +use codex_exec_server::RemoteRelayProtocol; +use codex_utils_absolute_path::AbsolutePathBuf; +use futures::FutureExt; use futures::SinkExt; use futures::StreamExt; +use futures::future::BoxFuture; use http::HeaderMap; use http::HeaderValue; use pretty_assertions::assert_eq; use prost::Message as ProstMessage; use relay_proto::RelayData; use relay_proto::RelayMessageFrame; -use relay_proto::RelayReset; use relay_proto::relay_message_frame; -use std::sync::Arc; +use tempfile::TempDir; use tokio::net::TcpListener; +use tokio::net::TcpStream; use tokio::time::timeout; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::accept_async; use tokio_tungstenite::tungstenite::Message; -use uuid::Uuid; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; @@ -45,10 +56,11 @@ use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; -const ENVIRONMENT_ID: &str = "env-mux-test"; +const ENVIRONMENT_ID: &str = "env-noise-relay-test"; +const EXECUTOR_REGISTRATION_ID: &str = "registration-1"; +const HARNESS_KEY_AUTHORIZATION: &str = "harness-key-authorization"; const REGISTRY_TOKEN: &str = "registry-token"; -const RELAY_MESSAGE_FRAME_VERSION: u32 = 1; -const TEST_TIMEOUT: Duration = Duration::from_secs(5); +const TEST_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Debug)] struct StaticRegistryAuthProvider; @@ -62,12 +74,60 @@ impl AuthProvider for StaticRegistryAuthProvider { } } +struct FailingNoiseConnectProvider { + attempts: Arc, +} + +impl NoiseRendezvousConnectProvider for FailingNoiseConnectProvider { + fn environment_id(&self) -> &str { + ENVIRONMENT_ID + } + + fn connect_args(&self) -> BoxFuture<'_, Result> { + self.attempts.fetch_add(1, Ordering::SeqCst); + async { + Err(ExecServerError::Protocol( + "test registry connect failure".to_string(), + )) + } + .boxed() + } +} + +struct WrongEnvironmentNoiseConnectProvider { + harness_identity: NoiseChannelIdentity, + executor_public_key: NoiseChannelPublicKey, +} + +impl NoiseRendezvousConnectProvider for WrongEnvironmentNoiseConnectProvider { + fn environment_id(&self) -> &str { + ENVIRONMENT_ID + } + + fn connect_args(&self) -> BoxFuture<'_, Result> { + async move { + Ok(NoiseRendezvousConnectArgs::new( + NoiseRendezvousConnectBundle { + websocket_url: "ws://127.0.0.1:1".to_string(), + environment_id: "wrong-environment".to_string(), + executor_registration_id: EXECUTOR_REGISTRATION_ID.to_string(), + executor_public_key: self.executor_public_key.clone(), + harness_key_authorization: HARNESS_KEY_AUTHORIZATION.to_string(), + }, + self.harness_identity.clone(), + "noise-relay-test".to_string(), + )) + } + .boxed() + } +} + fn static_registry_auth_provider() -> codex_api::SharedAuthProvider { Arc::new(StaticRegistryAuthProvider) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn multiplexed_remote_environment_routes_independent_virtual_streams() -> Result<()> { +async fn remote_environment_uses_legacy_relay_by_default() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:0").await?; let rendezvous_url = format!("ws://{}", listener.local_addr()?); let registry = MockServer::start().await; @@ -90,286 +150,331 @@ async fn multiplexed_remote_environment_routes_independent_virtual_streams() -> ENVIRONMENT_ID.to_string(), static_registry_auth_provider(), )?; + assert_eq!(config.relay_protocol, RemoteRelayProtocol::Legacy); let remote_environment = tokio::spawn(codex_exec_server::run_remote_environment( config, runtime_paths, )); - let (socket, _peer_addr) = timeout(TEST_TIMEOUT, listener.accept()) - .await - .context("remote environment should connect to fake rendezvous")??; - let mut websocket = timeout(TEST_TIMEOUT, accept_async(socket)) - .await - .context("fake rendezvous should accept environment websocket")??; - - let stream_a = "stream-a"; - let stream_b = "stream-b"; - send_relay_message( - &mut websocket, - stream_a, - /*seq*/ 0, - initialize_request(/*id*/ 1, "relay-test-a")?, - ) - .await?; - send_relay_message( - &mut websocket, - stream_b, - /*seq*/ 0, - initialize_request(/*id*/ 1, "relay-test-b")?, - ) - .await?; - - let initialize_responses = read_relay_messages_by_stream(&mut websocket, /*count*/ 2).await?; - let session_a = - assert_initialize_response(initialize_responses.get(stream_a), stream_a, /*id*/ 1)?; - let session_b = - assert_initialize_response(initialize_responses.get(stream_b), stream_b, /*id*/ 1)?; - assert_ne!(session_a, session_b); - - send_relay_message( - &mut websocket, - stream_a, - /*seq*/ 1, - notification("initialized", serde_json::json!({})), - ) - .await?; - send_relay_message( - &mut websocket, - stream_b, - /*seq*/ 1, - notification("initialized", serde_json::json!({})), - ) - .await?; - - send_relay_message( - &mut websocket, - stream_a, - /*seq*/ 2, - request(/*id*/ 2, "test/unknown-a", serde_json::json!({})), - ) - .await?; - send_relay_message( - &mut websocket, - stream_b, - /*seq*/ 2, - request(/*id*/ 2, "test/unknown-b", serde_json::json!({})), - ) - .await?; - - let unknown_method_responses = - read_relay_messages_by_stream(&mut websocket, /*count*/ 2).await?; - assert_error_response( - unknown_method_responses.get(stream_a), - stream_a, - /*id*/ 2, - "test/unknown-a", - )?; - assert_error_response( - unknown_method_responses.get(stream_b), - stream_b, - /*id*/ 2, - "test/unknown-b", - )?; + let mut websocket = accept_websocket(&listener, "legacy environment").await?; + let initialize = serde_json::json!({ + "id": 1, + "method": "initialize", + "params": { + "clientName": "legacy-default-test", + "resumeSessionId": null, + }, + }); + websocket + .send(Message::Binary( + RelayMessageFrame { + version: 1, + stream_id: "legacy-stream".to_string(), + ack: 0, + ack_bits: 0, + body: Some(relay_message_frame::Body::Data(RelayData { + seq: 0, + segment_index: 0, + segment_count: 1, + payload: serde_json::to_vec(&initialize)?, + })), + } + .encode_to_vec() + .into(), + )) + .await?; - send_relay_reset(&mut websocket, stream_a, "test_reset").await?; - send_relay_message( - &mut websocket, - stream_b, - /*seq*/ 3, - request( - /*id*/ 3, - "test/unknown-b-after-reset", - serde_json::json!({}), - ), - ) - .await?; - - let (stream_id, message) = read_relay_message(&mut websocket).await?; - assert_eq!(stream_id, stream_b); - assert_error_response( - Some(&message), - stream_b, - /*id*/ 3, - "test/unknown-b-after-reset", - )?; + let response = timeout(TEST_TIMEOUT, websocket.next()) + .await + .context("legacy relay should return initialize response")? + .context("legacy relay websocket should remain open")??; + let Message::Binary(response) = response else { + anyhow::bail!("legacy relay should return a binary protobuf frame"); + }; + let response = RelayMessageFrame::decode(response.as_ref())?; + let Some(relay_message_frame::Body::Data(response)) = response.body else { + anyhow::bail!("legacy relay should return a data frame"); + }; + let response: serde_json::Value = serde_json::from_slice(&response.payload)?; + assert_eq!(response["id"], 1); + assert!(response["result"].is_object()); - websocket.close(None).await?; remote_environment.abort(); let _ = remote_environment.await; Ok(()) } -async fn send_relay_message( - websocket: &mut WebSocketStream, - stream_id: &str, - seq: u32, - message: JSONRPCMessage, -) -> Result<()> -where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, -{ - let payload = serde_json::to_vec(&message)?; - let frame = RelayMessageFrame { - version: RELAY_MESSAGE_FRAME_VERSION, - stream_id: stream_id.to_string(), - ack: 0, - ack_bits: 0, - body: Some(relay_message_frame::Body::Data(RelayData { - seq, - segment_index: 0, - segment_count: 1, - payload, - })), - }; - send_relay_frame(websocket, frame).await +#[tokio::test] +async fn noise_environment_refreshes_bundle_for_each_connection_attempt() -> Result<()> { + let attempts = Arc::new(AtomicUsize::new(0)); + let manager = EnvironmentManager::without_environments(); + manager.upsert_noise_environment( + ENVIRONMENT_ID.to_string(), + Arc::new(FailingNoiseConnectProvider { + attempts: Arc::clone(&attempts), + }), + )?; + let backend = manager + .get_environment(ENVIRONMENT_ID) + .context("Noise environment should be materialized")? + .get_exec_backend(); + + for attempt in 1..=2 { + let result = backend + .start(ExecParams { + process_id: ProcessId::new(format!("proc-{attempt}")), + argv: vec!["true".to_string()], + cwd: std::env::current_dir()?, + env_policy: None, + env: HashMap::new(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await; + assert!(matches!( + result, + Err(ExecServerError::Protocol(ref message)) + if message == "test registry connect failure" + )); + } + + assert_eq!(attempts.load(Ordering::SeqCst), 2); + Ok(()) } -async fn send_relay_reset( - websocket: &mut WebSocketStream, - stream_id: &str, - reason: &str, -) -> Result<()> -where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, -{ - send_relay_frame( - websocket, - RelayMessageFrame { - version: RELAY_MESSAGE_FRAME_VERSION, - stream_id: stream_id.to_string(), - ack: 0, - ack_bits: 0, - body: Some(relay_message_frame::Body::Reset(RelayReset { - reason: reason.to_string(), - })), - }, - ) - .await +#[tokio::test] +async fn noise_environment_rejects_provider_bundle_for_another_environment() -> Result<()> { + let manager = EnvironmentManager::without_environments(); + manager.upsert_noise_environment( + ENVIRONMENT_ID.to_string(), + Arc::new(WrongEnvironmentNoiseConnectProvider { + harness_identity: NoiseChannelIdentity::generate()?, + executor_public_key: NoiseChannelIdentity::generate()?.public_key(), + }), + )?; + let backend = manager + .get_environment(ENVIRONMENT_ID) + .context("Noise environment should be materialized")? + .get_exec_backend(); + + let result = backend + .start(ExecParams { + process_id: ProcessId::from("proc-wrong-environment"), + argv: vec!["true".to_string()], + cwd: std::env::current_dir()?, + env_policy: None, + env: HashMap::new(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await; + + assert!(matches!( + result, + Err(ExecServerError::Protocol(ref message)) + if message == "Noise rendezvous provider returned a different environment id" + )); + Ok(()) } -async fn send_relay_frame( - websocket: &mut WebSocketStream, - frame: RelayMessageFrame, -) -> Result<()> -where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, -{ - websocket - .send(Message::Binary(frame.encode_to_vec().into())) +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn remote_environment_routes_encrypted_exec_server_rpc() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let rendezvous_url = format!("ws://{}", listener.local_addr()?); + let registry = MockServer::start().await; + Mock::given(method("POST")) + .and(path(format!( + "/cloud/environment/{ENVIRONMENT_ID}/register" + ))) + .and(header("authorization", format!("Bearer {REGISTRY_TOKEN}"))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "environment_id": ENVIRONMENT_ID, + "url": format!("{rendezvous_url}/relay?role=environment"), + "security_profile": "noise_hybrid_ik_v1", + "executor_registration_id": EXECUTOR_REGISTRATION_ID, + }))) + .mount(®istry) + .await; + Mock::given(method("POST")) + .and(path(format!( + "/cloud/environment/{ENVIRONMENT_ID}/validate" + ))) + .and(header("authorization", format!("Bearer {REGISTRY_TOKEN}"))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "valid": true, + }))) + .mount(®istry) + .await; + + let (codex_exe, codex_linux_sandbox_exe) = common::current_test_binary_helper_paths()?; + let runtime_paths = ExecServerRuntimePaths::new(codex_exe, codex_linux_sandbox_exe)?; + let mut config = RemoteEnvironmentConfig::new( + registry.uri(), + ENVIRONMENT_ID.to_string(), + static_registry_auth_provider(), + )?; + config.relay_protocol = RemoteRelayProtocol::Noise; + let remote_environment = tokio::spawn(codex_exec_server::run_remote_environment( + config, + runtime_paths, + )); + + let environment_websocket = accept_websocket(&listener, "environment").await?; + let executor_public_key = registered_executor_public_key(®istry).await?; + let harness_identity = NoiseChannelIdentity::generate()?; + let client_args = NoiseRendezvousConnectArgs::new( + NoiseRendezvousConnectBundle { + websocket_url: format!("{rendezvous_url}/relay?role=harness"), + environment_id: ENVIRONMENT_ID.to_string(), + executor_registration_id: EXECUTOR_REGISTRATION_ID.to_string(), + executor_public_key, + harness_key_authorization: HARNESS_KEY_AUTHORIZATION.to_string(), + }, + harness_identity, + "noise-relay-test".to_string(), + ); + let client_task = + tokio::spawn(async move { ExecServerClient::connect_noise_rendezvous(client_args).await }); + let harness_websocket = accept_websocket(&listener, "harness").await?; + let captured_frames = Arc::new(Mutex::new(Vec::new())); + let relay_task = tokio::spawn(proxy_relay_frames( + environment_websocket, + harness_websocket, + Arc::clone(&captured_frames), + )); + let client = timeout(TEST_TIMEOUT, client_task) + .await + .context("Noise harness client should connect")???; + + let response = client + .exec(ExecParams { + process_id: ProcessId::from("proc-1"), + argv: vec!["true".to_string()], + cwd: std::env::current_dir()?, + env_policy: None, + env: HashMap::new(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + assert_eq!( + response, + ExecResponse { + process_id: ProcessId::from("proc-1") + } + ); + + let temp_dir = TempDir::new()?; + let large_file_path = temp_dir.path().join("large-response.bin"); + let large_file_contents = vec![0x5a; 128 * 1024]; + std::fs::write(&large_file_path, &large_file_contents)?; + let read_response = client + .fs_read_file(FsReadFileParams { + path: AbsolutePathBuf::try_from(large_file_path)?, + sandbox: None, + }) .await?; + assert_eq!( + STANDARD.decode(read_response.data_base64)?, + large_file_contents + ); + + assert_relay_data_is_encrypted(&captured_frames)?; + + drop(client); + relay_task.abort(); + remote_environment.abort(); + let _ = relay_task.await; + let _ = remote_environment.await; Ok(()) } -async fn read_relay_messages_by_stream( - websocket: &mut WebSocketStream, - count: usize, -) -> Result> -where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, -{ - let mut messages = HashMap::new(); - for _ in 0..count { - let (stream_id, message) = read_relay_message(websocket).await?; - if messages.insert(stream_id.clone(), message).is_some() { - bail!("received duplicate response for stream {stream_id}"); - } - } - Ok(messages) +async fn accept_websocket( + listener: &TcpListener, + role: &str, +) -> Result> { + let (socket, _peer_addr) = timeout(TEST_TIMEOUT, listener.accept()) + .await + .with_context(|| format!("remote {role} should connect to fake rendezvous"))??; + timeout(TEST_TIMEOUT, accept_async(socket)) + .await + .with_context(|| format!("fake rendezvous should accept {role} websocket"))? + .map_err(Into::into) } -async fn read_relay_message( - websocket: &mut WebSocketStream, -) -> Result<(String, JSONRPCMessage)> -where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, -{ +async fn registered_executor_public_key(registry: &MockServer) -> Result { + let requests = registry + .received_requests() + .await + .context("wiremock should retain requests")?; + let request = requests + .iter() + .find(|request| request.url.path().ends_with("/register")) + .context("exec-server should register before connecting")?; + let body: serde_json::Value = serde_json::from_slice(&request.body)?; + let key = serde_json::from_value(body["executor_public_key"].clone())?; + Ok(key) +} + +async fn proxy_relay_frames( + mut environment: WebSocketStream, + mut harness: WebSocketStream, + captured_frames: Arc>>>, +) -> Result<()> { loop { - let frame = timeout(TEST_TIMEOUT, websocket.next()) - .await - .context("timed out waiting for relay frame")? - .ok_or_else(|| anyhow!("environment websocket closed"))??; - match frame { - Message::Binary(bytes) => { - let frame = RelayMessageFrame::decode(bytes.as_ref())?; - let stream_id = frame.stream_id; - let Some(relay_message_frame::Body::Data(data)) = frame.body else { - continue; + tokio::select! { + message = environment.next() => { + let Some(message) = message else { + break; }; - let message = serde_json::from_slice(&data.payload)?; - return Ok((stream_id, message)); + let message = message?; + capture_binary_frame(&captured_frames, &message); + harness.send(message).await?; + } + message = harness.next() => { + let Some(message) = message else { + break; + }; + let message = message?; + capture_binary_frame(&captured_frames, &message); + environment.send(message).await?; } - Message::Ping(_) | Message::Pong(_) => {} - Message::Close(_) => bail!("environment websocket closed"), - Message::Text(_) => bail!("environment sent text frame on relay websocket"), - Message::Frame(_) => {} } } + Ok(()) } -fn initialize_request(id: i64, client_name: &str) -> Result { - Ok(request( - id, - "initialize", - serde_json::to_value(InitializeParams { - client_name: client_name.to_string(), - resume_session_id: None, - })?, - )) -} - -fn request(id: i64, method: &str, params: serde_json::Value) -> JSONRPCMessage { - JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(id), - method: method.to_string(), - params: Some(params), - trace: None, - }) -} - -fn notification(method: &str, params: serde_json::Value) -> JSONRPCMessage { - JSONRPCMessage::Notification(JSONRPCNotification { - method: method.to_string(), - params: Some(params), - }) -} - -fn assert_initialize_response( - message: Option<&JSONRPCMessage>, - stream_id: &str, - id: i64, -) -> Result { - let message = message.ok_or_else(|| anyhow!("missing initialize response for {stream_id}"))?; - let JSONRPCMessage::Response(JSONRPCResponse { - id: response_id, - result, - }) = message - else { - bail!("expected initialize response for {stream_id}, got {message:?}"); - }; - assert_eq!(response_id, &RequestId::Integer(id)); - let response: InitializeResponse = serde_json::from_value(result.clone())?; - Ok(Uuid::parse_str(&response.session_id)?) +fn capture_binary_frame(captured_frames: &Mutex>>, message: &Message) { + if let Message::Binary(bytes) = message { + captured_frames + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(bytes.to_vec()); + } } -fn assert_error_response( - message: Option<&JSONRPCMessage>, - stream_id: &str, - id: i64, - expected_method: &str, -) -> Result<()> { - let message = message.ok_or_else(|| anyhow!("missing error response for {stream_id}"))?; - let JSONRPCMessage::Error(JSONRPCError { - id: response_id, - error, - }) = message - else { - bail!("expected error response for {stream_id}, got {message:?}"); - }; - assert_eq!(response_id, &RequestId::Integer(id)); +fn assert_relay_data_is_encrypted(captured_frames: &Mutex>>) -> Result<()> { + let captured_frames = captured_frames + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let mut data_frames = 0; + for encoded in captured_frames.iter() { + let frame = RelayMessageFrame::decode(encoded.as_slice())?; + let Some(relay_message_frame::Body::Data(data)) = frame.body else { + continue; + }; + data_frames += 1; + let payload = String::from_utf8_lossy(&data.payload); + assert!(!payload.contains("initialize")); + assert!(!payload.contains("process/start")); + assert!(!payload.contains("noise-relay-test")); + } assert!( - error.message.contains(expected_method), - "expected error for {stream_id} to mention {expected_method}, got {}", - error.message + data_frames >= 4, + "expected encrypted request and response frames" ); Ok(()) }