fix(api): eliminate token-drop race in _send_text_generation_with_images#2044
Open
Drifter4242 wants to merge 2 commits into
Open
fix(api): eliminate token-drop race in _send_text_generation_with_images#2044Drifter4242 wants to merge 2 commits into
Drifter4242 wants to merge 2 commits into
Conversation
## Problem
The token queue was not registered until _token_chunk_stream's inner
generator was first iterated by the HTTP consumer. On a fast worker or
loaded event loop, ChunkGenerated events could be dispatched before the
queue existed and were silently dropped.
## Fix
Register the token queue in _send_text_generation_with_images --
synchronously, before the await self._send(command) call that dispatches
the command to workers. This guarantees the queue is live before any
worker can produce tokens.
_send_text_generation_with_images:
self._text_generation_queues[command.command_id], recv = channel[...]()
token_stream = self._token_chunk_stream(command.command_id, recv)
await self._send(command) # queue is already registered
_token_chunk_stream is now a plain async def generator that receives the
pre-registered recv channel as a parameter. The outer sync wrapper and
nested _stream() function are gone -- the diff is straightforward.
All six API endpoints are updated: chat_completions,
bench_chat_completions, claude_messages, openai_responses,
ollama_chat, ollama_generate.
4f4ea70 to
1230141
Compare
Member
|
i think this issue is legitimate but im currently rewriting a significant portion of our networking to avoid this exact issue at a deeper level so im going to hold off on it for now - thanks for the submission though, i've never seen this particular combination of latency before |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
I've been experimenting and testing Kimi K2.6 on 2 M3 Mac Ultras using exo and ran into an issue. Sonnet identified it as a race condition between queue creation and the send function that uses the queue. The send can happen before the queue creation. I agree with Sonnet's assessment and worked with it to refine the code.
The rest is written by Sonnet (reviewed by myself):
The token queue was not registered until _token_chunk_stream's inner generator was first iterated by the HTTP consumer. On a fast worker or loaded event loop, ChunkGenerated events could be dispatched before the queue existed and were silently dropped.
Fix
Register the token queue in _send_text_generation_with_images -- synchronously, before the await self._send(command) call that dispatches the command to workers. This guarantees the queue is live before any worker can produce tokens.
_send_text_generation_with_images:
self._text_generation_queues[command.command_id], recv = channel...
token_stream = self._token_chunk_stream(command.command_id, recv)
await self._send(command) # queue is already registered
_token_chunk_stream is now a plain async def generator that receives the pre-registered recv channel as a parameter. The outer sync wrapper and nested _stream() function are gone -- the diff is straightforward.
All six API endpoints are updated: chat_completions, bench_chat_completions, claude_messages, openai_responses, ollama_chat, ollama_generate.
Motivation
_token_chunk_streampreviously created and registered the token queue insidethe generator body, which only runs when the HTTP response handler begins
iterating it. On fast hardware or a loaded event loop, the worker can dispatch
ChunkGeneratedevents — including the final stop signal — before the HTTPhandler has scheduled its first iteration. Those events are dropped silently,
causing the streaming response to hang indefinitely.
Changes
_send_text_generation_with_images: create the channel and registerself._text_generation_queues[command.command_id]beforeawait self._send(command), so the queue is live before any worker canproduce tokens.
_token_chunk_streamis now a plainasync defgenerator that receives thepre-registered
recvchannel as a parameter. The outer sync wrapper andnested
_stream()function are removed.chat_completions,bench_chat_completions,claude_messages,openai_responses,ollama_chat,ollama_generate.Why It Works
By registering the queue synchronously before
await self._send(command),there is no window between dispatch and registration. Any
ChunkGeneratedevent fired by a worker lands in an already-live queue, regardless of when the
HTTP consumer starts iterating.
Test Plan
Manual Testing
Hardware: 2× Mac Studio M3 Ultra 512 GB, connected via Thunderbolt 5
(direct bridge), running
MlxJacclRDMA tensor-parallel inference(
moonshotai/Kimi-K2.6, 595 GB INT4, 61 layers split across both nodes).chat/completionsrequest via curl; confirmed full SSE tokenstream delivered without hang or truncation.
is the most likely to trigger the race, as workers dispatch tokens faster
than a typical event loop can schedule the HTTP consumer's first iteration.
Automated Testing
All existing tests pass:
pytest src -m "not slow" --import-mode=importlib— 422/422 passed. No new tests added; the fix is a structural refactor of the
registration ordering, not a behaviour change under normal conditions.