Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,6 @@ func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) {
if cur.name != "" {
delete(q.tm, cur.name)
}

if q.tq.Len() == 0 {
// At idle there's no reason to let the id generator keep going
// indefinitely.
q.idGen = 0
}
}

// addItem adds the given item into the overall datastructure. You must already
Expand Down
46 changes: 44 additions & 2 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package memberlist

import (
"bytes"
"testing"

"github.com/google/btree"
Expand Down Expand Up @@ -142,13 +143,13 @@ func TestTransmitLimited_GetBroadcasts_Limit(t *testing.T) {
partial3 := q.GetBroadcasts(3, 80)
require.Equal(t, 2, len(partial3), "missing messages: %v", prettyPrintMessages(partial3))

require.Equal(t, int64(0), q.idGen, "id generator resets on empty")
require.Equal(t, int64(4), q.idGen, "id generator resets on empty")

// Should get nothing
partial5 := q.GetBroadcasts(3, 80)
require.Equal(t, 0, len(partial5), "missing messages: %v", prettyPrintMessages(partial5))

require.Equal(t, int64(0), q.idGen, "id generator resets on empty")
require.Equal(t, int64(4), q.idGen, "id generator resets on empty")
}

func prettyPrintMessages(msgs [][]byte) []string {
Expand Down Expand Up @@ -229,3 +230,44 @@ func TestTransmitLimited_ordering(t *testing.T) {
t.Fatalf("bad val %v, %d", dump[4].b.(*memberlistBroadcast).node, dump[4].transmits)
}
}

func TestTransmitLimited_retransmitRace(t *testing.T) {
// RetransmitMult is set high enough to ensure retransmits happen.
q := &TransmitLimitedQueue{RetransmitMult: 4, NumNodes: func() int { return 10 }}

msgA := []byte{'a'}
q.QueueBroadcast(&memberlistBroadcast{"nodeA", msgA, nil})
require.Equal(t, 1, q.NumQueued(), "should have one item queued")

broadcasts := q.GetBroadcasts(0, 1)
require.Equal(t, msgA, broadcasts[0], "should get one broadcast out")

// At this point, 'nodeA' has been removed from the queue and is pending
// re-insertion at the end of GetBroadcasts.

// Queue a new, different item.
msgB := []byte{'b'}
q.QueueBroadcast(&memberlistBroadcast{"nodeB", msgB, nil})
broadcasts = q.GetBroadcasts(0, 1)
require.Equal(t, msgB, broadcasts[0], "should get one broadcast out")

// Verify the final state of the queue.
// With the buggy code, an ID collision occurs. The btree may drop one of
// the items or behave unpredictably. The test will fail.
// With the fixed code, both items have unique IDs and coexist peacefully.
require.Equal(t, 2, q.NumQueued(), "queue should contain two distinct items")

// Further verification: ensure both items are actually present.
finalBroadcasts := q.GetBroadcasts(0, 2)
require.Equal(t, 2, len(finalBroadcasts), "should be able to retrieve two distinct items")

// Check that both message payloads are present.
foundA := false
foundB := false
for _, msg := range finalBroadcasts {
foundA = foundA || bytes.Equal(msg, msgA)
foundB = foundB || bytes.Equal(msg, msgB)
}
require.True(t, foundA, "message 'a' should be present in the final queue")
require.True(t, foundB, "message 'b' should be present in the final queue")
}