-
Notifications
You must be signed in to change notification settings - Fork 560
fix(v1,serve): evict rollout trajectories, discard delivered intercepts, trim worker arenas #1611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -248,9 +248,18 @@ async def send_error_response(error: str) -> None: | |
|
|
||
| async def stats_loop(self, interval: float = 10.0) -> None: | ||
| """Loop to push worker stats to the router.""" | ||
| try: | ||
| import ctypes | ||
|
|
||
| libc = ctypes.CDLL("libc.so.6") | ||
| except OSError: | ||
| libc = None | ||
| while True: | ||
| await asyncio.sleep(interval) | ||
|
|
||
| if libc is not None: | ||
| libc.malloc_trim(0) | ||
|
Comment on lines
+260
to
+261
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When workers are serving active rollouts, this synchronous Useful? React with 👍 / 👎. |
||
|
|
||
| stats = EnvWorkerStats( | ||
| worker_id=self.worker_id, | ||
| timestamp=time.time(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1052,6 +1052,7 @@ async def cleanup_rollout(self, task: Task, state: State) -> None: | |
| key = str(state["trajectory_id"]) | ||
| self._model_request_locks.pop(key, None) | ||
| self._inflight_visible_model_requests.pop(key, None) | ||
| self.trajectories.pop(key, None) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a grouped eval reaches group scoring, each Useful? React with 👍 / 👎. |
||
| self.release_tool_handles(state) | ||
|
|
||
| async def cleanup_group(self, tasks: list[Task], states: list[State]) -> None: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -240,6 +240,10 @@ def rollout_queue(self, rollout_key: str) -> asyncio.Queue[str]: | |
| def get_request(self, request_id: str) -> EndpointInterceptData: | ||
| return cast(EndpointInterceptData, self.server.intercepts[request_id]) | ||
|
|
||
| def discard_request(self, request_id: str) -> None: | ||
| """Drop a delivered intercept from the server's per-request store.""" | ||
| self.server.intercepts.pop(request_id, None) | ||
|
|
||
| def request_context( | ||
| self, request_id: str, request: EndpointInterceptData | ||
| ) -> ModelRequestContext: | ||
|
|
@@ -513,14 +517,17 @@ async def forward_request( | |
| state._set_error(e) | ||
| raise | ||
| finally: | ||
| if bool(request.get("stream")): | ||
| if request.get("protocol") != "openai_chat_completions": | ||
| raise NotImplementedError( | ||
| "Streaming interception is currently supported for OpenAI Chat Completions." | ||
| ) | ||
| await synthesize_stream(request, response, error) | ||
| else: | ||
| deliver_response(request, response, error) | ||
| try: | ||
| if bool(request.get("stream")): | ||
| if request.get("protocol") != "openai_chat_completions": | ||
| raise NotImplementedError( | ||
| "Streaming interception is currently supported for OpenAI Chat Completions." | ||
| ) | ||
| await synthesize_stream(request, response, error) | ||
| else: | ||
| deliver_response(request, response, error) | ||
| finally: | ||
| endpoint.discard_request(request_id) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For intercepted streaming requests whose protocol is not Useful? React with 👍 / 👎. |
||
|
|
||
|
|
||
| def normalize_endpoint_prompt(request: EndpointInterceptData) -> Messages: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Medium
server/env_worker.py:252On musl-based systems like Alpine Linux,
libc.so.6can exist butmalloc_trimis a glibc-specific function that musl does not export. Callinglibc.malloc_trim(0)throwsAttributeErrorwhen the symbol is missing, which terminatesstats_loopand stops all stats collection for the rest of the worker's lifetime. Consider catchingAttributeErroralongsideOSErrorwhen loading the symbol, or guard the call with a try/except.try: import ctypes libc = ctypes.CDLL("libc.so.6") + libc.malloc_trim(0) # verify symbol exists + libc.malloc_trim.argtypes = [ctypes.c_int] except (OSError, AttributeError): libc = None while True: await asyncio.sleep(interval) if libc is not None: - libc.malloc_trim(0) + try: + libc.malloc_trim(0) + except Exception: + pass🚀 Reply "fix it for me" or copy this AI Prompt for your agent: