diff --git a/environments/dspy_flights/dspy_flights.py b/environments/dspy_flights/dspy_flights.py index 3ec38deb36..715ab2b5c3 100644 --- a/environments/dspy_flights/dspy_flights.py +++ b/environments/dspy_flights/dspy_flights.py @@ -12,9 +12,6 @@ PROGRAM_SANDBOX = { "image": "python:3.11-slim", "network_access": True, - "timeout_minutes": 60, - "command_timeout": 900, - "install_timeout": 900, } diff --git a/environments/hello_parallel_sandbox_v1/hello_parallel_sandbox_v1.py b/environments/hello_parallel_sandbox_v1/hello_parallel_sandbox_v1.py index fe53ecbb62..bda8dc9be3 100644 --- a/environments/hello_parallel_sandbox_v1/hello_parallel_sandbox_v1.py +++ b/environments/hello_parallel_sandbox_v1/hello_parallel_sandbox_v1.py @@ -125,8 +125,6 @@ "image": "python:3.11-slim", "scope": "rollout", "network_access": True, - "timeout_minutes": 20, - "command_timeout": 120, } @@ -154,7 +152,7 @@ class ParallelSandboxHarnessConfig(vf.HarnessConfig): async def bash(command: str, sandbox, state) -> str: """Run a bash command in the active program sandbox.""" - result = await sandbox.execute(command, timeout=120, working_dir="/tmp") + result = await sandbox.execute(command, working_dir="/tmp") output = { "exit_code": int(getattr(result, "exit_code", 0)), "stdout": truncate_text(str(getattr(result, "stdout", "") or "")), diff --git a/environments/hello_self_judge_v1/hello_self_judge_v1.py b/environments/hello_self_judge_v1/hello_self_judge_v1.py index 8ed347957e..a33f5362d1 100644 --- a/environments/hello_self_judge_v1/hello_self_judge_v1.py +++ b/environments/hello_self_judge_v1/hello_self_judge_v1.py @@ -164,7 +164,7 @@ class SelfJudgeHarnessConfig(vf.HarnessConfig): async def bash(command: str, sandbox, state) -> str: """Run a bash command in the rollout sandbox and return stdout/stderr.""" - result = await sandbox.execute(command, timeout=120, working_dir="/tmp") + result = await sandbox.execute(command, working_dir="/tmp") output = { "exit_code": int(getattr(result, "exit_code", 0)), "stdout": truncate_text(str(getattr(result, "stdout", "") or "")), @@ -337,8 +337,6 @@ def load_bash_toolset() -> vf.Toolset: image="python:3.11-slim", scope="rollout", network_access=True, - timeout_minutes=30, - command_timeout=120, ), cleanups=[collect_bash_commands], ) diff --git a/environments/langchain_deep_agents_wikispeedia/langchain_deep_agents_wikispeedia.py b/environments/langchain_deep_agents_wikispeedia/langchain_deep_agents_wikispeedia.py index 51bcfe25f1..a9d425a31e 100644 --- a/environments/langchain_deep_agents_wikispeedia/langchain_deep_agents_wikispeedia.py +++ b/environments/langchain_deep_agents_wikispeedia/langchain_deep_agents_wikispeedia.py @@ -1,4 +1,3 @@ -import asyncio import json from collections.abc import Awaitable, Callable, Iterator, Mapping, Sequence from typing import Protocol, cast @@ -51,6 +50,7 @@ def system_prompt(allow_go_back: bool = True) -> str: SYSTEM_PROMPT = system_prompt() +WIKISPEEDIA_TASK_TIMEOUT_SECONDS = 1200.0 class WikispeediaTasksetConfig(vf.TasksetConfig): @@ -73,7 +73,6 @@ class WikispeediaHarnessConfig(vf.HarnessConfig): fn="run_langchain_deep_agents_wikispeedia_program" ) max_turns: int = 50 - timeout_seconds: float = 1200.0 class WikispeediaTaskset(vf.Taskset[WikispeediaTasksetConfig]): @@ -444,7 +443,6 @@ async def go_back() -> str: def make_langchain_deep_agents_program( max_turns: int, - timeout_seconds: float, ) -> Callable[[vf.Task, vf.State], Awaitable[vf.State]]: async def run_langchain_deep_agents_wikispeedia_program( task: vf.Task, state: vf.State @@ -488,19 +486,14 @@ async def run_langchain_deep_agents_wikispeedia_program( invoke_config = ( {"recursion_limit": recursion_limit} if recursion_limit > 0 else None ) - invoke = agent.ainvoke( - {"messages": [{"role": "user", "content": prompt}]}, - config=invoke_config, - ) try: - result = await asyncio.wait_for(invoke, timeout=timeout_seconds) - except (TimeoutError, GraphRecursionError) as exc: - state["agent_timeout"] = True - state.stop( - "agent_timeout" - if isinstance(exc, TimeoutError) - else "agent_recursion_limit" + result = await agent.ainvoke( + {"messages": [{"role": "user", "content": prompt}]}, + config=invoke_config, ) + except GraphRecursionError: + state["agent_timeout"] = True + state.stop("agent_recursion_limit") state.setdefault("agent_completion", []) return state @@ -520,7 +513,6 @@ async def run_langchain_deep_agents_wikispeedia_program( ) -> vf.State: return await make_langchain_deep_agents_program( max_turns=harness.config.max_turns, - timeout_seconds=harness.config.timeout_seconds, )(task, state) @@ -585,7 +577,9 @@ class WikispeediaEnvConfig(vf.EnvConfig): def load_environment(config: WikispeediaEnvConfig) -> vf.Env: - return vf.Env( + env = vf.Env( taskset=vf.load_taskset(config=config.taskset), harness=vf.load_harness(config=config.harness), ) + env.task_timeout_seconds = WIKISPEEDIA_TASK_TIMEOUT_SECONDS + return env diff --git a/environments/math_python/math_python.py b/environments/math_python/math_python.py index 61d52f7e33..6092be8c58 100644 --- a/environments/math_python/math_python.py +++ b/environments/math_python/math_python.py @@ -25,6 +25,10 @@ def load_environment( unsupported.append("max_startup_wait_seconds") if sandbox_client_max_workers is not None: unsupported.append("sandbox_client_max_workers") + if sandbox_timeout_minutes != 60: + unsupported.append("sandbox_timeout_minutes") + if sandbox_timeout_per_command_seconds != 60: + unsupported.append("sandbox_timeout_per_command_seconds") if unsupported: unexpected = ", ".join(sorted(unsupported)) raise TypeError(f"Unsupported v1 load_environment kwargs: {unexpected}") @@ -52,8 +56,6 @@ def load_environment( sandbox_memory_gb=sandbox_memory_gb, sandbox_disk_size_gb=sandbox_disk_size_gb, sandbox_gpu_count=sandbox_gpu_count, - sandbox_timeout_minutes=sandbox_timeout_minutes, - sandbox_timeout_per_command_seconds=sandbox_timeout_per_command_seconds, ), ) ) diff --git a/environments/math_python/math_python_v1.py b/environments/math_python/math_python_v1.py index e7fa12f9a7..d44bc64afd 100644 --- a/environments/math_python/math_python_v1.py +++ b/environments/math_python/math_python_v1.py @@ -104,8 +104,6 @@ class MathPythonHarnessConfig(vf.HarnessConfig): sandbox_memory_gb: int = 2 sandbox_disk_size_gb: int = 5 sandbox_gpu_count: int = 0 - sandbox_timeout_minutes: int = 60 - sandbox_timeout_per_command_seconds: int = 60 class MathPythonEnvConfig(vf.EnvConfig): @@ -162,8 +160,6 @@ def load_toolset( sandbox_memory_gb: int = 2, sandbox_disk_size_gb: int = 5, sandbox_gpu_count: int = 0, - sandbox_timeout_minutes: int = 60, - sandbox_timeout_per_command_seconds: int = 60, ): packages = pip_install_packages.split() if pip_install_packages.strip() else [] return vf.Toolset( @@ -176,8 +172,6 @@ def load_toolset( memory_gb=sandbox_memory_gb, disk_size_gb=sandbox_disk_size_gb, gpu_count=sandbox_gpu_count, - timeout_minutes=sandbox_timeout_minutes, - command_timeout=sandbox_timeout_per_command_seconds, packages=packages, ), cleanups=[collect_python_commands], @@ -195,10 +189,6 @@ def load_environment(config: MathPythonEnvConfig) -> vf.Env: sandbox_memory_gb=config.harness.sandbox_memory_gb, sandbox_disk_size_gb=config.harness.sandbox_disk_size_gb, sandbox_gpu_count=config.harness.sandbox_gpu_count, - sandbox_timeout_minutes=config.harness.sandbox_timeout_minutes, - sandbox_timeout_per_command_seconds=( - config.harness.sandbox_timeout_per_command_seconds - ), ) } ) diff --git a/environments/rlm_swe_v1/README.md b/environments/rlm_swe_v1/README.md index d7389e4f70..7df27a0915 100644 --- a/environments/rlm_swe_v1/README.md +++ b/environments/rlm_swe_v1/README.md @@ -22,7 +22,7 @@ env = load_environment( harness=RLMConfig( program=RLMProgramConfig( local_checkout="/path/to/checkout", - tools=["bash", "edit"], + rlm_tools=["bash", "edit"], ) ), ) diff --git a/environments/rlm_swe_v1/rlm_swe_v1.py b/environments/rlm_swe_v1/rlm_swe_v1.py index fa0437344a..4753201a13 100644 --- a/environments/rlm_swe_v1/rlm_swe_v1.py +++ b/environments/rlm_swe_v1/rlm_swe_v1.py @@ -27,7 +27,6 @@ class RlmSweTasksetConfig(vf.TasksetConfig): filter_repos: list[str] | None = None ds_num_proc: int | None = None ds_keep_in_memory: bool = True - timeout_minutes: int | None = None hide_tests_from_agent: bool = True env: vf.ConfigData | None = None @@ -59,7 +58,7 @@ async def upload_file( async def upload_bytes(self, remote_path: str, data: bytes, name: str) -> None: ... async def run_background_job( - self, command: str, timeout: int, working_dir: str + self, command: str, working_dir: str ) -> SandboxCommandResult: ... @@ -69,7 +68,6 @@ def load_tasks( filter_repos: list[str] | None = None, ds_num_proc: int | None = None, ds_keep_in_memory: bool = True, - timeout_minutes: int | None = None, env: vf.ConfigData | None = None, ) -> list[vf.JsonData]: dataset_kwargs = dict( @@ -116,7 +114,6 @@ def load_tasks( "sandbox": sandbox_config( info=info, repo_path=repo_path, - timeout_minutes=timeout_minutes, ), "program": {"env": program_env}, } @@ -124,10 +121,8 @@ def load_tasks( return rows -def sandbox_config( - *, info: vf.JsonData, repo_path: str, timeout_minutes: int | None -) -> vf.JsonData: - config: vf.JsonData = { +def sandbox_config(*, info: vf.JsonData, repo_path: str) -> vf.JsonData: + return { "image": f"{REGISTRY_PREFIX}/{info['docker_image']}", "cpu_cores": 4, "memory_gb": 4, @@ -136,9 +131,6 @@ def sandbox_config( "workdir": repo_path, "scope": "rollout", } - if timeout_minutes is not None: - config["timeout_minutes"] = timeout_minutes - return config def env_vars(*, repo_path: str, env: vf.ConfigData) -> dict[str, str]: @@ -162,7 +154,6 @@ def sandbox_config(self, info: vf.JsonData) -> vf.JsonData: return sandbox_config( info=info, repo_path=self.config.repo_path, - timeout_minutes=self.config.timeout_minutes, ) def get_env_vars(self) -> dict[str, str]: @@ -177,7 +168,6 @@ def load_tasks(self, split: vf.TaskSplit = "train") -> vf.Tasks: filter_repos=self.config.filter_repos, ds_num_proc=self.config.ds_num_proc, ds_keep_in_memory=self.config.ds_keep_in_memory, - timeout_minutes=self.config.timeout_minutes, env=dict(self.config.env or {}), ) @@ -187,10 +177,6 @@ async def setup_r2e_sandbox(self, task, state, sandbox=None) -> None: raise RuntimeError("R2E SWE setup requires the active program sandbox.") state["_rlm_swe_sandbox"] = sandbox state["sandbox_id"] = getattr(sandbox, "id", state.get("sandbox_id")) - sandbox_config = task.get("sandbox") - if isinstance(sandbox_config, Mapping): - timeout_minutes = int(sandbox_config.get("timeout_minutes") or 60) - state.setdefault("test_timeout", timeout_minutes * 60) await self.setup_sandbox(sandbox, state) async def setup_sandbox(self, sandbox: R2ESandbox, state: vf.State) -> None: @@ -261,11 +247,7 @@ async def solved(self, task, state) -> float: if sandbox is None: return 0.0 try: - test_output = await self.run_tests( - sandbox, - state, - int(state.get("test_timeout", 900)), - ) + test_output = await self.run_tests(sandbox, state) state["test_output"] = test_output except Exception as exc: logger.warning("Test execution failed: %r", exc) @@ -277,7 +259,6 @@ async def run_tests( self, sandbox: R2ESandbox, state: vf.State, - test_timeout: int, ) -> str: local_archive_path = state.get("r2e_tests_archive_local_path") if local_archive_path and Path(str(local_archive_path)).exists(): @@ -306,7 +287,7 @@ async def run_tests( ) command = f"export {env_str}; /bin/bash run_tests.sh > test_output.txt 2>&1" result = await sandbox.run_background_job( - command, timeout=test_timeout, working_dir=self.config.repo_path + command, working_dir=self.config.repo_path ) if result.exit_code > 1: raise RuntimeError(f"Error running tests: exit_code={result.exit_code}") @@ -362,12 +343,9 @@ async def apply_gold_patch(self, sandbox: R2ESandbox, state: vf.State) -> None: async def validate_instance(self, state: vf.State) -> bool: sandbox = cast(R2ESandbox, state["_rlm_swe_sandbox"]) await self.apply_gold_patch(sandbox, state) - test_timeout = state.get("test_timeout", 900) - assert isinstance(test_timeout, int) test_output = await self.run_tests( sandbox, state, - test_timeout, ) state["test_output"] = test_output info = cast(vf.JsonData, state["info"]) @@ -493,7 +471,7 @@ def load_taskset( class RlmSweProgramConfig(RLMProgramConfig): workdir: str = DEFAULT_REPO_PATH - tools: list[str] = list(DEFAULT_RLM_TOOLS) + rlm_tools: list[str] = list(DEFAULT_RLM_TOOLS) class RlmSweHarnessConfig(RLMConfig): diff --git a/packages/harnesses/harnesses/mini_swe_agent.py b/packages/harnesses/harnesses/mini_swe_agent.py index 29aa084344..739124e1bd 100644 --- a/packages/harnesses/harnesses/mini_swe_agent.py +++ b/packages/harnesses/harnesses/mini_swe_agent.py @@ -20,7 +20,6 @@ MINI_SWE_AGENT_DEFAULT_PACKAGE = MINI_SWE_AGENT_DEFAULT_VERSION MINI_SWE_AGENT_DEFAULT_CONFIG_SPEC = "mini" MINI_SWE_AGENT_DEFAULT_MODEL_CLASS = "litellm" -MINI_SWE_AGENT_DEFAULT_ENVIRONMENT_TIMEOUT = 120 def build_mini_swe_agent_install_script( @@ -62,7 +61,6 @@ class MiniSWEAgentProgramConfig(vf.ProgramConfig): trajectory_path: str = MINI_SWE_AGENT_DEFAULT_TRAJECTORY_PATH config_spec: str = MINI_SWE_AGENT_DEFAULT_CONFIG_SPEC model_class: str = MINI_SWE_AGENT_DEFAULT_MODEL_CLASS - environment_timeout: int = MINI_SWE_AGENT_DEFAULT_ENVIRONMENT_TIMEOUT parallel_tool_calls: bool = True extra_config_specs: list[str] | None = None sandbox: vf.SandboxConfig | None = vf.SandboxConfig() @@ -103,8 +101,6 @@ def resolve( "-c", "agent.cost_limit=0", "-c", - f"environment.timeout={self.environment_timeout}", - "-c", f"model.model_class={shlex.quote(self.model_class)}", "-c", "model.cost_tracking=ignore_errors", @@ -141,7 +137,7 @@ def resolve( CONFIG_ARGS+=(-c "agent.system_template=$(cat {system_prompt_file})") fi cd "$MINI_SWE_AGENT_WORKDIR" -timeout --kill-after=30s "${{AGENT_TIMEOUT_SECONDS:-3600}}" {shlex.quote(DEFAULT_MINI_BINARY)} \\ +{shlex.quote(DEFAULT_MINI_BINARY)} \\ --model "$OPENAI_MODEL" \\ --task "$MINI_SWE_AGENT_TASK" \\ --output {shlex.quote(self.trajectory_path)} \\ diff --git a/packages/harnesses/harnesses/nemo_gym.py b/packages/harnesses/harnesses/nemo_gym.py index 39d35332d4..2dc5da637a 100644 --- a/packages/harnesses/harnesses/nemo_gym.py +++ b/packages/harnesses/harnesses/nemo_gym.py @@ -16,6 +16,7 @@ from pydantic import Field from verifiers.types import AssistantMessage, ToolCall, ToolMessage +from verifiers.utils.async_utils import FrameworkTimeoutError, timeout_after from verifiers.utils.serve_utils import get_free_port logger = logging.getLogger(__name__) @@ -41,7 +42,6 @@ class NeMoGymHarnessConfig(vf.HarnessConfig): config_paths: list[str] = Field(default_factory=list) server_name: str | None = None agent_name: str | None = None - timeout_seconds: float | None = None global_config: ConfigMap = Field(default_factory=dict) @@ -54,7 +54,7 @@ async def run( server_name: str | None, agent_name: str | None, endpoint_config: EndpointConfig, - timeout_seconds: float | None, + task_timeout_seconds: float | None, global_config: ConfigMap, ) -> ConfigMap: ... @@ -196,15 +196,23 @@ async def _run_nemo_gym(self, task: vf.Task, state: vf.State) -> vf.State: raise RuntimeError("NeMo Gym harness runner has not been compiled.") endpoint_config = await nemo_gym_rollout_endpoint_config(state) row = nemo_gym_row_from_task(cast(ConfigMap, task), self.config.agent_name) - result = await runner.run( - row, - config_paths=self._config_paths(), - server_name=self.config.server_name, - agent_name=self.config.agent_name, - endpoint_config=endpoint_config, - timeout_seconds=self.config.timeout_seconds, - global_config=self.config.global_config, - ) + try: + result = await runner.run( + row, + config_paths=self._config_paths(), + server_name=self.config.server_name, + agent_name=self.config.agent_name, + endpoint_config=endpoint_config, + task_timeout_seconds=cast( + float | None, state.runtime_state().get("task_timeout_seconds") + ), + global_config=self.config.global_config, + ) + except FrameworkTimeoutError: + state["timed_out"] = True + state["task_timed_out"] = True + state.stop("timeout_reached") + return state apply_nemo_gym_result(state, result) return state @@ -272,7 +280,7 @@ async def run( server_name: str | None, agent_name: str | None, endpoint_config: EndpointConfig, - timeout_seconds: float | None, + task_timeout_seconds: float | None, global_config: ConfigMap, ) -> ConfigMap: async with self._lifecycle_lock: @@ -287,9 +295,8 @@ async def run( agent_name=agent_name, endpoint_config=endpoint_config, ) - if timeout_seconds is None: + async with timeout_after(task_timeout_seconds): return await run_once - return await asyncio.wait_for(run_once, timeout=timeout_seconds) async def _ensure_started( self, *, config_paths: Sequence[str], global_config: ConfigMap diff --git a/packages/harnesses/harnesses/opencode.py b/packages/harnesses/harnesses/opencode.py index 40031bec7d..98900b54f5 100644 --- a/packages/harnesses/harnesses/opencode.py +++ b/packages/harnesses/harnesses/opencode.py @@ -51,7 +51,6 @@ class OpenCodeProgramConfig(vf.ProgramConfig): allow_git: bool = False disable_compaction: bool = True install_ripgrep: bool = True - provider_timeout_ms: int = 3_600_000 def resolve(self, version: str = OPENCODE_DEFAULT_VERSION) -> vf.ProgramConfig: files: dict[str, vf.ProgramValue] = { @@ -124,7 +123,6 @@ def resolve(self, version: str = OPENCODE_DEFAULT_VERSION) -> vf.ProgramConfig: "options": { "baseURL": "$OPENAI_BASE_URL", "apiKey": "${OPENAI_API_KEY:-intercepted}", - "timeout": self.provider_timeout_ms, }, "models": { "model": { diff --git a/packages/harnesses/harnesses/rlm.py b/packages/harnesses/harnesses/rlm.py index aa728b8ae0..16ce21ece5 100644 --- a/packages/harnesses/harnesses/rlm.py +++ b/packages/harnesses/harnesses/rlm.py @@ -11,8 +11,7 @@ ) RLM_DEFAULT_REPO_URL = "github.com/PrimeIntellect-ai/rlm-harness.git" -RLM_DEFAULT_REF = "main" -RLM_DEFAULT_EXEC_TIMEOUT = 300 +RLM_DEFAULT_REPO_REF = "main" RLM_DEFAULT_MAX_DEPTH = 0 RLM_DEFAULT_INSTRUCTION_PATH = "/rlm/instruction.txt" RLM_DEFAULT_APPEND_TO_SYSTEM_PROMPT_PATH = "/rlm/append_to_system_prompt.txt" @@ -24,15 +23,14 @@ class RLMProgramConfig(vf.ProgramConfig): sandbox: vf.SandboxConfig | None = None workdir: str = RLM_DEFAULT_WORKDIR instruction_path: str = RLM_DEFAULT_INSTRUCTION_PATH - repo_url: str = RLM_DEFAULT_REPO_URL - ref: str = RLM_DEFAULT_REF - exec_timeout: int = RLM_DEFAULT_EXEC_TIMEOUT - max_depth: int = RLM_DEFAULT_MAX_DEPTH + rlm_repo_url: str = RLM_DEFAULT_REPO_URL + rlm_repo_ref: str = RLM_DEFAULT_REPO_REF + rlm_max_depth: int = RLM_DEFAULT_MAX_DEPTH summarize_at_tokens: int | None = None append_to_system_prompt: str = "" local_checkout: str | None = None gh_token_var: str | None = "GH_TOKEN" - tools: list[str] = RLM_DEFAULT_TOOLS + rlm_tools: list[str] = RLM_DEFAULT_TOOLS env_vars: dict[str, str] = {} skills: str | None = None @@ -55,8 +53,8 @@ def resolve(self) -> vf.ProgramConfig: if self.local_checkout else {} ), - "repo_url": self.repo_url, - "ref": self.ref, + "repo_url": self.rlm_repo_url, + "ref": self.rlm_repo_ref, **({"gh_token_var": self.gh_token_var} if self.gh_token_var else {}), } } @@ -71,9 +69,8 @@ def resolve(self) -> vf.ProgramConfig: "PATH": "/root/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", "OPENAI_MODEL": "runtime.model", "RLM_MODEL": "runtime.model", - "RLM_TOOLS": ",".join(self.tools), - "RLM_EXEC_TIMEOUT": str(self.exec_timeout), - "RLM_MAX_DEPTH": str(self.max_depth), + "RLM_TOOLS": ",".join(self.rlm_tools), + "RLM_MAX_DEPTH": str(self.rlm_max_depth), **self.env_vars, } if self.summarize_at_tokens is not None: @@ -90,13 +87,6 @@ def resolve(self) -> vf.ProgramConfig: } } ) - command_timeout = max(self.exec_timeout + 120, 600) - setup_timeout = command_timeout - if self.sandbox is not None and "setup_timeout" in self.sandbox.data( - fill_defaults=False - ): - setup_timeout = self.sandbox.setup_timeout - if self.sandbox is None: sandbox = vf.SandboxConfig( image="python:3.11-slim", @@ -105,17 +95,12 @@ def resolve(self) -> vf.ProgramConfig: memory_gb=2, disk_size_gb=5, network_access=True, - timeout_minutes=60, - command_timeout=command_timeout, - setup_timeout=setup_timeout, ) else: sandbox = vf.SandboxConfig.model_validate( { "workdir": self.workdir, - "command_timeout": command_timeout, **self.sandbox.data(), - "setup_timeout": setup_timeout, } ) @@ -173,7 +158,6 @@ def resolve(self) -> vf.ProgramConfig: env=env, artifacts=artifacts, sandbox=sandbox, - setup_timeout=setup_timeout, ) diff --git a/packages/tasksets/tasksets/harbor.py b/packages/tasksets/tasksets/harbor.py index 62b90b6f8e..a6ce2be869 100644 --- a/packages/tasksets/tasksets/harbor.py +++ b/packages/tasksets/tasksets/harbor.py @@ -4,6 +4,7 @@ import verifiers as vf from verifiers.utils.import_utils import load_toml from verifiers.v1.utils.sandbox_utils import SandboxClient +from verifiers.v1.utils.timeout_utils import test_phase_timeout_seconds from tasksets.utils.harbor_utils import ( TASKS_SUBDIR, @@ -22,9 +23,7 @@ cpu_cores=2.0, memory_gb=4.0, disk_size_gb=10.0, - timeout_minutes=120, workdir="/app", - command_timeout=900, ) @@ -36,7 +35,6 @@ class HarborTasksetConfig(vf.TasksetConfig): cache_dir: str | None = None refresh: bool = False sandbox: vf.SandboxConfig = HARBOR_DEFAULT_SANDBOX - verifier_timeout_seconds: float = 900.0 task_dir: str = "/task" env: dict[str, str] = {} @@ -85,6 +83,9 @@ def load_tasks(self, split: vf.TaskSplit = "train") -> vf.Tasks: raise TypeError(f"{task_toml_path} [agent] must be a mapping.") if not isinstance(verifier_config, dict): raise TypeError(f"{task_toml_path} [verifier] must be a mapping.") + test_timeout_seconds = test_phase_timeout_seconds( + verifier_config.get("timeout_sec") + ) instruction = instruction_path.read_text().strip() task_remote_dir = config.task_dir.rstrip("/") or "/task" sandbox = harbor_sandbox(HARBOR_DEFAULT_SANDBOX, config.sandbox) @@ -98,12 +99,6 @@ def load_tasks(self, split: vf.TaskSplit = "train") -> vf.Tasks: "disk_size_gb": parse_gb( environment.get("storage"), sandbox.disk_size_gb ), - "command_timeout": int( - parse_number( - agent_config.get("timeout_sec"), - sandbox.command_timeout or 900, - ) - ), **( {"network_access": bool(environment["allow_internet"])} if "allow_internet" in environment @@ -141,9 +136,10 @@ def load_tasks(self, split: vf.TaskSplit = "train") -> vf.Tasks: "task_name": task_dir.name, "config": task_config, "docker_image": environment.get("docker_image"), - "test_timeout": parse_number( - verifier_config.get("timeout_sec"), - config.verifier_timeout_seconds, + **( + {"test_timeout_seconds": test_timeout_seconds} + if test_timeout_seconds is not None + else {} ), }, "info": { @@ -166,12 +162,12 @@ async def harbor_reward(self, task: vf.Task, state: vf.State) -> float: harbor = task["harbor"] assert isinstance(harbor, dict) task_dir = Path(str(harbor["task_dir"])) + test_timeout = test_phase_timeout_seconds(harbor.get("test_timeout_seconds")) from prime_sandboxes import AsyncSandboxClient client = cast(SandboxClient, AsyncSandboxClient()) try: await upload_harbor_tests(client, sandbox_id, task_dir) - test_timeout = int(parse_number(harbor.get("test_timeout"), 900)) result = await client.run_background_job( sandbox_id=sandbox_id, command="bash test.sh", diff --git a/packages/tasksets/tasksets/openenv.py b/packages/tasksets/tasksets/openenv.py index 9d0040cbd2..18d1fb2554 100644 --- a/packages/tasksets/tasksets/openenv.py +++ b/packages/tasksets/tasksets/openenv.py @@ -60,11 +60,6 @@ class OpenEnvRuntimeConfig(vf.Config): start_command: str contract: Literal["gym", "mcp"] seed: int - startup_timeout_seconds: int - startup_poll_interval_seconds: float - health_request_timeout_seconds: float - schema_request_timeout_seconds: float - wait_for_creation_max_attempts: int max_retries: int base_delay: float backoff_factor: float @@ -99,11 +94,6 @@ class OpenEnvTasksetConfig(vf.TasksetConfig): num_train_examples: int = 100 num_eval_examples: int = 50 seed: int = 0 - startup_timeout_seconds: int = 30 - startup_poll_interval_seconds: float = 1.0 - health_request_timeout_seconds: float = 2.0 - schema_request_timeout_seconds: float = 5.0 - wait_for_creation_max_attempts: int = 20 max_retries: int = 5 base_delay: float = 0.5 backoff_factor: float = 2.0 @@ -215,11 +205,6 @@ def openenv_tasks( start_command=build.start_command, contract=build.contract, seed=config.seed, - startup_timeout_seconds=config.startup_timeout_seconds, - startup_poll_interval_seconds=config.startup_poll_interval_seconds, - health_request_timeout_seconds=config.health_request_timeout_seconds, - schema_request_timeout_seconds=config.schema_request_timeout_seconds, - wait_for_creation_max_attempts=config.wait_for_creation_max_attempts, max_retries=config.max_retries, base_delay=config.base_delay, backoff_factor=config.backoff_factor, diff --git a/packages/tasksets/tasksets/utils/harbor_utils.py b/packages/tasksets/tasksets/utils/harbor_utils.py index 6525dbae25..52b98ab7ef 100644 --- a/packages/tasksets/tasksets/utils/harbor_utils.py +++ b/packages/tasksets/tasksets/utils/harbor_utils.py @@ -13,6 +13,7 @@ from verifiers.v1.sandbox import SandboxConfig from verifiers.v1.utils.sandbox_utils import SandboxClient +from verifiers.v1.utils.timeout_utils import FILE_TRANSFER_TIMEOUT_SECONDS TASKS_SUBDIR = "tasks" @@ -152,7 +153,7 @@ async def upload_harbor_tests( f"mkdir -p /oracle /tests /logs/verifier && " f"tar -xzf {remote_tar} -C / && rm {remote_tar}" ), - timeout=900, + timeout=FILE_TRANSFER_TIMEOUT_SECONDS, ) finally: tar_path.unlink(missing_ok=True) diff --git a/packages/tasksets/tasksets/utils/openenv_utils.py b/packages/tasksets/tasksets/utils/openenv_utils.py index 2ac576d6ae..f5da08cc0b 100644 --- a/packages/tasksets/tasksets/utils/openenv_utils.py +++ b/packages/tasksets/tasksets/utils/openenv_utils.py @@ -7,6 +7,14 @@ import tenacity as tc from openenv.core.containers.runtime.providers import ContainerProvider from verifiers.v1.types import JsonData +from verifiers.v1.utils.timeout_utils import ( + OPENENV_HEALTH_REQUEST_TIMEOUT_SECONDS, + OPENENV_SCHEMA_REQUEST_TIMEOUT_SECONDS, + OPENENV_STARTUP_POLL_INTERVAL_SECONDS, + OPENENV_STARTUP_TIMEOUT_SECONDS, + OPENENV_WAIT_FOR_CREATION_ATTEMPTS, + SANDBOX_LEASE_TTL_MINUTES, +) T = TypeVar("T") @@ -54,7 +62,7 @@ def start_container( cpu_cores=2, memory_gb=4, disk_size_gb=10, - timeout_minutes=60, + timeout_minutes=SANDBOX_LEASE_TTL_MINUTES, environment_vars={"ENABLE_WEB_INTERFACE": "false", **dict(env_vars or {})}, ) sandbox_id: str | None = None @@ -66,7 +74,7 @@ def start_container( self.sandbox_id = sandbox_id client.wait_for_creation( sandbox_id, - max_attempts=self.spec.wait_for_creation_max_attempts, + max_attempts=OPENENV_WAIT_FOR_CREATION_ATTEMPTS, ) exposure = client.expose( sandbox_id, @@ -129,21 +137,21 @@ def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: del timeout_s start_time = time.monotonic() last_error = "no attempts" - while time.monotonic() - start_time < self.spec.startup_timeout_seconds: + while time.monotonic() - start_time < OPENENV_STARTUP_TIMEOUT_SECONDS: try: response = requests.get( f"{base_url}/health", - timeout=self.spec.health_request_timeout_seconds, + timeout=OPENENV_HEALTH_REQUEST_TIMEOUT_SECONDS, ) if response.status_code == 200: return last_error = f"HTTP {response.status_code}: {response.text[:200]}" except Exception as exc: last_error = f"{type(exc).__name__}: {exc}" - time.sleep(self.spec.startup_poll_interval_seconds) + time.sleep(OPENENV_STARTUP_POLL_INTERVAL_SECONDS) raise RuntimeError( "OpenEnv server not ready. " - f"Health timeout={self.spec.startup_timeout_seconds}s, " + f"Health timeout={OPENENV_STARTUP_TIMEOUT_SECONDS}s, " f"url={base_url}, last error: {last_error}" ) @@ -151,7 +159,7 @@ def fetch_schema(self) -> JsonData: def request_schema() -> JsonData: response = requests.get( f"{self.base_url}/schema", - timeout=self.spec.schema_request_timeout_seconds, + timeout=OPENENV_SCHEMA_REQUEST_TIMEOUT_SECONDS, ) response.raise_for_status() data = response.json() diff --git a/tests/test_eval_cli.py b/tests/test_eval_cli.py index 399137e690..97ce53aacb 100644 --- a/tests/test_eval_cli.py +++ b/tests/test_eval_cli.py @@ -86,6 +86,9 @@ def _run_cli( "save_to_hf_hub": False, "hf_hub_dataset_name": "", "extra_env_kwargs": {}, + "rollout_timeout_seconds": None, + "task_timeout_seconds": None, + "global_timeout_seconds": None, "env_config_overrides": [], "max_retries": 0, "fullscreen": False, @@ -350,34 +353,10 @@ def test_cli_temperature_not_added_when_none(monkeypatch, run_cli): assert "temperature" not in sa -def test_cli_extra_env_kwargs_support_timeout_seconds(monkeypatch, run_cli): - captured = run_cli( - monkeypatch, - { - "extra_env_kwargs": {"timeout_seconds": 30, "foo": "bar"}, - }, - ) - - assert captured["configs"][0].extra_env_kwargs == { - "timeout_seconds": 30, - "foo": "bar", - } - +def test_parse_args_legacy_timeout_alias_maps_to_rollout_timeout(): + args = vf_eval.parse_args(["dummy-env", "--timeout", "600"]) -def test_cli_timeout_flag_overrides_extra_env_kwargs(monkeypatch, run_cli): - """--timeout wins over timeout_seconds in --extra-env-kwargs.""" - captured = run_cli( - monkeypatch, - { - "extra_env_kwargs": {"timeout_seconds": 30, "foo": "bar"}, - "timeout": 600, - }, - ) - - assert captured["configs"][0].extra_env_kwargs == { - "timeout_seconds": 600, - "foo": "bar", - } + assert args.rollout_timeout_seconds == 600 def test_cli_headers_table_and_list_merge(monkeypatch, run_cli): @@ -1294,22 +1273,22 @@ def test_load_toml_config_global_values_with_per_eval_override(): def test_load_toml_config_with_extra_env_kwargs(): with tempfile.NamedTemporaryFile(suffix=".toml", delete=False, mode="w") as f: f.write( - '[[eval]]\nenv_id = "env1"\n[eval.extra_env_kwargs]\ntimeout_seconds = 600\n' + '[[eval]]\nenv_id = "env1"\n[eval.extra_env_kwargs]\nrollout_timeout_seconds = 600\n' ) f.flush() result = load_toml_config(Path(f.name)) - assert result[0]["extra_env_kwargs"] == {"timeout_seconds": 600} + assert result[0]["extra_env_kwargs"] == {"rollout_timeout_seconds": 600} -def test_load_toml_config_with_top_level_timeout(): - """Top-level `timeout` is a recognized field on [[eval]] tables.""" +def test_load_toml_config_maps_legacy_timeout_to_rollout_timeout(): with tempfile.NamedTemporaryFile(suffix=".toml", delete=False, mode="w") as f: f.write('[[eval]]\nenv_id = "env1"\ntimeout = 600\n') f.flush() result = load_toml_config(Path(f.name)) - assert result[0]["timeout"] == 600 + assert result[0]["rollout_timeout_seconds"] == 600 + assert "timeout" not in result[0] def test_load_toml_config_invalid_global_field(): diff --git a/tests/test_langchain_deep_agents_wikispeedia.py b/tests/test_langchain_deep_agents_wikispeedia.py index 4632a8423e..e8aa39a5fd 100644 --- a/tests/test_langchain_deep_agents_wikispeedia.py +++ b/tests/test_langchain_deep_agents_wikispeedia.py @@ -86,7 +86,6 @@ def test_wikispeedia_env_config_reaches_taskset_and_harness( }, harness={ "max_turns": 8, - "timeout_seconds": 9.0, }, ) ) @@ -101,7 +100,6 @@ def test_wikispeedia_env_config_reaches_taskset_and_harness( assert len(eval_rows) == 1 assert train_rows[0]["max_turns"] == 7 assert env.harness.config.max_turns == 8 - assert env.harness.config.timeout_seconds == 9.0 assert [tool.__name__ for tool in env.taskset.toolsets[0].tools] == ["click_link"] @@ -350,7 +348,6 @@ def fake_create_deep_agent(**kwargs): program = module.make_langchain_deep_agents_program( max_turns=50, - timeout_seconds=30, ) state = FakeState( { diff --git a/tests/test_v1_harbor_cli.py b/tests/test_v1_harbor_cli.py index 348c193c94..8519737525 100644 --- a/tests/test_v1_harbor_cli.py +++ b/tests/test_v1_harbor_cli.py @@ -110,7 +110,6 @@ def test_harbor_taskset_loads_package_tasks_with_program_patch( assert task["sandbox"]["image"] == "ubuntu:24.04" assert task["sandbox"]["memory_gb"] == 2.0 assert task["sandbox"]["disk_size_gb"] == 8.0 - assert task["sandbox"]["command_timeout"] == 600 assert "network_access" not in task["sandbox"] assert ( merge_task_sandbox( @@ -118,7 +117,7 @@ def test_harbor_taskset_loads_package_tasks_with_program_patch( ).network_access is False ) - assert task["harbor"]["test_timeout"] == 300.0 + assert task["harbor"]["test_timeout_seconds"] == 300 assert task["program"]["files"] == { "/task/instruction.md": {"task": "instruction"}, "/task/task.toml": {"task": "task_toml"}, @@ -227,7 +226,8 @@ async def aclose(self) -> None: @pytest.mark.asyncio async def test_harbor_reward_uses_background_job_for_tests( - tmp_path: Path, monkeypatch: pytest.MonkeyPatch + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, ) -> None: task_dir = write_harbor_task(tmp_path) fake_module = cast(Any, types.ModuleType("prime_sandboxes")) @@ -238,15 +238,18 @@ async def test_harbor_reward_uses_background_job_for_tests( taskset = HarborTaskset(config=HarborTasksetConfig(bundle_package=__name__)) reward = await taskset.harbor_reward( vf.Task( - {"prompt": [], "harbor": {"task_dir": str(task_dir), "test_timeout": 120}} + { + "prompt": [], + "harbor": {"task_dir": str(task_dir), "test_timeout_seconds": 300}, + } ).freeze(), vf.State({"sandbox_id": "sbx-1"}), ) client = FakeHarborSandboxClient.instances[0] assert reward == 1.0 - assert client.background_jobs == [("sbx-1", "bash test.sh", 120, "/tests")] - assert ("bash test.sh", 120, "/tests") not in client.execute_commands + assert client.background_jobs == [("sbx-1", "bash test.sh", 300, "/tests")] + assert ("bash test.sh", None, "/tests") not in client.execute_commands def test_packaged_harbor_and_opencode_imports_are_available_from_packages() -> None: @@ -506,15 +509,6 @@ def test_task_program_merges_into_command_program_without_collisions() -> None: } -def test_command_program_patch_preserves_explicit_default_values() -> None: - program = vf.ProgramConfig(setup_timeout=300).resolve_command( - command=["tool"], - setup_timeout=600, - ) - - assert program.data()["setup_timeout"] == 300 - - def test_task_program_rejects_harness_owned_keys() -> None: harness = vf.Harness( config=vf.HarnessConfig( diff --git a/tests/test_v1_rlm_swe.py b/tests/test_v1_rlm_swe.py index b7b536c3c0..8b2ae4178c 100644 --- a/tests/test_v1_rlm_swe.py +++ b/tests/test_v1_rlm_swe.py @@ -81,8 +81,7 @@ def test_rlm_harness_accepts_typed_config_surface(): config=RLMConfig( program=RLMProgramConfig( local_checkout="/tmp/checkout", - tools=["bash", "edit"], - exec_timeout=11, + rlm_tools=["bash", "edit"], env_vars={"CUSTOM": "1"}, ) ) @@ -90,9 +89,8 @@ def test_rlm_harness_accepts_typed_config_surface(): program = as_dict(harness.config.program) program_env = as_dict(program["env"]) - assert harness.config.program.tools == ["bash", "edit"] + assert harness.config.program.rlm_tools == ["bash", "edit"] assert program_env["RLM_TOOLS"] == "bash,edit" - assert program_env["RLM_EXEC_TIMEOUT"] == "11" assert program_env["CUSTOM"] == "1" @@ -111,50 +109,6 @@ def test_rlm_endpoint_hides_nested_depth_requests(): ) -def test_rlm_harness_preserves_program_setup_timeout_override(): - harness = RLM( - config=RLMConfig( - program=RLMProgramConfig( - local_checkout="/tmp/checkout", - setup_timeout=123, - ), - ) - ) - program = as_dict(harness.config.program) - - assert program["setup_timeout"] == 123 - - -def test_rlm_harness_uses_sandbox_setup_timeout_default(): - harness = RLM( - config=RLMConfig( - program=RLMProgramConfig( - local_checkout="/tmp/checkout", - sandbox=vf.SandboxConfig(setup_timeout=777), - ), - ) - ) - program = as_dict(harness.config.program) - - assert program["setup_timeout"] == 777 - - -def test_rlm_harness_keeps_minimum_setup_timeout_for_default_sandbox_config(): - harness = RLM( - config=RLMConfig( - program=RLMProgramConfig( - local_checkout="/tmp/checkout", - sandbox=vf.SandboxConfig(), - ), - ) - ) - program = as_dict(harness.config.program) - sandbox = as_dict(harness.sandbox) - - assert program["setup_timeout"] == 600 - assert sandbox["setup_timeout"] == 600 - - def test_rlm_harness_can_upload_skills(tmp_path: Path): skills = tmp_path / "skills" (skills / "edit").mkdir(parents=True) @@ -611,7 +565,6 @@ def fake_load_dataset(dataset_name: str, **kwargs: object) -> Dataset: taskset=rlm_swe_v1.RlmSweTasksetConfig( dataset_name="fake-r2e", repo_path="/workspace/repo", - timeout_minutes=30, env={"CUSTOM": "1"}, ), harness=rlm_swe_v1.RlmSweHarnessConfig( @@ -640,7 +593,6 @@ def fake_load_dataset(dataset_name: str, **kwargs: object) -> Dataset: f"{rlm_swe_v1.REGISTRY_PREFIX}/r2e/image:latest" ) assert task["sandbox"]["workdir"] == "/workspace/repo" - assert task["sandbox"]["timeout_minutes"] == 30 task_program_env = as_dict(as_dict(task["program"])["env"]) assert task_program_env["AGENT_WORKDIR"] == "/workspace/repo" assert "/workspace/repo/.venv/bin" in task_program_env["AGENT_PATH"] @@ -677,9 +629,7 @@ async def test_rlm_swe_taskset_setup_and_reward(monkeypatch): monkeypatch.setattr( rlm_swe_v1, "load_dataset", lambda *args, **kwargs: fake_r2e_dataset() ) - taskset = rlm_swe_v1.load_taskset( - config=rlm_swe_v1.RlmSweTasksetConfig(timeout_minutes=30) - ) + taskset = rlm_swe_v1.load_taskset(config=rlm_swe_v1.RlmSweTasksetConfig()) task = next(iter(taskset)) state = vf.State.for_task(task) sandbox = FakeSandbox() @@ -692,9 +642,8 @@ async def fake_setup_sandbox(sandbox_arg: object, state_arg: vf.State) -> None: async def fake_run_tests( sandbox_arg: object, state_arg: vf.State, - test_timeout: int, ) -> str: - calls["run_tests"] = (sandbox_arg, state_arg, test_timeout) + calls["run_tests"] = (sandbox_arg, state_arg) return """ =========================== short test summary info ============================ PASSED tests/test_example.py::test_fix @@ -709,9 +658,11 @@ async def fake_run_tests( assert calls["setup_sandbox"] is sandbox assert calls["setup_state"] is state - assert calls["run_tests"] == (sandbox, state, 1800) + assert calls["run_tests"] == ( + sandbox, + state, + ) assert state["sandbox_id"] == "sandbox-1" - assert state["test_timeout"] == 1800 assert reward == 1.0 assert "sandbox_client" not in state assert "_rlm_swe_sandbox" not in state @@ -727,7 +678,7 @@ async def test_rlm_swe_run_tests_quotes_env_values(): ) sandbox = RecordingSandbox() - output = await taskset.run_tests(sandbox, {}, 123) + output = await taskset.run_tests(sandbox, {}) assert output == "test output" assert len(sandbox.background_jobs) == 1 diff --git a/tests/test_v1_runtime_lifecycle.py b/tests/test_v1_runtime_lifecycle.py index 508716c314..ae135025b3 100644 --- a/tests/test_v1_runtime_lifecycle.py +++ b/tests/test_v1_runtime_lifecycle.py @@ -139,8 +139,6 @@ class FakeSandboxClient: requests: list[dict[str, object]] = [] deleted: list[str] = [] commands: list[tuple[str, str]] = [] - command_timeouts: list[int | None] = [] - background_jobs: list[tuple[str, str, int | None, str | None, int]] = [] uploads: list[tuple[str, str, bytes]] = [] wait_attempts: list[tuple[str, int]] = [] closed = 0 @@ -154,8 +152,6 @@ def reset(cls) -> None: cls.requests = [] cls.deleted = [] cls.commands = [] - cls.command_timeouts = [] - cls.background_jobs = [] cls.uploads = [] cls.wait_attempts = [] cls.closed = 0 @@ -179,9 +175,7 @@ async def execute_command( ) -> FakeCommandResult: sandbox_id = str(kwargs.get("sandbox_id") or args[0]) command = str(kwargs.get("command") or args[1]) - timeout = cast(int | None, kwargs.get("timeout")) type(self).commands.append((sandbox_id, command)) - type(self).command_timeouts.append(timeout) return FakeCommandResult() async def run_background_job( @@ -189,13 +183,7 @@ async def run_background_job( ) -> FakeCommandResult: sandbox_id = str(kwargs.get("sandbox_id") or args[0]) command = str(kwargs.get("command") or args[1]) - timeout = cast(int | None, kwargs.get("timeout", 900)) - working_dir = cast(str | None, kwargs.get("working_dir")) - poll_interval = cast(int, kwargs.get("poll_interval", 3)) type(self).commands.append((sandbox_id, command)) - type(self).background_jobs.append( - (sandbox_id, command, timeout, working_dir, poll_interval) - ) return FakeCommandResult() async def upload_bytes(self, *args: object, **kwargs: object) -> None: @@ -711,6 +699,23 @@ async def replay_answer_program(task, state): return state +async def timeout_error_program(task, state): + _ = task, state + raise TimeoutError("client timed out") + + +async def sandbox_error_program(task, state): + _ = task, state + raise vf.SandboxError("sandbox failed") + + +async def slow_program(task, state): + _ = task + await asyncio.sleep(0.05) + state["answer"] = "late" + return state + + @vf.reward async def replay_reward(task, state) -> float: return float(state.get("answer") == task.get("answer")) @@ -738,6 +743,9 @@ async def replay_reward(task, state) -> float: "state_tools_program": state_tools_program, "state_tool_program": state_tool_program, "replay_answer_program": replay_answer_program, + "timeout_error_program": timeout_error_program, + "sandbox_error_program": sandbox_error_program, + "slow_program": slow_program, "replay_reward": replay_reward, }.items(): setattr(ref_module, _name, _value) @@ -788,6 +796,101 @@ async def test_v1_records_default_metrics_usage_and_timing() -> None: assert state["timing"]["model"]["duration"] > 0.0 +@pytest.mark.asyncio +async def test_task_timeout_limits_active_execution() -> None: + harness = make_harness(program={"fn": program_ref("slow_program")}) + task = vf.Task({"prompt": []}).freeze() + state = vf.State.for_task(task) + state.runtime_state()["task_timeout_seconds"] = 0.01 + + state = await harness.run(task, state) + + assert state["timed_out"] is True + assert state["task_timed_out"] is True + assert state["is_completed"] is True + assert state["stop_condition"] == "timeout_reached" + assert "answer" not in state + + +@pytest.mark.asyncio +async def test_user_timeout_error_is_not_task_timeout() -> None: + harness = make_harness(program={"fn": program_ref("timeout_error_program")}) + task = vf.Task({"prompt": []}).freeze() + state = vf.State.for_task(task) + state.runtime_state()["task_timeout_seconds"] = 10 + + with pytest.raises(TimeoutError, match="client timed out"): + await harness.run(task, state) + + assert state.get("timed_out") is None + assert state.get("task_timed_out") is None + + +@pytest.mark.asyncio +async def test_error_rollout_is_scored_and_completed() -> None: + harness = make_harness( + program={"fn": program_ref("sandbox_error_program")}, + rewards=[program_ref("replay_reward")], + ) + task = vf.Task({"prompt": [], "answer": "solved"}).freeze() + + state = await harness.run(task) + + assert state["error"]["error"] == "SandboxError" + assert state["reward"] == 0.0 + assert state["is_completed"] is True + assert state["stop_condition"] == "has_error" + assert state["runtime"] == {} + + +@pytest.mark.asyncio +async def test_group_scoring_shares_rollout_timeout() -> None: + taskset = make_taskset() + + async def init_group(task: vf.Task, n: int): + _ = task + tasks = [ + vf.Task({"prompt": [], "answer": f"solved-{idx}"}).freeze() + for idx in range(n) + ] + states = [vf.State.for_task(task) for task in tasks] + return tasks, states + + async def slow_completed_run(task, state): + _ = task + await asyncio.sleep(0.02) + state._set_completed(True) + state._set_stop_condition("program_completed") + return state + + async def slow_score_group(tasks, states): + _ = tasks + await asyncio.sleep(0.05) + for state in states: + state["group_scored"] = True + + taskset.init_group = init_group + harness = make_harness(program={"fn": program_ref("replay_answer_program")}) + harness.run = slow_completed_run + harness.score_group = slow_score_group + env = vf.Env(taskset=taskset, harness=harness) + env.rollout_timeout_seconds = 0.04 + + start = time.perf_counter() + states = await env._run_group_states( + [{"prompt": [], "answer": "solved"}] * 2, + cast(Client, FakeModelClient([])), + "fake", + {}, + ) + elapsed = time.perf_counter() - start + + assert elapsed < 0.08 + assert all(state["is_completed"] is True for state in states) + assert all(state.get("group_scored") is None for state in states) + assert all(state.get("timed_out") is None for state in states) + + def test_v1_state_does_not_copy_task_answer_to_top_level() -> None: task = vf.Task({"answer": "gold"}).freeze() state = vf.State.for_task(task) @@ -1506,6 +1609,50 @@ async def delete(self, sandbox_id: str) -> None: assert deleted == ["sbx-created-after-cancel"] +@pytest.mark.asyncio +async def test_create_sandbox_timeout_returns_before_provider_finishes( + monkeypatch: pytest.MonkeyPatch, +) -> None: + install_fake_sandboxes(monkeypatch) + disable_sandbox_retry_sleep(monkeypatch) + monkeypatch.setattr(sandbox_utils, "SANDBOX_CREATE_TIMEOUT_SECONDS", 0.05) + started = asyncio.Event() + finish = asyncio.Event() + deleted = asyncio.Event() + + class SlowCreateClient: + async def create(self, request: FakeCreateSandboxRequest) -> FakeSandboxResult: + _ = request + started.set() + await finish.wait() + return FakeSandboxResult("sbx-created-after-timeout") + + async def wait_for_creation( + self, + sandbox_id: str, + *, + max_attempts: int = sandbox_utils.SANDBOX_WAIT_FOR_CREATION_ATTEMPTS, + ) -> None: + _ = sandbox_id, max_attempts + + async def delete(self, sandbox_id: str) -> None: + assert sandbox_id == "sbx-created-after-timeout" + deleted.set() + + start = time.perf_counter() + with pytest.raises(TimeoutError): + await sandbox_utils.create_sandbox( + cast(sandbox_utils.SandboxClient, SlowCreateClient()), + {"image": "python:3.11-slim"}, + ) + elapsed = time.perf_counter() - start + + assert started.is_set() + assert elapsed < 0.2 + finish.set() + await asyncio.wait_for(deleted.wait(), timeout=1) + + @pytest.mark.asyncio async def test_create_sandbox_wait_cancellation_deletes_known_sandbox( monkeypatch: pytest.MonkeyPatch, @@ -1566,7 +1713,6 @@ async def test_create_sandbox_threads_v1_request_fields( "gpu_type": "a10", "vm": True, "network_access": False, - "timeout_minutes": 30, "environment_vars": {"NUMBER": 7}, "secrets": {"TOKEN": "secret"}, "team_id": "team", @@ -1772,57 +1918,6 @@ async def test_rollout_setup_receives_program_sandbox_before_program_setup( assert program_setup_index < lifecycle_setup_index < command_index -@pytest.mark.asyncio -async def test_program_setup_uses_program_setup_timeout( - monkeypatch: pytest.MonkeyPatch, -) -> None: - install_fake_sandboxes(monkeypatch) - install_fake_endpoint_tunnel(monkeypatch) - - harness = make_harness( - program={ - "command": ["true"], - "sandbox": True, - "setup": "echo program-setup", - "setup_timeout": 777, - }, - sandbox={"image": "python:3.11-slim"}, - ) - task = vf.Task({"prompt": [{"role": "user", "content": "hi"}]}).freeze() - - await harness.run(task) - - setup_commands = FakeSandboxClient.commands[ - : len(FakeSandboxClient.command_timeouts) - ] - command_timeouts = dict( - zip( - [command for _, command in setup_commands], - FakeSandboxClient.command_timeouts, - strict=True, - ) - ) - assert command_timeouts["echo program-setup"] == 777 - - -@pytest.mark.asyncio -async def test_sandbox_command_uses_configured_poll_interval( - monkeypatch: pytest.MonkeyPatch, -) -> None: - install_fake_sandboxes(monkeypatch) - install_fake_endpoint_tunnel(monkeypatch) - - harness = make_harness( - program={"command": ["true"], "sandbox": True}, - sandbox={"image": "python:3.11-slim", "poll_interval": 11}, - ) - task = vf.Task({"prompt": [{"role": "user", "content": "hi"}]}).freeze() - - await harness.run(task) - - assert FakeSandboxClient.background_jobs == [("sbx-1", "true", 900, None, 11)] - - @pytest.mark.asyncio async def test_sandbox_handle_forwards_background_job_poll_interval() -> None: class BackgroundJobClient: @@ -1926,29 +2021,6 @@ async def test_sandbox_state_input_upload_runs_after_rollout_setup( assert uploads["/tmp/vf_state_in.json"]["state_input_setup"] is True -@pytest.mark.asyncio -async def test_task_command_uses_background_job( - monkeypatch: pytest.MonkeyPatch, -) -> None: - install_fake_sandboxes(monkeypatch) - install_fake_endpoint_tunnel(monkeypatch) - - harness = make_harness( - program={"command": ["sleep", "120"], "sandbox": True}, - sandbox={"image": "python:3.11-slim", "workdir": "/app"}, - ) - task = vf.Task( - { - "prompt": [{"role": "user", "content": "hi"}], - "sandbox": {"command_timeout": 120}, - } - ).freeze() - - await harness.run(task) - - assert ("sbx-1", "sleep 120", 120, "/app", 3) in FakeSandboxClient.background_jobs - - @pytest.mark.asyncio async def test_program_channels_mcp_setup_accepts_config_ref_mappings( monkeypatch: pytest.MonkeyPatch, @@ -2083,8 +2155,6 @@ async def test_real_sandbox_base_program_calls_host_callable_tool() -> None: "image": "python:3.11-slim", "scope": "group", "network_access": True, - "timeout_minutes": 20, - "command_timeout": 120, }, toolsets=[ vf.Toolset( @@ -2127,8 +2197,6 @@ async def test_real_sandbox_command_program_uses_mcp_tool_proxy() -> None: sandbox={ "image": "python:3.9-slim", "network_access": True, - "timeout_minutes": 20, - "command_timeout": 120, }, toolsets=[vf.Toolset(tools=[echo_tool])], ) diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index 37e8172efb..ada2397913 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -1261,6 +1261,12 @@ def set_kwargs(self, **kwargs) -> None: else: setattr(self, key, value) + def set_rollout_timeout_seconds(self, value: float | None) -> None: + timeout = None if value is None else float(value) + self.rollout_timeout_seconds = timeout + if hasattr(self, "timeout_seconds"): + self.timeout_seconds = timeout + def add_rubric(self, rubric: Rubric) -> None: if self.rubric is None: self.rubric = rubric diff --git a/verifiers/scripts/eval.py b/verifiers/scripts/eval.py index 0580f234db..a3cbfc33cc 100644 --- a/verifiers/scripts/eval.py +++ b/verifiers/scripts/eval.py @@ -571,11 +571,33 @@ def build_parser() -> argparse.ArgumentParser: default={}, help='Extra environment as JSON object (e.g., \'{"key": "value", "num": 42}\'). Passed to environment constructor.', ) + parser.add_argument( + "--rollout-timeout", + dest="rollout_timeout_seconds", + type=float, + default=None, + help="Per-rollout wall-clock timeout in seconds.", + ) parser.add_argument( "--timeout", + dest="rollout_timeout_seconds", + type=float, + default=None, + help=argparse.SUPPRESS, + ) + parser.add_argument( + "--task-timeout", + dest="task_timeout_seconds", + type=float, + default=None, + help="Per-task active agent timeout in seconds.", + ) + parser.add_argument( + "--global-timeout", + dest="global_timeout_seconds", type=float, default=None, - help="Per-rollout wall-clock timeout in seconds. Overrides timeout_seconds in --extra-env-kwargs.", + help="Whole evaluation run timeout in seconds.", ) parser.add_argument( "--fullscreen", @@ -940,9 +962,9 @@ def build_eval_config(raw: dict) -> EvalConfig: list(raw.get("env_config_overrides", [])), ) - extra_env_kwargs = dict(raw.get("extra_env_kwargs", {})) - if raw.get("timeout") is not None: - extra_env_kwargs["timeout_seconds"] = raw["timeout"] + rollout_timeout_seconds = raw.get("rollout_timeout_seconds") + if rollout_timeout_seconds is None: + rollout_timeout_seconds = raw.get("timeout") return EvalConfig( env_id=env_id, @@ -950,7 +972,7 @@ def build_eval_config(raw: dict) -> EvalConfig: env_args=env_args, env_dir_path=raw.get("env_dir_path", DEFAULT_ENV_DIR_PATH), output_dir=raw.get("output_dir"), - extra_env_kwargs=extra_env_kwargs, + extra_env_kwargs=dict(raw.get("extra_env_kwargs", {})), endpoint_id=resolved_endpoint_id, model=model, client_config=client_config, @@ -961,6 +983,9 @@ def build_eval_config(raw: dict) -> EvalConfig: max_retries=raw.get("max_retries", 3), num_workers=raw.get("num_workers", "auto"), disable_env_server=raw.get("disable_env_server", False), + global_timeout_seconds=raw.get("global_timeout_seconds"), + rollout_timeout_seconds=rollout_timeout_seconds, + task_timeout_seconds=raw.get("task_timeout_seconds"), verbose=raw.get("verbose", False), disable_tui=raw.get("disable_tui", False), state_columns=raw.get("state_columns", []), diff --git a/verifiers/types.py b/verifiers/types.py index ff4efbf9db..1174aa6571 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -1227,7 +1227,7 @@ class ClientConfig(BaseModel): api_key_var: str = "PRIME_API_KEY" api_base_url: str = "https://api.pinference.ai/api/v1" endpoint_configs: list["EndpointClientConfig"] = Field(default_factory=list) - timeout: float = 3600.0 + timeout: float = 21600.0 connect_timeout: float = 5.0 max_connections: int = 28000 max_keepalive_connections: int = 28000 @@ -1294,7 +1294,7 @@ class EndpointClientConfig(BaseModel): client_idx: int = 0 api_key_var: str = "PRIME_API_KEY" api_base_url: str = "https://api.pinference.ai/api/v1" - timeout: float = 3600.0 + timeout: float = 21600.0 connect_timeout: float = 5.0 max_connections: int = 28000 max_keepalive_connections: int = 28000 @@ -1328,6 +1328,9 @@ class EvalConfig(BaseModel): max_concurrent: int num_workers: int | str = "auto" independent_scoring: bool = False + global_timeout_seconds: float | None = None + rollout_timeout_seconds: float | None = None + task_timeout_seconds: float | None = None extra_env_kwargs: dict = {} max_retries: int = 3 disable_env_server: bool = False diff --git a/verifiers/utils/async_utils.py b/verifiers/utils/async_utils.py index d1ea469036..79a83f18c8 100644 --- a/verifiers/utils/async_utils.py +++ b/verifiers/utils/async_utils.py @@ -4,8 +4,9 @@ from collections import deque from collections.abc import Mapping from collections.abc import Coroutine +from contextlib import asynccontextmanager from time import perf_counter -from typing import Any, AsyncContextManager, Callable, Optional, TypeVar +from typing import Any, AsyncContextManager, AsyncIterator, Callable, Optional, TypeVar import numpy as np import tenacity as tc @@ -21,6 +22,10 @@ T = TypeVar("T") +class FrameworkTimeoutError(TimeoutError): + """Raised only when a Verifiers-managed timeout expires.""" + + async def with_sem(sem: AsyncContextManager, coro: Coroutine[Any, Any, T]) -> T: """Wrap a coroutine with a context manager (typically a semaphore).""" try: @@ -46,6 +51,33 @@ async def maybe_call_with_named_args(func: Callable, **objects): return await maybe_await(func, **allowed) +@asynccontextmanager +async def timeout_after(seconds: float | None) -> AsyncIterator[None]: + if seconds is None: + yield + return + task = asyncio.current_task() + if task is None: + yield + return + timed_out = False + + def cancel_task() -> None: + nonlocal timed_out + timed_out = True + task.cancel() + + handle = asyncio.get_running_loop().call_later(seconds, cancel_task) + try: + yield + except asyncio.CancelledError: + if timed_out: + raise FrameworkTimeoutError from None + raise + finally: + handle.cancel() + + class NullAsyncContext: async def __aenter__(self): return None diff --git a/verifiers/utils/eval_utils.py b/verifiers/utils/eval_utils.py index 510d998565..9ce69eb233 100644 --- a/verifiers/utils/eval_utils.py +++ b/verifiers/utils/eval_utils.py @@ -35,7 +35,7 @@ TokenUsage, _validate_extra_headers_value, ) -from verifiers.utils.async_utils import EventLoopLagMonitor +from verifiers.utils.async_utils import EventLoopLagMonitor, timeout_after from verifiers.utils.env_config_utils import config_table, normalize_env_config_sections from verifiers.utils.import_utils import load_toml from verifiers.utils.logging_utils import ( @@ -57,6 +57,13 @@ CHAT_TEMPLATE_KWARG_FIELDS = ("reasoning_effort", "enable_thinking") +def normalize_timeout_config(config: dict) -> dict: + if "timeout" in config and "rollout_timeout_seconds" not in config: + config["rollout_timeout_seconds"] = config["timeout"] + config.pop("timeout", None) + return config + + def _sum_output_usage(outputs: list[RolloutOutput]) -> TokenUsage | None: input_tokens = 0.0 output_tokens = 0.0 @@ -664,6 +671,9 @@ def load_toml_config( "num_workers", "disable_env_server", "timeout", + "global_timeout_seconds", + "rollout_timeout_seconds", + "task_timeout_seconds", # logging "verbose", "disable_tui", @@ -708,7 +718,9 @@ def load_toml_config( merged.pop("model", None) if "model" in eval_config and "endpoint_id" not in eval_config: merged.pop("endpoint_id", None) - merged_eval_list.append(normalize_env_config_sections(merged)) + merged_eval_list.append( + normalize_timeout_config(normalize_env_config_sections(merged)) + ) # expand [[ablation]] blocks into eval configs for ablation in ablation_list: @@ -738,11 +750,13 @@ def load_toml_config( f"Valid fields are: {sorted(valid_fields)}" ) expanded = [ - normalize_env_config_sections( - normalize_sampling_config( - config, - "expanded [[ablation]] config", - merge_sampling_with_existing=True, + normalize_timeout_config( + normalize_env_config_sections( + normalize_sampling_config( + config, + "expanded [[ablation]] config", + merge_sampling_with_existing=True, + ) ) ) for config in _expand_ablation(ablation, global_defaults) @@ -1038,10 +1052,16 @@ async def run_evaluation( with maybe_suppress_logs: vf_env = vf.load_environment(env_id=config.env_id, **config.env_args) + extra_env_kwargs = dict(config.extra_env_kwargs) + if config.rollout_timeout_seconds is not None: + extra_env_kwargs["rollout_timeout_seconds"] = config.rollout_timeout_seconds + if config.task_timeout_seconds is not None: + extra_env_kwargs["task_timeout_seconds"] = config.task_timeout_seconds + # set extra environment kwargs - if config.extra_env_kwargs: - logger.info(f"Setting extra environment kwargs: {config.extra_env_kwargs}") - vf_env.set_kwargs(**config.extra_env_kwargs) + if extra_env_kwargs: + logger.info(f"Setting extra environment kwargs: {extra_env_kwargs}") + vf_env.set_kwargs(**extra_env_kwargs) results_path = config.resume_path or get_eval_results_path(config) if config.client_config.endpoint_configs: @@ -1056,92 +1076,93 @@ async def run_evaluation( on_progress = _with_eval_metadata(on_progress, model_pricing, config.name) try: - if not config.disable_env_server: - extra_env_kwargs = dict(config.extra_env_kwargs) - # resolve total concurrency - if "concurrency" not in extra_env_kwargs: - if config.max_concurrent <= 0: - concurrency = config.num_examples * config.rollouts_per_example + async with timeout_after(config.global_timeout_seconds): + if not config.disable_env_server: + extra_env_kwargs = dict(extra_env_kwargs) + # resolve total concurrency + if "concurrency" not in extra_env_kwargs: + if config.max_concurrent <= 0: + concurrency = config.num_examples * config.rollouts_per_example + else: + concurrency = config.max_concurrent + logger.info(f"Automatically determined {concurrency=}") else: - concurrency = config.max_concurrent - logger.info(f"Automatically determined {concurrency=}") - else: - concurrency = extra_env_kwargs["concurrency"] + concurrency = extra_env_kwargs["concurrency"] - # resolve num_workers - num_workers = config.num_workers - if num_workers == "auto": - num_workers = max(1, math.ceil(concurrency / 256)) - else: - num_workers = int(num_workers) - if num_workers < 1: - raise ValueError(f"num_workers must be >= 1, got {num_workers}") - - # per-worker concurrency - per_worker = max(1, concurrency // num_workers) - extra_env_kwargs["concurrency"] = per_worker - logger.info( - f"Using {num_workers=} env server worker(s), " - f"per-worker concurrency: {per_worker} (total {concurrency})" - ) + # resolve num_workers + num_workers = config.num_workers + if num_workers == "auto": + num_workers = max(1, math.ceil(concurrency / 256)) + else: + num_workers = int(num_workers) + if num_workers < 1: + raise ValueError(f"num_workers must be >= 1, got {num_workers}") + + # per-worker concurrency + per_worker = max(1, concurrency // num_workers) + extra_env_kwargs["concurrency"] = per_worker + logger.info( + f"Using {num_workers=} env server worker(s), " + f"per-worker concurrency: {per_worker} (total {concurrency})" + ) - log_dir = str(results_path) - results_path.mkdir(parents=True, exist_ok=True) - await vf_env.start_server( - extra_env_kwargs=extra_env_kwargs, - num_workers=num_workers, - log_level=get_log_level(config.verbose), - log_dir=log_dir, - console_logging=config.disable_tui, - ) - if on_log_file is not None: - from verifiers.serve import EnvServer + log_dir = str(results_path) + results_path.mkdir(parents=True, exist_ok=True) + await vf_env.start_server( + extra_env_kwargs=extra_env_kwargs, + num_workers=num_workers, + log_level=get_log_level(config.verbose), + log_dir=log_dir, + console_logging=config.disable_tui, + ) + if on_log_file is not None: + from verifiers.serve import EnvServer - for path in EnvServer.get_all_log_files(log_dir, num_workers): - on_log_file(path) + for path in EnvServer.get_all_log_files(log_dir, num_workers): + on_log_file(path) - logger.debug(f"Starting evaluation with model: {config.model}") - logger.debug( - f"Configuration: num_examples={config.num_examples}, rollouts_per_example={config.rollouts_per_example}, max_concurrent={config.max_concurrent}" - ) - - effective_group_max_concurrent = config.max_concurrent - if ( - not config.independent_scoring - and config.max_concurrent > 0 - and config.rollouts_per_example > 1 - ): - # Grouped scoring applies the semaphore at group level. Convert - # rollout-level concurrency to group-level slots. - effective_group_max_concurrent = math.ceil( - config.max_concurrent / config.rollouts_per_example + logger.debug(f"Starting evaluation with model: {config.model}") + logger.debug( + f"Configuration: num_examples={config.num_examples}, rollouts_per_example={config.rollouts_per_example}, max_concurrent={config.max_concurrent}" ) - if config.num_examples > 0: - effective_group_max_concurrent = min( - effective_group_max_concurrent, config.num_examples - ) - outputs = await vf_env.evaluate( - client=config.client_config, - model=config.model, - sampling_args=config.sampling_args, - num_examples=config.num_examples, - rollouts_per_example=config.rollouts_per_example, - max_concurrent=effective_group_max_concurrent, - results_path=results_path, - state_columns=config.state_columns, - save_results=config.save_results, - push_to_hf_hub=config.save_to_hf_hub, - hf_hub_dataset_name=config.hf_hub_dataset_name, - independent_scoring=config.independent_scoring, - max_retries=config.max_retries, - on_start=on_start, - on_progress=on_progress, - on_log=on_log, - ) + effective_group_max_concurrent = config.max_concurrent + if ( + not config.independent_scoring + and config.max_concurrent > 0 + and config.rollouts_per_example > 1 + ): + # Grouped scoring applies the semaphore at group level. Convert + # rollout-level concurrency to group-level slots. + effective_group_max_concurrent = math.ceil( + config.max_concurrent / config.rollouts_per_example + ) + if config.num_examples > 0: + effective_group_max_concurrent = min( + effective_group_max_concurrent, config.num_examples + ) + + outputs = await vf_env.evaluate( + client=config.client_config, + model=config.model, + sampling_args=config.sampling_args, + num_examples=config.num_examples, + rollouts_per_example=config.rollouts_per_example, + max_concurrent=effective_group_max_concurrent, + results_path=results_path, + state_columns=config.state_columns, + save_results=config.save_results, + push_to_hf_hub=config.save_to_hf_hub, + hf_hub_dataset_name=config.hf_hub_dataset_name, + independent_scoring=config.independent_scoring, + max_retries=config.max_retries, + on_start=on_start, + on_progress=on_progress, + on_log=on_log, + ) finally: if not config.disable_env_server: - await vf_env.stop_server() + await asyncio.wait_for(vf_env.stop_server(), timeout=600) metadata_changed = _attach_metadata_name(outputs["metadata"], config.name) if _attach_metadata_cost(outputs["metadata"], model_pricing, outputs["outputs"]): diff --git a/verifiers/utils/threaded_sandbox_client.py b/verifiers/utils/threaded_sandbox_client.py index 25df03fc04..7ddb01a8dc 100644 --- a/verifiers/utils/threaded_sandbox_client.py +++ b/verifiers/utils/threaded_sandbox_client.py @@ -79,7 +79,7 @@ async def run_background_job( self, sandbox_id: str, command: str, - timeout: int = 900, + timeout: int | None = None, working_dir: str | None = None, env: dict[str, str] | None = None, poll_interval: int = 3, @@ -91,12 +91,13 @@ async def run_background_job( working_dir=working_dir, env=env, ) - deadline = time.monotonic() + timeout - while time.monotonic() < deadline: + deadline = None if timeout is None else time.monotonic() + timeout + while deadline is None or time.monotonic() < deadline: status = await self.get_background_job(sandbox_id, job) if status.completed: return status await asyncio.sleep(poll_interval) + assert timeout is not None raise CommandTimeoutError(sandbox_id, command, timeout) def teardown(self, wait: bool = True) -> None: diff --git a/verifiers/v1/env.py b/verifiers/v1/env.py index 9cb62f7c4b..15bd0283d5 100644 --- a/verifiers/v1/env.py +++ b/verifiers/v1/env.py @@ -6,6 +6,7 @@ from verifiers.clients import Client from verifiers.types import ClientConfig from verifiers.types import RolloutInput, SamplingArgs +from verifiers.utils.async_utils import FrameworkTimeoutError, timeout_after from .config import Config from .harness import Harness, HarnessConfig @@ -75,6 +76,14 @@ def __init__( ) self._empty_dataset_checked = False self._empty_eval_dataset_checked = False + self.rollout_timeout_seconds: float | None = None + self.task_timeout_seconds: float | None = None + + def set_rollout_timeout_seconds(self, value: float | None) -> None: + self.rollout_timeout_seconds = None if value is None else float(value) + + def set_task_timeout_seconds(self, value: float | None) -> None: + self.task_timeout_seconds = None if value is None else float(value) def build_dataset(self) -> "Dataset | None": if self.dataset is not None: @@ -129,6 +138,8 @@ async def rollout( "model": model, "sampling_args": sampling_args or {}, "score_rollout": self.score_rollouts, + "rollout_timeout_seconds": self.rollout_timeout_seconds, + "task_timeout_seconds": self.task_timeout_seconds, }, ) return await self.harness.run(task, state) @@ -167,14 +178,27 @@ async def _run_group_states( "model": model, "sampling_args": sampling_args, "score_rollout": self.score_rollouts, + "rollout_timeout_seconds": self.rollout_timeout_seconds, + "task_timeout_seconds": self.task_timeout_seconds, }, ) - states = await asyncio.gather( - *[self.harness.run(task, state) for task, state in zip(tasks, states)] - ) + try: - if self.score_rollouts: - await self.harness.score_group(tasks, states) + async with timeout_after(self.rollout_timeout_seconds): + states = await asyncio.gather( + *[ + self.harness.run(task, state) + for task, state in zip(tasks, states) + ] + ) + if self.score_rollouts: + await self.harness.score_group(tasks, states) + except FrameworkTimeoutError: + for state in states: + if state.get("is_completed"): + continue + state["timed_out"] = True + state.stop("timeout_reached") finally: await self.harness.cleanup_group(tasks, states) for state in states: diff --git a/verifiers/v1/harness.py b/verifiers/v1/harness.py index 1ebb516ad6..941f9496ea 100644 --- a/verifiers/v1/harness.py +++ b/verifiers/v1/harness.py @@ -14,7 +14,11 @@ SamplingArgs, ToolMessage, ) -from verifiers.utils.async_utils import maybe_call_with_named_args +from verifiers.utils.async_utils import ( + FrameworkTimeoutError, + maybe_call_with_named_args, + timeout_after, +) from verifiers.utils.error_utils import error_info from verifiers.utils.message_utils import normalize_messages from verifiers.utils.response_utils import parse_response_message @@ -251,21 +255,30 @@ async def run(self, task: Task, state: State | None = None) -> State: completed = False try: try: - state = await self.setup_state(task, state) - if not await self.runtime.is_completed(task, state): - state = await self.run_program(task, state) - await self.runtime.is_completed(task, state) - state._set_stop_condition("program_completed") - await self.runtime.collect_artifacts(task, state) - except Error as e: - self.record_error(state, e) - await self.runtime.update_rollout(task, state) - state.record_generation_timing() - timing_recorded = True - if state.runtime_state().get("score_rollout", True): - await self.runtime.score_rollout(task, state) - state._set_completed(True) - completed = True + timeout = cast( + float | None, + state.runtime_state().get("rollout_timeout_seconds"), + ) + async with timeout_after(timeout): + try: + state = await self.setup_state(task, state) + if not await self.runtime.is_completed(task, state): + state = await self.run_program(task, state) + await self.runtime.is_completed(task, state) + state._set_stop_condition("program_completed") + await self.runtime.collect_artifacts(task, state) + except Error as e: + self.record_error(state, e) + await self.runtime.update_rollout(task, state) + state.record_generation_timing() + timing_recorded = True + if state.runtime_state().get("score_rollout", True): + await self.runtime.score_rollout(task, state) + state._set_completed(True) + completed = True + except FrameworkTimeoutError: + state["timed_out"] = True + state.stop("timeout_reached") finally: if not timing_recorded: state.record_generation_timing() @@ -465,9 +478,19 @@ def compile_program(self, program: ProgramConfig) -> ProgramRunner: def local_callable_program(self, fn: Handler) -> ProgramRunner: async def run(task: Task, state: State) -> ProgramResult: await self.runtime.setup_rollout(task, state) - result = await maybe_call_with_named_args( - fn, task=task, state=state, runtime=self.runtime, harness=self + timeout = cast( + float | None, state.runtime_state().get("task_timeout_seconds") ) + try: + async with timeout_after(timeout): + result = await maybe_call_with_named_args( + fn, task=task, state=state, runtime=self.runtime, harness=self + ) + except FrameworkTimeoutError: + state["timed_out"] = True + state["task_timed_out"] = True + state.stop("timeout_reached") + return state if result is None or isinstance(result, State | dict): return cast(ProgramResult, result) raise TypeError("program.fn must return None, State, or a mapping.") @@ -502,60 +525,67 @@ def sync_completion() -> list[JsonData]: turn = 0 max_turns = state.get_max_turns(self.config.max_turns) - while max_turns <= 0 or turn < max_turns: - if await self.runtime.is_completed(task, state): - return state - response = await self.runtime.submit_model_request( - messages, - task, - state, - tool_defs=self.runtime.tool_defs(state), - ) - turn += 1 - messages.extend(await parse_response_message(response)) - rendered_messages = sync_completion() - tool_calls = list(response.message.tool_calls or []) - if not tool_calls: - user_messages = await self.runtime.user_messages( - task, state, transcript=rendered_messages - ) - if user_messages: + timeout = cast(float | None, state.runtime_state().get("task_timeout_seconds")) + try: + async with timeout_after(timeout): + while max_turns <= 0 or turn < max_turns: + if await self.runtime.is_completed(task, state): + return state + response = await self.runtime.submit_model_request( + messages, + task, + state, + tool_defs=self.runtime.tool_defs(state), + ) + turn += 1 + messages.extend(await parse_response_message(response)) + rendered_messages = sync_completion() + tool_calls = list(response.message.tool_calls or []) + if not tool_calls: + user_messages = await self.runtime.user_messages( + task, state, transcript=rendered_messages + ) + if user_messages: + messages.extend( + normalize_messages( + cast(Messages, user_messages), + field_name="user_messages", + ) + ) + sync_completion() + continue + state._set_stop_condition("no_tools") + return state + callable_tools = state.get_tools() + + async def call_tool(tool_call) -> ToolMessage: + content: MessageContent + try: + name = tool_call.name + result = await maybe_call_with_named_args( + callable_tools[name], **json_args(tool_call.arguments) + ) + content = ( + cast(MessageContent, result) + if is_valid_tool_content_parts(result) + else str(result) + ) + except Exception as e: + content = tool_error_content(e) + return ToolMessage(tool_call_id=tool_call.id, content=content) + messages.extend( - normalize_messages( - cast(Messages, user_messages), - field_name="user_messages", + await asyncio.gather( + *(call_tool(tool_call) for tool_call in tool_calls) ) ) sync_completion() - continue - state._set_stop_condition("no_tools") - return state - callable_tools = state.get_tools() - - async def call_tool(tool_call) -> ToolMessage: - content: MessageContent - try: - name = tool_call.name - result = await maybe_call_with_named_args( - callable_tools[name], **json_args(tool_call.arguments) - ) - content = ( - cast(MessageContent, result) - if is_valid_tool_content_parts(result) - else str(result) - ) - except Exception as e: - content = tool_error_content(e) - return ToolMessage(tool_call_id=tool_call.id, content=content) - - messages.extend( - await asyncio.gather( - *(call_tool(tool_call) for tool_call in tool_calls) - ) - ) - sync_completion() - if await self.runtime.is_completed(task, state): - return state + if await self.runtime.is_completed(task, state): + return state + except FrameworkTimeoutError: + state["timed_out"] = True + state["task_timed_out"] = True + state.stop("timeout_reached") return state def command_program( @@ -575,7 +605,17 @@ async def run(task: Task, state: State) -> State: runtime, ) await runtime.setup_rollout(task, state) - return await run_local_command(merged_program, task, state, runtime) + timeout = cast( + float | None, state.runtime_state().get("task_timeout_seconds") + ) + try: + async with timeout_after(timeout): + await run_local_command(merged_program, task, state, runtime) + except FrameworkTimeoutError: + state["timed_out"] = True + state["task_timed_out"] = True + state.stop("timeout_reached") + return state return run diff --git a/verifiers/v1/program.py b/verifiers/v1/program.py index 9ed8b6bbf8..7e3eb21f48 100644 --- a/verifiers/v1/program.py +++ b/verifiers/v1/program.py @@ -31,8 +31,6 @@ "image": "python:3.11-slim", "workdir": "/app", "scope": "rollout", - "timeout_minutes": 120, - "command_timeout": 900, "network_access": True, } COMMAND_PROGRAM_PATCH_KEYS = { @@ -40,7 +38,6 @@ "files", "dirs", "setup", - "setup_timeout", "bindings", "env", "artifacts", @@ -73,7 +70,6 @@ class ProgramConfig(Config): files: ProgramFiles = {} dirs: ProgramDirs = {} setup: ProgramSetup = [] - setup_timeout: int = 300 bindings: BindingsConfig = BindingsConfig() env: ProgramEnv = {} artifacts: ArtifactsConfig = ArtifactsConfig() @@ -149,7 +145,6 @@ def resolve_command( files: ProgramFiles | None = None, dirs: ProgramDirs | None = None, setup: ProgramSetup | None = None, - setup_timeout: int | None = None, bindings: ConfigData | None = None, env: ProgramEnv | None = None, artifacts: ArtifactsConfig | ConfigData | None = None, @@ -180,8 +175,6 @@ def resolve_command( data["dirs"] = dict(dirs) if setup is not None: data["setup"] = setup - if setup_timeout is not None: - data["setup_timeout"] = setup_timeout if bindings is not None: data["bindings"] = dict(bindings) if env is not None: diff --git a/verifiers/v1/runtime.py b/verifiers/v1/runtime.py index 60c812e316..054b878388 100644 --- a/verifiers/v1/runtime.py +++ b/verifiers/v1/runtime.py @@ -69,6 +69,7 @@ from .utils.tool_utils import schema_callable, tool_schema, tool_visible from .utils.tool_utils import toolset_object_scope from .utils.usage_utils import record_response_usage +from .utils.timeout_utils import MODEL_NO_RESPONSE_TIMEOUT_SECONDS from .state import State from .task import Task from .toolset import ( @@ -885,13 +886,16 @@ async def submit_model_request( try: client = self.model_client(state) request_start = time.time() - response = await client.get_response( + request = client.get_response( prompt=prompt, model=self.model(state), tools=tool_defs, sampling_args=self.sampling_args(state), state=state, ) + response = await asyncio.wait_for( + request, timeout=MODEL_NO_RESPONSE_TIMEOUT_SECONDS + ) request_end = time.time() state.record_model_timing(request_start, request_end) record_response_usage(state, response) diff --git a/verifiers/v1/sandbox.py b/verifiers/v1/sandbox.py index cefe606034..1d247a609c 100644 --- a/verifiers/v1/sandbox.py +++ b/verifiers/v1/sandbox.py @@ -20,9 +20,6 @@ class SandboxConfig(Config): gpu_type: str | None = None vm: bool | None = None network_access: bool = True - timeout_minutes: int = 60 - create_timeout: int | None = None - wait_timeout: int | None = None environment_vars: dict[str, str] = {} secrets: dict[str, str] = {} team_id: str | None = None @@ -30,12 +27,8 @@ class SandboxConfig(Config): registry_credentials_id: str | None = None guaranteed: bool = False workdir: str | None = None - command_timeout: int | None = None - poll_interval: int = 3 packages: list[str] = [] - install_timeout: int = 300 setup_commands: list[str] = [] - setup_timeout: int = 300 labels: list[str] = [] scope: Literal["rollout", "group", "global"] = "rollout" prefer: Literal["program"] | None = None diff --git a/verifiers/v1/utils/program_utils.py b/verifiers/v1/utils/program_utils.py index 8ada2fda3d..5f0f0fd697 100644 --- a/verifiers/v1/utils/program_utils.py +++ b/verifiers/v1/utils/program_utils.py @@ -31,14 +31,13 @@ "files", "dirs", "setup", - "setup_timeout", "bindings", "env", "artifacts", "channels", } PROGRAM_KEYS = PROGRAM_KIND_KEYS | PROGRAM_OPTION_KEYS | {"args"} -SANDBOX_ONLY_PROGRAM_KEYS = {"files", "dirs", "setup", "setup_timeout", "artifacts"} +SANDBOX_ONLY_PROGRAM_KEYS = {"files", "dirs", "setup", "artifacts"} TASK_PROGRAM_KEYS = { "files", "dirs", @@ -64,7 +63,16 @@ async def run_local_command( stderr=asyncio.subprocess.PIPE, env=env, ) - stdout, stderr = await proc.communicate() + try: + stdout, stderr = await proc.communicate() + except asyncio.CancelledError: + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=5) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise state["command"] = { "argv": argv, "returncode": proc.returncode, diff --git a/verifiers/v1/utils/sandbox_utils.py b/verifiers/v1/utils/sandbox_utils.py index ea6876ada0..b749099f96 100644 --- a/verifiers/v1/utils/sandbox_utils.py +++ b/verifiers/v1/utils/sandbox_utils.py @@ -16,7 +16,11 @@ from verifiers.decorators import setup as setup_handler from verifiers.errors import Error, SandboxError -from verifiers.utils.async_utils import maybe_call_with_named_args +from verifiers.utils.async_utils import ( + FrameworkTimeoutError, + maybe_call_with_named_args, + timeout_after, +) from .program_utils import command_argv, command_env, float_config, int_config from .program_utils import program_channels @@ -28,6 +32,13 @@ python_package_list, sandbox_python_path_command, ) +from .timeout_utils import ( + BACKGROUND_JOB_POLL_INTERVAL_SECONDS, + SANDBOX_CREATE_TIMEOUT_SECONDS, + SANDBOX_LEASE_TTL_MINUTES, + SANDBOX_READY_TIMEOUT_SECONDS, + SANDBOX_SETUP_STEP_TIMEOUT_SECONDS, +) from ..runtime import Runtime from ..sandbox import SandboxConfig from ..program import ProgramValue @@ -405,7 +416,11 @@ async def create_sandbox_lease( from verifiers.utils.threaded_sandbox_client import ThreadedAsyncSandboxClient client = cast(SandboxClient, ThreadedAsyncSandboxClient()) - sandbox_id = await create_sandbox(client, sandbox_data, owns_client=owns_client) + sandbox_id = await create_sandbox( + client, + sandbox_data, + owns_client=owns_client, + ) lease = SandboxLease( client, sandbox_id, sandbox_config.scope, key, owns_client=owns_client ) @@ -425,7 +440,11 @@ async def create_scoped_sandbox_lease( sandbox = owner.sandbox if not isinstance(sandbox, SandboxConfig): raise TypeError("Sandbox owner must define a sandbox config.") - return await create_sandbox_lease(sandbox, key or sandbox_owner_key(owner), client) + return await create_sandbox_lease( + sandbox, + key or sandbox_owner_key(owner), + client, + ) async def run_sandbox_command( @@ -482,15 +501,38 @@ async def run_sandbox_command( command = shlex.join(argv) if use_sandbox_python_path or "mcp" in program_channels(program): command = sandbox_python_path_command(command) - command_timeout = sandbox_config.command_timeout - try: + + async def active_command() -> State: result = await lease.run_background_job( command, - timeout=command_timeout, working_dir=workdir, env=env, - poll_interval=int_config(sandbox_data, "poll_interval", 3), + poll_interval=BACKGROUND_JOB_POLL_INTERVAL_SECONDS, ) + state["command"] = { + "argv": argv, + "returncode": result.exit_code, + "stdout": result.stdout or "", + "stderr": result.stderr or "", + } + state["completion"] = [ + {"role": "assistant", "content": state["command"]["stdout"].strip()} + ] + if result.exit_code: + raise SandboxError( + f"Sandbox command exited with {result.exit_code}: {result.stderr}" + ) + state._set_stop_condition("command_completed") + return state + + timeout = cast(float | None, state.runtime_state().get("task_timeout_seconds")) + try: + async with timeout_after(timeout): + await active_command() + except FrameworkTimeoutError: + state["timed_out"] = True + state["task_timed_out"] = True + state.stop("timeout_reached") except Error: raise except Exception as exc: @@ -500,20 +542,6 @@ async def run_sandbox_command( f"Sandbox {lease.id} failed during command ({kind}): {exc}" ) from exc raise - state["command"] = { - "argv": argv, - "returncode": result.exit_code, - "stdout": result.stdout or "", - "stderr": result.stderr or "", - } - state["completion"] = [ - {"role": "assistant", "content": state["command"]["stdout"].strip()} - ] - if result.exit_code: - raise SandboxError( - f"Sandbox command exited with {result.exit_code}: {result.stderr}" - ) - state._set_stop_condition("command_completed") return state @@ -665,7 +693,7 @@ async def create_sandbox( else None, vm=bool(vm) if vm is not None else gpu_count > 0, network_access=bool(sandbox_config.get("network_access", True)), - timeout_minutes=int_config(sandbox_config, "timeout_minutes", 60), + timeout_minutes=SANDBOX_LEASE_TTL_MINUTES, labels=[str(label) for label in labels] if isinstance(labels, list) else [], environment_vars={ str(key): str(value) for key, value in environment_vars.items() @@ -691,13 +719,18 @@ async def create_sandbox( ) try: create_waiter = asyncio.shield(create_task) - if sandbox_config.get("create_timeout") is not None: - sandbox = await asyncio.wait_for( - create_waiter, int_config(sandbox_config, "create_timeout", 0) + sandbox = await asyncio.wait_for(create_waiter, SANDBOX_CREATE_TIMEOUT_SECONDS) + except (asyncio.TimeoutError, TimeoutError) as exc: + asyncio.create_task( + delete_late_sandbox_create( + client, + create_task, + owns_client=owns_client, + reason="timed out creation", ) - else: - sandbox = await create_waiter - except (asyncio.CancelledError, TimeoutError): + ) + raise TimeoutError("Sandbox creation timed out.") from exc + except asyncio.CancelledError: try: sandbox = cast(SandboxRecord, await asyncio.shield(create_task)) except BaseException: @@ -723,10 +756,7 @@ async def create_sandbox( sandbox_id, max_attempts=SANDBOX_WAIT_FOR_CREATION_ATTEMPTS, ) - if sandbox_config.get("wait_timeout") is not None: - await asyncio.wait_for(wait, int_config(sandbox_config, "wait_timeout", 0)) - else: - await wait + await asyncio.wait_for(wait, SANDBOX_READY_TIMEOUT_SECONDS) except BaseException: delete_task = asyncio.create_task( delete_sandbox_id( @@ -741,6 +771,38 @@ async def create_sandbox( return sandbox_id +async def delete_late_sandbox_create( + client: SandboxClient, + create_task: asyncio.Task[object], + *, + owns_client: bool, + reason: str, +) -> None: + try: + sandbox = cast( + SandboxRecord, + await asyncio.wait_for( + asyncio.shield(create_task), SANDBOX_CREATE_TIMEOUT_SECONDS + ), + ) + except (asyncio.TimeoutError, TimeoutError): + create_task.cancel() + logger.warning("Timed out waiting for late sandbox create cleanup.") + if owns_client: + await close_sandbox_client(client) + return + except BaseException: + if owns_client: + await close_sandbox_client(client) + return + await delete_sandbox_id( + client, + str(sandbox.id), + close_client=owns_client, + reason=reason, + ) + + async def delete_sandbox_id( client: SandboxClient, sandbox_id: str, @@ -770,7 +832,7 @@ async def setup_sandbox(handle: SandboxLease, sandbox_config: ConfigData) -> Non try: result = await handle.execute( python_package_install_command(package_args), - timeout=int_config(sandbox_config, "install_timeout", 300), + timeout=SANDBOX_SETUP_STEP_TIMEOUT_SECONDS, ) except Error: raise @@ -791,7 +853,7 @@ async def setup_sandbox(handle: SandboxLease, sandbox_config: ConfigData) -> Non try: result = await handle.execute( command, - timeout=int_config(sandbox_config, "setup_timeout", 300), + timeout=SANDBOX_SETUP_STEP_TIMEOUT_SECONDS, ) except Error: raise @@ -1042,7 +1104,6 @@ async def run_program_items( use_sandbox_python_path: bool = False, ) -> None: env = await command_env(program, task, state, runtime, include_base=False) - timeout = int_config(program, "setup_timeout", 300) for command in items: command = await resolve_program_value(command, task, state, runtime, program) command = str(command) @@ -1053,7 +1114,7 @@ async def run_program_items( sandbox_id=sandbox_id, command=command, env=env, - timeout=timeout, + timeout=SANDBOX_SETUP_STEP_TIMEOUT_SECONDS, ) if result.exit_code: raise SandboxError(f"{error_prefix}: {result.stderr}") diff --git a/verifiers/v1/utils/timeout_utils.py b/verifiers/v1/utils/timeout_utils.py new file mode 100644 index 0000000000..97ac2f407b --- /dev/null +++ b/verifiers/v1/utils/timeout_utils.py @@ -0,0 +1,21 @@ +MODEL_NO_RESPONSE_TIMEOUT_SECONDS = 21_600 +SANDBOX_CREATE_TIMEOUT_SECONDS = 3_600 +SANDBOX_READY_TIMEOUT_SECONDS = 3_600 +SANDBOX_SETUP_STEP_TIMEOUT_SECONDS = 900 +FILE_TRANSFER_TIMEOUT_SECONDS = 900 +BACKGROUND_JOB_POLL_INTERVAL_SECONDS = 3 +SANDBOX_LEASE_TTL_MINUTES = 24 * 60 +TEST_PHASE_TIMEOUT_SECONDS: int | None = None +OPENENV_STARTUP_TIMEOUT_SECONDS = 300 +OPENENV_STARTUP_POLL_INTERVAL_SECONDS = 1.0 +OPENENV_HEALTH_REQUEST_TIMEOUT_SECONDS = 10.0 +OPENENV_SCHEMA_REQUEST_TIMEOUT_SECONDS = 30.0 +OPENENV_WAIT_FOR_CREATION_ATTEMPTS = 120 + + +def test_phase_timeout_seconds(value: object | None) -> int | None: + if value is None: + return TEST_PHASE_TIMEOUT_SECONDS + if isinstance(value, bool) or not isinstance(value, int | float | str): + raise TypeError("Test phase timeout must be numeric seconds.") + return int(float(value))