Skip to content

fix(runner_supervisor): wrap blocking pipe/join ops in to_thread to prevent event loop stall#2107

Open
raajkumars wants to merge 6 commits into
exo-explore:mainfrom
qwickapps:fix/cancel-pipe-blocking-and-sigterm-cleanup
Open

fix(runner_supervisor): wrap blocking pipe/join ops in to_thread to prevent event loop stall#2107
raajkumars wants to merge 6 commits into
exo-explore:mainfrom
qwickapps:fix/cancel-pipe-blocking-and-sigterm-cleanup

Conversation

@raajkumars
Copy link
Copy Markdown

Problem

When the MLX runner subprocess receives SIGHUP (macOS memory pressure), the RunnerSupervisor hits a 'cancel pipe blocked' condition. At that point it calls _check_runner which triggers the run() finally block. That block contained several synchronous blocking calls directly in the async coroutine:

  1. self._cancel_sender.send(CANCEL_ALL_TASKS) — synchronous mp.Queue.put(block=True) with no timeout
  2. self.runner_process.terminate() — synchronous, can block
  3. self.runner_process.join(timeout=N) — multiple calls, each blocking the thread for up to N seconds
  4. self.runner_process.kill() — same

These calls stall the asyncio event loop, which causes the main python3 process to spin at 100%+ CPU and the API to stop accepting connections.

Fix

Primary (event loop protection):

  • Add three async helpers (_join_runner, _terminate_runner, _kill_runner) that wrap each blocking mp.Process call in to_thread.run_sync(..., abandon_on_cancel=True). The event loop is never blocked; if the supervisor is being cancelled the thread is abandoned.
  • Replace the synchronous _cancel_sender.send(CANCEL_ALL_TASKS) in run() finally with send_async wrapped in anyio.move_on_after(2.0). A blocked cancel pipe now gives up in ≤2 s instead of hanging forever.
  • Use the existing _runner_exitcode() helper (from exo#18) in _check_runner instead of accessing runner_process.exitcode directly.

Secondary (orphan process cleanup):

  • Add _sigterm_handler that SIGKILLs all direct child PIDs (via pgrep -P) before re-raising SIGTERM. Installed at import time via signal.signal(signal.SIGTERM, ...). This ensures orphaned python3 MLX-runner processes do not survive a kickstart restart.

Testing

Syntax verified with python3 -m py_compile on the patched file.

Related

Builds on top of the exo#18 fix (_runner_is_alive / _runner_exitcode guards).

Raajkumar Subramaniam added 5 commits May 21, 2026 14:53
…revent event loop stall

All synchronous cancel-pipe writes and runner_process.join/terminate/kill
calls in the async shutdown path were blocking the asyncio event loop,
causing 100%+ CPU and API outages when the MLX runner subprocess received
SIGHUP under macOS memory pressure.

Changes:
- Add _join_runner / _terminate_runner / _kill_runner async helpers that
  offload each blocking call to a thread via to_thread.run_sync with
  abandon_on_cancel=True, so the event loop never stalls.
- Replace the synchronous _cancel_sender.send(CANCEL_ALL_TASKS) in the
  run() finally block with send_async wrapped in anyio.move_on_after(2.0)
  so a blocked cancel pipe gives up in ≤2 s rather than hanging forever.
- Add _sigterm_handler that SIGKILLs direct child PIDs before re-raising
  SIGTERM so orphaned python3 MLX-runner processes do not survive a
  kickstart.
- Use _runner_exitcode() helper (already present from exo#18 fix) in
  _check_runner instead of accessing runner_process.exitcode directly.
…runner crashes

- Add _shutdown_requested flag to distinguish intentional shutdown from crash
- Modify shutdown() to set the flag before cancelling tasks
- Add _reset_for_restart() to recreate channels and mp.Process for a fresh start
  while keeping _event_sender alive so the rest of exo stays connected
- Wrap run() body in while-True restart loop with exponential back-off
  (2s, 4s, 8s … capped at 60s) and MAX_RESTARTS=10 hard limit
- On intentional shutdown the loop breaks immediately after cleanup

Fixes exo#22 part 2.
…d — use _cancel_tg() so restart loop fires on runner crash
@raajkumars
Copy link
Copy Markdown
Author

Review: REQUEST CHANGES


BLOCKER — _runner_is_alive() is infinitely recursive (will crash on first health check)

The diff introduces _runner_is_alive() twice inside the class — and both definitions call self._runner_is_alive() recursively. Python's second definition silently shadows the first, so only the second survives, but either way every call raises RecursionError (not ValueError, so the except ValueError won't save it).

# Current (broken):
def _runner_is_alive(self) -> bool:
    try:
        return self._runner_is_alive()   # ← calls itself → RecursionError
    except ValueError:
        return False

The obvious intent was self.runner_process.is_alive(). Fix:

def _runner_is_alive(self) -> bool:
    try:
        return self.runner_process.is_alive()
    except ValueError:
        return False

Remove the duplicate definition entirely — only one is needed.


FOCUS AREAS — everything else checks out

1. Cancel pipe non-blocking?
self._cancel_sender.send(CANCEL_ALL_TASKS)await self._cancel_sender.send_async(CANCEL_ALL_TASKS) inside anyio.move_on_after(2.0). Properly non-blocking; timeout prevents a wedged pipe from stalling the finally block.

2. _reset_for_restart() channel recreation?
All inbound/outbound channels recreated (ev_send/recv, task_sender/recv, cancel_sender/recv), new mp.Process with fresh channel ends, new TaskGroup, fresh CancelScope, state collections cleared. _event_sender intentionally preserved — correct, it's the upstream connection to the rest of exo. No missing channel.

3. Race between _cancel_tg() and restart loop's finally block?
_cancel_tg() (crash path) vs shutdown() (intentional path) use different flag semantics. Even if shutdown() is called mid-finally, _shutdown_requested is checked after the finally block completes — so the loop correctly breaks. Double-cancel is idempotent. The only narrow race (shutdown between _shutdown_requested = False and async with self._tg) still produces correct behavior: finally runs, flag is True, loop breaks.


nit — _sigterm_handler calls subprocess.run inside a signal handler

Using subprocess.run in a signal handler is technically async-signal-unsafe. In practice it works on CPython because the GIL means this handler only fires between bytecodes, and subprocess.run is fast here. But the safer pattern would be os.system("kill -9 ...") or a pre-forked approach. Not blocking — log it as a known limitation in the docstring.


The recursive _runner_is_alive() is the only blocker. Fix that, remove the duplicate, and this is approvable.

@raajkumars
Copy link
Copy Markdown
Author

Blocker addressed — fix pushed (5944cb1)

The duplicate _runner_is_alive() definition and recursive self-call have been removed. The surviving definition now correctly delegates to self.runner_process.is_alive():

def _runner_is_alive(self) -> bool:
    try:
        return self.runner_process.is_alive()
    except ValueError:
        return False

Verified: grep -n "def _runner_is_alive" returns exactly one definition. python3 -m py_compile passes on the patched file.

@raajkumars
Copy link
Copy Markdown
Author

Blocker resolved. Single _runner_is_alive() definition delegates correctly to runner_process.is_alive() with ValueError guard. Nit on _sigterm_handler signal safety noted in prior review — not blocking. LGTM.

@Evanev7
Copy link
Copy Markdown
Member

Evanev7 commented May 25, 2026

sorry is this a concrete issue you've actually run into?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants