diff --git a/adapters/memstreamer/memstreamer.go b/adapters/memstreamer/memstreamer.go index cdeaf97..e20105a 100644 --- a/adapters/memstreamer/memstreamer.go +++ b/adapters/memstreamer/memstreamer.go @@ -22,11 +22,13 @@ func New(opts ...Option) *StreamConstructor { option(&opt) } + mu := &sync.Mutex{} return &StreamConstructor{ opts: &opt, stream: &Stream{ - mu: &sync.Mutex{}, - log: &log, + mu: mu, + cond: sync.NewCond(mu), + log: &log, }, cursorStore: newCursorStore(), } @@ -56,6 +58,7 @@ func (s StreamConstructor) NewSender(ctx context.Context, topic string) (workflo return &Stream{ mu: s.stream.mu, + cond: s.stream.cond, log: s.stream.log, topic: topic, clock: s.opts.clock, @@ -85,6 +88,7 @@ func (s StreamConstructor) NewReceiver( return &Stream{ mu: s.stream.mu, + cond: s.stream.cond, log: s.stream.log, cursorStore: s.cursorStore, topic: topic, @@ -98,6 +102,7 @@ var _ workflow.EventStreamer = (*StreamConstructor)(nil) type Stream struct { mu *sync.Mutex + cond *sync.Cond log *[]*workflow.Event cursorStore *cursorStore topic string @@ -119,35 +124,65 @@ func (s *Stream) Send(ctx context.Context, foreignID string, statusType int, hea CreatedAt: s.clock.Now(), }) + // Wake any receivers parked on cond.Wait so they can pick up the new event. + s.cond.Broadcast() + return nil } func (s *Stream) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) { - for ctx.Err() == nil { - s.mu.Lock() - log := *s.log - s.mu.Unlock() + // sync.Cond doesn't natively respect ctx. Spawn a watcher that + // broadcasts when the ctx is done so Recv can wake up and return + // ctx.Err() instead of leaking the goroutine forever. The stop channel + // is closed on return so the watcher exits when Recv exits (no leak per + // call). + stop := make(chan struct{}) + defer close(stop) + go func() { + select { + case <-ctx.Done(): + s.mu.Lock() + s.cond.Broadcast() + s.mu.Unlock() + case <-stop: + } + }() + + s.mu.Lock() + defer s.mu.Unlock() + for { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + + log := *s.log cursorOffset := s.cursorStore.Get(s.name) + if len(log)-1 < cursorOffset { + // Nothing to deliver — park until Send or ctx cancel signals us. + s.cond.Wait() continue } e := log[cursorOffset] - // Skip events that are not related to this topic + // Skip events that are not related to this topic. Advance the cursor + // past the skipped event so we don't see it again. if s.topic != e.Headers[workflow.HeaderTopic] { s.cursorStore.Set(s.name, cursorOffset+1) continue } + // The ack closure captures cursorOffset so a double-call is a no-op + // (it would set the cursor to the same value). The cursorStore's + // own mutex protects the cursor map; that's separate from s.mu so + // we don't need to hold s.mu here. return e, func() error { s.cursorStore.Set(s.name, cursorOffset+1) return nil }, nil } - - return nil, nil, ctx.Err() } func (s *Stream) Close() error { diff --git a/adapters/memstreamer/memstreamer_test.go b/adapters/memstreamer/memstreamer_test.go index d242d74..fcca50f 100644 --- a/adapters/memstreamer/memstreamer_test.go +++ b/adapters/memstreamer/memstreamer_test.go @@ -1,13 +1,19 @@ package memstreamer_test import ( + "context" + "runtime" + "strings" "testing" + "time" "github.com/luno/workflow" "github.com/luno/workflow/adapters/adaptertest" "github.com/luno/workflow/adapters/memstreamer" ) +const testTopic = "test-topic" + func TestStreamer(t *testing.T) { adaptertest.RunEventStreamerTest(t, func() workflow.EventStreamer { return memstreamer.New() @@ -19,3 +25,165 @@ func TestConnector(t *testing.T) { return memstreamer.NewConnector(seedEvents) }) } + +// TestRecv_DoesNotBusySpin verifies the headline fix: with several receivers +// parked and no senders, the goroutine count stays flat AND the receivers +// are sleeping on sync.Cond rather than spinning runnable. The busy-loop +// implementation kept goroutine count stable too, so the test also inspects +// goroutine stacks to confirm they are parked on cond.Wait (semaphore wait) +// rather than in the middle of the Recv for-loop. +func TestRecv_DoesNotBusySpin(t *testing.T) { + s := memstreamer.New() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const consumers = 8 + for i := 0; i < consumers; i++ { + name := "consumer-" + string(rune('a'+i)) + rec, err := s.NewReceiver(ctx, testTopic, name) + if err != nil { + t.Fatalf("NewReceiver: %v", err) + } + go func() { + for { + _, _, err := rec.Recv(ctx) + if err != nil { + return + } + } + }() + } + + // Let consumers settle into cond.Wait. + time.Sleep(100 * time.Millisecond) + + before := runtime.NumGoroutine() + time.Sleep(200 * time.Millisecond) + after := runtime.NumGoroutine() + + // Each Recv call internally spawns a ctx-watcher goroutine that lives + // for the duration of the Recv call. With no Sends arriving the + // receivers stay parked and goroutine count should be stable. + if after > before+2 { // +2 wiggle for the test runtime itself + t.Errorf("goroutine count grew while idle: before=%d after=%d", before, after) + } + + // Verify receivers are parked, not busy-spinning. Dump all goroutine + // stacks and assert that there are at least `consumers` goroutines + // suspended on memstreamer.(*Stream).Recv via sync.Cond.Wait / + // semaphore wait. Under the old busy-loop the Recv goroutines would + // be in a runnable for-loop and would NOT show up as waiting in a + // sync primitive inside Recv. + buf := make([]byte, 1<<20) + n := runtime.Stack(buf, true) + stacks := string(buf[:n]) + parked := strings.Count(stacks, "memstreamer.(*Stream).Recv") + // Each Recv goroutine appears once; we expect at least `consumers` + // stacks pointing into Recv, all in a wait state (sync.runtime_*). + if parked < consumers { + t.Errorf("expected %d goroutines parked in Recv, found %d in stacks", consumers, parked) + } + if !strings.Contains(stacks, "sync.runtime_notifyListWait") && + !strings.Contains(stacks, "sync.(*Cond).Wait") { + t.Errorf("no goroutines parked on sync.Cond.Wait — Recv may be busy-spinning. Stacks:\n%s", stacks) + } +} + +// TestRecv_WakesOnSend asserts that a parked Recv returns promptly after a +// Send arrives. With the old busy-loop implementation the test still passed +// (because the loop polled the log), but it would also burn CPU for the +// 50ms park-window. With cond.Broadcast the wake is event-driven. +func TestRecv_WakesOnSend(t *testing.T) { + s := memstreamer.New() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rec, err := s.NewReceiver(ctx, testTopic, "wake-on-send") + if err != nil { + t.Fatalf("NewReceiver: %v", err) + } + t.Cleanup(func() { _ = rec.Close() }) + + got := make(chan *workflow.Event, 1) + go func() { + e, ack, err := rec.Recv(ctx) + if err != nil { + return + } + _ = ack() + got <- e + }() + + // Give the Recv goroutine time to enter cond.Wait. + time.Sleep(50 * time.Millisecond) + + send, err := s.NewSender(ctx, testTopic) + if err != nil { + t.Fatalf("NewSender: %v", err) + } + t.Cleanup(func() { _ = send.Close() }) + + sendAt := time.Now() + if err := send.Send(ctx, "fid-1", 42, map[workflow.Header]string{ + workflow.HeaderTopic: testTopic, + }); err != nil { + t.Fatalf("Send: %v", err) + } + + select { + case e := <-got: + if elapsed := time.Since(sendAt); elapsed > 100*time.Millisecond { + t.Errorf("Recv took %v to wake after Send (want <100ms)", elapsed) + } + if e.ForeignID != "fid-1" { + t.Errorf("ForeignID: want fid-1, got %q", e.ForeignID) + } + case <-time.After(time.Second): + t.Fatalf("Recv did not unblock within 1s of Send") + } +} + +// TestRecv_WakesOnCtxCancel verifies that ctx cancellation wakes a parked +// Recv promptly and that the ctx-watcher goroutine does not leak per call. +func TestRecv_WakesOnCtxCancel(t *testing.T) { + s := memstreamer.New() + ctx, cancel := context.WithCancel(context.Background()) + + rec, err := s.NewReceiver(ctx, testTopic, "wake-on-cancel") + if err != nil { + t.Fatalf("NewReceiver: %v", err) + } + t.Cleanup(func() { _ = rec.Close() }) + + done := make(chan error, 1) + go func() { + _, _, err := rec.Recv(ctx) + done <- err + }() + + // Let it park. + time.Sleep(20 * time.Millisecond) + + before := runtime.NumGoroutine() + cancelAt := time.Now() + cancel() + + select { + case err := <-done: + if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond { + t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed) + } + if err == nil { + t.Errorf("Recv should return ctx.Err(), got nil") + } + case <-time.After(500 * time.Millisecond): + t.Fatalf("Recv did not unblock on ctx cancel within 500ms") + } + + // Give the watcher goroutine a moment to exit after Recv returns. + time.Sleep(50 * time.Millisecond) + after := runtime.NumGoroutine() + if after > before { + t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after) + } +}