diff --git a/docs/reference.md b/docs/reference.md index a50811f4aa..e4a27f3617 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -215,6 +215,7 @@ class RoutedExpertsPayload(TypedDict): data: Any # actually memoryview; kept opaque so Pydantic skips schema validation shape: list[int] start: int + dtype: NotRequired[Literal["uint8", "uint16", "int16", "int32"]] # optional; absent → uint8 ``` ### TrajectoryStepTokens @@ -1077,6 +1078,8 @@ with `MyConfig.model_validate(...)` or use the typed object directly. class ClientConfig(BaseModel): client_idx: int = 0 client_type: ClientType = "openai_chat_completions" + renderer_transport: RendererTransport = "vllm" + renderer_model_name: str | None = None preserve_all_thinking: bool = False preserve_thinking_between_tool_calls: bool = False api_key_var: str = "PRIME_API_KEY" @@ -1095,6 +1098,10 @@ class ClientConfig(BaseModel): `client_type` selects which `Client` implementation to instantiate (see [Client Classes](#client-classes)). Use `endpoint_configs` for multi-endpoint round-robin. In grouped scoring mode, groups are distributed round-robin across endpoint configs. +`renderer_transport` selects the token-in/token-out wire format used by `client_type == "openai_chat_completions_token"` and `client_type == "renderer"`. The default `"vllm"` uses vLLM's token routes. Set `"dynamo"` for Dynamo backends that accept pre-tokenized prompts in `nvext.token_data` on `/v1/chat/completions` and return token IDs in `nvext.engine_data`. + +`renderer_model_name` overrides the tokenizer/renderer model name used for local bridge tokenization and renderer construction. It is useful when the served API model name is an alias but the tokenizer should be loaded from the underlying Hugging Face model. + `preserve_all_thinking` and `preserve_thinking_between_tool_calls` are forwarded to the underlying renderer when `client_type == "renderer"`. They control whether past-assistant `reasoning_content` is re-emitted on subsequent renders — `preserve_all_thinking` keeps every past-assistant turn's thinking, and `preserve_thinking_between_tool_calls` keeps thinking only inside the in-flight assistant→tool→…→assistant block after the most recent user turn (when that block contains at least one tool response). Both default to `False` (template default applies). When `api_key_var` is `"PRIME_API_KEY"` (the default), credentials are loaded with the following precedence: diff --git a/docs/training.md b/docs/training.md index 655d057c99..dcc018fff1 100644 --- a/docs/training.md +++ b/docs/training.md @@ -214,6 +214,8 @@ The rollout client's `client_type` controls how prompt assembly and token state - **`openai_chat_completions_token`** (TITO, *token-in*): server-side templating, but returns prompt and completion token IDs alongside text so the trainer doesn't re-tokenize. Use when you trust the server's chat template to be stable across turns. - **`renderer`** *(experimental)*: client-side tokenization via a per-model renderer in the [`renderers` package](https://github.com/PrimeIntellect-ai/verifiers/tree/main/packages/renderers). Install it with `uv add "verifiers[renderers]"` before using `client_type="renderer"`. The trainer renders messages to token IDs locally and sends those to vLLM's `/v1/generate` endpoint. The renderer's `bridge_to_next_turn` extends prior-turn tokens verbatim across multi-turn boundaries (the *extension property*) and synthesizes the canonical turn-close on mid-completion truncation, so multi-turn rollouts merge into one training sample with one clean loss mask. +`openai_chat_completions_token` defaults to `renderer_transport="vllm"`, which uses vLLM token routes. For Dynamo inference backends, set `renderer_transport="dynamo"` so multi-turn rollouts send the stitched prompt in `nvext.token_data` on `/v1/chat/completions` and read server token IDs from `nvext.engine_data`. In `prime-rl`, this is normally selected for you when `client.backend = "dynamo"`. + For production RL training, use `openai_chat_completions_token` — it's the tried-and-tested path with broad model coverage. The `renderer` client is newer and offers stronger token-preservation guarantees in theory, but is experimental: hand-coded renderers exist only for a subset of models, and corner cases are still being shaken out. See [reference § Built-in Clients](reference.md#built-in-client-implementations) for the full list. ### Common Issues diff --git a/tests/test_openai_chat_completions_token_client.py b/tests/test_openai_chat_completions_token_client.py index 923ff118e0..248a2b7d3f 100644 --- a/tests/test_openai_chat_completions_token_client.py +++ b/tests/test_openai_chat_completions_token_client.py @@ -1,3 +1,5 @@ +import base64 +import json from typing import Any, cast import httpx @@ -8,6 +10,7 @@ OpenAIChatCompletionsTokenClient, ) from verifiers.types import State +from verifiers.utils.client_utils import post_chat_completion_with_routed_experts_sidecar class _NoopClient: @@ -46,6 +49,40 @@ async def post( ) +class _DynamoRoutedExpertsClient(_NoopClient): + async def post( + self, path: str, body: dict[str, Any], cast_to: type, **kwargs: Any + ) -> Any: + payload = { + "id": "x", + "object": "chat.completion", + "created": 1, + "model": "test-model", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "ok"}, + "finish_reason": "stop", + } + ], + "nvext": { + "engine_data": { + "completion_token_ids": [10], + "routed_experts": { + "data": base64.b64encode(b"abc").decode("ascii"), + "shape": [3, 1, 1], + "start": 0, + "dtype": "uint8", + }, + } + }, + } + return httpx.Response( + 200, + content=json.dumps(payload, separators=(",", ":")).encode("utf-8"), + ) + + class _PromptIdTestClient(OpenAIChatCompletionsTokenClient): def __init__(self, full_prompt_ids: list[int]) -> None: super().__init__(_NoopClient()) @@ -293,3 +330,167 @@ async def fake_get_prompt_ids( # noqa: ANN001 assert len(recording_client.calls) == 1 assert recording_client.calls[0]["path"] == "/chat/completions/tokens" assert recording_client.calls[0]["body"]["tokens"] == [10, 20] + + +@pytest.mark.asyncio +async def test_post_dynamo_scrubs_vllm_only_and_forwards_sampling(): + """dynamo wire body: vLLM-only keys scrubbed, standard sampling args + forwarded, nvext token_data + passthrough preserved.""" + recording_client = _RecordingClient() + client = OpenAIChatCompletionsTokenClient(recording_client) + + await client._post_dynamo( + prompt=cast(Any, [{"role": "user", "content": ""}]), + prompt_ids=[1, 2, 3], + model="test-model", + tools=None, + sampling_args={ + "temperature": 0.5, + "presence_penalty": 0.2, + "reasoning_effort": "high", # arbitrary key: full parity, not an allowlist + "spaces_between_special_tokens": False, # vLLM-only — must be scrubbed + "extra_body": { + "return_token_ids": True, # vLLM-only — must be scrubbed + "nvext": {"extra_fields": ["engine_data"]}, + "cache_salt": "ckpt-1", + }, + }, + extra_headers=None, + ) + + body = recording_client.calls[0]["body"] + assert "return_token_ids" not in body + assert "spaces_between_special_tokens" not in body + assert body["presence_penalty"] == 0.2 + assert body["temperature"] == 0.5 + assert body["reasoning_effort"] == "high" + assert body["nvext"]["token_data"] == [1, 2, 3] + assert body["nvext"]["extra_fields"] == ["engine_data"] + assert body["cache_salt"] == "ckpt-1" + + +@pytest.mark.asyncio +async def test_post_dynamo_uses_placeholder_messages(): + recording_client = _RecordingClient() + client = OpenAIChatCompletionsTokenClient(recording_client) + + await client._post_dynamo( + prompt=cast(Any, [{"role": "user", "content": "real prompt"}]), + prompt_ids=[1, 2, 3], + model="test-model", + tools=None, + sampling_args={"extra_body": {"nvext": {"extra_fields": ["engine_data"]}}}, + extra_headers=None, + ) + + assert recording_client.calls[0]["body"]["messages"] == [ + {"role": "user", "content": ""} + ] + + +@pytest.mark.asyncio +async def test_sidecar_helper_reattaches_dynamo_engine_routed_experts(): + response = await post_chat_completion_with_routed_experts_sidecar( + _DynamoRoutedExpertsClient(), + "/chat/completions", + body={}, + ) + + routed = response.model_extra["nvext"]["engine_data"]["routed_experts"] + assert isinstance(routed["data"], memoryview) + assert routed["data"].tobytes() == base64.b64encode(b"abc") + + +@pytest.mark.asyncio +async def test_graft_engine_data_synthesizes_logprobs_when_content_less(): + """engine_data.completion_logprobs must be grafted even when the choice + carries a content-less logprobs object (not only when absent).""" + from openai.types.chat import ChatCompletion + + client = OpenAIChatCompletionsClient(_NoopClient()) + native = ChatCompletion.model_validate( + { + "id": "x", + "object": "chat.completion", + "created": 1, + "model": "test-model", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "ok"}, + "finish_reason": "stop", + "logprobs": {"content": None}, # present but content-less + } + ], + "nvext": { + "engine_data": { + "completion_token_ids": [10, 11], + "prompt_token_ids": [1, 2, 3], + "completion_logprobs": [-0.1, -0.2], + } + }, + } + ) + + vf_response = await client.from_native_response(native) + tokens = vf_response.message.tokens + assert tokens is not None # would be None before the fix (TITO lost) + assert tokens.completion_ids == [10, 11] + assert tokens.prompt_ids == [1, 2, 3] + assert tokens.completion_logprobs == [-0.1, -0.2] + + +@pytest.mark.asyncio +async def test_parse_tokens_reads_dynamo_engine_routed_experts(): + from openai.types.chat import ChatCompletion + + client = OpenAIChatCompletionsClient(_NoopClient()) + native = ChatCompletion.model_validate( + { + "id": "x", + "object": "chat.completion", + "created": 1, + "model": "test-model", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "ok"}, + "finish_reason": "stop", + "logprobs": { + "content": [ + { + "token": "ok", + "logprob": -0.1, + "bytes": [111, 107], + "top_logprobs": [], + } + ] + }, + } + ], + "nvext": { + "engine_data": { + "completion_token_ids": [10], + "prompt_token_ids": [1, 2, 3], + "completion_logprobs": [-0.1], + "routed_experts": { + "data": "QUJD", + "shape": [3, 1, 1], + "start": 0, + "dtype": "uint8", + }, + } + }, + } + ) + + vf_response = await client.from_native_response(native) + tokens = vf_response.message.tokens + + assert tokens is not None + assert tokens.routed_experts == { + "data": "QUJD", + "shape": [3, 1, 1], + "start": 0, + "dtype": "uint8", + } diff --git a/tests/test_trajectory_processing.py b/tests/test_trajectory_processing.py index 386e4fd947..3ebe7cdbb2 100644 --- a/tests/test_trajectory_processing.py +++ b/tests/test_trajectory_processing.py @@ -483,3 +483,32 @@ def test_trajectory_step_mask_combining(): assert token_ids == [1, 2, 3, 4, 5] assert mask == [0, 0, 0, 1, 1] assert logprobs == [0.0, 0.0, 0.0, -0.1, -0.2] + + +def test_strip_routed_experts_data_key_order_robust(): + """The zero-copy stripper must find ``data`` regardless of key order + (``dtype``/``shape``/``start`` may precede it) and no-op when absent.""" + from verifiers.utils.response_utils import strip_routed_experts_data + + # data first (fast path) + raw = b'{"routed_experts":{"data":"QUJD","shape":[3],"start":0,"dtype":"uint8"}}' + stripped, blob = strip_routed_experts_data(raw) + assert blob is not None and blob.tobytes() == b"QUJD" + assert b'"data":""' in stripped + + # dtype/shape/start before data — must still strip the blob + raw2 = b'{"routed_experts":{"dtype":"uint16","shape":[3],"start":0,"data":"WFla"}}' + stripped2, blob2 = strip_routed_experts_data(raw2) + assert blob2 is not None and blob2.tobytes() == b"WFla" + assert b'"data":""' in stripped2 + + # routed_experts object lacks data; an unrelated sibling has data — must + # NOT be mistaken for routed experts (search bounded to the object). + raw4 = b'{"routed_experts":{"shape":[3],"start":0},"other":{"data":"UNRELATED"}}' + stripped4, blob4 = strip_routed_experts_data(raw4) + assert blob4 is None and stripped4 == raw4 + + # absent — no-op passthrough + raw3 = b'{"choices":[{"token_ids":[1,2]}]}' + stripped3, blob3 = strip_routed_experts_data(raw3) + assert blob3 is None and stripped3 == raw3 diff --git a/verifiers/clients/openai_chat_completions_client.py b/verifiers/clients/openai_chat_completions_client.py index d7d262f4be..a1f36a401b 100644 --- a/verifiers/clients/openai_chat_completions_client.py +++ b/verifiers/clients/openai_chat_completions_client.py @@ -469,8 +469,100 @@ def parse_finish_reason(response: OpenAIChatResponse) -> FinishReason: case _: return None + def _graft_engine_data(response: OpenAIChatResponse) -> None: + """Graft engine-side token IDs onto top-level response fields. + + Three coexisting wire shapes from dynamo's vLLM/SGLang backends: + + 1. ``response.nvext.engine_data.{completion_token_ids, + completion_logprobs, prompt_token_ids}`` + (opt-in: ``nvext.extra_fields=["engine_data"]``). + 2. ``response.nvext.completion_token_ids`` — top-level shape + (opt-in: + ``nvext.extra_fields=["completion_token_ids"]``). No + logprobs in this shape; logprobs ride the standard + ``choices[0].logprobs.content[*].logprob`` channel. + 3. Older vLLM-native paths set ``response.choices[0].token_ids`` + / ``response.prompt_token_ids`` directly (no grafting needed). + + This helper bridges (1) and (2) onto the top-level fields the + rest of ``parse_tokens`` reads via the standard openai SDK + attribute path. ``engine_data`` wins when both are present (it + carries more — including logprobs + prompt_token_ids). + """ + nvext = getattr(response, "nvext", None) + if nvext is None and hasattr(response, "model_dump"): + nvext = response.model_dump().get("nvext") + if not isinstance(nvext, dict): + return + choice = response.choices[0] + + engine_data = nvext.get("engine_data") + completion_token_ids_top = nvext.get("completion_token_ids") + prompt_token_ids_top = nvext.get("prompt_token_ids") + + # Prefer engine_data over top-level when both arrive: engine_data + # bundles logprobs + prompt_token_ids in one place. + completion_token_ids: list[int] | None = None + prompt_token_ids: list[int] | None = None + completion_logprobs: list[float] | None = None + if isinstance(engine_data, dict): + if engine_data.get("completion_token_ids") is not None: + completion_token_ids = list(engine_data["completion_token_ids"]) + if engine_data.get("prompt_token_ids") is not None: + prompt_token_ids = list(engine_data["prompt_token_ids"]) + if engine_data.get("completion_logprobs") is not None: + completion_logprobs = [ + float(x) for x in engine_data["completion_logprobs"] + ] + if completion_token_ids is None and completion_token_ids_top is not None: + completion_token_ids = list(completion_token_ids_top) + if prompt_token_ids is None and prompt_token_ids_top is not None: + prompt_token_ids = list(prompt_token_ids_top) + + if ( + getattr(choice, "token_ids", None) is None + and completion_token_ids is not None + ): + try: + choice.token_ids = completion_token_ids + except Exception: + object.__setattr__(choice, "token_ids", completion_token_ids) + if ( + getattr(response, "prompt_token_ids", None) is None + and prompt_token_ids is not None + ): + try: + response.prompt_token_ids = prompt_token_ids + except Exception: + object.__setattr__(response, "prompt_token_ids", prompt_token_ids) + # Dynamo returns logprobs only under engine_data, not + # choices[0].logprobs. Synthesize the standard shape so parse_tokens + # (which requires choices[0].logprobs.content) can read them. Graft + # whenever the choice has no usable logprobs content — i.e. logprobs + # is missing OR present-but-content-less (empty/None content) — not + # only when it is absent entirely. + existing_lp = getattr(choice, "logprobs", None) + existing_content = ( + existing_lp.get("content") + if isinstance(existing_lp, dict) + else getattr(existing_lp, "content", None) + ) + if ( + completion_logprobs is not None + and completion_token_ids is not None + and len(completion_logprobs) == len(completion_token_ids) + and not existing_content + ): + synthesized = {"content": [{"logprob": lp} for lp in completion_logprobs]} + try: + choice.logprobs = synthesized + except Exception: + object.__setattr__(choice, "logprobs", synthesized) + def parse_tokens(response: OpenAIChatResponse) -> ResponseTokens | None: assert len(response.choices) == 1, "Response should always have one choice" + _graft_engine_data(response) choice = response.choices[0] if not hasattr(choice, "token_ids"): return None @@ -508,14 +600,28 @@ def parse_tokens(response: OpenAIChatResponse) -> ResponseTokens | None: logprobs_content = response.choices[0].logprobs["content"] completion_logprobs = [token["logprob"] for token in logprobs_content] + if len(completion_logprobs) != len(completion_ids): + # Engine returned mismatched logprobs/ids — drop rather than emit + # out-of-sync ResponseTokens. + return None + choice_extra = choice.model_extra or {} + routed_experts = choice_extra.get("routed_experts") + if routed_experts is None: + top_extra = response.model_extra or {} + nvext = top_extra.get("nvext") if isinstance(top_extra, dict) else None + if isinstance(nvext, dict): + routed_experts = nvext.get("routed_experts") + engine_data = nvext.get("engine_data") + if routed_experts is None and isinstance(engine_data, dict): + routed_experts = engine_data.get("routed_experts") return ResponseTokens( prompt_ids=prompt_ids, prompt_mask=prompt_mask, completion_ids=completion_ids, completion_mask=completion_mask, completion_logprobs=completion_logprobs, - routed_experts=choice_extra.get("routed_experts"), + routed_experts=routed_experts, ) response_id = getattr(response, "id", "") diff --git a/verifiers/clients/openai_chat_completions_token_client.py b/verifiers/clients/openai_chat_completions_token_client.py index 2d8cd701cc..44f7ce7ba5 100644 --- a/verifiers/clients/openai_chat_completions_token_client.py +++ b/verifiers/clients/openai_chat_completions_token_client.py @@ -18,11 +18,21 @@ OpenAITool, handle_openai_overlong_prompt, ) -from verifiers.types import SamplingArgs, State +from verifiers.types import RendererTransport, SamplingArgs, State from verifiers.utils.client_utils import ( post_chat_completion_with_routed_experts_sidecar, ) +# Sentinel for the default (legacy vLLM) transport. Lets callers route +# around the legacy /tokenize body shape without changing the signature. +_DEFAULT_TRANSPORT: RendererTransport = "vllm" + +# vLLM/prime-only sampling keys Dynamo's strict validator rejects — scrubbed +# from every dynamo request body (both MITO and TITO paths). +_DYNAMO_DROP_KEYS = frozenset( + {"return_token_ids", "spaces_between_special_tokens", "priority"} +) + def _has_multimodal_content(messages) -> bool: """Check if any message contains multimodal content (images, audio). @@ -51,7 +61,25 @@ class TokenizeResponse(BaseModel): class OpenAIChatCompletionsTokenClient(OpenAIChatCompletionsClient): - """Wrapper for custom vLLM route /v1/chat/completions/tokens via AsyncOpenAI client.""" + """Token-in / token-out chat client. + + Two transports share this class, selected via + ``ClientConfig.renderer_transport``: + + * ``vllm`` (default): vLLM's TITO surface. + Posts to ``/v1/chat/completions/tokens`` with ``tokens=prompt_ids`` + and uses the server's ``/tokenize`` endpoint for bridge tokens. + Requires vLLM ``>=0.20``. + + * ``dynamo``: Dynamo's standard ``/v1/chat/completions`` + route with ``nvext.token_data=prompt_ids``. Server-side response + token IDs come back via ``response.nvext.engine_data.*`` + (`OpenAIChatCompletionsClient.from_native_response` grafts them + onto the OpenAI-shaped response). Bridge tokens are computed + locally via the model's HuggingFace fast tokenizer — no + ``/tokenize`` HTTP round-trip — since Dynamo doesn't expose vLLM's + token routes. + """ @property def token_client(self) -> AsyncOpenAI: @@ -61,6 +89,46 @@ def token_client(self) -> AsyncOpenAI: base_url = base_url[:-3] return self.client.with_options(base_url=base_url) + @property + def renderer_transport(self) -> RendererTransport: + """Wire-shape selector. ``ClientConfig.renderer_transport`` if set, + else the default vLLM TITO surface. Mirrors the same field used by + ``RendererClient`` so backend selection stays in one place.""" + return cast( + RendererTransport, + getattr(self._config, "renderer_transport", _DEFAULT_TRANSPORT) + if self._config is not None + else _DEFAULT_TRANSPORT, + ) + + def _get_local_tokenizer(self, model: str): + """Lazy, per-model HF fast tokenizer for the ``dynamo`` + transport. Bridge tokens are stitched locally — no ``/tokenize`` + round-trip. Cached so we pay the ``AutoTokenizer.from_pretrained`` + cost once. + """ + # Honor the explicit tokenizer override (renderer_model_name) so model + # aliases don't break bridge stitching; fall back to the served model. + override = ( + getattr(self._config, "renderer_model_name", None) + if self._config is not None + else None + ) + model = override or model + cache: dict[str, Any] = self.__dict__.setdefault("_tokenizer_cache", {}) + if model in cache: + return cache[model] + try: + from transformers import AutoTokenizer # type: ignore[import-not-found] + except ImportError as exc: # pragma: no cover - dependency surface + raise ImportError( + "OpenAIChatCompletionsTokenClient with " + "renderer_transport='dynamo' requires " + "`transformers`. Install with `pip install transformers`." + ) from exc + cache[model] = AutoTokenizer.from_pretrained(model) + return cache[model] + @handle_openai_overlong_prompt async def get_native_response( self, @@ -75,14 +143,59 @@ def normalize_sampling_args(sampling_args: SamplingArgs): if "max_tokens" in sampling_args: sampling_args["max_completion_tokens"] = sampling_args.pop("max_tokens") sampling_args["logprobs"] = True - extra_body = dict(return_token_ids=True) - if "extra_body" in sampling_args: - sampling_args["extra_body"] = { - **sampling_args["extra_body"], - **extra_body, + + # Transport-specific opt-ins. Both transports get response-side + # token IDs, just via different fields: + # + # * vllm (vLLM): `extra_body.return_token_ids=True` + # tells vLLM to set the non-standard `choices[0].token_ids` and + # `response.prompt_token_ids` fields. `parse_tokens` reads them + # directly. + # + # * dynamo: `nvext.extra_fields=["engine_data"]` + # tells Dynamo's response builder to emit `response.nvext` + # `engine_data.{completion_token_ids, completion_logprobs, + # prompt_token_ids}`. `from_native_response` grafts + # this onto the OpenAI-shaped response so `parse_tokens` + # works unmodified. `return_token_ids` is dropped because + # Dynamo's strict validator rejects it. + if self.renderer_transport == "dynamo": + extra_body: dict[str, Any] = { + "nvext": {"extra_fields": ["engine_data"]} } + else: + extra_body = {"return_token_ids": True} + + if "extra_body" in sampling_args: + merged = {**sampling_args["extra_body"]} + # Merge nvext.extra_fields cumulatively rather than overwriting, + # so caller-provided extra_fields (e.g. "timing", "worker_id") + # coexist with our "engine_data" opt-in. + if "nvext" in merged and "nvext" in extra_body: + base = dict(merged.get("nvext") or {}) + inc = dict(extra_body.get("nvext") or {}) + base_ef = list(base.get("extra_fields") or []) + inc_ef = list(inc.get("extra_fields") or []) + merged_ef = list(dict.fromkeys(base_ef + inc_ef)) + merged_nvext = {**base, **inc, "extra_fields": merged_ef} + merged["nvext"] = merged_nvext + sampling_args["extra_body"] = { + **{k: v for k, v in extra_body.items() if k != "nvext"}, + **merged, + } + else: + sampling_args["extra_body"] = {**merged, **extra_body} else: sampling_args["extra_body"] = extra_body + if self.renderer_transport == "dynamo": + # Drop vLLM/prime-only keys Dynamo rejects from both top-level + # args and extra_body, so MITO + TITO paths send a clean body. + eb = sampling_args.get("extra_body") + if isinstance(eb, dict): + for k in _DYNAMO_DROP_KEYS: + eb.pop(k, None) + for k in _DYNAMO_DROP_KEYS: + sampling_args.pop(k, None) return {k: v for k, v in sampling_args.items() if v is not None} sampling_args = normalize_sampling_args(sampling_args) @@ -126,6 +239,16 @@ def normalize_sampling_args(sampling_args: SamplingArgs): prompt, model, sampling_args, tools, extra_headers=extra_headers ) + if self.renderer_transport == "dynamo": + return await self._post_dynamo( + prompt=prompt, + prompt_ids=prompt_ids, + model=model, + tools=tools, + sampling_args=sampling_args, + extra_headers=extra_headers, + ) + extra_body = sampling_args.pop("extra_body", {}) body = { "model": model, @@ -143,6 +266,66 @@ def normalize_sampling_args(sampling_args: SamplingArgs): extra_headers=extra_headers, ) + async def _post_dynamo( + self, + prompt: OpenAIChatMessages, + prompt_ids: list[int], + model: str, + tools: list[OpenAITool] | None, + sampling_args: dict, + extra_headers: Mapping[str, str] | None, + ) -> OpenAIChatResponse: + """Post stitched ``prompt_ids`` to Dynamo's chat-completions route. + + The engine sees ``nvext.token_data`` and skips its own tokenization, + so the placeholder ``messages`` value stays small regardless of + trajectory length. Response token IDs come back via + ``response.nvext.engine_data.completion_token_ids`` and are grafted + onto ``choices[0].token_ids`` by + ``OpenAIChatCompletionsClient.from_native_response`` so the rest of + the pipeline reads them via the standard openai SDK attribute path. + """ + extra_body = dict(sampling_args.pop("extra_body", {}) or {}) + + # nvext.token_data is the canonical pre-tokenized-prompt channel. + # Merge with caller-provided nvext (extra_fields etc.) rather than + # overwriting it. normalize_sampling_args already injected + # extra_fields=["engine_data"] into extra_body.nvext, so this just + # adds token_data to that same dict. + caller_nvext = dict(extra_body.pop("nvext", None) or {}) + caller_nvext["token_data"] = prompt_ids + nvext = caller_nvext + + body: dict[str, Any] = { + "model": model, + "messages": [{"role": "user", "content": ""}], + "stream": False, + "nvext": nvext, + } + if tools: + body["tools"] = tools + + # Forward the full normalized sampling_args (parity with the vLLM path, + # which spreads all of sampling_args), then remaining extra_body keys — + # minus vLLM-only keys Dynamo's strict validator rejects (return_token_ids). + # Unknown keys ride through the dynamo frontend's PASSTHROUGH_EXTRA_FIELDS. + vllm_only = _DYNAMO_DROP_KEYS + for source in (sampling_args, extra_body): + for key, value in source.items(): + if value is None or key in vllm_only or key in body: + continue + body[key] = value + + # Use the sidecar-aware post (same as the vLLM TITO + MITO paths) so any + # routed_experts blob is streamed, not JSON-parsed. dynamo opts into + # extra_fields=["engine_data"] only, so routed_experts is normally absent. + return await post_chat_completion_with_routed_experts_sidecar( + self.client, + "/chat/completions", + body=body, + extra_headers=extra_headers, + ) + async def get_prompt_ids( self, state: State, @@ -176,6 +359,15 @@ def normalize_for_comparison(value: Any) -> Any: # prefix-match equality is unaffected. if normalized.get("content") == "": normalized["content"] = None + # Drop None-valued keys so model_dump's exhaustive view (which + # carries e.g. thinking_blocks=None on AssistantMessage) is + # equivalent to to_native_prompt's slimmer view (which omits + # the field entirely). Without this, vf.Message-shaped input + # (what MultiTurnEnv produces after maybe_normalize_messages) + # never matches the to_native_prompt-normalized step messages, + # which breaks the prefix match and forces TITO to fall back + # to MITO every turn-2+. + normalized = {k: v for k, v in normalized.items() if v is not None} return normalized if isinstance(value, list): return [normalize_for_comparison(item) for item in value] @@ -369,9 +561,28 @@ async def tokenize( extra_kwargs: dict | None = None, **kwargs, ) -> list[int]: - """Tokenize messages using the vLLM /tokenize API.""" + """Tokenize messages for bridge-token computation. + + Dispatched by ``renderer_transport``: + + * ``vllm`` (default): POST to vLLM's ``/tokenize`` route. + * ``dynamo``: local HF fast-tokenizer call. Dynamo doesn't + expose ``/tokenize``; running locally also saves two HTTP RTTs per + turn (the bridge computes both ``add_generation_prompt=True`` and + ``False`` views). The HF Rust encode releases the GIL so the + ``asyncio.to_thread`` wrap gives the event loop real parallelism. + """ if extra_kwargs is None: extra_kwargs = {} + + if self.renderer_transport == "dynamo": + return await self._local_tokenize( + messages=messages, + tools=tools, + model=model, + extra_kwargs=extra_kwargs, + ) + if isinstance(messages, str): body = dict( model=model, @@ -392,3 +603,47 @@ async def tokenize( "/tokenize", body=body, cast_to=TokenizeResponse ) return tokenize_response.tokens + + async def _local_tokenize( + self, + messages: str | OpenAIChatMessages, + tools: list[OpenAITool] | None, + model: str, + extra_kwargs: dict, + ) -> list[int]: + """Local in-process tokenization for the ``dynamo`` transport. + + Bridge tokenization under TITO calls this twice per turn (once for + ``add_generation_prompt=True`` and once for ``False``). Both runs + execute in a worker thread so the event loop stays free; HF fast + tokenizers release the GIL during the Rust encode pass. + """ + import asyncio + + add_generation_prompt = bool(extra_kwargs.get("add_generation_prompt", True)) + chat_template_kwargs = dict(extra_kwargs.get("chat_template_kwargs") or {}) + + # Load the tokenizer inside the worker thread: a cache miss runs the + # synchronous AutoTokenizer.from_pretrained, which must not block the loop. + if isinstance(messages, str): + def _encode_text() -> list[int]: + tokenizer = self._get_local_tokenizer(model) + return list(tokenizer.encode(messages, add_special_tokens=False)) + return await asyncio.to_thread(_encode_text) + + def _encode_chat() -> list[int]: + tokenizer = self._get_local_tokenizer(model) + ids = tokenizer.apply_chat_template( + messages, + tools=tools, + add_generation_prompt=add_generation_prompt, + tokenize=True, + **chat_template_kwargs, + ) + if hasattr(ids, "input_ids"): + ids = ids.input_ids + if ids and isinstance(ids[0], list): + ids = ids[0] + return [int(t) for t in ids] + + return await asyncio.to_thread(_encode_chat) diff --git a/verifiers/clients/renderer_client.py b/verifiers/clients/renderer_client.py index 64ca4ec89d..af372bf9ff 100644 --- a/verifiers/clients/renderer_client.py +++ b/verifiers/clients/renderer_client.py @@ -603,15 +603,22 @@ async def get_native_response( multi_modal_data = None prompt_attribution = None - # ``renderers.client.generate`` discovers the engine's context-length - # cap on its own (via ``GET /v1/models``, cached) and raises - # ``renderers.OverlongPromptError`` on pre-flight overflow. Rebadge - # that into the verifiers-native ``OverlongPromptError`` so the - # ``MultiTurnEnv.prompt_too_long`` stop condition picks it up via - # the ``vf.Error`` hierarchy. The ``@handle_openai_overlong_prompt`` - # decorator still handles the fallback case (cap unknown → engine - # 4xx → vf.OverlongPromptError) for engines whose ``/v1/models`` - # doesn't expose ``max_model_len``. + # Thread renderer_transport from ClientConfig into generate() so the + # renderer client works against Dynamo's /v1/chat/completions surface + # as well as vLLM's /inference/v1/generate. setup_clients auto-picks + # "dynamo" when client_config.backend == "dynamo". + # ``renderers.client.generate`` raises ``renderers.OverlongPromptError`` + # on pre-flight overflow; rebadge to verifiers-native so MultiTurnEnv stops. + transport = ( + self._config.renderer_transport + if self._config is not None + else "vllm" + ) + # Only pass transport= when non-default: a pinned ``renderers`` may + # predate the kwarg, so the default path must use the upstream signature. + generate_kwargs: dict[str, Any] = {} + if transport != "vllm": + generate_kwargs["transport"] = transport try: return await generate( client=self.client, @@ -627,6 +634,7 @@ async def get_native_response( or sampling_params.pop("cache_salt", None), priority=args.get("priority") or sampling_params.pop("priority", None), extra_headers=extra_headers or None, + **generate_kwargs, ) except RendererOverlongPromptError as exc: raise OverlongPromptError(str(exc)) from exc diff --git a/verifiers/types.py b/verifiers/types.py index 4242f8a86f..484ab3b9ed 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -78,6 +78,23 @@ EndpointClient: TypeAlias = AsyncOpenAI | OpenAI | AsyncAnthropic | Anthropic MessageType = Literal["chat", "completion"] # deprecated +# Wire-shape selector shared between RendererClient and +# OpenAIChatCompletionsTokenClient. Picks which inference-server surface the +# client targets at request-build time. Same flag drives both clients so a +# single `ClientConfig.renderer_transport` setting routes consistently. +# +# - "vllm" (default): vLLM's TITO surface. For RendererClient +# that's POST /v1/chat/completions with a renderer-flavored request body. +# For OpenAIChatCompletionsTokenClient that's POST +# /v1/chat/completions/tokens with `tokens=prompt_ids` and bridge +# tokenization via the server's /tokenize route. +# - "dynamo": Dynamo's standard chat-completions route with +# pre-tokenized prompt carried in `nvext.token_data`. Server-side token +# IDs come back via `nvext.engine_data.completion_token_ids` (the +# canonical Dynamo channel). Bridge tokenization runs locally via the +# transformers fast tokenizer; no /tokenize HTTP round-trip. +RendererTransport = Literal["vllm", "dynamo"] + # Provider-agnostic message + response types class CustomBaseModel(BaseModel): @@ -211,6 +228,10 @@ class RoutedExpertsPayload(TypedDict): data: Any shape: list[int] start: int + # Element dtype of the decoded expert-id buffer. NotRequired so payloads + # serialized before this field still validate; a decoder that doesn't see + # it falls back to "uint8" (the historical encoding). + dtype: NotRequired[Literal["uint8", "uint16", "int16", "int32"]] class ResponseTokens(CustomBaseModel): @@ -1269,6 +1290,7 @@ class ClientConfig(BaseModel): Drives the renderer pool when ``client_type == "renderer"``. Defaults to ``None`` so non-renderer clients aren't forced to declare it; the renderer client treats ``None`` as ``AutoRendererConfig()``.""" + renderer_transport: RendererTransport = "vllm" renderer_model_name: str | None = None """Override the tokenizer model name used to instantiate the renderer pool. Defaults to the model used in API requests.""" diff --git a/verifiers/utils/client_utils.py b/verifiers/utils/client_utils.py index d1c9b62e2a..c718c9e0ab 100644 --- a/verifiers/utils/client_utils.py +++ b/verifiers/utils/client_utils.py @@ -101,6 +101,27 @@ async def post_chat_completion_with_routed_experts_sidecar( body: dict[str, Any], extra_headers: Mapping[str, str] | None = None, ) -> ChatCompletion: + def _routed_experts_container(response: ChatCompletion) -> dict[str, Any] | None: + """Return the parsed routed_experts dict, wherever the backend put it.""" + candidates: list[Any] = [] + if response.choices: + choice_extra = response.choices[0].model_extra or {} + if isinstance(choice_extra, dict): + candidates.append(choice_extra.get("routed_experts")) + + top_extra = response.model_extra or {} + nvext = top_extra.get("nvext") if isinstance(top_extra, dict) else None + if isinstance(nvext, dict): + candidates.append(nvext.get("routed_experts")) + engine_data = nvext.get("engine_data") + if isinstance(engine_data, dict): + candidates.append(engine_data.get("routed_experts")) + + for candidate in candidates: + if isinstance(candidate, dict): + return candidate + return None + raw_response = await client.post( path, body=body, @@ -110,9 +131,13 @@ async def post_chat_completion_with_routed_experts_sidecar( stripped, routed_data = strip_routed_experts_data(raw_response.content) response = ChatCompletion.model_validate_json(stripped) if routed_data is not None: - choice_extra = response.choices[0].model_extra - assert choice_extra is not None - choice_extra["routed_experts"]["data"] = routed_data + routed_experts = _routed_experts_container(response) + if routed_experts is None: + raise RuntimeError( + "routed_experts data was stripped from the raw response, but no " + "parsed routed_experts object was found to reattach it." + ) + routed_experts["data"] = routed_data return response diff --git a/verifiers/utils/response_utils.py b/verifiers/utils/response_utils.py index 7bc13bc22d..64539bda2a 100644 --- a/verifiers/utils/response_utils.py +++ b/verifiers/utils/response_utils.py @@ -9,15 +9,32 @@ TrajectoryStepTokens, ) -ROUTED_EXPERTS_DATA_PREFIX = b'"routed_experts":{"data":"' +ROUTED_EXPERTS_OBJ_PREFIX = b'"routed_experts":{' +ROUTED_EXPERTS_DATA_KEY = b'"data":"' def strip_routed_experts_data(raw: bytes) -> tuple[bytes, memoryview | None]: - data_start = raw.find(ROUTED_EXPERTS_DATA_PREFIX) - if data_start < 0: + # Zero-copy fast path for the large base64 routed_experts blob: find the + # "data" value inside the routed_experts object regardless of key order + # (shape/start/dtype may precede it), slice it out before JSON parsing. + # No-op fallback (consumer b64-decodes the string) if the shape isn't found. + obj_start = raw.find(ROUTED_EXPERTS_OBJ_PREFIX) + if obj_start < 0: return raw, None - data_start += len(ROUTED_EXPERTS_DATA_PREFIX) + # Bound the search to the routed_experts object so a missing `data` here + # can't match an unrelated sibling's `data` later in the response. The + # object's values (base64 string, int shape/start, dtype) contain no `}`, + # so the first `}` after the prefix closes it. + obj_end = raw.find(b"}", obj_start) + if obj_end < 0: + return raw, None + + data_key = raw.find(ROUTED_EXPERTS_DATA_KEY, obj_start, obj_end) + if data_key < 0: + return raw, None + + data_start = data_key + len(ROUTED_EXPERTS_DATA_KEY) data_end = raw.index(b'"', data_start) routed_data = memoryview(raw)[data_start:data_end] stripped = raw[:data_start] + raw[data_end:]