Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions components/src/dynamo/frontend/vllm_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ async def _generator_inner(
nvext_max_thinking_tokens = (request.get("nvext") or {}).get(
"max_thinking_tokens"
)
# routed_experts_prompt_start (RL capture offset) also rides nvext and is
# applied onto SamplingParams worker-side. This native vLLM chat path
# bypasses the Rust OpenAIPreprocessor, so forward it explicitly via
# extra_args.nvext (the shape build_sampling_params reads) — otherwise it
# silently drops and the engine ships the full-sequence routing blob.
nvext_routed_experts_prompt_start = (request.get("nvext") or {}).get(
"routed_experts_prompt_start"
)
logprobs = request_for_sampling.logprobs
top_logprobs = request_for_sampling.top_logprobs
if logprobs is True:
Expand Down Expand Up @@ -460,6 +468,11 @@ async def _generator_inner(
dynamo_preproc["reasoning_ended"] = reasoning_ended
if reasoning_parser_kwargs is not None:
dynamo_preproc["reasoning_parser_kwargs"] = reasoning_parser_kwargs
if nvext_routed_experts_prompt_start is not None:
extra_args = dynamo_preproc.setdefault("extra_args", {})
extra_args.setdefault("nvext", {})[
"routed_experts_prompt_start"
] = nvext_routed_experts_prompt_start

# Extract MM routing metadata and prepare transfer.
cleanup_items: list = []
Expand Down
23 changes: 20 additions & 3 deletions components/src/dynamo/vllm/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,29 @@ def build_sampling_params(
if value is not None and hasattr(sampling_params, key):
setattr(sampling_params, key, value)

# routed_experts_prompt_start (RL capture offset) rides nvext, not the
# standard sampling_options (Dynamo's chat schema rejects unknown top-level
# fields). Apply it onto SamplingParams so vLLM trims the leading prompt rows
# from the returned routing engine-side (instead of the client trimming the
# full-sequence blob after it crosses the wire).
for source in _iter_nvext_sources(request):
reps_nvext = source.get("routed_experts_prompt_start")
if reps_nvext is not None and hasattr(
sampling_params, "routed_experts_prompt_start"
):
sampling_params.routed_experts_prompt_start = reps_nvext
break

# routed_experts_prompt_start (RL capture offset) must be a non-negative
# int; reject bad client values so the worker emits a sane `start` instead
# of a bogus offset the consumer cannot align (vLLM clamps the upper bound).
# int within the frontend's u32 range; reject bad client values so the worker
# emits a sane `start` instead of a bogus offset the consumer cannot align
# (vLLM clamps the upper bound against the actual sequence length).
reps = getattr(sampling_params, "routed_experts_prompt_start", None)
if reps is not None and (
isinstance(reps, bool) or not isinstance(reps, int) or reps < 0
isinstance(reps, bool)
or not isinstance(reps, int)
or reps < 0
or reps > 0xFFFFFFFF
):
logger.warning(
"Ignoring invalid routed_experts_prompt_start=%r (want non-negative int)",
Expand Down
51 changes: 51 additions & 0 deletions components/src/dynamo/vllm/tests/test_vllm_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from unittest.mock import patch

import pytest
from vllm.sampling_params import SamplingParams

from dynamo.vllm.args import (
_connector_to_kv_transfer_json,
Expand All @@ -27,6 +28,7 @@
update_engine_config_with_dynamo,
)
from dynamo.vllm.constants import DisaggregationMode
from dynamo.vllm.handlers import build_sampling_params
from dynamo.vllm.tests.conftest import make_cli_args_fixture

# Get path relative to this test file
Expand Down Expand Up @@ -900,6 +902,55 @@ def test_build_sampling_params_maps_max_thinking_tokens():
assert sp.thinking_token_budget == 1024


@pytest.mark.parametrize("shape", ["nvext", "extra_args"])
def test_build_sampling_params_applies_nvext_routed_experts_prompt_start(shape):
"""routed_experts_prompt_start rides nvext (not sampling_options) and is
applied onto SamplingParams so vLLM trims routing engine-side. The worker
accepts both the raw OpenAI shape (request["nvext"], used by SGLang/tests)
and the Rust-preprocessor shape (request["extra_args"]["nvext"])."""
if not hasattr(SamplingParams(), "routed_experts_prompt_start"):
pytest.skip("installed vLLM has no routed_experts_prompt_start support")

def make_request(value):
request = {
"token_ids": [1, 2, 3],
"sampling_options": {},
"stop_conditions": {},
"output_options": {},
}
if shape == "nvext":
request["nvext"] = {"routed_experts_prompt_start": value}
else:
request["extra_args"] = {"nvext": {"routed_experts_prompt_start": value}}
return request

sp = build_sampling_params(make_request(4), default_sampling_params={})
assert sp.routed_experts_prompt_start == 4

# negative / non-int / out-of-u32-range values are clamped to 0
for bad in (-1, True, 1.5, "4", 2**32):
sp = build_sampling_params(make_request(bad), default_sampling_params={})
assert sp.routed_experts_prompt_start == 0


def test_build_sampling_params_routed_experts_raw_nvext_takes_precedence():
"""When both nvext shapes carry the offset, the raw request["nvext"]
source wins (matches _iter_nvext_sources priority order)."""
if not hasattr(SamplingParams(), "routed_experts_prompt_start"):
pytest.skip("installed vLLM has no routed_experts_prompt_start support")

request = {
"token_ids": [1, 2, 3],
"sampling_options": {},
"stop_conditions": {},
"output_options": {},
"nvext": {"routed_experts_prompt_start": 7},
"extra_args": {"nvext": {"routed_experts_prompt_start": 9}},
}
sp = build_sampling_params(request, default_sampling_params={})
assert sp.routed_experts_prompt_start == 7


def _make_dynamo_config(**overrides):
"""Build a minimal fake DynamoConfig for update_engine_config_with_dynamo tests."""
defaults = {
Expand Down
47 changes: 45 additions & 2 deletions lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ impl OpenAIPreprocessor {
if nvext.token_data.is_some() {
nvext_passthrough.insert("token_in".to_string(), serde_json::Value::Bool(true));
}
if let Some(start) = nvext.routed_experts_prompt_start {
nvext_passthrough.insert(
"routed_experts_prompt_start".to_string(),
serde_json::json!(start),
);
}
}

if !nvext_passthrough.contains_key("cache_salt")
Expand Down Expand Up @@ -2845,7 +2851,7 @@ impl
request.inner.stream = Some(true);

// create a response generator
let response_generator = request.response_generator(context.id().to_string());
let mut response_generator = request.response_generator(context.id().to_string());
let tracker = Some(response_generator.tracker());
let preprocess_options = PreprocessRequestOptions {
preserve_omitted_max_tokens: context
Expand All @@ -2859,6 +2865,41 @@ impl
.preprocess_request_with_options(&request, tracker.as_deref(), preprocess_options)
.await?;

// tokenize_only (RL bridge): the chat template has been applied and the
// prompt tokenized into `common_request.token_ids`. Return those IDs in
// `response.nvext.token_ids` and skip engine dispatch entirely — no
// generation runs. Lets an RL client tokenize multi-turn bridge segments
// with the server's own tokenizer + chat template (one source of truth)
// instead of a separate `/tokenize` route or a local HF tokenizer.
if request
.nvext
.as_ref()
.and_then(|n| n.tokenize_only)
.unwrap_or(false)
{
let token_ids = common_request.token_ids.clone();
let mut stream_response = response_generator.create_choice(
0,
None,
Some(dynamo_protocols::types::FinishReason::Stop),
None,
);
let nvext_response = crate::protocols::openai::nvext::NvExtResponse {
worker_id: None,
timing: None,
token_ids: Some(token_ids),
routed_experts: None,
engine_data: None,
stop_reason: None,
completion_token_ids: None,
prompt_logprobs: None,
};
stream_response.nvext = Some(serde_json::to_value(&nvext_response)?);
let ctx = context.context();
let output = stream::iter(vec![Annotated::from_data(stream_response)]);
return Ok(ResponseStream::new(Box::pin(output), ctx));
}

let uses_tool_call_structural_tag = self.apply_tool_choice_guided_decoding(
&request,
&mut common_request,
Expand Down Expand Up @@ -3319,14 +3360,16 @@ mod tests {
"bad_words_token_ids": [[12, 13]],
"nvext": {
"cache_salt": "step_7",
"extra_fields": ["completion_token_ids"]
"extra_fields": ["completion_token_ids"],
"routed_experts_prompt_start": 5
}
}))
.unwrap();

let extra_args = OpenAIPreprocessor::backend_extra_args(&request).unwrap();

assert_eq!(extra_args["nvext"]["cache_salt"], "step_7");
assert_eq!(extra_args["nvext"]["routed_experts_prompt_start"], 5);
assert_eq!(
extra_args["nvext"]["extra_fields"],
serde_json::json!(["completion_token_ids"])
Expand Down
25 changes: 25 additions & 0 deletions lib/llm/src/protocols/openai/nvext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,31 @@ pub struct NvExt {
#[builder(default, setter(strip_option))]
pub cache_salt: Option<String>,

/// RL routed-experts capture offset (MoE expert replay).
///
/// When set, Dynamo forwards this to the backend's
/// `SamplingParams.routed_experts_prompt_start` so the engine trims the
/// leading prompt rows from the returned routing tensor (rather than the
/// client trimming after the full-sequence blob crosses the wire). Backends
/// without routed-experts capture ignore it.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub routed_experts_prompt_start: Option<u32>,

/// Tokenize-only mode (RL bridge tokenization).
///
/// When `true`, the chat-completions request is preprocessed (chat template
/// applied + tokenized) and the resulting prompt token IDs are returned in
/// `response.nvext.token_ids` **without dispatching to the engine** — no
/// generation runs. Lets an RL client tokenize multi-turn bridge segments
/// using the server's own tokenizer + chat template (closing the
/// turn-1-server / bridge-local tokenizer seam) instead of a separate
/// `/tokenize` route. Honors per-request `chat_template_args`
/// (e.g. `add_generation_prompt`) so the dual-tokenize bridge works.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[builder(default, setter(strip_option))]
pub tokenize_only: Option<bool>,

/// Extra fields to be included in the response's nvext
/// This is a list of field names that should be populated in the response
/// Supported fields include "worker_id", "timing", "routed_experts", "engine_data",
Expand Down
Loading