diff --git a/queue.go b/queue.go index 610f349eb..e1c9c6ff5 100644 --- a/queue.go +++ b/queue.go @@ -251,12 +251,6 @@ func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) { if cur.name != "" { delete(q.tm, cur.name) } - - if q.tq.Len() == 0 { - // At idle there's no reason to let the id generator keep going - // indefinitely. - q.idGen = 0 - } } // addItem adds the given item into the overall datastructure. You must already diff --git a/queue_test.go b/queue_test.go index b49f647a1..821866c21 100644 --- a/queue_test.go +++ b/queue_test.go @@ -4,6 +4,7 @@ package memberlist import ( + "bytes" "testing" "github.com/google/btree" @@ -142,13 +143,13 @@ func TestTransmitLimited_GetBroadcasts_Limit(t *testing.T) { partial3 := q.GetBroadcasts(3, 80) require.Equal(t, 2, len(partial3), "missing messages: %v", prettyPrintMessages(partial3)) - require.Equal(t, int64(0), q.idGen, "id generator resets on empty") + require.Equal(t, int64(4), q.idGen, "id generator resets on empty") // Should get nothing partial5 := q.GetBroadcasts(3, 80) require.Equal(t, 0, len(partial5), "missing messages: %v", prettyPrintMessages(partial5)) - require.Equal(t, int64(0), q.idGen, "id generator resets on empty") + require.Equal(t, int64(4), q.idGen, "id generator resets on empty") } func prettyPrintMessages(msgs [][]byte) []string { @@ -229,3 +230,44 @@ func TestTransmitLimited_ordering(t *testing.T) { t.Fatalf("bad val %v, %d", dump[4].b.(*memberlistBroadcast).node, dump[4].transmits) } } + +func TestTransmitLimited_retransmitRace(t *testing.T) { + // RetransmitMult is set high enough to ensure retransmits happen. + q := &TransmitLimitedQueue{RetransmitMult: 4, NumNodes: func() int { return 10 }} + + msgA := []byte{'a'} + q.QueueBroadcast(&memberlistBroadcast{"nodeA", msgA, nil}) + require.Equal(t, 1, q.NumQueued(), "should have one item queued") + + broadcasts := q.GetBroadcasts(0, 1) + require.Equal(t, msgA, broadcasts[0], "should get one broadcast out") + + // At this point, 'nodeA' has been removed from the queue and is pending + // re-insertion at the end of GetBroadcasts. + + // Queue a new, different item. + msgB := []byte{'b'} + q.QueueBroadcast(&memberlistBroadcast{"nodeB", msgB, nil}) + broadcasts = q.GetBroadcasts(0, 1) + require.Equal(t, msgB, broadcasts[0], "should get one broadcast out") + + // Verify the final state of the queue. + // With the buggy code, an ID collision occurs. The btree may drop one of + // the items or behave unpredictably. The test will fail. + // With the fixed code, both items have unique IDs and coexist peacefully. + require.Equal(t, 2, q.NumQueued(), "queue should contain two distinct items") + + // Further verification: ensure both items are actually present. + finalBroadcasts := q.GetBroadcasts(0, 2) + require.Equal(t, 2, len(finalBroadcasts), "should be able to retrieve two distinct items") + + // Check that both message payloads are present. + foundA := false + foundB := false + for _, msg := range finalBroadcasts { + foundA = foundA || bytes.Equal(msg, msgA) + foundB = foundB || bytes.Equal(msg, msgB) + } + require.True(t, foundA, "message 'a' should be present in the final queue") + require.True(t, foundB, "message 'b' should be present in the final queue") +}