-
Notifications
You must be signed in to change notification settings - Fork 560
fix(v1): replace unreachable sandboxes #1578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,14 @@ | |
| from concurrent.futures import ThreadPoolExecutor | ||
| from typing import Any, Callable | ||
|
|
||
| from prime_sandboxes import AsyncSandboxClient, CommandTimeoutError | ||
| from prime_sandboxes import ( | ||
| AsyncSandboxClient, | ||
| CommandTimeoutError, | ||
| SandboxImagePullError, | ||
| SandboxNotRunningError, | ||
| SandboxOOMError, | ||
| SandboxTimeoutError, | ||
| ) | ||
|
|
||
| from verifiers.utils.thread_utils import ( | ||
| get_or_create_thread_attr, | ||
|
|
@@ -22,6 +29,7 @@ class ThreadedAsyncSandboxClient: | |
| """ | ||
|
|
||
| DEFAULT_MAX_WORKERS = 50 | ||
| READINESS_FAILURE_ATTEMPTS = 5 | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -75,6 +83,72 @@ def run_in_thread(): | |
|
|
||
| return wrapper | ||
|
|
||
| async def wait_for_creation_resilient( | ||
| self, | ||
| sandbox_id: str, | ||
| max_attempts: int = 60, | ||
| stability_checks: int = 1, | ||
| ) -> None: | ||
| """Wait for readiness without occupying one worker for the entire poll loop.""" | ||
| consecutive_successes = 0 | ||
| readiness_failures = 0 | ||
| last_readiness_error: Exception | None = None | ||
| for attempt in range(max_attempts): | ||
| sandbox = await self.get(sandbox_id) | ||
| if sandbox.status == "RUNNING": | ||
| try: | ||
| await self.execute_command( | ||
| sandbox_id, | ||
| "echo 'sandbox ready'", | ||
| timeout=10, | ||
| ) | ||
| except Exception as exc: | ||
| consecutive_successes = 0 | ||
| readiness_failures += 1 | ||
| last_readiness_error = exc | ||
| if readiness_failures >= self.READINESS_FAILURE_ATTEMPTS: | ||
| raise SandboxNotRunningError( | ||
| sandbox_id, | ||
| status="RUNNING", | ||
| message=( | ||
| f"Sandbox {sandbox_id} stayed unreachable after " | ||
| f"{readiness_failures} readiness checks: {exc}" | ||
| ), | ||
| ) from exc | ||
| else: | ||
| readiness_failures = 0 | ||
| consecutive_successes += 1 | ||
| if consecutive_successes >= stability_checks: | ||
| return | ||
| await asyncio.sleep(0.5) | ||
| continue | ||
| elif sandbox.status in {"ERROR", "TERMINATED", "TIMEOUT"}: | ||
| error_type = sandbox.error_type | ||
| error_class = { | ||
| "OOM_KILLED": SandboxOOMError, | ||
| "TIMEOUT": SandboxTimeoutError, | ||
| "IMAGE_PULL_FAILED": SandboxImagePullError, | ||
| }.get(error_type, SandboxNotRunningError) | ||
| message = ( | ||
| f"Sandbox {sandbox_id} failed ({error_type}): " | ||
| f"{sandbox.error_message}" | ||
| if sandbox.error_message | ||
| else None | ||
| ) | ||
| raise error_class( | ||
| sandbox_id, | ||
| status=sandbox.status, | ||
| error_type=error_type, | ||
| message=message, | ||
| ) | ||
|
|
||
| await asyncio.sleep(1 if attempt < 5 else 2) | ||
|
|
||
| message = "Timeout during sandbox creation" | ||
| if last_readiness_error is not None: | ||
| message += f": {last_readiness_error}" | ||
| raise SandboxNotRunningError(sandbox_id, status=message) | ||
|
Comment on lines
+147
to
+150
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟢 Low On line 150, - raise SandboxNotRunningError(sandbox_id, status=message)
+ raise SandboxNotRunningError(sandbox_id, message=message)🚀 Reply "fix it for me" or copy this AI Prompt for your agent: |
||
|
|
||
| async def run_background_job( | ||
| self, | ||
| sandbox_id: str, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -725,59 +725,82 @@ async def create_sandbox( | |
| else None, | ||
| guaranteed=bool(sandbox_config.get("guaranteed", False)), | ||
| ) | ||
| create_task = asyncio.create_task( | ||
| with_sandbox_retry(lambda: client.create(request)) | ||
| ) | ||
| try: | ||
| create_waiter = asyncio.shield(create_task) | ||
| if sandbox_config.get("create_timeout") is not None: | ||
| sandbox = await asyncio.wait_for( | ||
| create_waiter, int_config(sandbox_config, "create_timeout", 0) | ||
| ) | ||
| else: | ||
| sandbox = await create_waiter | ||
| except (asyncio.CancelledError, TimeoutError): | ||
| for readiness_attempt in range(SANDBOX_RETRY_ATTEMPTS): | ||
| create_task = asyncio.create_task( | ||
| with_sandbox_retry(lambda: client.create(request)) | ||
| ) | ||
| try: | ||
| sandbox = cast(SandboxRecord, await asyncio.shield(create_task)) | ||
| create_waiter = asyncio.shield(create_task) | ||
| if sandbox_config.get("create_timeout") is not None: | ||
| sandbox = await asyncio.wait_for( | ||
| create_waiter, int_config(sandbox_config, "create_timeout", 0) | ||
| ) | ||
| else: | ||
| sandbox = await create_waiter | ||
| except (asyncio.CancelledError, TimeoutError): | ||
| try: | ||
| sandbox = cast(SandboxRecord, await asyncio.shield(create_task)) | ||
| except BaseException: | ||
| if owns_client: | ||
| await close_sandbox_client(client) | ||
| raise | ||
| await asyncio.shield( | ||
| delete_sandbox_id( | ||
| client, | ||
| str(sandbox.id), | ||
| close_client=owns_client, | ||
| reason="cancelled creation", | ||
| ) | ||
| ) | ||
| raise | ||
| except BaseException: | ||
| if owns_client: | ||
| await close_sandbox_client(client) | ||
| raise | ||
| await asyncio.shield( | ||
| delete_sandbox_id( | ||
| sandbox_id = str(sandbox.id) | ||
| try: | ||
| wait_for_creation = getattr( | ||
| client, | ||
| str(sandbox.id), | ||
| close_client=owns_client, | ||
| reason="cancelled creation", | ||
| "wait_for_creation_resilient", | ||
| client.wait_for_creation, | ||
| ) | ||
| ) | ||
| raise | ||
| except BaseException: | ||
| if owns_client: | ||
| await close_sandbox_client(client) | ||
| raise | ||
| sandbox_id = str(sandbox.id) | ||
| try: | ||
| wait = client.wait_for_creation( | ||
| sandbox_id, | ||
| max_attempts=SANDBOX_WAIT_FOR_CREATION_ATTEMPTS, | ||
| ) | ||
| if sandbox_config.get("wait_timeout") is not None: | ||
| await asyncio.wait_for(wait, int_config(sandbox_config, "wait_timeout", 0)) | ||
| else: | ||
| await wait | ||
| except BaseException: | ||
| delete_task = asyncio.create_task( | ||
| delete_sandbox_id( | ||
| client, | ||
| wait = wait_for_creation( | ||
| sandbox_id, | ||
| close_client=owns_client, | ||
| reason="creation failure", | ||
| max_attempts=SANDBOX_WAIT_FOR_CREATION_ATTEMPTS, | ||
| ) | ||
| ) | ||
| await asyncio.shield(delete_task) | ||
| raise | ||
| return sandbox_id | ||
| if sandbox_config.get("wait_timeout") is not None: | ||
| await asyncio.wait_for( | ||
| wait, int_config(sandbox_config, "wait_timeout", 0) | ||
| ) | ||
| else: | ||
| await wait | ||
| except BaseException as exc: | ||
| replace_sandbox = ( | ||
| type(exc).__name__ == "SandboxNotRunningError" | ||
| and getattr(exc, "status", None) == "RUNNING" | ||
| ) | ||
| final_attempt = readiness_attempt == SANDBOX_RETRY_ATTEMPTS - 1 | ||
| await asyncio.shield( | ||
| delete_sandbox_id( | ||
| client, | ||
| sandbox_id, | ||
| close_client=owns_client and (final_attempt or not replace_sandbox), | ||
| reason="readiness failure", | ||
| ) | ||
| ) | ||
| if final_attempt or not replace_sandbox: | ||
| raise | ||
| logger.warning( | ||
| "Replacing unreachable sandbox %s after readiness failure: %s " | ||
| "(attempt %s/%s)", | ||
| sandbox_id, | ||
| exc, | ||
| readiness_attempt + 1, | ||
| SANDBOX_RETRY_ATTEMPTS, | ||
| ) | ||
| continue | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replacement retries reuse create requestMedium Severity The readiness retry loop builds one Reviewed by Cursor Bugbot for commit ad7f1fd. Configure here. |
||
| return sandbox_id | ||
| raise AssertionError("sandbox readiness retry loop exited without running") | ||
|
|
||
|
|
||
| async def delete_sandbox_id( | ||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the control plane reports
RUNNINGbefore the exec gateway is accepting commands, this hard cap raises after only five failed readiness probes, regardless of the caller'smax_attempts/wait_timeout. For slow-starting sandboxes or delayed gateway routing, v1 will delete and recreate otherwise healthy sandboxes and can exhaust all six create retries instead of waiting for the original sandbox to become reachable.Useful? React with 👍 / 👎.