Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions codex-rs/core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ pub use codex_core::resolve_installation_id;
pub use codex_core::skills::SkillsManager;
pub use codex_core::thread_store_from_config;
pub use codex_exec_server::EnvironmentManager;
pub use codex_exec_server::ExecServerError;
pub use codex_exec_server::ExecServerRuntimePaths;
pub use codex_exec_server::NoiseChannelIdentity;
pub use codex_exec_server::NoiseChannelPublicKey;
pub use codex_exec_server::NoiseRendezvousConnectArgs;
pub use codex_exec_server::NoiseRendezvousConnectBundle;
pub use codex_exec_server::NoiseRendezvousConnectProvider;
pub use codex_exec_server::SharedNoiseRendezvousConnectProvider;
pub use codex_extension_api::empty_extension_registry;
pub use codex_features::Feature;
pub use codex_features::Features;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/exec-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tokio = { workspace = true, features = [
tokio-util = { workspace = true, features = ["rt"] }
tokio-tungstenite = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
Expand Down
32 changes: 32 additions & 0 deletions codex-rs/exec-server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerTransportParams;
use crate::client_api::HttpClient;
use crate::client_api::NoiseRendezvousConnectArgs;
use crate::client_api::NoiseRendezvousConnectBundle;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
Expand Down Expand Up @@ -115,6 +117,16 @@ impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
}
}

impl From<NoiseRendezvousConnectArgs> for ExecServerClientConnectOptions {
fn from(value: NoiseRendezvousConnectArgs) -> Self {
Self {
client_name: value.client_name,
initialize_timeout: value.initialize_timeout,
resume_session_id: value.resume_session_id,
}
}
}

impl From<StdioExecServerConnectArgs> for ExecServerClientConnectOptions {
fn from(value: StdioExecServerConnectArgs) -> Self {
Self {
Expand All @@ -137,6 +149,23 @@ impl RemoteExecServerConnectArgs {
}
}

impl NoiseRendezvousConnectArgs {
pub fn new(
bundle: NoiseRendezvousConnectBundle,
harness_identity: crate::NoiseChannelIdentity,
client_name: String,
) -> Self {
Self {
bundle,
harness_identity,
client_name,
connect_timeout: CONNECT_TIMEOUT,
initialize_timeout: INITIALIZE_TIMEOUT,
resume_session_id: None,
}
}
}

pub(crate) struct SessionState {
wake_tx: watch::Sender<u64>,
events: ExecProcessEventLog,
Expand Down Expand Up @@ -231,6 +260,7 @@ impl LazyRemoteExecServerClient {
if matches!(
&self.transport_params,
ExecServerTransportParams::WebSocketUrl { .. }
| ExecServerTransportParams::NoiseRendezvous { .. }
) =>
{
ExecServerClient::connect_for_transport(self.transport_params.clone()).await?
Expand Down Expand Up @@ -317,6 +347,8 @@ pub enum ExecServerError {
EnvironmentRegistryAuth(String),
#[error("environment registry request failed: {0}")]
EnvironmentRegistryRequest(#[from] reqwest::Error),
#[error(transparent)]
NoiseChannel(#[from] crate::NoiseChannelError),
}

impl ExecServerClient {
Expand Down
109 changes: 108 additions & 1 deletion codex-rs/exec-server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;
Expand All @@ -8,6 +9,8 @@ use crate::ExecServerError;
use crate::HttpRequestParams;
use crate::HttpRequestResponse;
use crate::HttpResponseBodyStream;
use crate::NoiseChannelIdentity;
use crate::NoiseChannelPublicKey;

pub(crate) const DEFAULT_REMOTE_EXEC_SERVER_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) const DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
Expand All @@ -30,6 +33,67 @@ pub struct RemoteExecServerConnectArgs {
pub resume_session_id: Option<String>,
}

/// Registry-authorized material for one Noise rendezvous connection attempt.
pub struct NoiseRendezvousConnectBundle {
pub websocket_url: String,
pub environment_id: String,
pub executor_registration_id: String,
pub executor_public_key: NoiseChannelPublicKey,
pub harness_key_authorization: String,
}

impl std::fmt::Debug for NoiseRendezvousConnectBundle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NoiseRendezvousConnectBundle")
.field(
"websocket_url",
&redacted_websocket_url(&self.websocket_url),
)
.field("environment_id", &self.environment_id)
.field("executor_registration_id", &self.executor_registration_id)
.field("executor_public_key", &self.executor_public_key)
.field("harness_key_authorization", &"<redacted>")
.finish()
}
}

/// Connection arguments for an authenticated Noise rendezvous exec-server.
pub struct NoiseRendezvousConnectArgs {
pub bundle: NoiseRendezvousConnectBundle,
pub harness_identity: NoiseChannelIdentity,
pub client_name: String,
pub connect_timeout: Duration,
pub initialize_timeout: Duration,
pub resume_session_id: Option<String>,
}

impl std::fmt::Debug for NoiseRendezvousConnectArgs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NoiseRendezvousConnectArgs")
.field("bundle", &self.bundle)
.field("harness_identity", &"<redacted>")
.field("client_name", &self.client_name)
.field("connect_timeout", &self.connect_timeout)
.field("initialize_timeout", &self.initialize_timeout)
.field("resume_session_id", &self.resume_session_id)
.finish()
}
}

/// Supplies fresh registry-authorized material for Noise rendezvous connections.
///
/// Implementations must preserve one endpoint-local harness identity while
/// refreshing short-lived registry material for every physical connection attempt.
pub trait NoiseRendezvousConnectProvider: Send + Sync {
/// Environment ID this provider is authorized to connect to.
fn environment_id(&self) -> &str;

/// Returns a fresh atomic bundle for one physical connection attempt.
fn connect_args(&self) -> BoxFuture<'_, Result<NoiseRendezvousConnectArgs, ExecServerError>>;
}

pub type SharedNoiseRendezvousConnectProvider = Arc<dyn NoiseRendezvousConnectProvider>;

/// Stdio connection arguments for a command-backed exec-server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct StdioExecServerConnectArgs {
Expand All @@ -49,20 +113,52 @@ pub(crate) struct StdioExecServerCommand {
}

/// Parameters used to connect to a remote exec-server environment.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone)]
pub(crate) enum ExecServerTransportParams {
WebSocketUrl {
websocket_url: String,
connect_timeout: Duration,
initialize_timeout: Duration,
},
NoiseRendezvous {
provider: SharedNoiseRendezvousConnectProvider,
},
#[allow(dead_code)]
StdioCommand {
command: StdioExecServerCommand,
initialize_timeout: Duration,
},
}

impl std::fmt::Debug for ExecServerTransportParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WebSocketUrl {
websocket_url,
connect_timeout,
initialize_timeout,
} => f
.debug_struct("WebSocketUrl")
.field("websocket_url", websocket_url)
.field("connect_timeout", connect_timeout)
.field("initialize_timeout", initialize_timeout)
.finish(),
Self::NoiseRendezvous { provider } => f
.debug_struct("NoiseRendezvous")
.field("environment_id", &provider.environment_id())
.finish(),
Self::StdioCommand {
command,
initialize_timeout,
} => f
.debug_struct("StdioCommand")
.field("command", command)
.field("initialize_timeout", initialize_timeout)
.finish(),
}
}
}

impl ExecServerTransportParams {
pub(crate) fn websocket_url(websocket_url: String) -> Self {
Self::WebSocketUrl {
Expand All @@ -73,6 +169,17 @@ impl ExecServerTransportParams {
}
}

pub(crate) fn redacted_websocket_url(websocket_url: &str) -> String {
match url::Url::parse(websocket_url) {
Ok(mut url) => {
url.set_query(None);
url.set_fragment(None);
url.to_string()
}
Err(_) => "<redacted websocket url>".to_string(),
}
}

/// Sends HTTP requests through a runtime-selected transport.
///
/// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers
Expand Down
78 changes: 78 additions & 0 deletions codex-rs/exec-server/src/client_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ use tokio::io::BufReader;
use tokio::process::Command;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::connect_async_with_config;
use tracing::debug;
use tracing::warn;

use codex_utils_rustls_provider::ensure_rustls_crypto_provider;

use crate::ExecServerClient;
use crate::ExecServerError;
use crate::client_api::NoiseRendezvousConnectArgs;
use crate::client_api::NoiseRendezvousConnectBundle;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerCommand;
use crate::client_api::StdioExecServerConnectArgs;
use crate::client_api::redacted_websocket_url;
use crate::connection::JsonRpcConnection;
use crate::noise_relay::noise_harness_connection_from_websocket;
use crate::noise_relay::noise_relay_websocket_config;
use crate::relay::harness_connection_from_websocket;

const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
Expand All @@ -38,6 +44,15 @@ impl ExecServerClient {
})
.await
}
crate::client_api::ExecServerTransportParams::NoiseRendezvous { provider } => {
let args = provider.connect_args().await?;
if args.bundle.environment_id != provider.environment_id() {
return Err(ExecServerError::Protocol(
"Noise rendezvous provider returned a different environment id".to_string(),
));
}
Self::connect_noise_rendezvous(args).await
}
crate::client_api::ExecServerTransportParams::StdioCommand {
command,
initialize_timeout,
Expand Down Expand Up @@ -79,6 +94,69 @@ impl ExecServerClient {
Self::connect(connection, args.into()).await
}

pub async fn connect_noise_rendezvous(
args: NoiseRendezvousConnectArgs,
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
// This connect call owns the complete registry-issued bundle. Move each
// sensitive value into the transport task exactly once rather than
// leaving extra copies of the harness authorization or endpoint identity
// alive in `args` after the handshake starts.
let NoiseRendezvousConnectArgs {
bundle,
harness_identity,
client_name,
connect_timeout,
initialize_timeout,
resume_session_id,
} = args;
let NoiseRendezvousConnectBundle {
websocket_url,
environment_id,
executor_registration_id,
executor_public_key,
harness_key_authorization,
} = bundle;
let diagnostic_url = redacted_websocket_url(&websocket_url);
let (stream, _) = timeout(
connect_timeout,
connect_async_with_config(
websocket_url.as_str(),
Some(noise_relay_websocket_config()),
/*disable_nagle*/ false,
),
)
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: diagnostic_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: diagnostic_url.clone(),
source,
})?;

let connection_label = format!("Noise exec-server rendezvous websocket {diagnostic_url}");
let connection = noise_harness_connection_from_websocket(
stream,
connection_label,
environment_id,
executor_registration_id,
harness_identity,
executor_public_key,
harness_key_authorization,
);
Self::connect(
connection,
crate::client_api::ExecServerClientConnectOptions {
client_name,
initialize_timeout,
resume_session_id,
},
)
.await
}

pub(crate) async fn connect_stdio_command(
args: StdioExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
Expand Down
11 changes: 7 additions & 4 deletions codex-rs/exec-server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ pub(crate) enum JsonRpcConnectionEvent {
}

#[derive(Clone)]
/// Describes resources owned by a JSON-RPC connection, not the protection of
/// its byte stream. `External` covers websocket, relay, and Noise connections
/// whose transport task owns no child process for Codex to terminate.
pub(crate) enum JsonRpcTransport {
Plain,
External,
Stdio { transport: StdioTransport },
}

Expand All @@ -57,7 +60,7 @@ impl JsonRpcTransport {

pub(crate) fn terminate(&self) {
match self {
Self::Plain => {}
Self::External => {}
Self::Stdio { transport } => transport.terminate(),
}
}
Expand Down Expand Up @@ -315,7 +318,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
transport: JsonRpcTransport::Plain,
transport: JsonRpcTransport::External,
}
}

Expand Down Expand Up @@ -452,7 +455,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![websocket_task],
transport: JsonRpcTransport::Plain,
transport: JsonRpcTransport::External,
}
}

Expand Down
Loading
Loading