fix(memstreamer): park Recv on sync.Cond instead of busy-spin#196
fix(memstreamer): park Recv on sync.Cond instead of busy-spin#196andrewwormald wants to merge 1 commit into
Conversation
The previous Recv implementation polled the log in a tight for-loop with no backoff: each iteration acquired and released s.mu and cursorStore.mu. With N step consumers per workflow, all N goroutines stayed runnable on those two mutexes even when the log was idle, pinning hardware at high CPU (observed ~380% on Apple Silicon for hours after a single Run had already reached a terminal status). This change adds a sync.Cond keyed off the existing shared mutex. Recv now blocks on cond.Wait when there is nothing to deliver and is woken by cond.Broadcast on Send. A short-lived watcher goroutine per Recv call broadcasts on ctx.Done so cancellation still unblocks parked receivers; it is cleaned up via a stop channel on Recv return, so there is no per-call goroutine leak. Semantics preserved: topic filtering, per-receiver cursor advancement via the existing cursorStore, StreamFromLatest receiver option, and the idempotent ack closure that advances the cursor exactly once. Adds three regression tests: - TestRecv_DoesNotBusySpin (8 parked receivers, asserts stable goroutine count and that stacks show goroutines parked on sync.Cond.Wait rather than spinning in the Recv for-loop) - TestRecv_WakesOnSend (parked Recv returns within 100ms of Send) - TestRecv_WakesOnCtxCancel (parked Recv returns ctx.Err() within 100ms of cancel and the watcher goroutine does not leak) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthrough
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
adapters/memstreamer/memstreamer.go (1)
139-149: 🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick winStart the context watcher only when
Recvis about to park.Right now every
Recvcall spawns a goroutine, even when an event is immediately available. Moving watcher creation behind thecond.Wait()path keeps the hot path goroutine-free while preserving cancellation wake-ups.Suggested local refactor
- stop := make(chan struct{}) - defer close(stop) - go func() { - select { - case <-ctx.Done(): - s.mu.Lock() - s.cond.Broadcast() - s.mu.Unlock() - case <-stop: - } - }() + var stop chan struct{} + defer func() { + if stop != nil { + close(stop) + } + }() + startCtxWatcher := func() { + if stop != nil { + return + } + stop = make(chan struct{}) + go func(stop <-chan struct{}) { + select { + case <-ctx.Done(): + s.mu.Lock() + s.cond.Broadcast() + s.mu.Unlock() + case <-stop: + } + }(stop) + }Then call
startCtxWatcher()immediately befores.cond.Wait().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@adapters/memstreamer/memstreamer.go` around lines 139 - 149, The Recv path is spawning a context-watcher goroutine on every call, even when an item is already available. Move the watcher setup behind the blocking path so it only starts when Recv is about to call cond.Wait(), and keep the cancellation wake-up behavior by extracting that logic into a helper such as startCtxWatcher that is invoked immediately before waiting.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@adapters/memstreamer/memstreamer_test.go`:
- Around line 167-187: The goroutine leak assertion in the Recv cancel test is
taking the baseline too late, after the Recv goroutine and its context watcher
are already running. Move the runtime.NumGoroutine baseline capture in
memstreamer_test.go so it happens before starting the goroutine that calls Recv,
then keep the existing cancel/wait/after check around done to ensure a leaked
watcher is counted correctly.
- Around line 35-55: Ensure TestRecv_DoesNotBusySpin waits for all receiver
goroutines to finish before exiting, since the current deferred cancel can leave
Recv loops from memstreamer.NewReceiver still unwinding when later tests inspect
runtime.NumGoroutine(). Add synchronization in the test (for example around the
goroutines launched from the Recv loop) so the test does not return until every
consumer has observed cancellation and exited cleanly.
---
Nitpick comments:
In `@adapters/memstreamer/memstreamer.go`:
- Around line 139-149: The Recv path is spawning a context-watcher goroutine on
every call, even when an item is already available. Move the watcher setup
behind the blocking path so it only starts when Recv is about to call
cond.Wait(), and keep the cancellation wake-up behavior by extracting that logic
into a helper such as startCtxWatcher that is invoked immediately before
waiting.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 31456de6-c40a-4b56-80a8-9a883a25f46d
📒 Files selected for processing (2)
adapters/memstreamer/memstreamer.goadapters/memstreamer/memstreamer_test.go
| func TestRecv_DoesNotBusySpin(t *testing.T) { | ||
| s := memstreamer.New() | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| const consumers = 8 | ||
| for i := 0; i < consumers; i++ { | ||
| name := "consumer-" + string(rune('a'+i)) | ||
| rec, err := s.NewReceiver(ctx, testTopic, name) | ||
| if err != nil { | ||
| t.Fatalf("NewReceiver: %v", err) | ||
| } | ||
| go func() { | ||
| for { | ||
| _, _, err := rec.Recv(ctx) | ||
| if err != nil { | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
Wait for the receiver goroutines before the test exits.
This test uses process-wide goroutine counts, but it cancels via defer without joining the goroutines it started. Those goroutines can still be unwinding when the next test samples runtime.NumGoroutine().
Suggested cleanup
import (
"context"
"runtime"
"strings"
+ "sync"
"testing"
"time"
@@
s := memstreamer.New()
ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ var wg sync.WaitGroup
+ t.Cleanup(func() {
+ cancel()
+ wg.Wait()
+ })
@@
if err != nil {
t.Fatalf("NewReceiver: %v", err)
}
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
_, _, err := rec.Recv(ctx)
if err != nil {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestRecv_DoesNotBusySpin(t *testing.T) { | |
| s := memstreamer.New() | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| defer cancel() | |
| const consumers = 8 | |
| for i := 0; i < consumers; i++ { | |
| name := "consumer-" + string(rune('a'+i)) | |
| rec, err := s.NewReceiver(ctx, testTopic, name) | |
| if err != nil { | |
| t.Fatalf("NewReceiver: %v", err) | |
| } | |
| go func() { | |
| for { | |
| _, _, err := rec.Recv(ctx) | |
| if err != nil { | |
| return | |
| } | |
| } | |
| }() | |
| } | |
| func TestRecv_DoesNotBusySpin(t *testing.T) { | |
| s := memstreamer.New() | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| var wg sync.WaitGroup | |
| t.Cleanup(func() { | |
| cancel() | |
| wg.Wait() | |
| }) | |
| const consumers = 8 | |
| for i := 0; i < consumers; i++ { | |
| name := "consumer-" + string(rune('a'+i)) | |
| rec, err := s.NewReceiver(ctx, testTopic, name) | |
| if err != nil { | |
| t.Fatalf("NewReceiver: %v", err) | |
| } | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| for { | |
| _, _, err := rec.Recv(ctx) | |
| if err != nil { | |
| return | |
| } | |
| } | |
| }() | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@adapters/memstreamer/memstreamer_test.go` around lines 35 - 55, Ensure
TestRecv_DoesNotBusySpin waits for all receiver goroutines to finish before
exiting, since the current deferred cancel can leave Recv loops from
memstreamer.NewReceiver still unwinding when later tests inspect
runtime.NumGoroutine(). Add synchronization in the test (for example around the
goroutines launched from the Recv loop) so the test does not return until every
consumer has observed cancellation and exited cleanly.
| before := runtime.NumGoroutine() | ||
| cancelAt := time.Now() | ||
| cancel() | ||
|
|
||
| select { | ||
| case err := <-done: | ||
| if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond { | ||
| t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed) | ||
| } | ||
| if err == nil { | ||
| t.Errorf("Recv should return ctx.Err(), got nil") | ||
| } | ||
| case <-time.After(500 * time.Millisecond): | ||
| t.Fatalf("Recv did not unblock on ctx cancel within 500ms") | ||
| } | ||
|
|
||
| // Give the watcher goroutine a moment to exit after Recv returns. | ||
| time.Sleep(50 * time.Millisecond) | ||
| after := runtime.NumGoroutine() | ||
| if after > before { | ||
| t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win
Take the leak baseline before starting Recv.
before is captured while both the Recv goroutine and its context watcher are already alive, so a leaked watcher can still leave after <= before once only the Recv goroutine exits.
Suggested assertion fix
+ baseline := runtime.NumGoroutine()
done := make(chan error, 1)
go func() {
_, _, err := rec.Recv(ctx)
done <- err
}()
@@
- before := runtime.NumGoroutine()
cancelAt := time.Now()
cancel()
@@
time.Sleep(50 * time.Millisecond)
after := runtime.NumGoroutine()
- if after > before {
- t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after)
+ if after > baseline {
+ t.Errorf("goroutine leak after ctx cancel: baseline=%d after=%d", baseline, after)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| before := runtime.NumGoroutine() | |
| cancelAt := time.Now() | |
| cancel() | |
| select { | |
| case err := <-done: | |
| if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond { | |
| t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed) | |
| } | |
| if err == nil { | |
| t.Errorf("Recv should return ctx.Err(), got nil") | |
| } | |
| case <-time.After(500 * time.Millisecond): | |
| t.Fatalf("Recv did not unblock on ctx cancel within 500ms") | |
| } | |
| // Give the watcher goroutine a moment to exit after Recv returns. | |
| time.Sleep(50 * time.Millisecond) | |
| after := runtime.NumGoroutine() | |
| if after > before { | |
| t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after) | |
| baseline := runtime.NumGoroutine() | |
| done := make(chan error, 1) | |
| go func() { | |
| _, _, err := rec.Recv(ctx) | |
| done <- err | |
| }() | |
| cancelAt := time.Now() | |
| cancel() | |
| select { | |
| case err := <-done: | |
| if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond { | |
| t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed) | |
| } | |
| if err == nil { | |
| t.Errorf("Recv should return ctx.Err(), got nil") | |
| } | |
| case <-time.After(500 * time.Millisecond): | |
| t.Fatalf("Recv did not unblock on ctx cancel within 500ms") | |
| } | |
| // Give the watcher goroutine a moment to exit after Recv returns. | |
| time.Sleep(50 * time.Millisecond) | |
| after := runtime.NumGoroutine() | |
| if after > baseline { | |
| t.Errorf("goroutine leak after ctx cancel: baseline=%d after=%d", baseline, after) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@adapters/memstreamer/memstreamer_test.go` around lines 167 - 187, The
goroutine leak assertion in the Recv cancel test is taking the baseline too
late, after the Recv goroutine and its context watcher are already running. Move
the runtime.NumGoroutine baseline capture in memstreamer_test.go so it happens
before starting the goroutine that calls Recv, then keep the existing
cancel/wait/after check around done to ensure a leaked watcher is counted
correctly.
Summary
adapters/memstreamer/(*Stream).Recvpolls the in-memory log in an unconditionalfor { ... continue }loop with no backoff. Each iteration acquires and releasess.muandcursorStore.mu. With N step consumers per workflow, every iteration of the runtime scheduler keeps all N goroutines runnable on those two mutexes — even when there is nothing to deliver.In a downstream daemon (everflow) using
memstreameras the productionEventStreamer(durability already provided by a sqlite-backedRecordStore+ transactional outbox), this pinned an Apple Silicon laptop at ~380% CPU for 15+ hours, even after the single in-flight Run had already reached a terminal status and the event log was no longer growing. A SIGQUIT goroutine dump showed all step consumers inrunnablestate inside(*cursorStore).Get → Mutex.lockSlow.This PR replaces the busy loop with
sync.Condparking:Recvblocks oncond.Waitwhen there's nothing to deliver and is woken bycond.BroadcastonSend.What changed
adapters/memstreamer/memstreamer.gocond *sync.Condto the sharedStreamstruct, constructed once inNew()keyed off the existing shareds.mu. Sender + receiverStreams share the same cond pointer (same wiring pattern asmuandlog).Sendcallss.cond.Broadcast()while still holdings.mu— the natural place after appending.Recvholdss.muacross the for-loop; when the log is exhausted it callss.cond.Wait()instead ofcontinue. Topic-skip still advances the cursor and loops without parking.<-ctx.Done()vs<-stop; on ctx done it lockss.mu, broadcasts, unlocks.defer close(stop)ensures the watcher exits whenRecvreturns — no per-call leak.adapters/memstreamer/memstreamer_test.go— three new regression tests:TestRecv_DoesNotBusySpin— 8 parked receivers; assertsruntime.NumGoroutine()stable over 200 ms AND dumpsruntime.Stackto assert at least 8 stacks point intomemstreamer.(*Stream).Recvand at least one is insync.runtime_notifyListWait/sync.(*Cond).Wait. The stack-shape assertion is what actually distinguishes spin from park.TestRecv_WakesOnSend— parkedRecvreturns within 100 ms of aSend.TestRecv_WakesOnCtxCancel— parkedRecvreturnsctx.Err()within 100 ms of cancel;NumGoroutinedoes not grow after cancel (no watcher leak).Semantics preserved
StreamConstructor.NewSender,NewReceiver,Send,Recv,Close, the ack closure, and theWithClockoption are all identical. No new exported symbols or options.cursorStore,StreamFromLatestoption, and the idempotent ack-closure-advances-cursor-exactly-once semantics all behave the same.Recvtakess.mu(across the wait); the ack closure only touchescursorStore.mu;Sendtakess.mu(briefly) and broadcasts.Verification
go test ./adapters/memstreamer/... -count=1 -race:go test ./... -count=1 -race— all packages green (~23s), including the workflow consumer paths that exercise the adapter.I verified that the new tests fail against the original busy-loop implementation by stashing the fix and re-running:
TestRecv_DoesNotBusySpindumped goroutines stuck ininternal/sync.(*Mutex).lockSlowin therunnablestate insideRecv(matching exactly the SIGQUIT dump described above) before failing.Behavioural notes
cond.Broadcast()wakes all parked receivers; they then race fors.mu. The Go runtime does not guarantee FIFO order onsync.Mutex.Lock. The previous busy-loop had no fairness either (all goroutines hammered the mutex), so this is no worse — but consumers should not depend on a particular wake order. For per-receiver cursor independence (multiplenames, same topic) this is fine because each receiver has its own cursor incursorStore.Recvinvocation now spawns one ephemeral goroutine. Workflow consumers typically callRecvin a tight loop (one event at a time), so this is one short-lived goroutine per delivered event — bounded by event throughput, cleaned up viadefer close(stop). Negligible compared to the CPU saved.s.cond.Wait()releasess.muwhile parked and reacquires on wake.Sendis not blocked by parked receivers.Test plan
go test ./adapters/memstreamer/... -racego test ./... -raceRecvcallers on the same topic🤖 Generated with Claude Code
Summary by CodeRabbit