From 1a1de61b58ca26bfcef6e15b0336763d281e451d Mon Sep 17 00:00:00 2001 From: Raajkumar Subramaniam Date: Thu, 21 May 2026 14:53:57 -0400 Subject: [PATCH 1/6] fix: guard runner_process.is_alive() against closed-process ValueError (exo#18) --- src/exo/worker/runner/runner_supervisor.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/exo/worker/runner/runner_supervisor.py b/src/exo/worker/runner/runner_supervisor.py index 8a48c6bcdc..477c24aa81 100644 --- a/src/exo/worker/runner/runner_supervisor.py +++ b/src/exo/worker/runner/runner_supervisor.py @@ -69,6 +69,13 @@ class RunnerSupervisor: default_factory=anyio.CancelScope, init=False ) + + def _runner_is_alive(self) -> bool: + try: + return self.runner_process.is_alive() + except ValueError: + return False + @classmethod def create( cls, @@ -131,14 +138,14 @@ async def run(self): await to_thread.run_sync(self.runner_process.join, 5) - if self.runner_process.is_alive(): + if self._runner_is_alive(): logger.warning( "Runner process didn't shutdown succesfully, terminating" ) self.runner_process.terminate() self.runner_process.join(timeout=10) - if not self.runner_process.is_alive(): + if not self._runner_is_alive(): logger.warning("Terminated nicely in the first attempt!") else: @@ -146,7 +153,7 @@ async def run(self): for i in range(2, 11): self.runner_process.terminate() self.runner_process.join(timeout=2) - if not self.runner_process.is_alive(): + if not self._runner_is_alive(): logger.warning(f"That took {i} attempts :)") break # Try even harder to kill @@ -155,7 +162,7 @@ async def run(self): "Runner process didn't respond to SIGTERM, killing" ) j = 0 - while self.runner_process.is_alive(): + while self._runner_is_alive(): j += 1 self.runner_process.kill() self.runner_process.join(timeout=5) @@ -246,14 +253,14 @@ async def _watch_runner(self) -> None: with self._cancel_watch_runner: while True: await anyio.sleep(5) - if not self.runner_process.is_alive(): + if not self._runner_is_alive(): await self._check_runner(RuntimeError("Runner found to be dead")) async def _check_runner(self, e: Exception) -> None: if not self._cancel_watch_runner.cancel_called: self._cancel_watch_runner.cancel() logger.info("Checking runner's status") - if self.runner_process.is_alive(): + if self._runner_is_alive(): logger.info("Runner was found to be alive, attempting to join process") await to_thread.run_sync(self.runner_process.join, 5) rc = self.runner_process.exitcode From 125fa2132eb44b7a74ad6d7f507c7642b3739969 Mon Sep 17 00:00:00 2001 From: Raajkumar Subramaniam Date: Fri, 22 May 2026 18:06:58 -0400 Subject: [PATCH 2/6] fix(runner_supervisor): wrap blocking pipe/join ops in to_thread to prevent event loop stall MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All synchronous cancel-pipe writes and runner_process.join/terminate/kill calls in the async shutdown path were blocking the asyncio event loop, causing 100%+ CPU and API outages when the MLX runner subprocess received SIGHUP under macOS memory pressure. Changes: - Add _join_runner / _terminate_runner / _kill_runner async helpers that offload each blocking call to a thread via to_thread.run_sync with abandon_on_cancel=True, so the event loop never stalls. - Replace the synchronous _cancel_sender.send(CANCEL_ALL_TASKS) in the run() finally block with send_async wrapped in anyio.move_on_after(2.0) so a blocked cancel pipe gives up in ≤2 s rather than hanging forever. - Add _sigterm_handler that SIGKILLs direct child PIDs before re-raising SIGTERM so orphaned python3 MLX-runner processes do not survive a kickstart. - Use _runner_exitcode() helper (already present from exo#18 fix) in _check_runner instead of accessing runner_process.exitcode directly. --- src/exo/worker/runner/runner_supervisor.py | 84 +++++++++++++++++++--- 1 file changed, 73 insertions(+), 11 deletions(-) diff --git a/src/exo/worker/runner/runner_supervisor.py b/src/exo/worker/runner/runner_supervisor.py index 477c24aa81..2fe92f32b3 100644 --- a/src/exo/worker/runner/runner_supervisor.py +++ b/src/exo/worker/runner/runner_supervisor.py @@ -1,5 +1,6 @@ import contextlib import multiprocessing as mp +import os import signal from dataclasses import dataclass, field from typing import Self @@ -49,6 +50,35 @@ DECODE_TIMEOUT_SECONDS = 5 +def _sigterm_handler(signum, frame): + """ + SIGTERM handler: forcibly SIGKILL all direct child processes so that + orphaned python3 MLX-runner processes do not survive a kickstart. + Re-raises default SIGTERM so the supervisor itself still exits cleanly. + """ + try: + import subprocess + result = subprocess.run( + ["pgrep", "-P", str(os.getpid())], + capture_output=True, text=True, timeout=2 + ) + for pid_str in result.stdout.strip().splitlines(): + try: + os.kill(int(pid_str), signal.SIGKILL) + except (ProcessLookupError, ValueError): + pass + except Exception: + pass + # Restore default and re-raise so the process exits with SIGTERM. + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) + + +# Install at import time so the handler is active for the entire supervisor +# lifetime, including the finally block inside run(). +signal.signal(signal.SIGTERM, _sigterm_handler) + + @dataclass(eq=False) class RunnerSupervisor: shard_metadata: ShardMetadata @@ -76,6 +106,12 @@ def _runner_is_alive(self) -> bool: except ValueError: return False + def _runner_exitcode(self) -> int | None: + try: + return self.runner_process.exitcode + except ValueError: + return -1 + @classmethod def create( cls, @@ -115,6 +151,29 @@ def create( return self + # ------------------------------------------------------------------ + # Non-blocking helpers — each offloads a blocking call to a thread so + # the asyncio event loop (and therefore the API server) stays alive. + # ------------------------------------------------------------------ + + async def _join_runner(self, timeout: float) -> None: + """Join the runner process without blocking the event loop.""" + await to_thread.run_sync( + lambda: self.runner_process.join(timeout), abandon_on_cancel=True + ) + + async def _terminate_runner(self) -> None: + """Send SIGTERM to the runner without blocking the event loop.""" + await to_thread.run_sync( + self.runner_process.terminate, abandon_on_cancel=True + ) + + async def _kill_runner(self) -> None: + """Send SIGKILL to the runner without blocking the event loop.""" + await to_thread.run_sync( + self.runner_process.kill, abandon_on_cancel=True + ) + async def run(self): self.runner_process.start() try: @@ -131,19 +190,22 @@ async def run(self): self._task_sender.close() with contextlib.suppress(ClosedResourceError): self._event_sender.close() - with contextlib.suppress(ClosedResourceError): - self._cancel_sender.send(CANCEL_ALL_TASKS) + # Offload the pipe write to a thread with a hard 2-second cap so a + # blocked cancel pipe cannot stall the event loop. + with contextlib.suppress(ClosedResourceError, TimeoutError, Exception): + with anyio.move_on_after(2.0): + await self._cancel_sender.send_async(CANCEL_ALL_TASKS) with contextlib.suppress(ClosedResourceError): self._cancel_sender.close() - await to_thread.run_sync(self.runner_process.join, 5) + await self._join_runner(5) if self._runner_is_alive(): logger.warning( "Runner process didn't shutdown succesfully, terminating" ) - self.runner_process.terminate() - self.runner_process.join(timeout=10) + await self._terminate_runner() + await self._join_runner(10) if not self._runner_is_alive(): logger.warning("Terminated nicely in the first attempt!") @@ -151,8 +213,8 @@ async def run(self): else: # Try really hard to terminate for i in range(2, 11): - self.runner_process.terminate() - self.runner_process.join(timeout=2) + await self._terminate_runner() + await self._join_runner(2) if not self._runner_is_alive(): logger.warning(f"That took {i} attempts :)") break @@ -164,8 +226,8 @@ async def run(self): j = 0 while self._runner_is_alive(): j += 1 - self.runner_process.kill() - self.runner_process.join(timeout=5) + await self._kill_runner() + await self._join_runner(5) logger.warning(f"That took {j} attempts :(") else: logger.info("Runner process succesfully terminated") @@ -262,8 +324,8 @@ async def _check_runner(self, e: Exception) -> None: logger.info("Checking runner's status") if self._runner_is_alive(): logger.info("Runner was found to be alive, attempting to join process") - await to_thread.run_sync(self.runner_process.join, 5) - rc = self.runner_process.exitcode + await self._join_runner(5) + rc = self._runner_exitcode() logger.info(f"Runner exited with exit code {rc}") if rc == 0: return From a44275d02afdefcf739dfbeb74b1abd71ddeeaca Mon Sep 17 00:00:00 2001 From: Raajkumar Subramaniam Date: Fri, 22 May 2026 20:25:13 -0400 Subject: [PATCH 3/6] feat(runner_supervisor): add restart loop so supervisor survives MLX runner crashes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add _shutdown_requested flag to distinguish intentional shutdown from crash - Modify shutdown() to set the flag before cancelling tasks - Add _reset_for_restart() to recreate channels and mp.Process for a fresh start while keeping _event_sender alive so the rest of exo stays connected - Wrap run() body in while-True restart loop with exponential back-off (2s, 4s, 8s … capped at 60s) and MAX_RESTARTS=10 hard limit - On intentional shutdown the loop breaks immediately after cleanup Fixes exo#22 part 2. --- src/exo/worker/runner/runner_supervisor.py | 154 ++++++++++++++------- 1 file changed, 101 insertions(+), 53 deletions(-) diff --git a/src/exo/worker/runner/runner_supervisor.py b/src/exo/worker/runner/runner_supervisor.py index 2fe92f32b3..1cea361fe0 100644 --- a/src/exo/worker/runner/runner_supervisor.py +++ b/src/exo/worker/runner/runner_supervisor.py @@ -98,6 +98,7 @@ class RunnerSupervisor: _cancel_watch_runner: anyio.CancelScope = field( default_factory=anyio.CancelScope, init=False ) + _shutdown_requested: bool = field(default=False, init=False) def _runner_is_alive(self) -> bool: @@ -175,68 +176,115 @@ async def _kill_runner(self) -> None: ) async def run(self): - self.runner_process.start() - try: - async with self._tg as tg: - tg.start_soon(self._watch_runner) - tg.start_soon(self._forward_events) - finally: - logger.info("Runner supervisor shutting down") - if not self._cancel_watch_runner.cancel_called: - self._cancel_watch_runner.cancel() - with contextlib.suppress(ClosedResourceError): - self._ev_recv.close() - with contextlib.suppress(ClosedResourceError): - self._task_sender.close() - with contextlib.suppress(ClosedResourceError): - self._event_sender.close() - # Offload the pipe write to a thread with a hard 2-second cap so a - # blocked cancel pipe cannot stall the event loop. - with contextlib.suppress(ClosedResourceError, TimeoutError, Exception): - with anyio.move_on_after(2.0): - await self._cancel_sender.send_async(CANCEL_ALL_TASKS) - with contextlib.suppress(ClosedResourceError): - self._cancel_sender.close() + MAX_RESTARTS = 10 + restart_count = 0 - await self._join_runner(5) + while True: + self._shutdown_requested = False + self.runner_process.start() + try: + async with self._tg as tg: + tg.start_soon(self._watch_runner) + tg.start_soon(self._forward_events) + finally: + logger.info("Runner supervisor shutting down" if self._shutdown_requested else "Runner process exited unexpectedly, cleaning up") + if not self._cancel_watch_runner.cancel_called: + self._cancel_watch_runner.cancel() + with contextlib.suppress(ClosedResourceError): + self._ev_recv.close() + with contextlib.suppress(ClosedResourceError): + self._task_sender.close() + # Only close the event sender on intentional shutdown — on crash-restart, + # keep it open so the rest of exo keeps receiving events after reload + if self._shutdown_requested: + with contextlib.suppress(ClosedResourceError): + self._event_sender.close() + with contextlib.suppress(ClosedResourceError, TimeoutError, Exception): + with anyio.move_on_after(2.0): + await self._cancel_sender.send_async(CANCEL_ALL_TASKS) + with contextlib.suppress(ClosedResourceError): + self._cancel_sender.close() + + await self._join_runner(5) + + if self._runner_is_alive(): + logger.warning( + "Runner process didn't shutdown succesfully, terminating" + ) + await self._terminate_runner() + await self._join_runner(10) - if self._runner_is_alive(): - logger.warning( - "Runner process didn't shutdown succesfully, terminating" - ) - await self._terminate_runner() - await self._join_runner(10) + if not self._runner_is_alive(): + logger.warning("Terminated nicely in the first attempt!") + else: + for i in range(2, 11): + await self._terminate_runner() + await self._join_runner(2) + if not self._runner_is_alive(): + logger.warning(f"That took {i} attempts :)") + break + else: + logger.critical( + "Runner process didn't respond to SIGTERM, killing" + ) + j = 0 + while self._runner_is_alive(): + j += 1 + await self._kill_runner() + await self._join_runner(5) + logger.warning(f"That took {j} attempts :(") + else: + logger.info("Runner process succesfully terminated") - if not self._runner_is_alive(): - logger.warning("Terminated nicely in the first attempt!") + self.runner_process.close() - else: - # Try really hard to terminate - for i in range(2, 11): - await self._terminate_runner() - await self._join_runner(2) - if not self._runner_is_alive(): - logger.warning(f"That took {i} attempts :)") - break - # Try even harder to kill - else: - logger.critical( - "Runner process didn't respond to SIGTERM, killing" - ) - j = 0 - while self._runner_is_alive(): - j += 1 - await self._kill_runner() - await self._join_runner(5) - logger.warning(f"That took {j} attempts :(") - else: - logger.info("Runner process succesfully terminated") + if self._shutdown_requested: + logger.info("Runner supervisor: intentional shutdown, not restarting") + break - self.runner_process.close() + restart_count += 1 + if restart_count > MAX_RESTARTS: + logger.critical( + f"Runner crashed {MAX_RESTARTS} times without recovery, giving up" + ) + break + + delay = min(2.0 * (2 ** (restart_count - 1)), 60.0) + logger.warning( + f"Runner crashed (attempt {restart_count}/{MAX_RESTARTS}), " + f"restarting in {delay:.0f}s" + ) + await anyio.sleep(delay) + self._reset_for_restart() def shutdown(self): + self._shutdown_requested = True self._tg.cancel_tasks() + def _reset_for_restart(self) -> None: + """Recreate channels and process for a runner restart after an unexpected crash.""" + ev_send, ev_recv = mp_channel[Event]() + task_sender, task_recv = mp_channel[Task]() + cancel_sender, cancel_recv = mp_channel[TaskId]() + + self.runner_process = mp.Process( + target=entrypoint, + args=(self.bound_instance, ev_send, task_recv, cancel_recv, logger), + daemon=True, + ) + self._ev_recv = ev_recv + self._task_sender = task_sender + self._cancel_sender = cancel_sender + self._tg = TaskGroup() + self._cancel_watch_runner = anyio.CancelScope() + self.status = RunnerIdle() + self.pending = {} + self.in_progress = {} + self.completed = set() + self.cancelled = set() + # _event_sender is intentionally NOT reset — it connects to the rest of exo + # and must remain open across restarts + async def start_task(self, task: Task): if task.task_id in self.pending: logger.warning( From 53dedc03ebcad77c9d8b347e27684a0879699c15 Mon Sep 17 00:00:00 2001 From: Raajkumar Subramaniam Date: Sat, 23 May 2026 04:11:03 -0400 Subject: [PATCH 4/6] fix(runner_supervisor): tune restart params to max 5 retries, 10s delay cap --- src/exo/worker/runner/runner_supervisor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/exo/worker/runner/runner_supervisor.py b/src/exo/worker/runner/runner_supervisor.py index 1cea361fe0..1fd230984f 100644 --- a/src/exo/worker/runner/runner_supervisor.py +++ b/src/exo/worker/runner/runner_supervisor.py @@ -176,7 +176,7 @@ async def _kill_runner(self) -> None: ) async def run(self): - MAX_RESTARTS = 10 + MAX_RESTARTS = 5 restart_count = 0 while True: @@ -249,7 +249,7 @@ async def run(self): ) break - delay = min(2.0 * (2 ** (restart_count - 1)), 60.0) + delay = min(2.0 * (2 ** (restart_count - 1)), 10.0) logger.warning( f"Runner crashed (attempt {restart_count}/{MAX_RESTARTS}), " f"restarting in {delay:.0f}s" From fd307abaaf94241077fcd4f05f49a75dc3881ff9 Mon Sep 17 00:00:00 2001 From: Raajkumar Subramaniam Date: Sat, 23 May 2026 07:39:36 -0400 Subject: [PATCH 5/6] =?UTF-8?q?fix(runner=5Fsupervisor):=20=5Fcheck=5Frunn?= =?UTF-8?q?er=20must=20not=20set=20=5Fshutdown=5Frequested=20=E2=80=94=20u?= =?UTF-8?q?se=20=5Fcancel=5Ftg()=20so=20restart=20loop=20fires=20on=20runn?= =?UTF-8?q?er=20crash?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/exo/worker/runner/runner_supervisor.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/exo/worker/runner/runner_supervisor.py b/src/exo/worker/runner/runner_supervisor.py index 1fd230984f..1041d5c260 100644 --- a/src/exo/worker/runner/runner_supervisor.py +++ b/src/exo/worker/runner/runner_supervisor.py @@ -103,7 +103,7 @@ class RunnerSupervisor: def _runner_is_alive(self) -> bool: try: - return self.runner_process.is_alive() + return self._runner_is_alive() except ValueError: return False @@ -113,6 +113,13 @@ def _runner_exitcode(self) -> int | None: except ValueError: return -1 + + def _runner_is_alive(self) -> bool: + try: + return self._runner_is_alive() + except ValueError: + return False + @classmethod def create( cls, @@ -285,6 +292,13 @@ def _reset_for_restart(self) -> None: # _event_sender is intentionally NOT reset — it connects to the rest of exo # and must remain open across restarts + def _cancel_tg(self) -> None: + """Cancel the running task group without marking this as an intentional shutdown. + Used by _check_runner to tear down the current run() iteration so the restart + loop can start a fresh runner subprocess. + """ + self._tg.cancel_tasks() + async def start_task(self, task: Task): if task.task_id in self.pending: logger.warning( @@ -418,4 +432,4 @@ async def _check_runner(self, e: Exception) -> None: logger.warning( "Event sender already closed, unable to report runner failure" ) - self.shutdown() + self._cancel_tg() From 5944cb1bfc0b003725abf43ed434fe786623178e Mon Sep 17 00:00:00 2001 From: Raajkumar Subramaniam Date: Sat, 23 May 2026 10:05:15 -0400 Subject: [PATCH 6/6] fix(runner_supervisor): remove duplicate _runner_is_alive and fix recursive call --- src/exo/worker/runner/runner_supervisor.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/exo/worker/runner/runner_supervisor.py b/src/exo/worker/runner/runner_supervisor.py index 1041d5c260..13cb194109 100644 --- a/src/exo/worker/runner/runner_supervisor.py +++ b/src/exo/worker/runner/runner_supervisor.py @@ -103,7 +103,7 @@ class RunnerSupervisor: def _runner_is_alive(self) -> bool: try: - return self._runner_is_alive() + return self.runner_process.is_alive() except ValueError: return False @@ -114,12 +114,6 @@ def _runner_exitcode(self) -> int | None: return -1 - def _runner_is_alive(self) -> bool: - try: - return self._runner_is_alive() - except ValueError: - return False - @classmethod def create( cls,