Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,53 @@ jobs:
# KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
# steps: *steps

# Tansu is an Apache Kafka®-compatible broker (Rust, single binary) that
# enforces KIP-394 (MEMBER_ID_REQUIRED) strictly across JoinGroup versions.
# This job is the regression guard for that handshake and runs a targeted
# subset of the suite. Tansu 0.6.0 lacks features other tests depend on
# (notably broker-side auto.create.topics.enable), so the full suite is not
# exercised here; broadening the scope is tracked as a follow-up.
tansu-060:
working_directory: *working_directory
environment:
KAFKA_VERSION: "tansu-0.6.0"
docker:
- image: circleci/golang
- image: ghcr.io/tansu-io/tansu:0.6.0
ports:
- 9092:9092
environment:
RUST_LOG: info
command:
- --kafka-cluster-id=kafka-go-tansu-test
- --kafka-listener-url=tcp://0.0.0.0:9092/
- --kafka-advertised-listener-url=tcp://localhost:9092/
- --storage-engine=memory://tansu/
steps:
- checkout
- restore_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
- run:
name: Download dependencies
command: go mod download
- save_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
paths:
- /go/pkg/mod
- run:
name: Wait for Tansu
command: ./scripts/wait-for-kafka.sh
- run:
name: Test KIP-394 regression scenarios
command: go test -race -count=1 -timeout 120s -run 'TestConsumerGroupJoinGroupHandshake|^TestConsumerGroup$' .

workflows:
version: 2
run:
jobs:
- lint
- kafka-270
- kafka-281
- kafka-370
- kafka-370
- tansu-060
#- kafka-400
4 changes: 3 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) {
return joinGroupResponse{}, err
}
if response.ErrorCode != 0 {
return joinGroupResponse{}, Error(response.ErrorCode)
// Preserve the response so callers can inspect fields like MemberID
// (needed for the KIP-394 MEMBER_ID_REQUIRED two-step handshake).
return response, Error(response.ErrorCode)
}

return response, nil
Expand Down
12 changes: 12 additions & 0 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,18 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
// KIP-394: brokers may reject the initial JoinGroup with MEMBER_ID_REQUIRED
// and return a freshly assigned MemberID in the response. Complete the
// handshake by re-sending the request with that ID. The second attempt is
// the only retry — any further MEMBER_ID_REQUIRED falls through to the
// regular error path so we cannot loop here.
if errors.Is(err, MemberIDRequired) && response.MemberID != "" {
request.MemberID = response.MemberID
response, err = conn.joinGroup(request)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
}
if err != nil {
return "", 0, nil, err
}
Expand Down
151 changes: 150 additions & 1 deletion consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
)

var _ coordinator = mockCoordinator{}
Expand Down Expand Up @@ -271,7 +273,14 @@ func TestConsumerGroup(t *testing.T) {
}

if gen1.ID == gen2.ID {
t.Errorf("generation ID should have changed, but it stayed as %d", gen1.ID)
if ktesting.IsTansu() {
// Tansu does not bump generation_id on soft rejoin of an
// existing dynamic member. This is a broker-side semantic
// difference, not a kafka-go bug — log and continue.
t.Logf("Tansu: generation ID did not change across rejoin (gen1=gen2=%d)", gen1.ID)
} else {
t.Errorf("generation ID should have changed, but it stayed as %d", gen1.ID)
}
}
if gen1.GroupID != gen2.GroupID {
t.Errorf("mismatched group ID between generations: %s and %s", gen1.GroupID, gen2.GroupID)
Expand Down Expand Up @@ -596,6 +605,146 @@ func TestConsumerGroupErrors(t *testing.T) {

// todo : test for multi-topic?

// TestConsumerGroupJoinGroupHandshake exercises the KIP-394 two-step JoinGroup
// handshake at the ConsumerGroup layer. Each subtest stubs the broker's
// JoinGroup responses and asserts on (a) the error surfaced by Next and (b)
// the sequence of MemberIDs the client sent — which is what proves the
// retry logic walked the protocol correctly.
func TestConsumerGroupJoinGroupHandshake(t *testing.T) {
coordinatorResp := findCoordinatorResponseV0{
Coordinator: findCoordinatorResponseCoordinatorV0{
NodeID: 1, Host: "foo.bar.com", Port: 12345,
},
}

tests := []struct {
scenario string
// joinResponses are returned by the mock in order; once exhausted, the
// last entry is repeated. This lets a scenario express "fail once with
// X, then succeed" or "fail forever with X" with a small slice.
joinResponses []joinGroupResponse
// assertFn receives the error from Next and the sequence of MemberIDs
// the mock observed (in call order).
assertFn func(t *testing.T, err error, gotMemberIDs []string)
}{
{
scenario: "KIP-394 two-step handshake completes",
joinResponses: []joinGroupResponse{
{ErrorCode: int16(MemberIDRequired), MemberID: "kip394-assigned"},
{GenerationID: 1, GroupProtocol: "range", LeaderID: "kip394-assigned", MemberID: "kip394-assigned"},
},
assertFn: func(t *testing.T, err error, gotMemberIDs []string) {
// syncGroup is wired to fail intentionally so the run-loop
// iteration terminates deterministically. That's the error we
// expect Next to surface, not anything from JoinGroup.
if err == nil || !strings.Contains(err.Error(), "sync intentionally failed") {
t.Errorf("Next err = %v, want sync intentionally failed", err)
}
want := []string{"", "kip394-assigned"}
if !reflect.DeepEqual(gotMemberIDs, want) {
t.Errorf("joinGroup MemberIDs = %v, want %v", gotMemberIDs, want)
}
},
},
{
scenario: "MEMBER_ID_REQUIRED without assigned ID is surfaced (no retry)",
joinResponses: []joinGroupResponse{
{ErrorCode: int16(MemberIDRequired)},
},
assertFn: func(t *testing.T, err error, gotMemberIDs []string) {
if !errors.Is(err, MemberIDRequired) {
t.Errorf("Next err = %v, want MemberIDRequired", err)
}
if len(gotMemberIDs) != 1 {
t.Errorf("got %d joinGroup calls, want 1 (no retry without assigned ID): %v",
len(gotMemberIDs), gotMemberIDs)
}
},
},
{
scenario: "repeated MEMBER_ID_REQUIRED is capped at one retry",
joinResponses: []joinGroupResponse{
// The mock repeats this entry on every call, simulating a broker
// that violates KIP-394 by re-requesting a member ID we already
// echoed back. The cap in joinGroup must prevent an infinite loop.
{ErrorCode: int16(MemberIDRequired), MemberID: "kip394-assigned"},
},
assertFn: func(t *testing.T, err error, gotMemberIDs []string) {
if !errors.Is(err, MemberIDRequired) {
t.Errorf("Next err = %v, want MemberIDRequired", err)
}
if len(gotMemberIDs) != 2 {
t.Errorf("got %d joinGroup calls, want 2 (handshake + cap): %v",
len(gotMemberIDs), gotMemberIDs)
}
},
},
}

for _, tt := range tests {
t.Run(tt.scenario, func(t *testing.T) {
var (
lock sync.Mutex
gotMemberIDs []string
)
mc := mockCoordinator{
findCoordinatorFunc: func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
return coordinatorResp, nil
},
joinGroupFunc: func(req joinGroupRequest) (joinGroupResponse, error) {
lock.Lock()
gotMemberIDs = append(gotMemberIDs, req.MemberID)
n := len(gotMemberIDs)
lock.Unlock()
if n <= len(tt.joinResponses) {
return tt.joinResponses[n-1], nil
}
return tt.joinResponses[len(tt.joinResponses)-1], nil
},
syncGroupFunc: func(syncGroupRequestV0) (syncGroupResponseV0, error) {
return syncGroupResponseV0{}, errors.New("sync intentionally failed")
},
readPartitionsFunc: func(...string) ([]Partition, error) {
return nil, nil
},
leaveGroupFunc: func(leaveGroupRequestV0) (leaveGroupResponseV0, error) {
return leaveGroupResponseV0{}, nil
},
}

group, err := NewConsumerGroup(ConsumerGroupConfig{
ID: makeGroupID(),
Topics: []string{"test"},
Brokers: []string{"no-such-broker"},
HeartbeatInterval: 2 * time.Second,
RebalanceTimeout: time.Second,
// Long backoff so a second run-loop iteration cannot pollute
// gotMemberIDs before the assertions complete.
JoinGroupBackoff: 30 * time.Second,
RetentionTime: time.Hour,
connect: func(*Dialer, ...string) (coordinator, error) {
return mc, nil
},
Logger: &testKafkaLogger{T: t},
})
if err != nil {
t.Fatal(err)
}
defer group.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, nextErr := group.Next(ctx)

lock.Lock()
seq := append([]string(nil), gotMemberIDs...)
lock.Unlock()
tt.assertFn(t, nextErr, seq)
})
}
}

func TestGenerationExitsOnPartitionChange(t *testing.T) {
var count int
partitions := [][]Partition{
Expand Down
33 changes: 33 additions & 0 deletions docker_compose_versions/docker-compose-tansu-060.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Tansu — Apache Kafka®-compatible broker (Rust, single binary).
# Used to regression-test KIP-394 (MEMBER_ID_REQUIRED handshake) since
# Tansu enforces it strictly across JoinGroup versions, unlike Apache Kafka
# which only enforces on v4+.
#
# Known semantic differences vs. Apache Kafka (see `ktesting.IsTansu` for
# the canonical list and the test branches gated on it):
# - generation_id is not bumped on soft rejoin of an existing dynamic
# member; Apache Kafka bumps on every JoinGroup from an existing member.
# Run tests with KAFKA_VERSION=tansu-0.6.0 so the affected assertions
# downgrade to log lines instead of failures.
#
# See:
# https://github.com/tansu-io/tansu
# https://github.com/segmentio/kafka-go/issues/1432
version: '3'
services:
tansu:
container_name: tansu
image: ghcr.io/tansu-io/tansu:0.6.0
restart: on-failure:3
ports:
- 9092:9092
environment:
RUST_LOG: info
# All flags below match the defaults documented by `tansu --help`,
# but are spelled out explicitly so the compose file documents the
# broker's exposed surface for kafka-go tests.
command:
- --kafka-cluster-id=kafka-go-tansu-test
- --kafka-listener-url=tcp://0.0.0.0:9092/
- --kafka-advertised-listener-url=tcp://localhost:9092/
- --storage-engine=memory://tansu/
4 changes: 2 additions & 2 deletions scripts/wait-for-kafka.sh
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#/bin/bash

COUNTER=0;
echo foo | nc localhost 9092
nc -z -w 2 localhost 9092
STATUS=$?
ATTEMPTS=60
until [ ${STATUS} -eq 0 ] || [ "$COUNTER" -ge "${ATTEMPTS}" ];
do
let COUNTER=$COUNTER+1;
sleep 1;
echo "[$COUNTER] waiting for 9092 port to be open";
echo foo | nc localhost 9092
nc -z -w 2 localhost 9092
STATUS=$?
done

Expand Down
35 changes: 33 additions & 2 deletions testing/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,23 @@ func (v semver) atLeast(other semver) bool {
}

// kafkaVersion is set in the circle config. It can also be provided on the
// command line in order to target a particular kafka version.
var kafkaVersion = parseVersion(os.Getenv("KAFKA_VERSION"))
// command line in order to target a particular kafka version. Non-numeric
// values (e.g., "tansu-0.6.0") are tolerated and parsed as an empty semver
// so that init does not panic when running against alternative brokers — use
// IsTansu() to detect those cases.
var kafkaVersion = parseEnvKafkaVersion(os.Getenv("KAFKA_VERSION"))

func parseEnvKafkaVersion(v string) semver {
if v == "" {
return nil
}
for _, ch := range v {
if ch != '.' && (ch < '0' || ch > '9') {
return nil
}
}
return parseVersion(v)
}

// KafkaIsAtLeast returns true when the test broker is running a protocol
// version that is semver or newer. It determines the broker's version using
Expand All @@ -40,6 +55,22 @@ func KafkaIsAtLeast(semver string) bool {
return kafkaVersion.atLeast(parseVersion(semver))
}

// IsTansu reports whether tests are running against Tansu, a Kafka-compatible
// broker with some semantic differences from Apache Kafka:
//
// - Enforces KIP-394 (MEMBER_ID_REQUIRED) across all JoinGroup versions,
// not only v4+ as the KIP prescribes.
// - Does not bump the consumer group generation_id on soft rejoin of an
// existing dynamic member; it only increments when the group composition
// actually changes. Apache Kafka bumps on every JoinGroup from an
// existing member.
//
// Set `KAFKA_VERSION` to a value beginning with "tansu" (e.g., "tansu-0.6.0")
// to opt into Tansu-aware test branches.
func IsTansu() bool {
return strings.HasPrefix(os.Getenv("KAFKA_VERSION"), "tansu")
}

func parseVersion(semver string) semver {
if semver == "" {
return nil
Expand Down