Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions codex-rs/exec-server/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ pub(crate) enum JsonRpcConnectionEvent {
}

#[derive(Clone)]
/// Describes resources owned by a JSON-RPC connection, not the protection of
/// its byte stream. `External` covers websocket, relay, and Noise connections
/// whose transport task owns no child process for Codex to terminate.
pub(crate) enum JsonRpcTransport {
Plain,
External,
Stdio { transport: StdioTransport },
}

Expand All @@ -57,7 +60,7 @@ impl JsonRpcTransport {

pub(crate) fn terminate(&self) {
match self {
Self::Plain => {}
Self::External => {}
Self::Stdio { transport } => transport.terminate(),
}
}
Expand Down Expand Up @@ -315,7 +318,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
transport: JsonRpcTransport::Plain,
transport: JsonRpcTransport::External,
}
}

Expand Down Expand Up @@ -452,7 +455,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![websocket_task],
transport: JsonRpcTransport::Plain,
transport: JsonRpcTransport::External,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ message RelayMessageFrame {
RelayResume resume = 7;
RelayReset reset = 8;
RelayHeartbeat heartbeat = 9;
RelayHandshake handshake = 10;
}
}

Expand All @@ -35,3 +36,7 @@ message RelayReset {
}

message RelayHeartbeat {}

message RelayHandshake {
bytes payload = 1;
}
9 changes: 8 additions & 1 deletion codex-rs/exec-server/src/proto/codex.exec_server.relay.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct RelayMessageFrame {
pub ack: u32,
#[prost(uint32, tag = "4")]
pub ack_bits: u32,
#[prost(oneof = "relay_message_frame::Body", tags = "5, 6, 7, 8, 9")]
#[prost(oneof = "relay_message_frame::Body", tags = "5, 6, 7, 8, 9, 10")]
pub body: ::core::option::Option<relay_message_frame::Body>,
}
pub mod relay_message_frame {
Expand All @@ -25,6 +25,8 @@ pub mod relay_message_frame {
Reset(super::RelayReset),
#[prost(message, tag = "9")]
Heartbeat(super::RelayHeartbeat),
#[prost(message, tag = "10")]
Handshake(super::RelayHandshake),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -52,3 +54,8 @@ pub struct RelayReset {
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct RelayHeartbeat {}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct RelayHandshake {
#[prost(bytes = "vec", tag = "1")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
108 changes: 88 additions & 20 deletions codex-rs/exec-server/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@ use crate::relay_proto::relay_message_frame;
use crate::server::ConnectionProcessor;

const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
const MAX_RELAY_RESET_REASON_BYTES: usize = 256;
const MAX_RELAY_STREAM_ID_BYTES: usize = 128;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum RelayFrameBodyKind {
pub(crate) enum RelayFrameBodyKind {
Data,
Ack,
Resume,
Reset,
Heartbeat,
Handshake,
}

impl RelayMessageFrame {
fn data(stream_id: String, seq: u32, payload: Vec<u8>) -> Self {
pub(crate) fn data(stream_id: String, seq: u32, payload: Vec<u8>) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
Expand All @@ -54,7 +57,7 @@ impl RelayMessageFrame {
}
}

fn resume(stream_id: String) -> Self {
pub(crate) fn resume(stream_id: String) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
Expand All @@ -66,18 +69,33 @@ impl RelayMessageFrame {
}
}

fn validate(&self) -> Result<RelayFrameBodyKind, ExecServerError> {
/// Validate cleartext routing metadata before a frame reaches either the
/// direct JSON-RPC parser or the Noise relay state machine.
///
/// The encrypted path intentionally leaves this metadata visible to
/// rendezvous, so it must be canonical and tightly bounded everywhere.
pub(crate) fn validate(&self) -> Result<RelayFrameBodyKind, ExecServerError> {
if self.version != RELAY_MESSAGE_FRAME_VERSION {
return Err(ExecServerError::Protocol(format!(
"unsupported relay message frame version {}",
self.version
)));
}
if self.stream_id.trim().is_empty() {
if self.stream_id.is_empty() {
return Err(ExecServerError::Protocol(
"relay message frame is missing stream_id".to_string(),
));
}
if self.stream_id.trim() != self.stream_id {
return Err(ExecServerError::Protocol(
"relay message frame stream_id is not canonical".to_string(),
));
}
if self.stream_id.len() > MAX_RELAY_STREAM_ID_BYTES {
return Err(ExecServerError::Protocol(
"relay message frame stream_id is too long".to_string(),
));
}
match self.body.as_ref() {
Some(relay_message_frame::Body::Data(data)) => {
if data.segment_index != 0 || data.segment_count != 1 || data.payload.is_empty() {
Expand All @@ -90,35 +108,49 @@ impl RelayMessageFrame {
Some(relay_message_frame::Body::AckFrame(_)) => Ok(RelayFrameBodyKind::Ack),
Some(relay_message_frame::Body::Resume(_)) => Ok(RelayFrameBodyKind::Resume),
Some(relay_message_frame::Body::Reset(reset)) => {
if reset.reason.is_empty() {
if reset.reason.is_empty() || reset.reason.len() > MAX_RELAY_RESET_REASON_BYTES {
return Err(ExecServerError::Protocol(
"relay reset message frame is missing reason".to_string(),
"relay reset message frame has invalid reason".to_string(),
));
}
Ok(RelayFrameBodyKind::Reset)
}
Some(relay_message_frame::Body::Heartbeat(_)) => Ok(RelayFrameBodyKind::Heartbeat),
Some(relay_message_frame::Body::Handshake(handshake)) => {
if handshake.payload.is_empty() {
return Err(ExecServerError::Protocol(
"relay handshake message frame is missing payload".to_string(),
));
}
Ok(RelayFrameBodyKind::Handshake)
}
None => Err(ExecServerError::Protocol(
"relay message frame is missing body".to_string(),
)),
}
}

fn into_jsonrpc_message(self) -> Result<JSONRPCMessage, ExecServerError> {
pub(crate) fn into_data(self) -> Result<RelayData, ExecServerError> {
let kind = self.validate()?;
if kind != RelayFrameBodyKind::Data {
return Err(ExecServerError::Protocol(
"expected relay data message frame".to_string(),
));
}
let payload = match self.body {
Some(relay_message_frame::Body::Data(data)) => data.payload,
_ => Vec::new(),
};
match self.body {
Some(relay_message_frame::Body::Data(data)) => Ok(data),
_ => Err(ExecServerError::Protocol(
"expected relay data message frame".to_string(),
)),
}
}

fn into_jsonrpc_message(self) -> Result<JSONRPCMessage, ExecServerError> {
let payload = self.into_data()?.payload;
serde_json::from_slice(&payload).map_err(ExecServerError::Json)
}

fn into_reset_reason(self) -> Option<String> {
pub(crate) fn into_reset_reason(self) -> Option<String> {
match self.body {
Some(relay_message_frame::Body::Reset(reset)) if !reset.reason.is_empty() => {
Some(reset.reason)
Expand All @@ -128,16 +160,18 @@ impl RelayMessageFrame {
}
}

fn encode_relay_message_frame(frame: &RelayMessageFrame) -> Vec<u8> {
pub(crate) fn encode_relay_message_frame(frame: &RelayMessageFrame) -> Vec<u8> {
frame.encode_to_vec()
}

fn decode_relay_message_frame(payload: &[u8]) -> Result<RelayMessageFrame, ExecServerError> {
pub(crate) fn decode_relay_message_frame(
payload: &[u8],
) -> Result<RelayMessageFrame, ExecServerError> {
RelayMessageFrame::decode(payload)
.map_err(|err| ExecServerError::Protocol(format!("invalid relay message frame: {err}")))
}

fn jsonrpc_payload(message: &JSONRPCMessage) -> Result<Vec<u8>, ExecServerError> {
pub(crate) fn jsonrpc_payload(message: &JSONRPCMessage) -> Result<Vec<u8>, ExecServerError> {
serde_json::to_vec(message).map_err(ExecServerError::Json)
}

Expand Down Expand Up @@ -253,7 +287,8 @@ where
}
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat => {}
| RelayFrameBodyKind::Heartbeat
| RelayFrameBodyKind::Handshake => {}
}
}
Some(Ok(Message::Close(_))) | None => {
Expand Down Expand Up @@ -294,10 +329,22 @@ where
incoming_rx,
disconnected_rx,
task_handles: vec![websocket_task],
transport: JsonRpcTransport::Plain,
transport: JsonRpcTransport::External,
}
}

/// Runs the backwards-compatible, cleartext multiplexed relay protocol.
///
/// The physical websocket carries protobuf [`RelayMessageFrame`] values. Each
/// frame's `stream_id` selects an independent logical JSON-RPC connection, so a
/// single registered exec-server can serve multiple orchestrator sessions.
/// This runner intentionally preserves the protocol used before Noise relay
/// support was introduced. Callers must select the separate secure runner when
/// Noise protection was explicitly negotiated during registration.
///
/// Relay framing is not an authentication boundary. Every frame is validated
/// before its routing metadata or payload is used, and malformed frames are
/// dropped without affecting the other virtual streams on the connection.
pub(crate) async fn run_multiplexed_environment<S>(
stream: WebSocketStream<S>,
processor: ConnectionProcessor,
Expand All @@ -310,6 +357,9 @@ pub(crate) async fn run_multiplexed_environment<S>(

let mut streams: HashMap<String, VirtualStream> = HashMap::new();
loop {
// Serialize all logical-stream writes through this task so only one
// owner writes to the physical websocket. Reads remain in the same
// select loop, which also keeps disconnect handling deterministic.
let frame = tokio::select! {
maybe_encoded = physical_outgoing_rx.recv() => {
let Some(encoded) = maybe_encoded else {
Expand Down Expand Up @@ -361,6 +411,11 @@ pub(crate) async fn run_multiplexed_environment<S>(
continue;
}
};

// A logical connection is created lazily on its first data
// frame. The connection processor owns the JSON-RPC lifecycle;
// this task only translates between relay and connection
// events.
let stream = streams.entry(stream_id.clone()).or_insert_with(|| {
spawn_virtual_stream(
stream_id.clone(),
Expand All @@ -382,24 +437,34 @@ pub(crate) async fn run_multiplexed_environment<S>(
stream.disconnect(frame.into_reset_reason()).await;
}
}
// These control frames are meaningful to other relay protocol
// variants. The legacy protocol has no resume or handshake state,
// so ignoring them preserves its existing wire behavior.
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat => {}
| RelayFrameBodyKind::Heartbeat
| RelayFrameBodyKind::Handshake => {}
}
}

// A physical disconnect ends every virtual connection. Notify each
// processor explicitly so requests do not remain live after the relay
// websocket has disappeared.
for (_stream_id, stream) in streams {
stream.disconnect(/*reason*/ None).await;
}
drop(physical_outgoing_tx);
}

/// The exec-server-facing half of one logical connection on the shared relay.
struct VirtualStream {
incoming_tx: mpsc::Sender<JsonRpcConnectionEvent>,
disconnected_tx: watch::Sender<bool>,
}

impl VirtualStream {
/// Marks the logical connection disconnected and supplies the peer's reset
/// reason, when one was provided.
async fn disconnect(self, reason: Option<String>) {
let _ = self.disconnected_tx.send(true);
let _ = self
Expand All @@ -409,6 +474,8 @@ impl VirtualStream {
}
}

/// Creates a logical JSON-RPC connection and forwards its outgoing messages
/// back to the physical relay writer as legacy cleartext data frames.
fn spawn_virtual_stream(
stream_id: String,
processor: ConnectionProcessor,
Expand Down Expand Up @@ -446,7 +513,7 @@ fn spawn_virtual_stream(
incoming_rx,
disconnected_rx,
task_handles: vec![writer_task],
transport: JsonRpcTransport::Plain,
transport: JsonRpcTransport::External,
};
tokio::spawn(async move {
processor.run_connection(connection).await;
Expand Down Expand Up @@ -476,6 +543,7 @@ mod tests {
use futures::task::AtomicWaker;
use tokio::net::TcpListener;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
Expand Down
Loading