Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions adapters/memstreamer/memstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func New(opts ...Option) *StreamConstructor {
option(&opt)
}

mu := &sync.Mutex{}
return &StreamConstructor{
opts: &opt,
stream: &Stream{
mu: &sync.Mutex{},
log: &log,
mu: mu,
cond: sync.NewCond(mu),
log: &log,
},
cursorStore: newCursorStore(),
}
Expand Down Expand Up @@ -56,6 +58,7 @@ func (s StreamConstructor) NewSender(ctx context.Context, topic string) (workflo

return &Stream{
mu: s.stream.mu,
cond: s.stream.cond,
log: s.stream.log,
topic: topic,
clock: s.opts.clock,
Expand Down Expand Up @@ -85,6 +88,7 @@ func (s StreamConstructor) NewReceiver(

return &Stream{
mu: s.stream.mu,
cond: s.stream.cond,
log: s.stream.log,
cursorStore: s.cursorStore,
topic: topic,
Expand All @@ -98,6 +102,7 @@ var _ workflow.EventStreamer = (*StreamConstructor)(nil)

type Stream struct {
mu *sync.Mutex
cond *sync.Cond
log *[]*workflow.Event
cursorStore *cursorStore
topic string
Expand All @@ -119,35 +124,65 @@ func (s *Stream) Send(ctx context.Context, foreignID string, statusType int, hea
CreatedAt: s.clock.Now(),
})

// Wake any receivers parked on cond.Wait so they can pick up the new event.
s.cond.Broadcast()

return nil
}

func (s *Stream) Recv(ctx context.Context) (*workflow.Event, workflow.Ack, error) {
for ctx.Err() == nil {
s.mu.Lock()
log := *s.log
s.mu.Unlock()
// sync.Cond doesn't natively respect ctx. Spawn a watcher that
// broadcasts when the ctx is done so Recv can wake up and return
// ctx.Err() instead of leaking the goroutine forever. The stop channel
// is closed on return so the watcher exits when Recv exits (no leak per
// call).
stop := make(chan struct{})
defer close(stop)
go func() {
select {
case <-ctx.Done():
s.mu.Lock()
s.cond.Broadcast()
s.mu.Unlock()
case <-stop:
}
}()

s.mu.Lock()
defer s.mu.Unlock()

for {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

log := *s.log
cursorOffset := s.cursorStore.Get(s.name)

if len(log)-1 < cursorOffset {
// Nothing to deliver — park until Send or ctx cancel signals us.
s.cond.Wait()
continue
}

e := log[cursorOffset]

// Skip events that are not related to this topic
// Skip events that are not related to this topic. Advance the cursor
// past the skipped event so we don't see it again.
if s.topic != e.Headers[workflow.HeaderTopic] {
s.cursorStore.Set(s.name, cursorOffset+1)
continue
}

// The ack closure captures cursorOffset so a double-call is a no-op
// (it would set the cursor to the same value). The cursorStore's
// own mutex protects the cursor map; that's separate from s.mu so
// we don't need to hold s.mu here.
return e, func() error {
s.cursorStore.Set(s.name, cursorOffset+1)
return nil
}, nil
}

return nil, nil, ctx.Err()
}

func (s *Stream) Close() error {
Expand Down
168 changes: 168 additions & 0 deletions adapters/memstreamer/memstreamer_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package memstreamer_test

import (
"context"
"runtime"
"strings"
"testing"
"time"

"github.com/luno/workflow"
"github.com/luno/workflow/adapters/adaptertest"
"github.com/luno/workflow/adapters/memstreamer"
)

const testTopic = "test-topic"

func TestStreamer(t *testing.T) {
adaptertest.RunEventStreamerTest(t, func() workflow.EventStreamer {
return memstreamer.New()
Expand All @@ -19,3 +25,165 @@ func TestConnector(t *testing.T) {
return memstreamer.NewConnector(seedEvents)
})
}

// TestRecv_DoesNotBusySpin verifies the headline fix: with several receivers
// parked and no senders, the goroutine count stays flat AND the receivers
// are sleeping on sync.Cond rather than spinning runnable. The busy-loop
// implementation kept goroutine count stable too, so the test also inspects
// goroutine stacks to confirm they are parked on cond.Wait (semaphore wait)
// rather than in the middle of the Recv for-loop.
func TestRecv_DoesNotBusySpin(t *testing.T) {
s := memstreamer.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const consumers = 8
for i := 0; i < consumers; i++ {
name := "consumer-" + string(rune('a'+i))
rec, err := s.NewReceiver(ctx, testTopic, name)
if err != nil {
t.Fatalf("NewReceiver: %v", err)
}
go func() {
for {
_, _, err := rec.Recv(ctx)
if err != nil {
return
}
}
}()
}
Comment on lines +35 to +55

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win

Wait for the receiver goroutines before the test exits.

This test uses process-wide goroutine counts, but it cancels via defer without joining the goroutines it started. Those goroutines can still be unwinding when the next test samples runtime.NumGoroutine().

Suggested cleanup
 import (
 	"context"
 	"runtime"
 	"strings"
+	"sync"
 	"testing"
 	"time"
@@
 	s := memstreamer.New()
 	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
+	var wg sync.WaitGroup
+	t.Cleanup(func() {
+		cancel()
+		wg.Wait()
+	})
@@
 		if err != nil {
 			t.Fatalf("NewReceiver: %v", err)
 		}
+		wg.Add(1)
 		go func() {
+			defer wg.Done()
 			for {
 				_, _, err := rec.Recv(ctx)
 				if err != nil {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestRecv_DoesNotBusySpin(t *testing.T) {
s := memstreamer.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const consumers = 8
for i := 0; i < consumers; i++ {
name := "consumer-" + string(rune('a'+i))
rec, err := s.NewReceiver(ctx, testTopic, name)
if err != nil {
t.Fatalf("NewReceiver: %v", err)
}
go func() {
for {
_, _, err := rec.Recv(ctx)
if err != nil {
return
}
}
}()
}
func TestRecv_DoesNotBusySpin(t *testing.T) {
s := memstreamer.New()
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
t.Cleanup(func() {
cancel()
wg.Wait()
})
const consumers = 8
for i := 0; i < consumers; i++ {
name := "consumer-" + string(rune('a'+i))
rec, err := s.NewReceiver(ctx, testTopic, name)
if err != nil {
t.Fatalf("NewReceiver: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
for {
_, _, err := rec.Recv(ctx)
if err != nil {
return
}
}
}()
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@adapters/memstreamer/memstreamer_test.go` around lines 35 - 55, Ensure
TestRecv_DoesNotBusySpin waits for all receiver goroutines to finish before
exiting, since the current deferred cancel can leave Recv loops from
memstreamer.NewReceiver still unwinding when later tests inspect
runtime.NumGoroutine(). Add synchronization in the test (for example around the
goroutines launched from the Recv loop) so the test does not return until every
consumer has observed cancellation and exited cleanly.


// Let consumers settle into cond.Wait.
time.Sleep(100 * time.Millisecond)

before := runtime.NumGoroutine()
time.Sleep(200 * time.Millisecond)
after := runtime.NumGoroutine()

// Each Recv call internally spawns a ctx-watcher goroutine that lives
// for the duration of the Recv call. With no Sends arriving the
// receivers stay parked and goroutine count should be stable.
if after > before+2 { // +2 wiggle for the test runtime itself
t.Errorf("goroutine count grew while idle: before=%d after=%d", before, after)
}

// Verify receivers are parked, not busy-spinning. Dump all goroutine
// stacks and assert that there are at least `consumers` goroutines
// suspended on memstreamer.(*Stream).Recv via sync.Cond.Wait /
// semaphore wait. Under the old busy-loop the Recv goroutines would
// be in a runnable for-loop and would NOT show up as waiting in a
// sync primitive inside Recv.
buf := make([]byte, 1<<20)
n := runtime.Stack(buf, true)
stacks := string(buf[:n])
parked := strings.Count(stacks, "memstreamer.(*Stream).Recv")
// Each Recv goroutine appears once; we expect at least `consumers`
// stacks pointing into Recv, all in a wait state (sync.runtime_*).
if parked < consumers {
t.Errorf("expected %d goroutines parked in Recv, found %d in stacks", consumers, parked)
}
if !strings.Contains(stacks, "sync.runtime_notifyListWait") &&
!strings.Contains(stacks, "sync.(*Cond).Wait") {
t.Errorf("no goroutines parked on sync.Cond.Wait — Recv may be busy-spinning. Stacks:\n%s", stacks)
}
}

// TestRecv_WakesOnSend asserts that a parked Recv returns promptly after a
// Send arrives. With the old busy-loop implementation the test still passed
// (because the loop polled the log), but it would also burn CPU for the
// 50ms park-window. With cond.Broadcast the wake is event-driven.
func TestRecv_WakesOnSend(t *testing.T) {
s := memstreamer.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rec, err := s.NewReceiver(ctx, testTopic, "wake-on-send")
if err != nil {
t.Fatalf("NewReceiver: %v", err)
}
t.Cleanup(func() { _ = rec.Close() })

got := make(chan *workflow.Event, 1)
go func() {
e, ack, err := rec.Recv(ctx)
if err != nil {
return
}
_ = ack()
got <- e
}()

// Give the Recv goroutine time to enter cond.Wait.
time.Sleep(50 * time.Millisecond)

send, err := s.NewSender(ctx, testTopic)
if err != nil {
t.Fatalf("NewSender: %v", err)
}
t.Cleanup(func() { _ = send.Close() })

sendAt := time.Now()
if err := send.Send(ctx, "fid-1", 42, map[workflow.Header]string{
workflow.HeaderTopic: testTopic,
}); err != nil {
t.Fatalf("Send: %v", err)
}

select {
case e := <-got:
if elapsed := time.Since(sendAt); elapsed > 100*time.Millisecond {
t.Errorf("Recv took %v to wake after Send (want <100ms)", elapsed)
}
if e.ForeignID != "fid-1" {
t.Errorf("ForeignID: want fid-1, got %q", e.ForeignID)
}
case <-time.After(time.Second):
t.Fatalf("Recv did not unblock within 1s of Send")
}
}

// TestRecv_WakesOnCtxCancel verifies that ctx cancellation wakes a parked
// Recv promptly and that the ctx-watcher goroutine does not leak per call.
func TestRecv_WakesOnCtxCancel(t *testing.T) {
s := memstreamer.New()
ctx, cancel := context.WithCancel(context.Background())

rec, err := s.NewReceiver(ctx, testTopic, "wake-on-cancel")
if err != nil {
t.Fatalf("NewReceiver: %v", err)
}
t.Cleanup(func() { _ = rec.Close() })

done := make(chan error, 1)
go func() {
_, _, err := rec.Recv(ctx)
done <- err
}()

// Let it park.
time.Sleep(20 * time.Millisecond)

before := runtime.NumGoroutine()
cancelAt := time.Now()
cancel()

select {
case err := <-done:
if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond {
t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed)
}
if err == nil {
t.Errorf("Recv should return ctx.Err(), got nil")
}
case <-time.After(500 * time.Millisecond):
t.Fatalf("Recv did not unblock on ctx cancel within 500ms")
}

// Give the watcher goroutine a moment to exit after Recv returns.
time.Sleep(50 * time.Millisecond)
after := runtime.NumGoroutine()
if after > before {
t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after)
Comment on lines +167 to +187

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win

Take the leak baseline before starting Recv.

before is captured while both the Recv goroutine and its context watcher are already alive, so a leaked watcher can still leave after <= before once only the Recv goroutine exits.

Suggested assertion fix
+	baseline := runtime.NumGoroutine()
 	done := make(chan error, 1)
 	go func() {
 		_, _, err := rec.Recv(ctx)
 		done <- err
 	}()
@@
-	before := runtime.NumGoroutine()
 	cancelAt := time.Now()
 	cancel()
@@
 	time.Sleep(50 * time.Millisecond)
 	after := runtime.NumGoroutine()
-	if after > before {
-		t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after)
+	if after > baseline {
+		t.Errorf("goroutine leak after ctx cancel: baseline=%d after=%d", baseline, after)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
before := runtime.NumGoroutine()
cancelAt := time.Now()
cancel()
select {
case err := <-done:
if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond {
t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed)
}
if err == nil {
t.Errorf("Recv should return ctx.Err(), got nil")
}
case <-time.After(500 * time.Millisecond):
t.Fatalf("Recv did not unblock on ctx cancel within 500ms")
}
// Give the watcher goroutine a moment to exit after Recv returns.
time.Sleep(50 * time.Millisecond)
after := runtime.NumGoroutine()
if after > before {
t.Errorf("goroutine leak after ctx cancel: before=%d after=%d", before, after)
baseline := runtime.NumGoroutine()
done := make(chan error, 1)
go func() {
_, _, err := rec.Recv(ctx)
done <- err
}()
cancelAt := time.Now()
cancel()
select {
case err := <-done:
if elapsed := time.Since(cancelAt); elapsed > 100*time.Millisecond {
t.Errorf("Recv took %v to wake after cancel (want <100ms)", elapsed)
}
if err == nil {
t.Errorf("Recv should return ctx.Err(), got nil")
}
case <-time.After(500 * time.Millisecond):
t.Fatalf("Recv did not unblock on ctx cancel within 500ms")
}
// Give the watcher goroutine a moment to exit after Recv returns.
time.Sleep(50 * time.Millisecond)
after := runtime.NumGoroutine()
if after > baseline {
t.Errorf("goroutine leak after ctx cancel: baseline=%d after=%d", baseline, after)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@adapters/memstreamer/memstreamer_test.go` around lines 167 - 187, The
goroutine leak assertion in the Recv cancel test is taking the baseline too
late, after the Recv goroutine and its context watcher are already running. Move
the runtime.NumGoroutine baseline capture in memstreamer_test.go so it happens
before starting the goroutine that calls Recv, then keep the existing
cancel/wait/after check around done to ensure a leaked watcher is counted
correctly.

}
}
Loading