diff --git a/verifiers/utils/threaded_sandbox_client.py b/verifiers/utils/threaded_sandbox_client.py index 25df03fc04..c6453babfa 100644 --- a/verifiers/utils/threaded_sandbox_client.py +++ b/verifiers/utils/threaded_sandbox_client.py @@ -5,7 +5,14 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable -from prime_sandboxes import AsyncSandboxClient, CommandTimeoutError +from prime_sandboxes import ( + AsyncSandboxClient, + CommandTimeoutError, + SandboxImagePullError, + SandboxNotRunningError, + SandboxOOMError, + SandboxTimeoutError, +) from verifiers.utils.thread_utils import ( get_or_create_thread_attr, @@ -22,6 +29,7 @@ class ThreadedAsyncSandboxClient: """ DEFAULT_MAX_WORKERS = 50 + READINESS_FAILURE_ATTEMPTS = 5 def __init__( self, @@ -75,6 +83,72 @@ def run_in_thread(): return wrapper + async def wait_for_creation_resilient( + self, + sandbox_id: str, + max_attempts: int = 60, + stability_checks: int = 1, + ) -> None: + """Wait for readiness without occupying one worker for the entire poll loop.""" + consecutive_successes = 0 + readiness_failures = 0 + last_readiness_error: Exception | None = None + for attempt in range(max_attempts): + sandbox = await self.get(sandbox_id) + if sandbox.status == "RUNNING": + try: + await self.execute_command( + sandbox_id, + "echo 'sandbox ready'", + timeout=10, + ) + except Exception as exc: + consecutive_successes = 0 + readiness_failures += 1 + last_readiness_error = exc + if readiness_failures >= self.READINESS_FAILURE_ATTEMPTS: + raise SandboxNotRunningError( + sandbox_id, + status="RUNNING", + message=( + f"Sandbox {sandbox_id} stayed unreachable after " + f"{readiness_failures} readiness checks: {exc}" + ), + ) from exc + else: + readiness_failures = 0 + consecutive_successes += 1 + if consecutive_successes >= stability_checks: + return + await asyncio.sleep(0.5) + continue + elif sandbox.status in {"ERROR", "TERMINATED", "TIMEOUT"}: + error_type = sandbox.error_type + error_class = { + "OOM_KILLED": SandboxOOMError, + "TIMEOUT": SandboxTimeoutError, + "IMAGE_PULL_FAILED": SandboxImagePullError, + }.get(error_type, SandboxNotRunningError) + message = ( + f"Sandbox {sandbox_id} failed ({error_type}): " + f"{sandbox.error_message}" + if sandbox.error_message + else None + ) + raise error_class( + sandbox_id, + status=sandbox.status, + error_type=error_type, + message=message, + ) + + await asyncio.sleep(1 if attempt < 5 else 2) + + message = "Timeout during sandbox creation" + if last_readiness_error is not None: + message += f": {last_readiness_error}" + raise SandboxNotRunningError(sandbox_id, status=message) + async def run_background_job( self, sandbox_id: str, diff --git a/verifiers/v1/utils/sandbox_utils.py b/verifiers/v1/utils/sandbox_utils.py index 258d0fa17e..91ac9f1c93 100644 --- a/verifiers/v1/utils/sandbox_utils.py +++ b/verifiers/v1/utils/sandbox_utils.py @@ -725,59 +725,82 @@ async def create_sandbox( else None, guaranteed=bool(sandbox_config.get("guaranteed", False)), ) - create_task = asyncio.create_task( - with_sandbox_retry(lambda: client.create(request)) - ) - try: - create_waiter = asyncio.shield(create_task) - if sandbox_config.get("create_timeout") is not None: - sandbox = await asyncio.wait_for( - create_waiter, int_config(sandbox_config, "create_timeout", 0) - ) - else: - sandbox = await create_waiter - except (asyncio.CancelledError, TimeoutError): + for readiness_attempt in range(SANDBOX_RETRY_ATTEMPTS): + create_task = asyncio.create_task( + with_sandbox_retry(lambda: client.create(request)) + ) try: - sandbox = cast(SandboxRecord, await asyncio.shield(create_task)) + create_waiter = asyncio.shield(create_task) + if sandbox_config.get("create_timeout") is not None: + sandbox = await asyncio.wait_for( + create_waiter, int_config(sandbox_config, "create_timeout", 0) + ) + else: + sandbox = await create_waiter + except (asyncio.CancelledError, TimeoutError): + try: + sandbox = cast(SandboxRecord, await asyncio.shield(create_task)) + except BaseException: + if owns_client: + await close_sandbox_client(client) + raise + await asyncio.shield( + delete_sandbox_id( + client, + str(sandbox.id), + close_client=owns_client, + reason="cancelled creation", + ) + ) + raise except BaseException: if owns_client: await close_sandbox_client(client) raise - await asyncio.shield( - delete_sandbox_id( + sandbox_id = str(sandbox.id) + try: + wait_for_creation = getattr( client, - str(sandbox.id), - close_client=owns_client, - reason="cancelled creation", + "wait_for_creation_resilient", + client.wait_for_creation, ) - ) - raise - except BaseException: - if owns_client: - await close_sandbox_client(client) - raise - sandbox_id = str(sandbox.id) - try: - wait = client.wait_for_creation( - sandbox_id, - max_attempts=SANDBOX_WAIT_FOR_CREATION_ATTEMPTS, - ) - if sandbox_config.get("wait_timeout") is not None: - await asyncio.wait_for(wait, int_config(sandbox_config, "wait_timeout", 0)) - else: - await wait - except BaseException: - delete_task = asyncio.create_task( - delete_sandbox_id( - client, + wait = wait_for_creation( sandbox_id, - close_client=owns_client, - reason="creation failure", + max_attempts=SANDBOX_WAIT_FOR_CREATION_ATTEMPTS, ) - ) - await asyncio.shield(delete_task) - raise - return sandbox_id + if sandbox_config.get("wait_timeout") is not None: + await asyncio.wait_for( + wait, int_config(sandbox_config, "wait_timeout", 0) + ) + else: + await wait + except BaseException as exc: + replace_sandbox = ( + type(exc).__name__ == "SandboxNotRunningError" + and getattr(exc, "status", None) == "RUNNING" + ) + final_attempt = readiness_attempt == SANDBOX_RETRY_ATTEMPTS - 1 + await asyncio.shield( + delete_sandbox_id( + client, + sandbox_id, + close_client=owns_client and (final_attempt or not replace_sandbox), + reason="readiness failure", + ) + ) + if final_attempt or not replace_sandbox: + raise + logger.warning( + "Replacing unreachable sandbox %s after readiness failure: %s " + "(attempt %s/%s)", + sandbox_id, + exc, + readiness_attempt + 1, + SANDBOX_RETRY_ATTEMPTS, + ) + continue + return sandbox_id + raise AssertionError("sandbox readiness retry loop exited without running") async def delete_sandbox_id(