diff --git a/crates/cli/src/adapters/claude_code.rs b/crates/cli/src/adapters/claude_code.rs index b9bcce9cc..1492b2290 100644 --- a/crates/cli/src/adapters/claude_code.rs +++ b/crates/cli/src/adapters/claude_code.rs @@ -4,7 +4,9 @@ use axum::http::HeaderMap; use serde_json::{Value, json}; -use crate::adapters::{AdapterOutcome, ClassificationRules, classify}; +use crate::adapters::{ + AdapterOutcome, CLAUDE_CODE_PAYLOAD_EXTRACTOR, ClassificationRules, classify, +}; use crate::model::{AgentKind, NormalizedEvent}; /// Normalizes Claude Code hook payloads and returns the hook response Claude expects. @@ -18,6 +20,7 @@ pub(crate) fn adapt(payload: Value, headers: &HeaderMap) -> AdapterOutcome { let events = classify( &payload, headers, + &CLAUDE_CODE_PAYLOAD_EXTRACTOR, &ClassificationRules { kind: AgentKind::ClaudeCode, agent_start: &["SessionStart", "sessionStart", "session_start"], diff --git a/crates/cli/src/adapters/codex.rs b/crates/cli/src/adapters/codex.rs index 58aa7bc4f..4cbde84b4 100644 --- a/crates/cli/src/adapters/codex.rs +++ b/crates/cli/src/adapters/codex.rs @@ -4,7 +4,7 @@ use axum::http::HeaderMap; use serde_json::{Value, json}; -use crate::adapters::{AdapterOutcome, ClassificationRules, classify}; +use crate::adapters::{AdapterOutcome, CODEX_PAYLOAD_EXTRACTOR, ClassificationRules, classify}; use crate::model::AgentKind; /// Normalizes Codex hook payloads while leaving Codex hook control flow untouched. @@ -16,6 +16,7 @@ pub(crate) fn adapt(payload: Value, headers: &HeaderMap) -> AdapterOutcome { let events = classify( &payload, headers, + &CODEX_PAYLOAD_EXTRACTOR, &ClassificationRules { kind: AgentKind::Codex, agent_start: &["sessionStart", "session_start", "agentStarted"], diff --git a/crates/cli/src/adapters/hermes.rs b/crates/cli/src/adapters/hermes.rs index 815b52340..582107c99 100644 --- a/crates/cli/src/adapters/hermes.rs +++ b/crates/cli/src/adapters/hermes.rs @@ -5,9 +5,10 @@ use axum::http::HeaderMap; use serde_json::{Map, Value, json}; use crate::adapters::{ - AdapterOutcome, ClassificationRules, classify, common_session_event, event_name, metadata, - normalize_name, session_id, value_at, + AdapterOutcome, ClassificationRules, HERMES_PAYLOAD_EXTRACTOR, classify, common_session_event, + event_name, metadata, normalize_name, session_id, }; +use crate::json_path::value_at; use crate::model::{AgentKind, LlmEvent, NormalizedEvent}; /// Normalizes Hermes shell hook payloads without emitting control directives. @@ -16,7 +17,7 @@ use crate::model::{AgentKind, LlmEvent, NormalizedEvent}; /// responses minimal and relies on the forwarder fail-open/fail-closed setting to decide whether /// hook delivery problems affect the invoking agent. pub(crate) fn adapt(payload: Value, headers: &HeaderMap) -> AdapterOutcome { - let event_name = event_name(&payload); + let event_name = event_name(&payload, &HERMES_PAYLOAD_EXTRACTOR); let normalized = normalize_name(&event_name); if normalized == "preapirequest" { return AdapterOutcome { @@ -64,6 +65,7 @@ pub(crate) fn adapt(payload: Value, headers: &HeaderMap) -> AdapterOutcome { &payload, headers, AgentKind::Hermes, + &HERMES_PAYLOAD_EXTRACTOR, ))], response: json!({}), }; @@ -72,6 +74,7 @@ pub(crate) fn adapt(payload: Value, headers: &HeaderMap) -> AdapterOutcome { let events = classify( &payload, headers, + &HERMES_PAYLOAD_EXTRACTOR, &ClassificationRules { kind: AgentKind::Hermes, agent_start: &["on_session_start", "sessionStart"], @@ -89,7 +92,7 @@ pub(crate) fn adapt(payload: Value, headers: &HeaderMap) -> AdapterOutcome { } fn hermes_llm_event(payload: &Value, headers: &HeaderMap, event_name: &str) -> LlmEvent { - let session_id = session_id(payload, headers); + let session_id = session_id(payload, headers, &HERMES_PAYLOAD_EXTRACTOR); let api_call_id = hermes_api_call_id(payload, &session_id); let provider = hermes_string_at(payload, "provider") .or_else(|| hermes_string_at(payload, "api_mode")) @@ -97,7 +100,13 @@ fn hermes_llm_event(payload: &Value, headers: &HeaderMap, event_name: &str) -> L let model_name = hermes_string_at(payload, "response_model").or_else(|| hermes_string_at(payload, "model")); let payload_exact = hermes_payload_exact(payload, event_name); - let mut event_metadata = metadata(payload, headers, AgentKind::Hermes, event_name); + let mut event_metadata = metadata( + payload, + headers, + AgentKind::Hermes, + event_name, + &HERMES_PAYLOAD_EXTRACTOR, + ); if let Value::Object(ref mut object) = event_metadata { object.insert("api_call_id".into(), json!(api_call_id.clone())); object.insert("provider_payload_exact".into(), json!(payload_exact)); diff --git a/crates/cli/src/adapters/mod.rs b/crates/cli/src/adapters/mod.rs index 17c77b228..ce60db69d 100644 --- a/crates/cli/src/adapters/mod.rs +++ b/crates/cli/src/adapters/mod.rs @@ -10,13 +10,18 @@ use serde_json::{Map, Value, json}; use uuid::Uuid; use crate::config::header_string; +use crate::json_path::{ + string_at, string_at_any as first_string_at, value_at, value_at_any as first_value_at, +}; use crate::model::{ AgentKind, LlmHintEvent, NormalizedEvent, SessionEvent, SubagentEvent, ToolEvent, }; #[derive(Debug, Clone, PartialEq)] pub(crate) struct AdapterOutcome { + /// Normalized events emitted from one incoming agent hook payload. pub(crate) events: Vec, + /// Hook response body returned to the invoking agent process. pub(crate) response: Value, } @@ -30,57 +35,400 @@ pub(super) struct ClassificationRules<'a> { tool_end: &'a [&'a str], } -// Derives a stable session identifier from gateway headers first, then common agent payload -// fields, and finally a v7 UUID. Header precedence lets gateway and hook-forward callers -// correlate events even when agent payload schemas omit or rename their native session field. -fn session_id(payload: &Value, headers: &HeaderMap) -> String { - header_string(headers, "x-nemo-relay-session-id") - .or_else(|| header_string(headers, "x-claude-code-session-id")) - .or_else(|| session_id_from_payload(payload)) - .unwrap_or_else(|| format!("hook-{}", Uuid::now_v7())) +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub(crate) struct ExtractedLlmHint { + /// Agent-local worker or subagent identifier, when the payload supplies one. + pub(crate) subagent_id: Option, + /// Stable agent identifier from the hook payload, not synthesized. + pub(crate) agent_id: Option, + /// Agent type or role reported by the harness, not synthesized. + pub(crate) agent_type: Option, + /// Provider or harness conversation identifier used for later LLM correlation. + pub(crate) conversation_id: Option, + /// Generation identifier used to pair hook hints with provider responses. + pub(crate) generation_id: Option, + /// Request identifier used to pair hook hints with provider requests. + pub(crate) request_id: Option, + /// Model name reported by the hook payload. + pub(crate) model: Option, } -// Reads the first known session identifier payload path. Keeping the path list in one place makes -// adapter precedence explicit without nesting a long `or_else` chain in `session_id`. -fn session_id_from_payload(payload: &Value) -> Option { - [ - &["session_id"][..], - &["sessionId"], - &["session", "id"], - &["conversation_id"], - &["conversationId"], - &["parent_session_id"], - &["task_id"], - &["extra", "session_id"], - &["extra", "task_id"], - ] - .into_iter() - .find_map(|path| string_at(payload, path)) +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ExtractedToolCall { + /// Tool-call identifier reported by the hook payload, not synthesized. + pub(crate) tool_call_id: Option, + /// Tool name reported by the hook payload, not synthesized. + pub(crate) tool_name: Option, + /// Agent-local worker or subagent that owns the tool call. + pub(crate) subagent_id: Option, + /// Tool arguments exactly as supplied by the hook payload. + pub(crate) arguments: Option, + /// Tool result exactly as supplied by the hook payload. + pub(crate) result: Option, + /// Tool status reported or conservatively derived from the hook event name. + pub(crate) status: Option, } -// Reads the agent's event name from the known hook fields in order and falls back to `unknown`. -// This deliberately keeps unknown payloads observable instead of rejecting them at the adapter -// boundary, allowing the session layer to emit a generic mark event. -fn event_name(payload: &Value) -> String { - string_at(payload, &["hook_event_name"]) - .or_else(|| string_at(payload, &["event_name"])) - .or_else(|| string_at(payload, &["eventName"])) - .or_else(|| string_at(payload, &["event"])) - .or_else(|| string_at(payload, &["type"])) - .or_else(|| string_at(payload, &["name"])) - .or_else(|| string_at(payload, &["extra", "hook_event_name"])) - .or_else(|| string_at(payload, &["extra", "event_name"])) - .or_else(|| string_at(payload, &["extra", "eventName"])) - .or_else(|| string_at(payload, &["extra", "event"])) - .or_else(|| string_at(payload, &["extra", "type"])) - .or_else(|| string_at(payload, &["extra", "name"])) - .unwrap_or_else(|| "unknown".to_string()) +/// Strategy for extracting normalized facts from agent or harness hook payloads. +/// +/// The trait is organized as a small set of per-harness *deviation hooks* +/// (`session_header_policy`, `session_id_paths`, `event_name_paths`, +/// `subagent_id_paths`, `tool_paths`) plus shared *behavior* methods built on +/// top of them. Every deviation hook has a canonical default, so a harness +/// implementation overrides only the hooks where its hook payloads genuinely +/// differ and the shared behavior is written once. +/// +/// Behavior methods return `None` for missing or untrusted fields. The adapter +/// layer owns compatibility fallbacks such as synthetic session IDs, synthetic +/// tool-call IDs, and `unknown_tool` names so downstream lifecycle behavior +/// remains stable for sparse payloads. +pub(crate) trait AgentPayloadExtractor { + // -- Per-harness deviations (override only what genuinely differs) ------- + + /// Whether this harness also trusts the Claude installed-mode session + /// header (`x-claude-code-session-id`) as explicit session evidence. + fn session_header_policy(&self) -> SessionHeaderPolicy { + SessionHeaderPolicy::RelayAndClaude + } + + /// Candidate payload paths for the native session identifier. + fn session_id_paths(&self) -> &'static [&'static [&'static str]] { + SESSION_ID_PATHS + } + + /// Candidate payload paths for the native hook event name. + fn event_name_paths(&self) -> &'static [&'static [&'static str]] { + EVENT_NAME_PATHS + } + + /// Candidate payload paths for the native subagent or worker identifier. + fn subagent_id_paths(&self) -> &'static [&'static [&'static str]] { + SUBAGENT_ID_PATHS + } + + /// Tool payload paths (call id, name, arguments, result, status). + fn tool_paths(&self) -> &'static ToolPathSet { + TOOL_PATHS + } + + // -- Shared behavior (derived from the deviation hooks above) ------------ + + /// Extract the native session identifier for this agent payload. + /// + /// Returning `None` means the payload did not supply a trustworthy session + /// id; the adapter boundary will apply compatibility fallbacks. + fn session_id(&self, payload: &Value, headers: &HeaderMap) -> Option { + agent_session_id( + headers, + payload, + self.session_header_policy(), + self.session_id_paths(), + ) + } + + /// Extract the native hook event name for this agent payload. + /// + /// Returning `None` keeps unknown events observable by letting the adapter + /// boundary synthesize the generic `unknown` event name. + fn event_name(&self, payload: &Value) -> Option { + first_string_at(payload, self.event_name_paths()) + } + + /// Build stable, low-cardinality metadata shared by normalized events. + /// + /// Implementations must not promote high-cardinality paths or PII into this + /// map; consumers that need full details can read the raw event payload. + fn metadata( + &self, + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + event_name: &str, + ) -> Value { + agent_metadata(payload, headers, kind, event_name) + } + + /// Extract the native subagent or worker identifier for this agent payload. + /// + /// Returning `None` means the payload did not identify a subagent; callers + /// decide whether to synthesize a compatibility owner. + fn subagent_id(&self, payload: &Value, headers: &HeaderMap) -> Option { + agent_subagent_id(payload, headers, self.subagent_id_paths()) + } + + /// Extract LLM-correlation hints without applying fallback values. + fn llm_hint(&self, payload: &Value, headers: &HeaderMap) -> ExtractedLlmHint { + agent_llm_hint(payload, self.subagent_id(payload, headers)) + } + + /// Extract tool-call facts without applying fallback identifiers or names. + fn tool_call( + &self, + payload: &Value, + headers: &HeaderMap, + event_name: &str, + ) -> ExtractedToolCall { + agent_tool_call( + payload, + self.subagent_id(payload, headers), + event_name, + self.tool_paths(), + ) + } +} + +pub(super) struct ClaudeCodePayloadExtractor; +pub(super) struct CodexPayloadExtractor; +pub(super) struct HermesPayloadExtractor; + +pub(super) static CLAUDE_CODE_PAYLOAD_EXTRACTOR: ClaudeCodePayloadExtractor = + ClaudeCodePayloadExtractor; +pub(super) static CODEX_PAYLOAD_EXTRACTOR: CodexPayloadExtractor = CodexPayloadExtractor; +pub(super) static HERMES_PAYLOAD_EXTRACTOR: HermesPayloadExtractor = HermesPayloadExtractor; + +/// Claude Code reports its native tool identifier as `tool_use_id`, so it uses +/// a tool path set that prefers that key. Every other hook field matches the +/// canonical defaults (including the installed-mode session-header policy). +impl AgentPayloadExtractor for ClaudeCodePayloadExtractor { + fn tool_paths(&self) -> &'static ToolPathSet { + CLAUDE_TOOL_PATHS + } +} + +/// Codex transparent runs forward provider tokens directly, so they must not +/// adopt the Claude installed-mode session header. They also expose a +/// Codex-native subagent nickname and send tool arguments under `arguments`. +impl AgentPayloadExtractor for CodexPayloadExtractor { + fn session_header_policy(&self) -> SessionHeaderPolicy { + SessionHeaderPolicy::RelayOnly + } + + fn subagent_id_paths(&self) -> &'static [&'static [&'static str]] { + CODEX_SUBAGENT_ID_PATHS + } + + fn tool_paths(&self) -> &'static ToolPathSet { + CODEX_TOOL_PATHS + } +} + +/// Hermes always runs nested under another agent, so the `child_subagent_id` +/// signal is the most reliable owner and is preferred over the generic +/// session-scoped subagent id. Session, event, and tool extraction match the +/// canonical defaults. +impl AgentPayloadExtractor for HermesPayloadExtractor { + fn subagent_id_paths(&self) -> &'static [&'static [&'static str]] { + HERMES_SUBAGENT_ID_PATHS + } +} + +pub(crate) struct ToolPathSet { + call_id: &'static [&'static [&'static str]], + name: &'static [&'static [&'static str]], + arguments: &'static [&'static [&'static str]], + result: &'static [&'static [&'static str]], + status: &'static [&'static [&'static str]], +} + +/// Whether an extractor accepts the Claude installed-mode session header. +#[derive(Clone, Copy)] +pub(crate) enum SessionHeaderPolicy { + /// Trust only the NeMo Relay session header. Used by harnesses (Codex + /// transparent runs) that forward provider tokens directly and must not + /// inherit a Claude installed-mode session id. + RelayOnly, + /// Trust the NeMo Relay session header and then the Claude installed-mode + /// `x-claude-code-session-id` header as explicit session evidence. + RelayAndClaude, +} + +/// Canonical session-id precedence. All supported harnesses share this list; +/// only their [`SessionHeaderPolicy`] differs. +const SESSION_ID_PATHS: &[&[&str]] = &[ + &["session_id"], + &["sessionId"], + &["session", "id"], + &["conversation_id"], + &["conversationId"], + &["parent_session_id"], + &["task_id"], + &["extra", "session_id"], + &["extra", "task_id"], +]; + +/// Canonical hook event-name precedence, shared by all supported harnesses. +const EVENT_NAME_PATHS: &[&[&str]] = &[ + &["hook_event_name"], + &["event_name"], + &["eventName"], + &["event"], + &["type"], + &["name"], + &["extra", "hook_event_name"], + &["extra", "event_name"], + &["extra", "eventName"], + &["extra", "event"], + &["extra", "type"], + &["extra", "name"], +]; + +/// Canonical subagent-id precedence for harnesses without a native nested-agent +/// signal of their own (Claude Code). +const SUBAGENT_ID_PATHS: &[&[&str]] = &[ + &["subagent_id"], + &["subagentId"], + &["child_subagent_id"], + &["childSubagentId"], + &["agent_id"], + &["subagent", "id"], + &["agent", "id"], + &["extra", "subagent_id"], + &["extra", "subagentId"], + &["extra", "child_subagent_id"], + &["extra", "childSubagentId"], + &["extra", "agent_id"], + &["extra", "subagent", "id"], + &["extra", "agent", "id"], +]; + +/// Codex deviation: adds the thread-spawn nickname between the flat id keys and +/// the nested `subagent.id`/`agent.id` shapes. +const CODEX_SUBAGENT_ID_PATHS: &[&[&str]] = &[ + &["subagent_id"], + &["subagentId"], + &["child_subagent_id"], + &["childSubagentId"], + &["agent_id"], + &["source", "subagent", "thread_spawn", "agent_nickname"], + &["subagent", "id"], + &["agent", "id"], + &["extra", "subagent_id"], + &["extra", "subagentId"], + &["extra", "child_subagent_id"], + &["extra", "childSubagentId"], + &["extra", "agent_id"], + &["extra", "subagent", "id"], + &["extra", "agent", "id"], +]; + +/// Hermes deviation: prefers the `child_subagent_id` owner signal before the +/// generic session-scoped subagent id. +const HERMES_SUBAGENT_ID_PATHS: &[&[&str]] = &[ + &["child_subagent_id"], + &["childSubagentId"], + &["subagent_id"], + &["subagentId"], + &["agent_id"], + &["subagent", "id"], + &["agent", "id"], + &["extra", "child_subagent_id"], + &["extra", "childSubagentId"], + &["extra", "subagent_id"], + &["extra", "subagentId"], + &["extra", "agent_id"], + &["extra", "subagent", "id"], + &["extra", "agent", "id"], +]; + +/// Claude Code deviation: its native tool identifier is `tool_use_id`, checked +/// before the generic `tool_call_id` shapes. +const CLAUDE_TOOL_CALL_ID_PATHS: &[&[&str]] = &[ + &["tool_use_id"], + &["tool_call_id"], + &["toolCallId"], + &["call_id"], + &["extra", "tool_call_id"], + &["extra", "call_id"], + &["tool", "id"], + &["tool_input", "id"], + &["id"], +]; + +/// Canonical tool-call-id precedence for harnesses that report the generic +/// `tool_call_id` first (Codex and Hermes). +const TOOL_CALL_ID_PATHS: &[&[&str]] = &[ + &["tool_call_id"], + &["toolCallId"], + &["tool_use_id"], + &["call_id"], + &["extra", "tool_call_id"], + &["extra", "call_id"], + &["tool", "id"], + &["tool_input", "id"], + &["id"], +]; + +const TOOL_NAME_PATHS: &[&[&str]] = &[ + &["tool_name"], + &["toolName"], + &["tool", "name"], + &["tool_input", "name"], + &["name"], +]; + +/// Canonical argument precedence for harnesses that nest tool input under +/// `tool_input` first (Claude Code and Hermes). +const TOOL_ARGUMENT_PATHS: &[&[&str]] = &[&["tool_input"], &["input"], &["arguments"], &["args"]]; + +/// Codex deviation: sends tool arguments under `arguments`/`args` first. +const CODEX_TOOL_ARGUMENT_PATHS: &[&[&str]] = + &[&["arguments"], &["args"], &["input"], &["tool_input"]]; +const TOOL_RESULT_PATHS: &[&[&str]] = &[ + &["tool_output"], + &["tool_response"], + &["output"], + &["result"], + &["extra", "tool_output"], + &["extra", "result"], +]; +const TOOL_STATUS_PATHS: &[&[&str]] = &[&["status"], &["decision"], &["permission"]]; + +/// Canonical tool path set used by harnesses that report generic tool shapes +/// (Hermes). Name, result, and status precedence is shared by every harness. +const TOOL_PATHS: &ToolPathSet = &ToolPathSet { + call_id: TOOL_CALL_ID_PATHS, + name: TOOL_NAME_PATHS, + arguments: TOOL_ARGUMENT_PATHS, + result: TOOL_RESULT_PATHS, + status: TOOL_STATUS_PATHS, +}; +const CLAUDE_TOOL_PATHS: &ToolPathSet = &ToolPathSet { + call_id: CLAUDE_TOOL_CALL_ID_PATHS, + name: TOOL_NAME_PATHS, + arguments: TOOL_ARGUMENT_PATHS, + result: TOOL_RESULT_PATHS, + status: TOOL_STATUS_PATHS, +}; +const CODEX_TOOL_PATHS: &ToolPathSet = &ToolPathSet { + call_id: TOOL_CALL_ID_PATHS, + name: TOOL_NAME_PATHS, + arguments: CODEX_TOOL_ARGUMENT_PATHS, + result: TOOL_RESULT_PATHS, + status: TOOL_STATUS_PATHS, +}; + +fn agent_session_id( + headers: &HeaderMap, + payload: &Value, + header_policy: SessionHeaderPolicy, + payload_paths: &'static [&'static [&'static str]], +) -> Option { + header_string(headers, "x-nemo-relay-session-id") + .or_else(|| match header_policy { + SessionHeaderPolicy::RelayOnly => None, + SessionHeaderPolicy::RelayAndClaude => { + header_string(headers, "x-claude-code-session-id") + } + }) + .or_else(|| session_id_from_payload(payload, payload_paths)) } -// Builds shared metadata for every normalized hook event. Only stable, low-cardinality fields and -// gateway configuration hints are lifted out; the full payload remains on the event for consumers -// that need agent-specific detail. -fn metadata(payload: &Value, headers: &HeaderMap, kind: AgentKind, event_name: &str) -> Value { +fn agent_metadata( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + event_name: &str, +) -> Value { let mut object = Map::new(); object.insert("agent_kind".into(), json!(kind.as_str())); object.insert("hook_event_name".into(), json!(event_name)); @@ -88,10 +436,6 @@ fn metadata(payload: &Value, headers: &HeaderMap, kind: AgentKind, event_name: & object.insert("gateway_config_profile".into(), json!(profile)); } for (key, value) in [ - ("cwd", string_at(payload, &["cwd"])), - ("transcript_path", string_at(payload, &["transcript_path"])), - ("project_dir", string_at(payload, &["project_dir"])), - ("user_email", string_at(payload, &["user_email"])), ("model", string_at(payload, &["model"])), ("agent_id", string_at(payload, &["agent_id"])), ("agent_type", string_at(payload, &["agent_type"])), @@ -103,51 +447,17 @@ fn metadata(payload: &Value, headers: &HeaderMap, kind: AgentKind, event_name: & Value::Object(object) } -// Creates a root session event using the common session-id and metadata extraction rules so -// lifecycle, marks, notifications, and compaction events all carry identical correlation fields. -pub(crate) fn common_session_event( +fn agent_subagent_id( payload: &Value, headers: &HeaderMap, - kind: AgentKind, -) -> SessionEvent { - let event_name = event_name(payload); - SessionEvent { - session_id: session_id(payload, headers), - agent_kind: kind, - event_name: event_name.clone(), - payload: payload.clone(), - metadata: metadata(payload, headers, kind, &event_name), - } + paths: &'static [&'static [&'static str]], +) -> Option { + first_string_at(payload, paths).or_else(|| header_string(headers, "x-nemo-relay-subagent-id")) } -// Creates a subagent event and tolerates sparse agent payloads by using the gateway subagent -// header and then a synthetic `subagent` id. The fallback keeps unmatched start/end events visible -// rather than dropping them when an integration lacks explicit nested-agent IDs. -fn common_subagent_event(payload: &Value, headers: &HeaderMap, kind: AgentKind) -> SubagentEvent { - let session = common_session_event(payload, headers, kind); - let subagent_id = subagent_id(payload) - .or_else(|| header_string(headers, "x-nemo-relay-subagent-id")) - .unwrap_or_else(|| "subagent".to_string()); - SubagentEvent { - session_id: session.session_id, - agent_kind: kind, - event_name: session.event_name, +fn agent_llm_hint(payload: &Value, subagent_id: Option) -> ExtractedLlmHint { + ExtractedLlmHint { subagent_id, - payload: session.payload, - metadata: session.metadata, - } -} - -// Captures hook payloads that can help correlate nearby gateway LLM calls to the right agent or -// subagent. Multiple naming conventions are accepted because integrations expose conversation, -// generation, request, and model identifiers under different shapes. -fn common_llm_hint_event(payload: &Value, headers: &HeaderMap, kind: AgentKind) -> LlmHintEvent { - let session = common_session_event(payload, headers, kind); - LlmHintEvent { - session_id: session.session_id, - agent_kind: kind, - event_name: session.event_name, - subagent_id: hook_subagent_id(payload, headers), agent_id: first_string_at(payload, &[&["agent_id"][..], &["agent", "id"][..]]), agent_type: first_string_at( payload, @@ -186,115 +496,224 @@ fn common_llm_hint_event(payload: &Value, headers: &HeaderMap, kind: AgentKind) payload, &[&["model"][..], &["model_name"][..], &["modelName"][..]], ), - payload: session.payload, - metadata: session.metadata, } } -// Converts agent tool hooks into the runtime tool event shape while preserving missing fields. -// Tool IDs and names are synthesized when absent, arguments/results are searched across known -// payload shapes, and failure or permission-denied event names are reflected in status metadata. -fn common_tool_event(payload: &Value, headers: &HeaderMap, kind: AgentKind) -> ToolEvent { - let session = common_session_event(payload, headers, kind); - let normalized_event = normalize_name(&session.event_name); - ToolEvent { - session_id: session.session_id, - agent_kind: kind, - event_name: session.event_name, - tool_call_id: tool_call_id(payload), - tool_name: tool_name(payload), - subagent_id: hook_subagent_id(payload, headers), - arguments: tool_arguments(payload), - result: tool_result(payload, &normalized_event), - status: tool_status(payload, &normalized_event), - payload: session.payload, - metadata: session.metadata, +fn agent_tool_call( + payload: &Value, + subagent_id: Option, + event_name: &str, + paths: &ToolPathSet, +) -> ExtractedToolCall { + let normalized_event = normalize_name(event_name); + ExtractedToolCall { + tool_call_id: first_string_at(payload, paths.call_id), + tool_name: first_string_at(payload, paths.name), + subagent_id, + arguments: first_value_at(payload, paths.arguments), + result: first_value_at(payload, paths.result) + .or_else(|| event_detail_result(payload, &normalized_event)), + status: first_string_at(payload, paths.status) + .or_else(|| derived_tool_status(&normalized_event)), } } -// Looks up the first string across a list of payload paths. Keeping this fallback mechanic in one -// helper makes event-specific extraction code read as schema precedence rather than control flow. -fn first_string_at(payload: &Value, paths: &[&[&str]]) -> Option { - paths.iter().find_map(|path| string_at(payload, path)) +/// Derive a stable session identifier from extracted facts and compatibility fallbacks. +/// +/// Header and payload precedence lives in the selected extractor. This boundary +/// applies the final synthetic ID fallback so sparse payloads stay observable. +fn session_id( + payload: &Value, + headers: &HeaderMap, + extractor: &dyn AgentPayloadExtractor, +) -> String { + let fallback_session_id = fallback_session_id(); + session_id_with_fallback(payload, headers, extractor, &fallback_session_id) } -// Resolves a subagent id from payload shape first and the gateway header second. The payload wins -// because it is the agent's native ownership signal; the header exists for gateway correlation and -// sparse hook systems. -fn hook_subagent_id(payload: &Value, headers: &HeaderMap) -> Option { - subagent_id(payload).or_else(|| header_string(headers, "x-nemo-relay-subagent-id")) +fn fallback_session_id() -> String { + format!("hook-{}", Uuid::now_v7()) } -// Resolves a tool call identifier from all known agent payload conventions before synthesizing a -// UUID-backed id. The synthetic id keeps lifecycle events recordable even when hooks omit IDs. -fn tool_call_id(payload: &Value) -> String { - first_string_at( - payload, - &[ - &["tool_call_id"][..], - &["toolCallId"][..], - &["tool_use_id"][..], - &["call_id"][..], - &["extra", "tool_call_id"][..], - &["extra", "call_id"][..], - &["tool", "id"][..], - &["tool_input", "id"][..], - &["id"][..], - ], - ) - .unwrap_or_else(|| format!("tool-{}", Uuid::now_v7())) +fn session_id_with_fallback( + payload: &Value, + headers: &HeaderMap, + extractor: &dyn AgentPayloadExtractor, + fallback_session_id: &str, +) -> String { + extractor + .session_id(payload, headers) + .unwrap_or_else(|| fallback_session_id.to_string()) } -// Resolves a human-readable tool name from the common top-level, nested tool, and tool-input -// shapes. Missing names are kept explicit as `unknown_tool` rather than inheriting event names. -fn tool_name(payload: &Value) -> String { - first_string_at( - payload, - &[ - &["tool_name"][..], - &["toolName"][..], - &["tool", "name"][..], - &["tool_input", "name"][..], - &["name"][..], - ], - ) - .unwrap_or_else(|| "unknown_tool".to_string()) +/// Read the first known session identifier payload path for one agent strategy. +/// +/// Keeping the path list in one place makes adapter precedence explicit without +/// nesting a long `or_else` chain in `session_id`. +fn session_id_from_payload( + payload: &Value, + paths: &'static [&'static [&'static str]], +) -> Option { + first_string_at(payload, paths) } -// Extracts tool input from the agent-specific fields that represent call arguments. A missing -// argument payload remains JSON null so downstream consumers can distinguish it from `{}`. -fn tool_arguments(payload: &Value) -> Value { - value_at(payload, &["tool_input"]) - .or_else(|| value_at(payload, &["input"])) - .or_else(|| value_at(payload, &["arguments"])) - .or_else(|| value_at(payload, &["args"])) - .unwrap_or(Value::Null) +/// Read the agent's event name and fall back to `unknown`. +/// +/// Unknown payloads stay observable instead of being rejected at the adapter +/// boundary, allowing the session layer to emit a generic mark event. +fn event_name(payload: &Value, extractor: &dyn AgentPayloadExtractor) -> String { + extractor + .event_name(payload) + .unwrap_or_else(|| "unknown".to_string()) } -// Extracts tool output from success payloads first and then failure diagnostics. Failure detail -// synthesis is last so an explicit result always wins over gateway-built diagnostic metadata. -fn tool_result(payload: &Value, normalized_event: &str) -> Value { - value_at(payload, &["tool_output"]) - .or_else(|| value_at(payload, &["tool_response"])) - .or_else(|| value_at(payload, &["output"])) - .or_else(|| value_at(payload, &["result"])) - .or_else(|| value_at(payload, &["extra", "tool_output"])) - .or_else(|| value_at(payload, &["extra", "result"])) - .or_else(|| event_detail_result(payload, normalized_event)) - .unwrap_or(Value::Null) +/// Build shared metadata for a normalized hook event. +/// +/// Only stable, low-cardinality fields and gateway configuration hints are +/// lifted out; the full payload remains on the event for consumers that need +/// agent-specific detail. +fn metadata( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + event_name: &str, + extractor: &dyn AgentPayloadExtractor, +) -> Value { + extractor.metadata(payload, headers, kind, event_name) } -// Resolves explicit status fields before deriving error/denied status from event names. Derived -// status is intentionally conservative and only covers known failure or permission-denial spellings. -fn tool_status(payload: &Value, normalized_event: &str) -> Option { - first_string_at( - payload, - &[&["status"][..], &["decision"][..], &["permission"][..]], - ) - .or_else(|| { +/// Create a root session event using the common extraction rules. +/// +/// Lifecycle, marks, notifications, and compaction events all carry identical +/// session-id and metadata correlation fields. +pub(crate) fn common_session_event( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + extractor: &dyn AgentPayloadExtractor, +) -> SessionEvent { + let fallback_session_id = fallback_session_id(); + common_session_event_with_fallback(payload, headers, kind, extractor, &fallback_session_id) +} + +fn common_session_event_with_fallback( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + extractor: &dyn AgentPayloadExtractor, + fallback_session_id: &str, +) -> SessionEvent { + let event_name = event_name(payload, extractor); + SessionEvent { + session_id: session_id_with_fallback(payload, headers, extractor, fallback_session_id), + agent_kind: kind, + event_name: event_name.clone(), + payload: payload.clone(), + metadata: metadata(payload, headers, kind, &event_name, extractor), + } +} + +/// Create a subagent event from an agent hook payload. +/// +/// Sparse payloads fall back through the selected extractor and then to a +/// synthetic `subagent` id, keeping unmatched start/end events visible when an +/// integration lacks explicit nested-agent IDs. +fn common_subagent_event_with_fallback( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + extractor: &dyn AgentPayloadExtractor, + fallback_session_id: &str, +) -> SubagentEvent { + let session = + common_session_event_with_fallback(payload, headers, kind, extractor, fallback_session_id); + let subagent_id = extractor + .subagent_id(payload, headers) + .unwrap_or_else(|| "subagent".to_string()); + SubagentEvent { + session_id: session.session_id, + agent_kind: kind, + event_name: session.event_name, + subagent_id, + payload: session.payload, + metadata: session.metadata, + } +} + +/// Capture hook payload hints used to correlate nearby gateway LLM calls. +/// +/// Multiple naming conventions are accepted because integrations expose +/// conversation, generation, request, and model identifiers under different +/// shapes. +fn common_llm_hint_event_with_fallback( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + extractor: &dyn AgentPayloadExtractor, + fallback_session_id: &str, +) -> LlmHintEvent { + let session = + common_session_event_with_fallback(payload, headers, kind, extractor, fallback_session_id); + let hint = extractor.llm_hint(payload, headers); + LlmHintEvent { + session_id: session.session_id, + agent_kind: kind, + event_name: session.event_name, + subagent_id: hint.subagent_id, + agent_id: hint.agent_id, + agent_type: hint.agent_type, + conversation_id: hint.conversation_id, + generation_id: hint.generation_id, + request_id: hint.request_id, + model: hint.model, + payload: session.payload, + metadata: session.metadata, + } +} + +/// Convert agent tool hooks into the runtime tool event shape. +/// +/// Tool IDs and names are synthesized when absent, arguments/results are +/// searched across known payload shapes, and failure or permission-denied event +/// names are reflected in status metadata. +fn common_tool_event_with_fallback( + payload: &Value, + headers: &HeaderMap, + kind: AgentKind, + extractor: &dyn AgentPayloadExtractor, + fallback_session_id: &str, +) -> ToolEvent { + let session = + common_session_event_with_fallback(payload, headers, kind, extractor, fallback_session_id); + let tool_call = extractor.tool_call(payload, headers, &session.event_name); + ToolEvent { + session_id: session.session_id, + agent_kind: kind, + event_name: session.event_name, + tool_call_id: tool_call + .tool_call_id + .unwrap_or_else(|| format!("tool-{}", Uuid::now_v7())), + tool_name: tool_call + .tool_name + .unwrap_or_else(|| "unknown_tool".to_string()), + subagent_id: tool_call.subagent_id, + arguments: tool_call.arguments.unwrap_or(Value::Null), + result: tool_call.result.unwrap_or(Value::Null), + status: tool_call.status, + payload: session.payload, + metadata: session.metadata, + } +} + +/// Derive error or denied status from normalized event names. +/// +/// This runs after an extractor has checked explicit status fields and remains +/// conservative by covering only known failure spellings. +fn derived_tool_status(normalized_event: &str) -> Option { + { (normalized_event.contains("failure") || normalized_event.contains("failed")) .then_some("error".to_string()) - }) + } .or_else(|| { normalized_event .contains("permissiondenied") @@ -302,29 +721,11 @@ fn tool_status(payload: &Value, normalized_event: &str) -> Option { }) } -// Finds the most specific nested-agent identifier the gateway knows how to interpret. Agent IDs -// are accepted as subagent IDs because several hook systems use `agent` terminology for spawned -// workers rather than for the top-level coding agent. -fn subagent_id(payload: &Value) -> Option { - string_at(payload, &["subagent_id"]) - .or_else(|| string_at(payload, &["subagentId"])) - .or_else(|| string_at(payload, &["child_subagent_id"])) - .or_else(|| string_at(payload, &["childSubagentId"])) - .or_else(|| string_at(payload, &["agent_id"])) - .or_else(|| string_at(payload, &["subagent", "id"])) - .or_else(|| string_at(payload, &["agent", "id"])) - .or_else(|| string_at(payload, &["extra", "subagent_id"])) - .or_else(|| string_at(payload, &["extra", "subagentId"])) - .or_else(|| string_at(payload, &["extra", "child_subagent_id"])) - .or_else(|| string_at(payload, &["extra", "childSubagentId"])) - .or_else(|| string_at(payload, &["extra", "agent_id"])) - .or_else(|| string_at(payload, &["extra", "subagent", "id"])) - .or_else(|| string_at(payload, &["extra", "agent", "id"])) -} - -// Extracts detail fields as a synthetic tool result only for failure-like hooks. Successful tool -// events without explicit output remain `null` so observers can distinguish "no output supplied" -// from "the gateway assembled diagnostic details". +/// Extract diagnostic detail fields as a synthetic result for failure-like hooks. +/// +/// Successful tool events without explicit output remain `null` so observers can +/// distinguish "no output supplied" from "the gateway assembled diagnostic +/// details". fn event_detail_result(payload: &Value, normalized_event: &str) -> Option { let include_details = normalized_event.contains("failure") || normalized_event.contains("failed") @@ -342,131 +743,189 @@ fn event_detail_result(payload: &Value, normalized_event: &str) -> Option (!object.is_empty()).then_some(Value::Object(object)) } -// Reads a nested value as a string, accepting numbers and booleans for agent schemas that encode -// identifiers or flags without string types. Empty strings are treated as absent to preserve -// fallback ordering. -fn string_at(payload: &Value, path: &[&str]) -> Option { - value_at(payload, path) - .and_then(|value| match value { - Value::String(value) => Some(value), - Value::Number(value) => Some(value.to_string()), - Value::Bool(value) => Some(value.to_string()), - _ => None, - }) - .filter(|value| !value.is_empty()) -} - -// Returns a cloned nested JSON value using exact object-key traversal. Missing intermediate keys -// stop the lookup without error so callers can chain schema fallbacks cheaply. -fn value_at(payload: &Value, path: &[&str]) -> Option { - let mut current = payload; - for key in path { - current = current.get(*key)?; - } - Some(current.clone()) -} - -// Classifies a raw hook event into one or more normalized events. -// -// Most hook events produce a single normalized event from `classify_primary`. The exception is -// `Stop` (Claude/Codex): it emits both the existing `LlmHint` (preserving correlation for -// subsequent LLM calls) AND a `TurnEnded` so the session manager can snapshot ATIF without -// closing the agent scope. Codex 0.129 has no `SessionEnd`-equivalent hook — without this dual -// emission, codex transparent runs would never trigger an ATIF write. -// -// If the primary event is already terminal, the snapshot is skipped to avoid double-writing — -// `flush_observers` already writes ATIF on agent-end, and a follow-up `TurnEnded` on a removed -// session would recreate an empty session and overwrite the freshly-written ATIF. +/// Classify a raw hook event into one or more normalized events. +/// +/// Most hook events produce a single normalized event from `classify_primary`. +/// The exception is `Stop` for Claude Code and Codex: it emits both the +/// existing `LlmHint` and a `TurnEnded` so the session manager can snapshot ATIF +/// without closing the agent scope. +/// +/// If the primary event is already terminal, the snapshot is skipped to avoid +/// double-writing and accidentally recreating an empty session. fn classify( payload: &Value, headers: &HeaderMap, + extractor: &dyn AgentPayloadExtractor, rules: &ClassificationRules<'_>, ) -> Vec { - let normalized = normalize_name(&event_name(payload)); + let fallback_session_id = fallback_session_id(); + let normalized = normalize_name(&event_name(payload, extractor)); if matches!( normalized.as_str(), "beforesubmitprompt" | "promptsubmitted" | "userpromptsubmit" ) { return vec![ - NormalizedEvent::PromptSubmitted(common_session_event(payload, headers, rules.kind)), - NormalizedEvent::LlmHint(common_llm_hint_event(payload, headers, rules.kind)), + NormalizedEvent::PromptSubmitted(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + &fallback_session_id, + )), + NormalizedEvent::LlmHint(common_llm_hint_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + &fallback_session_id, + )), ]; } - let primary = classify_primary(payload, headers, rules); + let primary = classify_primary(payload, headers, extractor, rules, &fallback_session_id); if normalized == "stop" && !primary.is_terminal() { return vec![ primary, - NormalizedEvent::TurnEnded(common_session_event(payload, headers, rules.kind)), + NormalizedEvent::TurnEnded(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + &fallback_session_id, + )), ]; } vec![primary] } -// Classifies a raw hook event using adapter-specific lifecycle names first and generic gateway -// names second. Unknown events are intentionally converted to hook marks, not errors, so new agent -// hook types remain observable until first-class normalization rules are added. +/// Classify a raw hook event using adapter-specific names before generic names. +/// +/// Unknown events are intentionally converted to hook marks, not errors, so new +/// agent hook types remain observable until first-class normalization rules are +/// added. fn classify_primary( payload: &Value, headers: &HeaderMap, + extractor: &dyn AgentPayloadExtractor, rules: &ClassificationRules<'_>, + fallback_session_id: &str, ) -> NormalizedEvent { - let event = event_name(payload); + let event = event_name(payload, extractor); let normalized = normalize_name(&event); if rules .agent_start .iter() .any(|name| normalize_name(name) == normalized) { - NormalizedEvent::AgentStarted(common_session_event(payload, headers, rules.kind)) + NormalizedEvent::AgentStarted(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } else if rules .agent_end .iter() .any(|name| normalize_name(name) == normalized) { - NormalizedEvent::AgentEnded(common_session_event(payload, headers, rules.kind)) + NormalizedEvent::AgentEnded(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } else if rules .subagent_start .iter() .any(|name| normalize_name(name) == normalized) { - NormalizedEvent::SubagentStarted(common_subagent_event(payload, headers, rules.kind)) + NormalizedEvent::SubagentStarted(common_subagent_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } else if rules .subagent_end .iter() .any(|name| normalize_name(name) == normalized) { - NormalizedEvent::SubagentEnded(common_subagent_event(payload, headers, rules.kind)) + NormalizedEvent::SubagentEnded(common_subagent_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } else if rules .tool_start .iter() .any(|name| normalize_name(name) == normalized) { - NormalizedEvent::ToolStarted(common_tool_event(payload, headers, rules.kind)) + NormalizedEvent::ToolStarted(common_tool_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } else if rules .tool_end .iter() .any(|name| normalize_name(name) == normalized) { - NormalizedEvent::ToolEnded(common_tool_event(payload, headers, rules.kind)) + NormalizedEvent::ToolEnded(common_tool_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } else { match normalized.as_str() { "afteragentresponse" | "agentresponse" | "assistantresponse" | "afteragentthought" | "prellmcall" | "postllmcall" | "stop" => { - NormalizedEvent::LlmHint(common_llm_hint_event(payload, headers, rules.kind)) + NormalizedEvent::LlmHint(common_llm_hint_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } "precompact" | "compaction" => { - NormalizedEvent::Compaction(common_session_event(payload, headers, rules.kind)) - } - "notification" => { - NormalizedEvent::Notification(common_session_event(payload, headers, rules.kind)) + NormalizedEvent::Compaction(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )) } - _ => NormalizedEvent::HookMark(common_session_event(payload, headers, rules.kind)), + "notification" => NormalizedEvent::Notification(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )), + _ => NormalizedEvent::HookMark(common_session_event_with_fallback( + payload, + headers, + rules.kind, + extractor, + fallback_session_id, + )), } } } -// Removes separators and case differences before comparing hook names. The gateway uses this for -// agent-specific aliases so `PostToolUse`, `post_tool_use`, and `postToolUse` converge. +/// Remove separators and case differences before comparing hook names. +/// +/// The gateway uses this for agent-specific aliases so `PostToolUse`, +/// `post_tool_use`, and `postToolUse` converge. fn normalize_name(name: &str) -> String { name.chars() .filter(|character| character.is_ascii_alphanumeric()) diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index bf7213c4b..19b6c26d7 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -14,6 +14,7 @@ use nemo_relay::api::llm::LlmRequest; use serde_json::{Map, Value, json}; use crate::config::header_string; +pub(crate) use crate::json_path::{string_at_any as json_string_at, value_at_any as json_value_at}; use crate::model::{AgentKind, LlmEvent, NormalizedEvent, SessionEvent, SubagentEvent, ToolEvent}; pub(crate) mod claude_code; @@ -50,6 +51,30 @@ pub(crate) enum GatewayRouteKind { AnthropicCountTokens, } +impl GatewayRouteKind { + pub(crate) const ALL: [Self; 5] = [ + Self::OpenAiResponses, + Self::OpenAiChatCompletions, + Self::OpenAiModels, + Self::AnthropicMessages, + Self::AnthropicCountTokens, + ]; + + pub(crate) const fn name(self) -> &'static str { + match self { + Self::OpenAiResponses => "openai.responses", + Self::OpenAiChatCompletions => "openai.chat_completions", + Self::OpenAiModels => "openai.models", + Self::AnthropicMessages => "anthropic.messages", + Self::AnthropicCountTokens => "anthropic.count_tokens", + } + } + + pub(crate) fn from_provider_name(provider: &str) -> Option { + Self::ALL.into_iter().find(|route| route.name() == provider) + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) enum GatewayManagementPolicy { Managed, @@ -72,6 +97,114 @@ impl GatewayManagementPolicy { } } +/// Strategy for extracting provider-request facts used by gateway alignment. +/// +/// This stays separate from [`SessionAlignmentState`] because extraction is a +/// stateless read of request JSON, while ownership resolution is stateful and +/// depends on active scopes, hints, aliases, and recent tool activity. +pub(crate) trait ProviderRequestExtractor { + /// Extract the gateway session id using route-specific header/body rules. + fn gateway_session_id(&self, headers: &HeaderMap, body: &Value) -> Option; + + /// Build a stable request-affinity key from user task text, when supported. + fn request_affinity_key(&self, request: &LlmRequest) -> Option; + + /// Build fallback turn input for gateway calls that beat an agent prompt hook. + fn gateway_turn_input(&self, agent_kind: AgentKind, request: &LlmRequest) -> Option; +} + +struct OpenAiResponsesRequestExtractor; +struct OpenAiChatCompletionsRequestExtractor; +struct OpenAiModelsRequestExtractor; +struct AnthropicMessagesRequestExtractor; +struct AnthropicCountTokensRequestExtractor; + +static OPENAI_RESPONSES_REQUEST_EXTRACTOR: OpenAiResponsesRequestExtractor = + OpenAiResponsesRequestExtractor; +static OPENAI_CHAT_COMPLETIONS_REQUEST_EXTRACTOR: OpenAiChatCompletionsRequestExtractor = + OpenAiChatCompletionsRequestExtractor; +static OPENAI_MODELS_REQUEST_EXTRACTOR: OpenAiModelsRequestExtractor = OpenAiModelsRequestExtractor; +static ANTHROPIC_MESSAGES_REQUEST_EXTRACTOR: AnthropicMessagesRequestExtractor = + AnthropicMessagesRequestExtractor; +static ANTHROPIC_COUNT_TOKENS_REQUEST_EXTRACTOR: AnthropicCountTokensRequestExtractor = + AnthropicCountTokensRequestExtractor; + +impl ProviderRequestExtractor for OpenAiResponsesRequestExtractor { + fn gateway_session_id(&self, headers: &HeaderMap, body: &Value) -> Option { + gateway_header_session_id(headers) + .or_else(|| codex::prompt_cache_session_id(body, GatewayRouteKind::OpenAiResponses)) + .or_else(|| openai_body_session_id(body, GatewayRouteKind::OpenAiResponses)) + } + + fn request_affinity_key(&self, request: &LlmRequest) -> Option { + affinity_key_from_task_text(responses_user_task_text(&request.content)?) + } + + fn gateway_turn_input(&self, _agent_kind: AgentKind, _request: &LlmRequest) -> Option { + None + } +} + +impl ProviderRequestExtractor for OpenAiChatCompletionsRequestExtractor { + fn gateway_session_id(&self, headers: &HeaderMap, body: &Value) -> Option { + gateway_header_session_id(headers) + .or_else(|| openai_body_session_id(body, GatewayRouteKind::OpenAiChatCompletions)) + } + + fn request_affinity_key(&self, request: &LlmRequest) -> Option { + affinity_key_from_task_text(messages_user_task_text(&request.content)?) + } + + fn gateway_turn_input(&self, _agent_kind: AgentKind, _request: &LlmRequest) -> Option { + None + } +} + +impl ProviderRequestExtractor for OpenAiModelsRequestExtractor { + fn gateway_session_id(&self, headers: &HeaderMap, _body: &Value) -> Option { + gateway_header_session_id(headers) + } + + fn request_affinity_key(&self, _request: &LlmRequest) -> Option { + None + } + + fn gateway_turn_input(&self, _agent_kind: AgentKind, _request: &LlmRequest) -> Option { + None + } +} + +impl ProviderRequestExtractor for AnthropicMessagesRequestExtractor { + fn gateway_session_id(&self, headers: &HeaderMap, _body: &Value) -> Option { + gateway_header_session_id(headers) + } + + fn request_affinity_key(&self, request: &LlmRequest) -> Option { + affinity_key_from_task_text(messages_user_task_text(&request.content)?) + } + + fn gateway_turn_input(&self, agent_kind: AgentKind, request: &LlmRequest) -> Option { + if agent_kind != AgentKind::ClaudeCode { + return None; + } + messages_user_task_text(&request.content).map(|prompt| json!({ "prompt": prompt })) + } +} + +impl ProviderRequestExtractor for AnthropicCountTokensRequestExtractor { + fn gateway_session_id(&self, headers: &HeaderMap, _body: &Value) -> Option { + gateway_header_session_id(headers) + } + + fn request_affinity_key(&self, _request: &LlmRequest) -> Option { + None + } + + fn gateway_turn_input(&self, _agent_kind: AgentKind, _request: &LlmRequest) -> Option { + None + } +} + // Records that a provider-created child session is really a subagent under another session. The // session manager stores this until the child emits its terminal AgentEnded event, then removes the // alias so future unrelated events cannot be reparented through stale state. @@ -340,18 +473,38 @@ fn prune_task_sessions( // Resolves the session id for a gateway request in precedence order: // explicit NeMo Relay header, agent-native headers, agent-specific body fallbacks, then the -// generic OpenAI-compatible `session_id` body field. Keeping the provider fallbacks behind one -// function makes a new agent integration add one small alignment adapter instead of threading -// bespoke checks through gateway request construction. +/// Extract a gateway session id for the selected provider route. +/// +/// Keeping provider fallbacks behind one function makes a new agent integration +/// add one small alignment adapter instead of threading bespoke checks through +/// gateway request construction. pub(crate) fn gateway_session_id( headers: &HeaderMap, body: &Value, route: GatewayRouteKind, ) -> Option { + provider_request_extractor(route).gateway_session_id(headers, body) +} + +fn provider_request_extractor(route: GatewayRouteKind) -> &'static dyn ProviderRequestExtractor { + match route { + GatewayRouteKind::OpenAiResponses => &OPENAI_RESPONSES_REQUEST_EXTRACTOR, + GatewayRouteKind::OpenAiChatCompletions => &OPENAI_CHAT_COMPLETIONS_REQUEST_EXTRACTOR, + GatewayRouteKind::OpenAiModels => &OPENAI_MODELS_REQUEST_EXTRACTOR, + GatewayRouteKind::AnthropicMessages => &ANTHROPIC_MESSAGES_REQUEST_EXTRACTOR, + GatewayRouteKind::AnthropicCountTokens => &ANTHROPIC_COUNT_TOKENS_REQUEST_EXTRACTOR, + } +} + +fn provider_request_extractor_for_name( + provider: &str, +) -> Option<&'static dyn ProviderRequestExtractor> { + GatewayRouteKind::from_provider_name(provider).map(provider_request_extractor) +} + +fn gateway_header_session_id(headers: &HeaderMap) -> Option { header_string(headers, "x-nemo-relay-session-id") .or_else(|| claude_code::session_id_from_headers(headers)) - .or_else(|| codex::prompt_cache_session_id(body, route)) - .or_else(|| openai_body_session_id(body, route)) } fn openai_body_session_id(body: &Value, route: GatewayRouteKind) -> Option { @@ -368,9 +521,10 @@ fn openai_body_session_id(body: &Value, route: GatewayRouteKind) -> Option Option { header_string(headers, "x-nemo-relay-subagent-id") } -// Resolves a correlation identifier from a dedicated header before trying known JSON body paths. -// Header precedence lets callers disambiguate requests even when provider payloads contain stale -// or differently scoped identifiers. +/// Resolve a correlation identifier from a header or known JSON body paths. +/// +/// Header precedence lets callers disambiguate requests even when provider +/// payloads contain stale or differently scoped identifiers. pub(crate) fn gateway_identifier( headers: &HeaderMap, body: &Value, @@ -415,9 +573,10 @@ pub(crate) fn gateway_identifier( header_string(headers, header_name).or_else(|| json_string_at(body, body_paths)) } -// Infers the owning agent for a session created by a gateway request that beat its SessionStart -// hook. This is the last chance to label the root scope correctly because exporter identities are -// baked when the scope opens. +/// Infer the owning agent for a session opened first by a gateway request. +/// +/// This is the last chance to label the root scope correctly because exporter +/// identities are baked when the scope opens. pub(crate) fn agent_kind_for_gateway_provider(provider: &str) -> AgentKind { if claude_code::owns_gateway_provider(provider) { AgentKind::ClaudeCode @@ -428,6 +587,7 @@ pub(crate) fn agent_kind_for_gateway_provider(provider: &str) -> AgentKind { } } +/// Decide whether a gateway request should enter the managed correlation path. pub(crate) fn gateway_management_policy( agent_kind: AgentKind, provider: &str, @@ -446,17 +606,20 @@ pub(crate) fn gateway_management_policy( } } -// Not every harness has a reliable process/session end signal. Claude Code and Codex sessions can -// outlive a user-visible run, so the CLI represents their work with bounded turn scopes instead of -// exporting a long-lived agent scope that needs synthetic termination. +/// Decide whether this agent kind should emit a long-lived session agent scope. +/// +/// Claude Code and Codex can outlive a user-visible run, so the CLI represents +/// their work with bounded turn scopes instead of exporting a long-lived agent +/// scope that needs synthetic termination. pub(crate) fn should_emit_session_agent_scope(agent_kind: AgentKind) -> bool { !matches!(agent_kind, AgentKind::ClaudeCode | AgentKind::Codex) } -// Detects agent harnesses that report a child session which should become a subagent under another -// session. Codex is the first such harness: it starts child threads with parent-thread metadata. -// Future harness-specific detectors should plug in here so the session manager can stay provider -// neutral. +/// Detect child sessions that should become subagents under another session. +/// +/// Codex starts child threads with parent-thread metadata. Future +/// harness-specific detectors should plug in here so the session manager can +/// stay provider neutral. pub(crate) async fn subagent_session_context( event: &SessionEvent, ) -> Option { @@ -466,8 +629,10 @@ pub(crate) async fn subagent_session_context( .or_else(|| hermes::subagent_context(event).map(SubagentSessionContext::Hermes)) } -// Converts an AgentStarted event into a pending child-session record when a harness explicitly -// reports parentage. The caller still decides whether the child session is empty enough to promote. +/// Convert an agent start into a pending child-session record when possible. +/// +/// The caller still decides whether the child session is empty enough to +/// promote. pub(crate) async fn pending_subagent_start( event: &mut NormalizedEvent, ) -> Option<(String, PendingSubagentStart)> { @@ -490,8 +655,10 @@ pub(crate) async fn pending_subagent_start( )) } -// Lets the owning alignment adapter stamp provider-specific debug fields on a child SessionStart -// before the generic session manager promotes it to a subagent. +/// Stamp provider-specific debug fields onto child-session metadata. +/// +/// This runs before the generic session manager promotes the child session to a +/// subagent. pub(crate) fn augment_subagent_session_metadata( metadata: Value, context: &SubagentSessionContext, @@ -506,9 +673,10 @@ pub(crate) fn augment_subagent_session_metadata( } } -// Converts a child SessionStart into the provider-appropriate SubagentStarted event. The session -// manager only knows that a child session should be promoted; the adapter owns how to preserve the -// provider's original metadata. +/// Convert a child session start into a provider-appropriate subagent start. +/// +/// The session manager only knows that a child session should be promoted; the +/// adapter owns how to preserve provider-specific metadata. pub(crate) fn subagent_start_event( event: &SessionEvent, context: &SubagentSessionContext, @@ -519,8 +687,10 @@ pub(crate) fn subagent_start_event( } } -// Builds the alias used to route later child-session events through the promoted parent/subagent -// pair. The adapter supplies provider-specific metadata explaining why the alias exists. +/// Build the alias used to route later child-session events through a parent. +/// +/// The adapter supplies provider-specific metadata explaining why the alias +/// exists. pub(crate) fn alias_for_child_session( child_session_id: String, context: &SubagentSessionContext, @@ -535,6 +705,7 @@ pub(crate) fn alias_for_child_session( } } +/// Extract an explicit child-session alias from a subagent-start event. pub(crate) fn explicit_subagent_alias( event: &mut NormalizedEvent, ) -> Option<(String, SessionAlias)> { @@ -547,9 +718,10 @@ pub(crate) fn explicit_subagent_alias( Some((explicit.child_session_id, explicit.alias)) } -// Recovers provider-specific metadata from a subagent scope and copies only the fields that should -// follow LLM spans. Codex contributes thread identifiers today; other harnesses can add filters -// here without changing session ownership code. +/// Recover provider-specific metadata that should follow owned LLM spans. +/// +/// Codex contributes thread identifiers today; other harnesses can add filters +/// here without changing session ownership code. pub(crate) fn llm_owner_metadata(scope_metadata: Option<&Value>) -> Value { merge_metadata( codex::llm_owner_metadata(scope_metadata), @@ -557,40 +729,40 @@ pub(crate) fn llm_owner_metadata(scope_metadata: Option<&Value>) -> Value { ) } -// Builds a provider-neutral affinity key from the user task text inside common LLM request -// formats. Coding agents often replay the same task prompt on later provider calls without a -// worker id; this key lets session correlation pair those calls with the subagent that first owned -// the task. The extractor understands Anthropic Messages, OpenAI Chat Completions, and OpenAI -// Responses shapes, and deliberately ignores raw count-token/file payloads. -pub(crate) fn request_affinity_key(request: &LlmRequest) -> Option { - let task_text = request_user_task_text(&request.content)?; - let normalized = normalize_affinity_text(&task_text); - (normalized.chars().count() >= REQUEST_AFFINITY_KEY_MIN_CHARS) - .then(|| truncate_affinity_text(&normalized, REQUEST_AFFINITY_KEY_MAX_CHARS)) +/// Build a route-specific affinity key from provider request user task text. +/// +/// Coding agents often replay the same task prompt on later provider calls +/// without a worker id; this key lets session correlation pair those calls with +/// the subagent that first owned the same task. +pub(crate) fn request_affinity_key(provider: &str, request: &LlmRequest) -> Option { + provider_request_extractor_for_name(provider)?.request_affinity_key(request) } -// Builds a non-null turn input when a direct gateway request arrives before the prompt hook. This -// is intentionally limited to Claude-owned Anthropic Messages requests because Claude installed -// mode is the path where the provider request can race the `UserPromptSubmit` hook. +/// Build fallback turn input when a gateway request arrives before a prompt hook. +/// +/// This is intentionally limited to Claude-owned Anthropic Messages requests +/// because Claude installed mode is the path where provider requests can race +/// the `UserPromptSubmit` hook. pub(crate) fn gateway_turn_input( agent_kind: AgentKind, provider: &str, request: &LlmRequest, ) -> Option { - if agent_kind != AgentKind::ClaudeCode || provider != "anthropic.messages" { - return None; - } - request_user_task_text(&request.content).map(|prompt| json!({ "prompt": prompt })) + provider_request_extractor_for_name(provider)?.gateway_turn_input(agent_kind, request) } -// Detects tool results that imply a subagent completed. Claude Code reports this through the -// `Agent` tool today; keeping the check here avoids leaking that tool shape into session teardown. +/// Detect tool results that imply a subagent completed. +/// +/// Claude Code reports this through the `Agent` tool today; keeping the check +/// here avoids leaking that tool shape into session teardown. pub(crate) fn completed_subagent_from_tool(event: &ToolEvent) -> Option { claude_code::completed_subagent_from_agent_tool(event) } -// Some harnesses route child-session turn boundaries through a parent-owned subagent alias. A -// child turn end should close that subagent, not the parent turn containing all sibling work. +/// Return the aliased subagent id that should own a child turn end. +/// +/// A child turn end should close that subagent, not the parent turn containing +/// all sibling work. pub(crate) fn aliased_turn_subagent_id(event: &SessionEvent) -> Option { json_string_at( &event.metadata, @@ -603,9 +775,11 @@ pub(crate) fn aliased_turn_subagent_id(event: &SessionEvent) -> Option { ) } -// Routes events from an aliased child session through the parent session/subagent pair. The alias -// records why the child is not a top-level agent; this generic router only rewrites ownership and -// preserves the adapter-supplied metadata for filtering/debugging in Phoenix. +/// Route events from an aliased child session through the parent/subagent pair. +/// +/// The alias records why the child is not a top-level agent; this generic router +/// only rewrites ownership and preserves the adapter-supplied metadata for +/// filtering and debugging in Phoenix. pub(crate) fn route_event_through_alias( event: NormalizedEvent, aliases: &HashMap, @@ -869,13 +1043,24 @@ const TASK_SESSION_SCOPE_PATHS: &[&[&str]] = &[ &["extra", "parentSessionId"], ]; -fn request_user_task_text(payload: &Value) -> Option { +fn messages_user_task_text(payload: &Value) -> Option { payload .get("messages") .and_then(Value::as_array) .and_then(|messages| messages.iter().rev().find_map(user_message_task_text)) - .or_else(|| responses_input_task_text(payload.get("input")?)) - .or_else(|| prompt_task_text(payload.get("prompt")?)) +} + +fn responses_user_task_text(payload: &Value) -> Option { + payload + .get("input") + .and_then(responses_input_task_text) + .or_else(|| payload.get("prompt").and_then(prompt_task_text)) +} + +fn affinity_key_from_task_text(task_text: String) -> Option { + let normalized = normalize_affinity_text(&task_text); + (normalized.chars().count() >= REQUEST_AFFINITY_KEY_MIN_CHARS) + .then(|| truncate_affinity_text(&normalized, REQUEST_AFFINITY_KEY_MAX_CHARS)) } fn user_message_task_text(message: &Value) -> Option { @@ -944,33 +1129,10 @@ fn truncate_affinity_text(text: &str, max_chars: usize) -> String { text.chars().take(max_chars).collect() } -// Reads the first string-like value from any candidate JSON path. Scalar numbers and booleans are -// accepted for IDs because provider payloads are not always strict about identifier types. -pub(crate) fn json_string_at(payload: &Value, paths: &[&[&str]]) -> Option { - json_value_at(payload, paths) - .and_then(|value| match value { - Value::String(value) => Some(value), - Value::Number(value) => Some(value.to_string()), - Value::Bool(value) => Some(value.to_string()), - _ => None, - }) - .filter(|value| !value.is_empty()) -} - -// Reads the first JSON value from any candidate path. The clone is intentional because extracted -// correlation data must live independently of the provider payload it was read from. -pub(crate) fn json_value_at(payload: &Value, paths: &[&[&str]]) -> Option { - paths.iter().find_map(|path| { - let mut current = payload; - for key in *path { - current = current.get(*key)?; - } - Some(current.clone()) - }) -} - -// Inserts an optional string value into a JSON object while omitting absent fields entirely. This -// keeps correlation metadata compact and avoids serializing nulls as meaningful observations. +/// Insert an optional string value into a JSON object. +/// +/// Absent fields are omitted entirely to keep correlation metadata compact and +/// avoid serializing nulls as meaningful observations. pub(crate) fn insert_optional(object: &mut Map, key: &str, value: Option<&str>) { if let Some(value) = value { object.insert(key.to_string(), json!(value)); diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index 2d5f9b797..87e0a34eb 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -955,16 +955,11 @@ impl ProviderRoute { } } - // Returns the provider route name recorded in LLM event metadata. These names split OpenAI API - // variants because their request/response schemas differ even when they share a base URL. + // Returns the provider route name recorded on managed LLM events. These names split OpenAI API + // variants because their request/response schemas differ even when they share a base URL, and + // they double as codec hints for ambiguous provider request shapes. const fn name(self) -> &'static str { - match self { - Self::OpenAiResponses => "openai.responses", - Self::OpenAiChatCompletions => "openai.chat_completions", - Self::OpenAiModels => "openai.models", - Self::AnthropicMessages => "anthropic.messages", - Self::AnthropicCountTokens => "anthropic.count_tokens", - } + self.alignment_route().name() } // Builds the upstream URL by combining the configured provider base with the original path and diff --git a/crates/cli/src/json_path.rs b/crates/cli/src/json_path.rs new file mode 100644 index 000000000..6e8028bfa --- /dev/null +++ b/crates/cli/src/json_path.rs @@ -0,0 +1,50 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Small JSON path helpers shared by CLI adapter and alignment code. + +use serde_json::Value; + +/// Read a nested value using exact object-key traversal. +/// +/// Missing intermediate keys stop the lookup without error so callers can +/// express schema precedence by chaining candidate paths. +pub(crate) fn value_at(payload: &Value, path: &[&str]) -> Option { + let mut current = payload; + for key in path { + current = current.get(*key)?; + } + Some(current.clone()) +} + +/// Read the first JSON value from any candidate path. +/// +/// The clone is intentional because extracted correlation data must live +/// independently of the payload it was read from. +pub(crate) fn value_at_any(payload: &Value, paths: &[&[&str]]) -> Option { + paths.iter().find_map(|path| value_at(payload, path)) +} + +/// Read a nested value as a string-like scalar. +/// +/// Numbers and booleans are accepted because both agent and provider schemas may +/// encode identifiers or flags without string types. Empty strings are treated +/// as absent. +pub(crate) fn string_at(payload: &Value, path: &[&str]) -> Option { + value_at(payload, path) + .and_then(|value| match value { + Value::String(value) => Some(value), + Value::Number(value) => Some(value.to_string()), + Value::Bool(value) => Some(value.to_string()), + _ => None, + }) + .filter(|value| !value.is_empty()) +} + +/// Read the first string-like value from any candidate JSON path. +/// +/// Paths that exist but contain non-scalar or empty values are skipped so a +/// later compatible field can supply the correlation value. +pub(crate) fn string_at_any(payload: &Value, paths: &[&[&str]]) -> Option { + paths.iter().find_map(|path| string_at(payload, path)) +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index e78dc538e..cfc96bdf9 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -12,6 +12,7 @@ mod doctor; mod error; mod gateway; mod installer; +mod json_path; mod launcher; mod model; mod model_pricing; diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index b944cd559..c108c0b57 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -971,6 +971,7 @@ impl Session { } let owner = self.resolve_llm_owner(&start); self.record_llm_request_affinity( + &start.provider, &start.request, owner.subagent_id.as_deref(), owner.status, @@ -1034,6 +1035,7 @@ impl Session { self.resolve_llm_owner(&start) }; self.record_llm_request_affinity( + &start.provider, &start.request, owner.subagent_id.as_deref(), owner.status, @@ -1848,7 +1850,7 @@ impl Session { // pair unhinted Anthropic Messages, OpenAI Chat Completions, and OpenAI Responses calls with // the subagent that first owned the same coding task. fn request_affinity_owner(&mut self, start: &LlmGatewayStart) -> Option { - let key = alignment::request_affinity_key(&start.request)?; + let key = alignment::request_affinity_key(&start.provider, &start.request)?; let subagent_id = self.llm_request_affinity.get(&key).cloned().flatten()?; let parent = match self.subagents.get(&subagent_id).cloned() { Some(parent) => parent, @@ -2041,6 +2043,7 @@ impl Session { // is meant to correct when multiple coding-agent workers share a root session. fn record_llm_request_affinity( &mut self, + provider: &str, request: &LlmRequest, subagent_id: Option<&str>, status: &str, @@ -2051,7 +2054,7 @@ impl Session { let Some(subagent_id) = subagent_id else { return; }; - let Some(key) = alignment::request_affinity_key(request) else { + let Some(key) = alignment::request_affinity_key(provider, request) else { return; }; match self.llm_request_affinity.get_mut(&key) { diff --git a/crates/cli/tests/coverage/adapters_tests.rs b/crates/cli/tests/coverage/adapters_tests.rs index 8ca0acb8d..f2ab8e64a 100644 --- a/crates/cli/tests/coverage/adapters_tests.rs +++ b/crates/cli/tests/coverage/adapters_tests.rs @@ -28,10 +28,13 @@ fn maps_claude_canonical_tool_payload() { assert_eq!(event.tool_call_id, "toolu-1"); assert_eq!(event.tool_name, "Read"); assert_eq!(event.arguments, json!({ "file_path": "README.md" })); + assert!(event.metadata.get("transcript_path").is_none()); + assert!(event.metadata.get("cwd").is_none()); assert_eq!( - event.metadata["transcript_path"], + event.payload["transcript_path"], json!("/tmp/transcript.jsonl") ); + assert_eq!(event.payload["cwd"], json!("/workspace")); } event => panic!("unexpected event: {event:?}"), } @@ -180,6 +183,24 @@ fn stop_hook_emits_turn_ended_for_codex() { ); } +#[test] +fn multi_event_hooks_reuse_synthetic_session_id() { + let outcome = codex::adapt( + json!({ "hook_event_name": "UserPromptSubmit" }), + &HeaderMap::new(), + ); + assert_eq!(outcome.events.len(), 2); + let prompt_session_id = outcome.events[0].session_id(); + assert!(prompt_session_id.starts_with("hook-")); + assert_eq!(outcome.events[1].session_id(), prompt_session_id); + + let outcome = codex::adapt(json!({ "hook_event_name": "Stop" }), &HeaderMap::new()); + assert_eq!(outcome.events.len(), 2); + let hint_session_id = outcome.events[0].session_id(); + assert!(hint_session_id.starts_with("hook-")); + assert_eq!(outcome.events[1].session_id(), hint_session_id); +} + #[test] fn stop_hook_emits_turn_ended_for_claude() { let outcome = claude_code::adapt( @@ -208,6 +229,278 @@ fn adapter_string_lookup_accepts_scalar_values_only() { assert_eq!(string_at(&payload, &["object"]), None); } +#[test] +fn agent_extractors_keep_fallbacks_at_adapter_boundary() { + let headers = HeaderMap::new(); + let payload = json!({}); + + fn assert_fallbacks( + extractor: &dyn AgentPayloadExtractor, + kind: AgentKind, + payload: &serde_json::Value, + headers: &HeaderMap, + ) { + assert_eq!(extractor.session_id(payload, headers), None); + assert_eq!(extractor.event_name(payload), None); + assert_eq!(extractor.subagent_id(payload, headers), None); + assert_eq!( + extractor.llm_hint(payload, headers), + ExtractedLlmHint::default() + ); + assert_eq!( + extractor.tool_call(payload, headers, "PreToolUse"), + ExtractedToolCall { + tool_call_id: None, + tool_name: None, + subagent_id: None, + arguments: None, + result: None, + status: None, + } + ); + + assert!(session_id(payload, headers, extractor).starts_with("hook-")); + assert_eq!(event_name(payload, extractor), "unknown"); + + let event = common_tool_event_with_fallback(payload, headers, kind, extractor, "hook-test"); + assert!(event.tool_call_id.starts_with("tool-")); + assert_eq!(event.tool_name, "unknown_tool"); + assert_eq!(event.arguments, json!(null)); + assert_eq!(event.result, json!(null)); + } + + assert_fallbacks( + &CLAUDE_CODE_PAYLOAD_EXTRACTOR, + AgentKind::ClaudeCode, + &payload, + &headers, + ); + assert_fallbacks( + &CODEX_PAYLOAD_EXTRACTOR, + AgentKind::Codex, + &payload, + &headers, + ); + assert_fallbacks( + &HERMES_PAYLOAD_EXTRACTOR, + AgentKind::Hermes, + &payload, + &headers, + ); +} + +#[test] +fn codex_extractor_reads_agent_hint_and_tool_call_fields() { + let headers = HeaderMap::new(); + let payload = json!({ + "subagent_id": "worker-1", + "agent": { + "id": "agent-1", + "type": "reviewer" + }, + "conversationId": "conversation-1", + "generation": { "id": "generation-1" }, + "request": { "id": "request-1" }, + "modelName": "gpt-test", + "tool_call_id": "tool-call-1", + "tool": { "name": "search" }, + "arguments": { "query": "needle" }, + "result": { "matches": 2 }, + "status": "success" + }); + + assert_eq!( + CODEX_PAYLOAD_EXTRACTOR.llm_hint(&payload, &headers), + ExtractedLlmHint { + subagent_id: Some("worker-1".into()), + agent_id: Some("agent-1".into()), + agent_type: Some("reviewer".into()), + conversation_id: Some("conversation-1".into()), + generation_id: Some("generation-1".into()), + request_id: Some("request-1".into()), + model: Some("gpt-test".into()), + } + ); + assert_eq!( + CODEX_PAYLOAD_EXTRACTOR.tool_call(&payload, &headers, "PostToolUse"), + ExtractedToolCall { + tool_call_id: Some("tool-call-1".into()), + tool_name: Some("search".into()), + subagent_id: Some("worker-1".into()), + arguments: Some(json!({ "query": "needle" })), + result: Some(json!({ "matches": 2 })), + status: Some("success".into()), + } + ); +} + +#[test] +fn agent_extractors_prefer_extra_call_ids_over_structural_ids() { + let headers = HeaderMap::new(); + let payload = json!({ + "hook_event_name": "PostToolUse", + "tool": { "id": "tool-structural" }, + "tool_input": { "id": "argument-id" }, + "id": "event-id", + "extra": { + "call_id": "extra-call" + } + }); + + for extractor in [ + &CLAUDE_CODE_PAYLOAD_EXTRACTOR as &dyn AgentPayloadExtractor, + &CODEX_PAYLOAD_EXTRACTOR, + &HERMES_PAYLOAD_EXTRACTOR, + ] { + assert_eq!( + extractor + .tool_call(&payload, &headers, "PostToolUse") + .tool_call_id + .as_deref(), + Some("extra-call") + ); + } +} + +#[test] +fn agent_extractors_keep_hook_event_name_precedence() { + let payload = json!({ + "hook_event_name": "hook-winner", + "event_name": "event-name-loser", + "eventName": "event-name-camel-loser", + "event": "event-loser" + }); + + for extractor in [ + &CLAUDE_CODE_PAYLOAD_EXTRACTOR as &dyn AgentPayloadExtractor, + &CODEX_PAYLOAD_EXTRACTOR, + &HERMES_PAYLOAD_EXTRACTOR, + ] { + assert_eq!( + extractor.event_name(&payload).as_deref(), + Some("hook-winner") + ); + } +} + +#[test] +fn claude_extractor_prefers_native_tool_use_id() { + let headers = HeaderMap::new(); + let payload = json!({ + "hook_event_name": "PreToolUse", + "tool_use_id": "claude-toolu", + "tool_call_id": "generic-tool", + "extra": { + "call_id": "extra-call" + } + }); + + assert_eq!( + CLAUDE_CODE_PAYLOAD_EXTRACTOR + .tool_call(&payload, &headers, "PreToolUse") + .tool_call_id + .as_deref(), + Some("claude-toolu") + ); +} + +#[test] +fn codex_extractor_prefers_codex_specific_fields() { + let headers = HeaderMap::new(); + let payload = json!({ + "source": { + "subagent": { + "thread_spawn": { + "agent_nickname": "codex-reviewer" + } + } + }, + "subagent": { "id": "nested-subagent" }, + "arguments": { "cmd": "cargo test" }, + "tool_input": { "cmd": "ignored", "id": "argument-id" }, + "extra": { "call_id": "extra-call" } + }); + let tool_call = CODEX_PAYLOAD_EXTRACTOR.tool_call(&payload, &headers, "toolEnded"); + + assert_eq!(tool_call.subagent_id.as_deref(), Some("codex-reviewer")); + assert_eq!(tool_call.tool_call_id.as_deref(), Some("extra-call")); + assert_eq!(tool_call.arguments, Some(json!({ "cmd": "cargo test" }))); +} + +#[test] +fn hermes_extractor_prefers_child_subagent_and_claude_session_header() { + let mut headers = HeaderMap::new(); + headers.insert( + "x-claude-code-session-id", + "claude-session".parse().unwrap(), + ); + let payload = json!({ + "subagent_id": "generic-subagent", + "child_subagent_id": "hermes-child" + }); + + assert_eq!( + HERMES_PAYLOAD_EXTRACTOR + .session_id(&payload, &headers) + .as_deref(), + Some("claude-session") + ); + assert_eq!( + HERMES_PAYLOAD_EXTRACTOR + .subagent_id(&payload, &headers) + .as_deref(), + Some("hermes-child") + ); + + let nested_payload = json!({ + "subagent": { "id": "nested-subagent" }, + "extra": { + "subagent_id": "extra-subagent" + } + }); + assert_eq!( + HERMES_PAYLOAD_EXTRACTOR + .subagent_id(&nested_payload, &headers) + .as_deref(), + Some("nested-subagent") + ); +} + +#[test] +fn codex_extractor_ignores_claude_session_header() { + let mut headers = HeaderMap::new(); + headers.insert( + "x-claude-code-session-id", + "claude-session".parse().unwrap(), + ); + + // RelayOnly: unlike Claude Code and Hermes, Codex must not adopt the Claude + // installed-mode session header. With no native session id the extractor + // returns None, and the adapter boundary applies the synthetic fallback. + assert_eq!( + CODEX_PAYLOAD_EXTRACTOR.session_id(&json!({}), &headers), + None + ); + + // The Claude header must not win over the native payload session id either. + let payload = json!({ "session_id": "codex-native" }); + assert_eq!( + CODEX_PAYLOAD_EXTRACTOR + .session_id(&payload, &headers) + .as_deref(), + Some("codex-native") + ); + + // The NeMo Relay session header is still honored and takes precedence. + headers.insert("x-nemo-relay-session-id", "relay-session".parse().unwrap()); + assert_eq!( + CODEX_PAYLOAD_EXTRACTOR + .session_id(&payload, &headers) + .as_deref(), + Some("relay-session") + ); +} + #[test] fn keeps_codex_response_unwrapped() { let headers = HeaderMap::new(); @@ -627,6 +920,59 @@ fn maps_hermes_null_request_as_lossy_summary() { } } +#[test] +fn normalizes_mark_style_events_and_header_session_ids() { + let mut headers = HeaderMap::new(); + headers.insert("x-nemo-relay-session-id", "header-session".parse().unwrap()); + headers.insert("x-nemo-relay-config-profile", "coverage".parse().unwrap()); + + for (event_name, expected) in [ + ("UserPromptSubmit", "prompt"), + ("afterAgentResponse", "response"), + ("PreCompact", "compact"), + ("Notification", "notification"), + ("Unrecognized.Event", "hook"), + ] { + let outcome = codex::adapt( + json!({ + "eventName": event_name, + "model": "model-a", + "cwd": "/repo" + }), + &headers, + ); + let (session_id, metadata, payload) = match &outcome.events[0] { + NormalizedEvent::PromptSubmitted(event) if expected == "prompt" => { + (event.session_id.as_str(), &event.metadata, &event.payload) + } + NormalizedEvent::LlmHint(event) if expected == "response" => { + (event.session_id.as_str(), &event.metadata, &event.payload) + } + NormalizedEvent::Compaction(event) if expected == "compact" => { + (event.session_id.as_str(), &event.metadata, &event.payload) + } + NormalizedEvent::Notification(event) if expected == "notification" => { + (event.session_id.as_str(), &event.metadata, &event.payload) + } + NormalizedEvent::HookMark(event) if expected == "hook" => { + (event.session_id.as_str(), &event.metadata, &event.payload) + } + event => panic!("unexpected event for {event_name}: {event:?}"), + }; + if expected == "prompt" { + assert!( + matches!(outcome.events.get(1), Some(NormalizedEvent::LlmHint(_))), + "prompt hooks should also emit a private LLM hint" + ); + } + assert_eq!(session_id, "header-session"); + assert_eq!(metadata["model"], json!("model-a")); + assert!(metadata.get("cwd").is_none()); + assert_eq!(payload["cwd"], json!("/repo")); + assert_eq!(metadata["gateway_config_profile"], json!("coverage")); + } +} + #[test] fn maps_hermes_llm_hooks_to_private_hints() { let headers = HeaderMap::new(); diff --git a/crates/cli/tests/coverage/alignment_tests.rs b/crates/cli/tests/coverage/alignment_tests.rs index 763f683ee..7a3f22b18 100644 --- a/crates/cli/tests/coverage/alignment_tests.rs +++ b/crates/cli/tests/coverage/alignment_tests.rs @@ -322,7 +322,7 @@ fn request_affinity_key_reads_messages_content_blocks() { }; assert_eq!( - request_affinity_key(&request).as_deref(), + request_affinity_key("anthropic.messages", &request).as_deref(), Some("Analyze the python binding with detail.") ); } @@ -340,7 +340,7 @@ fn request_affinity_key_reads_chat_completion_string_messages() { }; assert_eq!( - request_affinity_key(&request).as_deref(), + request_affinity_key("openai.chat_completions", &request).as_deref(), Some("Review the Rust CLI gateway alignment code.") ); } @@ -360,7 +360,7 @@ fn request_affinity_key_preserves_leading_tagged_context_text() { }; assert_eq!( - request_affinity_key(&request).as_deref(), + request_affinity_key("anthropic.messages", &request).as_deref(), Some( " Trace run 7. Today is 2026-05-19. Review the gateway correlation logic." ) @@ -383,7 +383,7 @@ fn request_affinity_key_keeps_task_after_large_prefix() { }), }; - let key = request_affinity_key(&request).unwrap(); + let key = request_affinity_key("anthropic.messages", &request).unwrap(); assert!(key.starts_with("volatile context")); assert!( key.ends_with(task), @@ -406,7 +406,7 @@ fn request_affinity_key_preserves_fully_tagged_prompt_text() { }; assert_eq!( - request_affinity_key(&request).as_deref(), + request_affinity_key("anthropic.messages", &request).as_deref(), Some("Review the gateway correlation logic.") ); } @@ -434,7 +434,7 @@ fn request_affinity_key_prefers_latest_task_message_over_root_history() { }; assert_eq!( - request_affinity_key(&request).as_deref(), + request_affinity_key("openai.chat_completions", &request).as_deref(), Some("Thoroughly explore the crates/ffi directory.") ); } @@ -465,11 +465,11 @@ fn request_affinity_key_reads_responses_input_items_and_prompt() { }; assert_eq!( - request_affinity_key(&responses_request).as_deref(), + request_affinity_key("openai.responses", &responses_request).as_deref(), Some("Analyze the Node binding architecture.") ); assert_eq!( - request_affinity_key(&prompt_request).as_deref(), + request_affinity_key("openai.responses", &prompt_request).as_deref(), Some("Summarize the Go binding architecture.") ); } @@ -481,7 +481,10 @@ fn request_affinity_key_ignores_count_token_style_payloads() { content: json!("// source text without a chat user task"), }; - assert_eq!(request_affinity_key(&request), None); + assert_eq!( + request_affinity_key("anthropic.count_tokens", &request), + None + ); } #[test] @@ -515,9 +518,18 @@ fn request_affinity_key_ignores_tool_results_and_sidecar_json() { }), }; - assert_eq!(request_affinity_key(&tool_result), None); - assert_eq!(request_affinity_key(&sidecar_json), None); - assert_eq!(request_affinity_key(&sidecar_json_array), None); + assert_eq!( + request_affinity_key("anthropic.messages", &tool_result), + None + ); + assert_eq!( + request_affinity_key("openai.responses", &sidecar_json), + None + ); + assert_eq!( + request_affinity_key("openai.responses", &sidecar_json_array), + None + ); } #[test] @@ -614,6 +626,14 @@ fn json_helpers_and_metadata_merge_cover_edge_shapes() { json_value_at(&payload, &[&["object"][..]]), Some(json!({ "nested": true })) ); + assert_eq!( + json_string_at( + &payload, + &[&["object"][..], &["empty"][..], &["string"][..]] + ) + .as_deref(), + Some("value") + ); let mut inserted = Map::new(); insert_optional(&mut inserted, "present", Some("value")); diff --git a/crates/cli/tests/coverage/gateway_tests.rs b/crates/cli/tests/coverage/gateway_tests.rs index 43b956fa6..35b63bc34 100644 --- a/crates/cli/tests/coverage/gateway_tests.rs +++ b/crates/cli/tests/coverage/gateway_tests.rs @@ -102,6 +102,22 @@ fn selects_provider_routes() { assert_eq!(ProviderRoute::from_path("/unsupported"), None); } +#[test] +fn provider_route_names_round_trip_through_alignment_routes() { + for route in [ + ProviderRoute::OpenAiResponses, + ProviderRoute::OpenAiChatCompletions, + ProviderRoute::OpenAiModels, + ProviderRoute::AnthropicMessages, + ProviderRoute::AnthropicCountTokens, + ] { + assert_eq!( + GatewayRouteKind::from_provider_name(route.name()), + Some(route.alignment_route()) + ); + } +} + #[test] fn provider_routes_preserve_path_query_and_choose_upstream() { let config = GatewayConfig { diff --git a/crates/core/src/api/event.rs b/crates/core/src/api/event.rs index 8d0b1e188..520509ced 100644 --- a/crates/core/src/api/event.rs +++ b/crates/core/src/api/event.rs @@ -39,7 +39,10 @@ impl EventNormalizationExt for Event { return Some(Cow::Borrowed(annotated.as_ref())); } let request: LlmRequest = serde_json::from_value(self.input()?.clone()).ok()?; - resolve::normalize_request(&request).map(Cow::Owned) + // Managed LLM events use the provider route as the event name (for + // example, "anthropic.messages"), which doubles as the codec hint for + // shape-identical request bodies. + resolve::normalize_request_with_hint(&request, Some(self.name())).map(Cow::Owned) } fn normalized_llm_response(&self) -> Option> { diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index 0af799d04..8e6cfd88b 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -41,6 +41,10 @@ pub struct LlmHandle { #[builder(default = Utc::now())] pub started_at: DateTime, /// Provider or logical call name recorded on lifecycle events. + /// + /// Gateway-managed provider calls use provider route names such as + /// `anthropic.messages`; event normalization may reuse those route names as + /// codec hints when raw request shapes overlap across providers. #[builder(setter(into))] pub name: String, /// Optional application payload stored on the handle. @@ -64,7 +68,8 @@ pub struct LlmHandle { #[derive(Debug, Clone, TypedBuilder)] #[builder(field_defaults(setter(strip_option(ignore_invalid, fallback_suffix = "_opt"))))] pub struct CreateLlmHandleParams<'a> { - /// Logical provider or model family name. + /// Logical provider or model family name. Gateway-managed provider calls + /// should pass the provider route name, for example `anthropic.messages`. pub name: &'a str, /// Optional parent scope UUID. #[builder(default)] diff --git a/crates/core/src/api/runtime/state.rs b/crates/core/src/api/runtime/state.rs index 70276786d..adcfffa0c 100644 --- a/crates/core/src/api/runtime/state.rs +++ b/crates/core/src/api/runtime/state.rs @@ -416,7 +416,9 @@ impl NemoRelayContextState { /// Create a new LLM handle. /// /// # Parameters - /// - `name`: Logical provider or model family name. + /// - `name`: Logical provider or model family name. Gateway-managed LLM + /// calls use provider route names such as `anthropic.messages`, which + /// become the emitted event name. /// - `parent_uuid`: Optional parent scope UUID. /// - `attributes`: LLM attribute bitflags. /// - `data`: Optional application payload stored on the handle. diff --git a/crates/core/src/codec/anthropic.rs b/crates/core/src/codec/anthropic.rs index e3596fe37..3243888c8 100644 --- a/crates/core/src/codec/anthropic.rs +++ b/crates/core/src/codec/anthropic.rs @@ -26,7 +26,7 @@ use super::request::{ AnnotatedLlmRequest, FunctionDefinition, GenerationParams, Message, MessageContent, ToolChoice, ToolChoiceFunction, ToolChoiceFunctionName, ToolDefinition, }; -use super::resolve::{ProviderSurface, SurfaceDescriptor}; +use super::resolve::{ProviderSurface, ProviderSurfaceDescriptor}; use super::response::{ AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, RawUsageCost, ResponseToolCall, Usage, estimate_cost_for_provider, infer_model_provider, provider_reported_cost, @@ -40,16 +40,15 @@ use super::traits::{LlmCodec, LlmResponseCodec}; /// Built-in codec for the Anthropic Messages API. pub struct AnthropicMessagesCodec; -// --------------------------------------------------------------------------- -// Built-in surface descriptor (codec-owned detection, registered in resolve) -// --------------------------------------------------------------------------- - -pub(crate) const SURFACE_DESCRIPTOR: SurfaceDescriptor = SurfaceDescriptor { +pub(crate) const PROVIDER_SURFACE: ProviderSurfaceDescriptor = ProviderSurfaceDescriptor { surface: ProviderSurface::AnthropicMessages, detect_request: |obj, hint| { // A system-less Anthropic request is shape-identical to OpenAI Chat; - // the "anthropic" hint disambiguates it. - obj.contains_key("system") || (hint == Some("anthropic") && obj.contains_key("messages")) + // a recognized Anthropic provider hint disambiguates it. + let hinted_anthropic = hint.is_some_and(|hint_value| { + hint_value == "anthropic" || hint_value == "anthropic.messages" + }); + obj.contains_key("system") || (hinted_anthropic && obj.contains_key("messages")) }, detect_response: |obj| { obj.get("type").and_then(Json::as_str) == Some("message") diff --git a/crates/core/src/codec/openai_chat.rs b/crates/core/src/codec/openai_chat.rs index 560026d7c..ccdc7d60e 100644 --- a/crates/core/src/codec/openai_chat.rs +++ b/crates/core/src/codec/openai_chat.rs @@ -13,7 +13,7 @@ use crate::error::{FlowError, Result}; use crate::json::Json; use super::request::{AnnotatedLlmRequest, GenerationParams, Message, ToolChoice, ToolDefinition}; -use super::resolve::{ProviderSurface, SurfaceDescriptor}; +use super::resolve::{ProviderSurface, ProviderSurfaceDescriptor}; use super::response::{ AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, RawUsageCost, ResponseToolCall, Usage, estimate_cost_for_provider, infer_model_provider, provider_reported_cost, @@ -27,11 +27,7 @@ use super::traits::{LlmCodec, LlmResponseCodec}; /// Built-in codec for the OpenAI Chat Completions API. pub struct OpenAIChatCodec; -// --------------------------------------------------------------------------- -// Built-in surface descriptor (codec-owned detection, registered in resolve) -// --------------------------------------------------------------------------- - -pub(crate) const SURFACE_DESCRIPTOR: SurfaceDescriptor = SurfaceDescriptor { +pub(crate) const PROVIDER_SURFACE: ProviderSurfaceDescriptor = ProviderSurfaceDescriptor { surface: ProviderSurface::OpenAIChat, detect_request: |obj, _| obj.contains_key("messages"), detect_response: |obj| obj.get("choices").is_some_and(Json::is_array), diff --git a/crates/core/src/codec/openai_responses.rs b/crates/core/src/codec/openai_responses.rs index e1dc52560..7f64d28c3 100644 --- a/crates/core/src/codec/openai_responses.rs +++ b/crates/core/src/codec/openai_responses.rs @@ -25,7 +25,7 @@ use super::request::{ AnnotatedLlmRequest, GenerationParams, Message, MessageContent, ToolChoice, ToolChoiceFunction, ToolChoiceFunctionName, ToolDefinition, }; -use super::resolve::{ProviderSurface, SurfaceDescriptor}; +use super::resolve::{ProviderSurface, ProviderSurfaceDescriptor}; use super::response::{ AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, RawUsageCost, ResponseToolCall, Usage, estimate_cost_for_provider, infer_model_provider, provider_reported_cost, @@ -39,11 +39,7 @@ use super::traits::{LlmCodec, LlmResponseCodec}; /// Built-in codec for the OpenAI Responses API. pub struct OpenAIResponsesCodec; -// --------------------------------------------------------------------------- -// Built-in surface descriptor (codec-owned detection, registered in resolve) -// --------------------------------------------------------------------------- - -pub(crate) const SURFACE_DESCRIPTOR: SurfaceDescriptor = SurfaceDescriptor { +pub(crate) const PROVIDER_SURFACE: ProviderSurfaceDescriptor = ProviderSurfaceDescriptor { surface: ProviderSurface::OpenAIResponses, detect_request: |obj, _| obj.contains_key("input") || obj.contains_key("instructions"), detect_response: |obj| { diff --git a/crates/core/src/codec/resolve.rs b/crates/core/src/codec/resolve.rs index fca3755f8..1769ea100 100644 --- a/crates/core/src/codec/resolve.rs +++ b/crates/core/src/codec/resolve.rs @@ -26,24 +26,40 @@ pub enum ProviderSurface { /// Request shape detector; the optional `&str` is a provider hint a codec may use /// to claim an otherwise-ambiguous shape. -type RequestDetector = fn(&serde_json::Map, Option<&str>) -> bool; -type ResponseDetector = fn(&serde_json::Map) -> bool; +type RequestSurfaceDetector = fn(&serde_json::Map, Option<&str>) -> bool; -pub(crate) struct SurfaceDescriptor { +/// Response shape detector; response routing is payload-only because provider +/// responses carry stronger built-in discriminators than request bodies. +type ResponseSurfaceDetector = fn(&serde_json::Map) -> bool; + +/// Built-in provider extraction strategy for one request/response surface. +/// +/// The descriptor keeps surface detection next to the codec that owns the +/// schema-specific decode logic while preserving the existing public +/// [`LlmCodec`](super::traits::LlmCodec) and +/// [`LlmResponseCodec`](super::traits::LlmResponseCodec) traits. +/// `decode_response` is the provider response-extraction interface: built-in +/// codecs populate [`AnnotatedLlmResponse`] with model names, finish reasons, +/// tool calls, usage, cost, provider-specific fields, and replayable response +/// data when the source payload supplies them. +pub(crate) struct ProviderSurfaceDescriptor { pub(crate) surface: ProviderSurface, - pub(crate) detect_request: RequestDetector, - pub(crate) detect_response: ResponseDetector, + pub(crate) detect_request: RequestSurfaceDetector, + pub(crate) detect_response: ResponseSurfaceDetector, pub(crate) decode_request: fn(&LlmRequest) -> Result, pub(crate) decode_response: fn(&Json) -> Result, } -/// Built-in surfaces in request-detection priority order (first match wins): -/// Responses > Anthropic > Chat. The order is authoritative — a hint-aware -/// detector must stay after any stronger-signal surface it could shadow. -static REGISTRY: &[SurfaceDescriptor] = &[ - openai_responses::SURFACE_DESCRIPTOR, - anthropic::SURFACE_DESCRIPTOR, - openai_chat::SURFACE_DESCRIPTOR, +/// Built-in provider surfaces in request-detection priority order. +/// +/// First match wins for requests because some shapes overlap. The order is +/// authoritative: a hint-aware detector must stay after any stronger-signal +/// surface it could shadow. Response detection requires exactly one match +/// before decoding. +pub(crate) static BUILTIN_PROVIDER_SURFACES: &[ProviderSurfaceDescriptor] = &[ + openai_responses::PROVIDER_SURFACE, + anthropic::PROVIDER_SURFACE, + openai_chat::PROVIDER_SURFACE, ]; /// Detect the request surface from a raw request body by top-level key. @@ -60,56 +76,67 @@ pub fn detect_request_surface(body: &Json) -> Option { /// Like [`detect_request_surface`], but a recognized `provider_hint` resolves the /// one ambiguous shape (an Anthropic request without a top-level `system`, -/// otherwise read as OpenAI Chat). Today, `"anthropic"` is the only hint that -/// changes detection; `None` or any other value is ignored and detection stays -/// shape-only. +/// otherwise read as OpenAI Chat). Today, only the exact hints `"anthropic"` +/// and `"anthropic.messages"` change detection; `None` or any other value is +/// ignored and detection stays shape-only. #[must_use] pub fn detect_request_surface_with_hint( body: &Json, provider_hint: Option<&str>, ) -> Option { + request_descriptor(body, provider_hint).map(|descriptor| descriptor.surface) +} + +/// Detect the response surface from a raw provider response, classifying only +/// when exactly one built-in shape matches (the built-in codecs accept minimal +/// objects, so decode success alone is not a reliable classifier). +#[must_use] +pub fn detect_response_surface(raw: &Json) -> Option { + response_descriptor(raw).map(|descriptor| descriptor.surface) +} + +fn request_descriptor( + body: &Json, + provider_hint: Option<&str>, +) -> Option<&'static ProviderSurfaceDescriptor> { let obj = body.as_object()?; - REGISTRY + BUILTIN_PROVIDER_SURFACES .iter() - .find(|d| (d.detect_request)(obj, provider_hint)) - .map(|d| d.surface) + .find(|descriptor| (descriptor.detect_request)(obj, provider_hint)) } -/// Classify a response object to exactly one built-in surface descriptor: the -/// single source of truth shared by [`detect_response_surface`] and -/// [`normalize_response`]. Zero or multiple matches yield `None` (the built-in -/// codecs accept minimal objects, so decode success alone is not a reliable -/// classifier). -fn detect_response_descriptor( - obj: &serde_json::Map, -) -> Option<&'static SurfaceDescriptor> { - let mut matches = REGISTRY.iter().filter(|d| (d.detect_response)(obj)); +fn response_descriptor(raw: &Json) -> Option<&'static ProviderSurfaceDescriptor> { + let obj = raw.as_object()?; + let mut matches = BUILTIN_PROVIDER_SURFACES + .iter() + .filter(|descriptor| (descriptor.detect_response)(obj)); match (matches.next(), matches.next()) { (Some(descriptor), None) => Some(descriptor), _ => None, } } -/// Detect the response surface from a raw provider response, classifying only -/// when exactly one built-in shape matches (the built-in codecs accept minimal -/// objects, so decode success alone is not a reliable classifier). +/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`] (fail-open). #[must_use] -pub fn detect_response_surface(raw: &Json) -> Option { - detect_response_descriptor(raw.as_object()?).map(|d| d.surface) +pub fn normalize_request(request: &LlmRequest) -> Option { + normalize_request_with_hint(request, None) } -/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`] (fail-open). +/// Like [`normalize_request`], but a recognized `provider_hint` can +/// disambiguate provider request shapes that are otherwise identical. #[must_use] -pub fn normalize_request(request: &LlmRequest) -> Option { - let obj = request.content.as_object()?; - let descriptor = REGISTRY.iter().find(|d| (d.detect_request)(obj, None))?; +pub fn normalize_request_with_hint( + request: &LlmRequest, + provider_hint: Option<&str>, +) -> Option { + let descriptor = request_descriptor(&request.content, provider_hint)?; (descriptor.decode_request)(request).ok() } /// Best-effort decode of a raw response into [`AnnotatedLlmResponse`] (fail-open). #[must_use] pub fn normalize_response(raw: &Json) -> Option { - let descriptor = detect_response_descriptor(raw.as_object()?)?; + let descriptor = response_descriptor(raw)?; (descriptor.decode_response)(raw).ok() } diff --git a/crates/core/tests/unit/codec/resolve_tests.rs b/crates/core/tests/unit/codec/resolve_tests.rs index e9c552b7a..c1460cdbe 100644 --- a/crates/core/tests/unit/codec/resolve_tests.rs +++ b/crates/core/tests/unit/codec/resolve_tests.rs @@ -14,6 +14,22 @@ fn req(content: serde_json::Value) -> LlmRequest { } } +#[test] +fn builtin_provider_surface_registry_keeps_request_priority() { + let surfaces: Vec<_> = BUILTIN_PROVIDER_SURFACES + .iter() + .map(|descriptor| descriptor.surface) + .collect(); + assert_eq!( + surfaces, + vec![ + ProviderSurface::OpenAIResponses, + ProviderSurface::AnthropicMessages, + ProviderSurface::OpenAIChat, + ] + ); +} + // --------------------------------------------------------------------------- // detect_request_surface (priority order, hoisted from adaptive) // --------------------------------------------------------------------------- @@ -264,15 +280,75 @@ fn hint_anthropic_upgrades_system_less_messages() { detect_request_surface(&json!({"messages": []})), Some(ProviderSurface::OpenAIChat) ); + for hint in [Some("anthropic"), Some("anthropic.messages")] { + assert_eq!( + detect_request_surface_with_hint(&json!({"messages": []}), hint), + Some(ProviderSurface::AnthropicMessages), + "messages-only with hint {hint:?} should select Anthropic", + ); + } +} + +#[test] +fn hint_anthropic_descriptor_decodes_system_less_messages() { + let request = req(json!({ + "model": "claude-3-5-sonnet", + "messages": [{"role": "user", "content": "hi"}], + "stop_sequences": ["END"] + })); + assert_eq!( - detect_request_surface_with_hint(&json!({"messages": []}), Some("anthropic")), - Some(ProviderSurface::AnthropicMessages) + request_descriptor(&request.content, None).map(|descriptor| descriptor.surface), + Some(ProviderSurface::OpenAIChat) ); + let descriptor = request_descriptor(&request.content, Some("anthropic")) + .expect("anthropic hint should select descriptor"); + assert_eq!(descriptor.surface, ProviderSurface::AnthropicMessages); + + let decoded = (descriptor.decode_request)(&request).expect("anthropic request decodes"); + let stop = decoded + .params + .as_ref() + .and_then(|params| params.stop.as_ref()) + .expect("anthropic stop_sequences are normalized"); + assert_eq!(stop, &vec!["END".to_string()]); + assert!(!decoded.extra.contains_key("stop_sequences")); +} + +#[test] +fn normalize_request_with_hint_decodes_system_less_anthropic() { + let request = req(json!({ + "model": "claude-3-5-sonnet", + "messages": [{"role": "user", "content": "hi"}], + "stop_sequences": ["END"] + })); + + let decoded_without_hint = + normalize_request(&request).expect("messages-only request decodes as chat by default"); + assert!(decoded_without_hint.extra.contains_key("stop_sequences")); + + let decoded = normalize_request_with_hint(&request, Some("anthropic.messages")) + .expect("anthropic-hinted request decodes"); + let stop = decoded + .params + .as_ref() + .and_then(|params| params.stop.as_ref()) + .expect("anthropic stop_sequences are normalized"); + assert_eq!(stop, &vec!["END".to_string()]); + assert!(!decoded.extra.contains_key("stop_sequences")); } #[test] fn hint_other_or_unknown_provider_stays_chat() { - for hint in [Some("openai"), Some("passthrough"), Some("gemini"), None] { + for hint in [ + Some("openai"), + Some("openai.chat"), + Some("anthropic.count_tokens"), + Some("anthropic.preview"), + Some("passthrough"), + Some("gemini"), + None, + ] { assert_eq!( detect_request_surface_with_hint(&json!({"messages": []}), hint), Some(ProviderSurface::OpenAIChat), diff --git a/crates/core/tests/unit/types_tests.rs b/crates/core/tests/unit/types_tests.rs index 0ce2ba22d..7f745935d 100644 --- a/crates/core/tests/unit/types_tests.rs +++ b/crates/core/tests/unit/types_tests.rs @@ -676,6 +676,37 @@ fn normalized_llm_request_decodes_wrapped_request_when_unannotated() { assert!(!normalized.messages.is_empty()); } +#[test] +fn normalized_llm_request_uses_event_name_provider_hint() { + let event = Event::Scope(ScopeEvent::new( + BaseEvent::builder() + .name("anthropic.messages") + .data(json!({ + "headers": {}, + "content": { + "model": "claude-3-5-sonnet", + "messages": [{"role": "user", "content": "hi"}], + "stop_sequences": ["END"] + } + })) + .build(), + ScopeCategory::Start, + llm_attributes_to_strings(LlmAttributes::empty()), + EventCategory::llm(), + Some(CategoryProfile::default()), + )); + let normalized = event + .normalized_llm_request() + .expect("decodes wrapped anthropic request"); + let stop = normalized + .params + .as_ref() + .and_then(|params| params.stop.as_ref()) + .expect("anthropic stop_sequences are normalized"); + assert_eq!(stop, &vec!["END".to_string()]); + assert!(!normalized.extra.contains_key("stop_sequences")); +} + #[test] fn normalized_llm_request_prefers_annotation() { let request = annotated_request("demo-model", "annotated"); diff --git a/docs/about-nemo-relay/concepts/codecs.mdx b/docs/about-nemo-relay/concepts/codecs.mdx index 127bc1abc..311380dec 100644 --- a/docs/about-nemo-relay/concepts/codecs.mdx +++ b/docs/about-nemo-relay/concepts/codecs.mdx @@ -80,6 +80,83 @@ Normalized codec output is applicable to several runtime layers: Codecs do not replace scopes, middleware, subscribers, or plugins. They make those layers easier to apply consistently across heterogeneous inputs. +## Extraction Strategy Boundaries + +NeMo Relay keeps extraction responsibilities separated so refactors can reuse +normalization logic without changing runtime ownership or public binding APIs. + +### Provider Schema Extraction + +Provider schema extraction is codec-owned. Built-in provider surfaces, such as +OpenAI Chat Completions, OpenAI Responses, and Anthropic Messages, each own the +logic that recognizes their request and response shapes and maps them into +`AnnotatedLlmRequest` or `AnnotatedLlmResponse`. + +When a managed LLM event already has an annotation, subscribers and exporters +consume that annotation. When an event has only raw provider JSON, best-effort +normalization may detect a built-in provider surface and decode it. This +fallback is fail-open: unrecognized, ambiguous, missing, sparse, or invalid +payloads remain observable as raw lifecycle data. A recognized provider hint can +disambiguate an otherwise identical request shape, such as an Anthropic Messages +request without a top-level `system` field, but no provider annotation is +invented without either a matching provider surface or a recognized hint. + +Provider extraction covers model names, messages, generation parameters, tool +definitions, tool calls, finish reasons, usage, cost, provider-specific fields, +and replayable request or response JSON when the source payload contains enough +information. Provider codecs should preserve unknown fields and treat request +encoding as a merge over the original provider payload. + +The response-extraction interface is the existing response codec contract: +`LlmResponseCodec::decode_response` returns `AnnotatedLlmResponse`. Built-in +codecs populate that normalized response with model, finish reason, tool-call, +usage, cost, provider-specific, and preserved `extra` fields. Cost parsing and +estimation helpers are codec implementation details behind that interface, not a +separate provider-response API. + +### Provider Request Extraction + +Provider request extraction is gateway-owned. It uses the selected gateway route, +such as OpenAI Responses, OpenAI Chat Completions, OpenAI Models, Anthropic +Messages, or Anthropic Count Tokens, to extract request facts that are not codec +schema annotations. + +Route-specific request extractors resolve gateway session IDs, request-affinity +keys, and fallback turn input for provider calls that arrive before the matching +agent prompt hook. This keeps correlation and ownership hints near gateway +alignment, while provider codecs stay focused on decoding request and response +schemas. + +Provider request extraction can also pass a narrow provider hint into codec +normalization. For example, the recognized `anthropic` and `anthropic.messages` +hints let Anthropic Messages requests without a top-level `system` field decode +through the Anthropic provider surface instead of being treated as shape-only +OpenAI Chat payloads. + +### Agent Payload Extraction + +Agent payload extraction is separate from provider codecs. Coding agents, +harnesses, and framework hooks can expose session IDs, event names, subagent +relationships, tool IDs, tool names, tool arguments, tool results, LLM hints, +and status fields through host-specific payload shapes. These facts help NeMo +Relay attach lifecycle events to the right scope, but they do not decode +provider schemas or build request-affinity keys from provider requests. + +Agent extraction may be partial. Missing identifiers use compatibility +fallbacks at the adapter boundary, such as synthetic session IDs, synthetic tool +call IDs, an explicit `unknown_tool` name, or a generic subagent ID. Lossy, +summary-only, or truncated payloads should keep their original payload and +metadata available for debugging instead of pretending to be reconstruction +grade provider data. + +### Exporter Projection + +Exporter projection is the final step. ATIF, OpenTelemetry, and OpenInference +may project the same normalized facts into different output schemas, but generic +extraction should stay outside exporter-specific formatting where possible. +ATIF-specific trajectory shaping, OpenTelemetry attributes, and OpenInference +semantic attributes remain exporter-local. + ## Codec Decision Limitations Codecs do not decide: