Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,18 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
// assignments for the topic. this matches the behavior of the official
// clients: java, python, and librdkafka.
// a topic watcher can trigger a rebalance when the topic comes into being.
if err != nil && !errors.Is(err, UnknownTopicOrPartition) {
return nil, err
//
// if no partitions are returned for any of the requested topics and
// WatchPartitionChanges is disabled, return the error so the group can
// rejoin with backoff until metadata appears.
if err != nil {
if errors.Is(err, UnknownTopicOrPartition) {
if len(partitions) == 0 && !cg.config.WatchPartitionChanges {
return nil, err
}
} else {
return nil, err
}
}

cg.withLogger(func(l Logger) {
Expand Down
73 changes: 73 additions & 0 deletions consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,79 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
}
}

func TestAssignTopicPartitionsMissingTopic(t *testing.T) {
conn := &mockCoordinator{
readPartitionsFunc: func(...string) ([]Partition, error) {
return nil, UnknownTopicOrPartition
},
}

newJoinGroupResponse := func(topicsByMemberID map[string][]string) joinGroupResponse {
resp := joinGroupResponse{
v: v1,
GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
}
for memberID, topics := range topicsByMemberID {
resp.Members = append(resp.Members, joinGroupResponseMember{
MemberID: memberID,
MemberMetadata: groupMetadata{
Topics: topics,
}.bytes(),
})
}
return resp
}

tests := []struct {
name string
watchPartitionChanges bool
expectErr bool
}{
{
name: "no watch triggers error on missing topic",
watchPartitionChanges: false,
expectErr: true,
},
{
name: "watch enabled allows empty assignments",
watchPartitionChanges: true,
expectErr: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cg := ConsumerGroup{}
cg.config.GroupBalancers = []GroupBalancer{
RangeGroupBalancer{},
RoundRobinGroupBalancer{},
}
cg.config.WatchPartitionChanges = test.watchPartitionChanges

assignments, err := cg.assignTopicPartitions(conn, newJoinGroupResponse(map[string][]string{
"member-1": {"missing-topic"},
}))

if test.expectErr {
if !errors.Is(err, UnknownTopicOrPartition) {
t.Fatalf("expected UnknownTopicOrPartition, got %v", err)
}
return
}

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if assignments == nil {
t.Fatal("expected assignments to be non-nil")
}
if _, ok := assignments["member-1"]; !ok {
t.Fatalf("expected assignments for member-1, got %v", assignments)
}
})
}
}

func TestConsumerGroup(t *testing.T) {
tests := []struct {
scenario string
Expand Down