Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions pkg/wal/processor/batch/wal_batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ func (s *Sender[T]) SendMessage(ctx context.Context, msg *WALMessage[T]) error {
// longer processing), and an error will be returned.
select {
case s.msgChan <- msg:
case sendDoneErr, ok := <-s.sendDone:
// check if a different call has closed the send channel already, to
// prevent blocking when called concurrently.
if ok && sendDoneErr != nil {
s.sendErr = sendDoneErr
}
case <-s.sendDone:
// s.sendErr is set by send() before closing s.sendDone, so it is safe
// to read here from any number of concurrent callers.
s.logger.Error(s.sendErr, "stop processing, sending has stopped")
if s.sendErr == nil {
return errSendStopped
}
return fmt.Errorf("%w: %w", errSendStopped, s.sendErr)
}

Expand Down Expand Up @@ -211,7 +211,9 @@ func (s *Sender[T]) send(ctx context.Context) error {
if err != nil && !errors.Is(err, context.Canceled) {
s.logger.Error(err, "sending stopped")
}
s.sendDone <- err
// publish the send error before signalling shutdown so any goroutines
// waiting in SendMessage can observe it after the channel is closed.
s.sendErr = err
close(s.sendDone)
return err
}
Expand Down
67 changes: 67 additions & 0 deletions pkg/wal/processor/batch/wal_batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,70 @@ func TestSender(t *testing.T) {
}
}
}

// Regression test for https://github.com/xataio/pgstream/issues/372:
// concurrent SendMessage callers must all observe the underlying send error
// rather than wrapping a nil sendErr (which produced "%!w(<nil>)" messages
// that obscured the real cause of snapshot worker failures).
func TestSender_ConcurrentSendErrorPropagation(t *testing.T) {
t.Parallel()

ctx := context.Background()

errTest := errors.New("oh noes")
testCommitPos := wal.CommitPosition("1")

mockMsg := func(i uint) *mockMessage {
return &mockMessage{id: i}
}
testWALMsg := func(i uint) *WALMessage[*mockMessage] {
return NewWALMessage(mockMsg(i), testCommitPos)
}

doneChan := make(chan struct{}, 1)
defer close(doneChan)

sendFn := func(doneChan chan<- struct{}) sendBatchFn[*mockMessage] {
once := sync.Once{}
return func(ctx context.Context, b *Batch[*mockMessage]) error {
defer once.Do(func() { doneChan <- struct{}{} })
return errTest
}
}

sender, err := NewSender(ctx, &Config{
BatchTimeout: 100 * time.Millisecond,
MaxBatchSize: 1,
}, sendFn(doneChan), log.NewNoopLogger())
require.NoError(t, err)
defer sender.Close()

// prime the sender so the batch send fails
require.NoError(t, sender.SendMessage(ctx, testWALMsg(1)))

select {
case <-doneChan:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for send to fail")
}
// give time for the error to propagate through sendDone
time.Sleep(100 * time.Millisecond)

const workers = 8
errs := make([]error, workers)
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(i int) {
defer wg.Done()
errs[i] = sender.SendMessage(ctx, testWALMsg(uint(i+2)))
}(i)
}
wg.Wait()

for i, err := range errs {
require.ErrorIsf(t, err, errSendStopped, "worker %d: missing errSendStopped", i)
require.ErrorIsf(t, err, errTest, "worker %d: missing underlying send error", i)
require.NotContainsf(t, err.Error(), "%!w(<nil>)", "worker %d: nil error wrapping leaked through", i)
}
}
19 changes: 13 additions & 6 deletions pkg/wal/processor/webhook/notifier/webhook_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ func (n *Notifier) ProcessWALEvent(ctx context.Context, walEvent *wal.Event) (er

select {
case n.notifyChan <- msg:
case notifyDoneErr, ok := <-n.notifyDone:
if ok && notifyDoneErr != nil {
n.notifyErr = notifyDoneErr
}
case <-n.notifyDone:
// n.notifyErr is set by Notify before closing n.notifyDone, so it is
// safe to read here from any number of concurrent callers.
n.logger.Error(n.notifyErr, "stop processing, notify has stopped")
if n.notifyErr == nil {
return errNotifyStopped
}
return fmt.Errorf("%w: %w", errNotifyStopped, n.notifyErr)
}

Expand All @@ -147,7 +149,10 @@ func (n *Notifier) Notify(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-n.notifyChan:
case msg, ok := <-n.notifyChan:
if !ok {
return nil
}
err := n.notify(ctx, msg)
n.queueBytesSema.Release(int64(msg.size()))
if err != nil {
Expand All @@ -163,7 +168,9 @@ func (n *Notifier) Notify(ctx context.Context) error {
}

err := notifyLoop()
n.notifyDone <- err
// publish the notify error before signalling shutdown so any goroutines
// waiting in ProcessWALEvent can observe it after the channel is closed.
n.notifyErr = err
close(n.notifyDone)
return err
}
Expand Down
84 changes: 84 additions & 0 deletions pkg/wal/processor/webhook/notifier/webhook_notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,87 @@ func TestNotifier(t *testing.T) {
}
}
}

// Regression test: Notify must not dereference a nil msg when notifyChan is
// closed via Close(). Prior to the fix, Close()-during-Notify produced a
// "invalid memory address or nil pointer dereference" panic at notify().
func TestNotifier_NotifyAfterClose(t *testing.T) {
t.Parallel()

n := New(&Config{}, &mocks.Store{})

errChan := make(chan error, 1)
go func() {
errChan <- n.Notify(context.Background())
}()

// give Notify a moment to enter its select
time.Sleep(50 * time.Millisecond)
require.NoError(t, n.Close())

select {
case err := <-errChan:
require.NoError(t, err)
case <-time.After(5 * time.Second):
t.Fatal("Notify did not return after Close")
}
}

// Regression test: concurrent ProcessWALEvent callers must all observe the
// underlying Notify error rather than wrapping a nil notifyErr (which would
// produce "%!w(<nil>)" messages, the same pattern fixed for the batch sender
// in issue #372).
func TestNotifier_ConcurrentProcessWALEventErrorPropagation(t *testing.T) {
t.Parallel()

n := New(&Config{}, &mocks.Store{
GetSubscriptionsFn: func(ctx context.Context, action, schema, table string) ([]*subscription.Subscription, error) {
return []*subscription.Subscription{newTestSubscription("url-1", "", "", nil)}, nil
},
})
n.checkpointer = func(ctx context.Context, positions []wal.CommitPosition) error {
return errTest
}

notifyDone := make(chan struct{})
go func() {
defer close(notifyDone)
err := n.Notify(context.Background())
require.ErrorIs(t, err, errTest)
}()

// seed a message that will make Notify exit with errTest
require.NoError(t, n.ProcessWALEvent(context.Background(), &wal.Event{
CommitPosition: wal.CommitPosition("seed"),
Data: &wal.Data{Action: "I"},
}))

select {
case <-notifyDone:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for Notify to fail")
}
// give time for notifyDone close to propagate
time.Sleep(100 * time.Millisecond)

const workers = 8
errs := make([]error, workers)
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(i int) {
defer wg.Done()
errs[i] = n.ProcessWALEvent(context.Background(), &wal.Event{
CommitPosition: wal.CommitPosition(fmt.Sprintf("w-%d", i)),
Data: &wal.Data{Action: "I"},
})
}(i)
}
wg.Wait()

for i, err := range errs {
require.ErrorIsf(t, err, errNotifyStopped, "worker %d: missing errNotifyStopped", i)
require.ErrorIsf(t, err, errTest, "worker %d: missing underlying notify error", i)
require.NotContainsf(t, err.Error(), "%!w(<nil>)", "worker %d: nil error wrapping leaked through", i)
}
}
Loading