diff --git a/components/src/dynamo/frontend/vllm_processor.py b/components/src/dynamo/frontend/vllm_processor.py index 10e92984a80c..4b32b980b26c 100644 --- a/components/src/dynamo/frontend/vllm_processor.py +++ b/components/src/dynamo/frontend/vllm_processor.py @@ -377,6 +377,14 @@ async def _generator_inner( nvext_max_thinking_tokens = (request.get("nvext") or {}).get( "max_thinking_tokens" ) + # routed_experts_prompt_start (RL capture offset) also rides nvext and is + # applied onto SamplingParams worker-side. This native vLLM chat path + # bypasses the Rust OpenAIPreprocessor, so forward it explicitly via + # extra_args.nvext (the shape build_sampling_params reads) — otherwise it + # silently drops and the engine ships the full-sequence routing blob. + nvext_routed_experts_prompt_start = (request.get("nvext") or {}).get( + "routed_experts_prompt_start" + ) logprobs = request_for_sampling.logprobs top_logprobs = request_for_sampling.top_logprobs if logprobs is True: @@ -460,6 +468,11 @@ async def _generator_inner( dynamo_preproc["reasoning_ended"] = reasoning_ended if reasoning_parser_kwargs is not None: dynamo_preproc["reasoning_parser_kwargs"] = reasoning_parser_kwargs + if nvext_routed_experts_prompt_start is not None: + extra_args = dynamo_preproc.setdefault("extra_args", {}) + extra_args.setdefault("nvext", {})[ + "routed_experts_prompt_start" + ] = nvext_routed_experts_prompt_start # Extract MM routing metadata and prepare transfer. cleanup_items: list = [] diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index 07cf85eb07a9..d42e987448eb 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -753,12 +753,29 @@ def build_sampling_params( if value is not None and hasattr(sampling_params, key): setattr(sampling_params, key, value) + # routed_experts_prompt_start (RL capture offset) rides nvext, not the + # standard sampling_options (Dynamo's chat schema rejects unknown top-level + # fields). Apply it onto SamplingParams so vLLM trims the leading prompt rows + # from the returned routing engine-side (instead of the client trimming the + # full-sequence blob after it crosses the wire). + for source in _iter_nvext_sources(request): + reps_nvext = source.get("routed_experts_prompt_start") + if reps_nvext is not None and hasattr( + sampling_params, "routed_experts_prompt_start" + ): + sampling_params.routed_experts_prompt_start = reps_nvext + break + # routed_experts_prompt_start (RL capture offset) must be a non-negative - # int; reject bad client values so the worker emits a sane `start` instead - # of a bogus offset the consumer cannot align (vLLM clamps the upper bound). + # int within the frontend's u32 range; reject bad client values so the worker + # emits a sane `start` instead of a bogus offset the consumer cannot align + # (vLLM clamps the upper bound against the actual sequence length). reps = getattr(sampling_params, "routed_experts_prompt_start", None) if reps is not None and ( - isinstance(reps, bool) or not isinstance(reps, int) or reps < 0 + isinstance(reps, bool) + or not isinstance(reps, int) + or reps < 0 + or reps > 0xFFFFFFFF ): logger.warning( "Ignoring invalid routed_experts_prompt_start=%r (want non-negative int)", diff --git a/components/src/dynamo/vllm/tests/test_vllm_unit.py b/components/src/dynamo/vllm/tests/test_vllm_unit.py index 3fa17c3a06d5..250246567cba 100644 --- a/components/src/dynamo/vllm/tests/test_vllm_unit.py +++ b/components/src/dynamo/vllm/tests/test_vllm_unit.py @@ -14,6 +14,7 @@ from unittest.mock import patch import pytest +from vllm.sampling_params import SamplingParams from dynamo.vllm.args import ( _connector_to_kv_transfer_json, @@ -27,6 +28,7 @@ update_engine_config_with_dynamo, ) from dynamo.vllm.constants import DisaggregationMode +from dynamo.vllm.handlers import build_sampling_params from dynamo.vllm.tests.conftest import make_cli_args_fixture # Get path relative to this test file @@ -900,6 +902,55 @@ def test_build_sampling_params_maps_max_thinking_tokens(): assert sp.thinking_token_budget == 1024 +@pytest.mark.parametrize("shape", ["nvext", "extra_args"]) +def test_build_sampling_params_applies_nvext_routed_experts_prompt_start(shape): + """routed_experts_prompt_start rides nvext (not sampling_options) and is + applied onto SamplingParams so vLLM trims routing engine-side. The worker + accepts both the raw OpenAI shape (request["nvext"], used by SGLang/tests) + and the Rust-preprocessor shape (request["extra_args"]["nvext"]).""" + if not hasattr(SamplingParams(), "routed_experts_prompt_start"): + pytest.skip("installed vLLM has no routed_experts_prompt_start support") + + def make_request(value): + request = { + "token_ids": [1, 2, 3], + "sampling_options": {}, + "stop_conditions": {}, + "output_options": {}, + } + if shape == "nvext": + request["nvext"] = {"routed_experts_prompt_start": value} + else: + request["extra_args"] = {"nvext": {"routed_experts_prompt_start": value}} + return request + + sp = build_sampling_params(make_request(4), default_sampling_params={}) + assert sp.routed_experts_prompt_start == 4 + + # negative / non-int / out-of-u32-range values are clamped to 0 + for bad in (-1, True, 1.5, "4", 2**32): + sp = build_sampling_params(make_request(bad), default_sampling_params={}) + assert sp.routed_experts_prompt_start == 0 + + +def test_build_sampling_params_routed_experts_raw_nvext_takes_precedence(): + """When both nvext shapes carry the offset, the raw request["nvext"] + source wins (matches _iter_nvext_sources priority order).""" + if not hasattr(SamplingParams(), "routed_experts_prompt_start"): + pytest.skip("installed vLLM has no routed_experts_prompt_start support") + + request = { + "token_ids": [1, 2, 3], + "sampling_options": {}, + "stop_conditions": {}, + "output_options": {}, + "nvext": {"routed_experts_prompt_start": 7}, + "extra_args": {"nvext": {"routed_experts_prompt_start": 9}}, + } + sp = build_sampling_params(request, default_sampling_params={}) + assert sp.routed_experts_prompt_start == 7 + + def _make_dynamo_config(**overrides): """Build a minimal fake DynamoConfig for update_engine_config_with_dynamo tests.""" defaults = { diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index 62b3d1d880bd..913b023e71a8 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -313,6 +313,12 @@ impl OpenAIPreprocessor { if nvext.token_data.is_some() { nvext_passthrough.insert("token_in".to_string(), serde_json::Value::Bool(true)); } + if let Some(start) = nvext.routed_experts_prompt_start { + nvext_passthrough.insert( + "routed_experts_prompt_start".to_string(), + serde_json::json!(start), + ); + } } if !nvext_passthrough.contains_key("cache_salt") @@ -2845,7 +2851,7 @@ impl request.inner.stream = Some(true); // create a response generator - let response_generator = request.response_generator(context.id().to_string()); + let mut response_generator = request.response_generator(context.id().to_string()); let tracker = Some(response_generator.tracker()); let preprocess_options = PreprocessRequestOptions { preserve_omitted_max_tokens: context @@ -2859,6 +2865,41 @@ impl .preprocess_request_with_options(&request, tracker.as_deref(), preprocess_options) .await?; + // tokenize_only (RL bridge): the chat template has been applied and the + // prompt tokenized into `common_request.token_ids`. Return those IDs in + // `response.nvext.token_ids` and skip engine dispatch entirely — no + // generation runs. Lets an RL client tokenize multi-turn bridge segments + // with the server's own tokenizer + chat template (one source of truth) + // instead of a separate `/tokenize` route or a local HF tokenizer. + if request + .nvext + .as_ref() + .and_then(|n| n.tokenize_only) + .unwrap_or(false) + { + let token_ids = common_request.token_ids.clone(); + let mut stream_response = response_generator.create_choice( + 0, + None, + Some(dynamo_protocols::types::FinishReason::Stop), + None, + ); + let nvext_response = crate::protocols::openai::nvext::NvExtResponse { + worker_id: None, + timing: None, + token_ids: Some(token_ids), + routed_experts: None, + engine_data: None, + stop_reason: None, + completion_token_ids: None, + prompt_logprobs: None, + }; + stream_response.nvext = Some(serde_json::to_value(&nvext_response)?); + let ctx = context.context(); + let output = stream::iter(vec![Annotated::from_data(stream_response)]); + return Ok(ResponseStream::new(Box::pin(output), ctx)); + } + let uses_tool_call_structural_tag = self.apply_tool_choice_guided_decoding( &request, &mut common_request, @@ -3319,7 +3360,8 @@ mod tests { "bad_words_token_ids": [[12, 13]], "nvext": { "cache_salt": "step_7", - "extra_fields": ["completion_token_ids"] + "extra_fields": ["completion_token_ids"], + "routed_experts_prompt_start": 5 } })) .unwrap(); @@ -3327,6 +3369,7 @@ mod tests { let extra_args = OpenAIPreprocessor::backend_extra_args(&request).unwrap(); assert_eq!(extra_args["nvext"]["cache_salt"], "step_7"); + assert_eq!(extra_args["nvext"]["routed_experts_prompt_start"], 5); assert_eq!( extra_args["nvext"]["extra_fields"], serde_json::json!(["completion_token_ids"]) diff --git a/lib/llm/src/protocols/openai/nvext.rs b/lib/llm/src/protocols/openai/nvext.rs index ba9532fdae26..d34af30532e6 100644 --- a/lib/llm/src/protocols/openai/nvext.rs +++ b/lib/llm/src/protocols/openai/nvext.rs @@ -448,6 +448,31 @@ pub struct NvExt { #[builder(default, setter(strip_option))] pub cache_salt: Option, + /// RL routed-experts capture offset (MoE expert replay). + /// + /// When set, Dynamo forwards this to the backend's + /// `SamplingParams.routed_experts_prompt_start` so the engine trims the + /// leading prompt rows from the returned routing tensor (rather than the + /// client trimming after the full-sequence blob crosses the wire). Backends + /// without routed-experts capture ignore it. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[builder(default, setter(strip_option))] + pub routed_experts_prompt_start: Option, + + /// Tokenize-only mode (RL bridge tokenization). + /// + /// When `true`, the chat-completions request is preprocessed (chat template + /// applied + tokenized) and the resulting prompt token IDs are returned in + /// `response.nvext.token_ids` **without dispatching to the engine** — no + /// generation runs. Lets an RL client tokenize multi-turn bridge segments + /// using the server's own tokenizer + chat template (closing the + /// turn-1-server / bridge-local tokenizer seam) instead of a separate + /// `/tokenize` route. Honors per-request `chat_template_args` + /// (e.g. `add_generation_prompt`) so the dual-tokenize bridge works. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[builder(default, setter(strip_option))] + pub tokenize_only: Option, + /// Extra fields to be included in the response's nvext /// This is a list of field names that should be populated in the response /// Supported fields include "worker_id", "timing", "routed_experts", "engine_data",