diff --git a/.circleci/config.yml b/.circleci/config.yml index cab9c8242..f26685191 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -191,6 +191,46 @@ jobs: # KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 # steps: *steps + # Tansu is an Apache Kafka®-compatible broker (Rust, single binary) that + # enforces KIP-394 (MEMBER_ID_REQUIRED) strictly across JoinGroup versions. + # This job is the regression guard for that handshake and runs a targeted + # subset of the suite. Tansu 0.6.0 lacks features other tests depend on + # (notably broker-side auto.create.topics.enable), so the full suite is not + # exercised here; broadening the scope is tracked as a follow-up. + tansu-060: + working_directory: *working_directory + environment: + KAFKA_VERSION: "tansu-0.6.0" + docker: + - image: circleci/golang + - image: ghcr.io/tansu-io/tansu:0.6.0 + ports: + - 9092:9092 + environment: + RUST_LOG: info + command: + - --kafka-cluster-id=kafka-go-tansu-test + - --kafka-listener-url=tcp://0.0.0.0:9092/ + - --kafka-advertised-listener-url=tcp://localhost:9092/ + - --storage-engine=memory://tansu/ + steps: + - checkout + - restore_cache: + key: kafka-go-mod-{{ checksum "go.sum" }}-1 + - run: + name: Download dependencies + command: go mod download + - save_cache: + key: kafka-go-mod-{{ checksum "go.sum" }}-1 + paths: + - /go/pkg/mod + - run: + name: Wait for Tansu + command: ./scripts/wait-for-kafka.sh + - run: + name: Test KIP-394 regression scenarios + command: go test -race -count=1 -timeout 120s -run 'TestConsumerGroupJoinGroupHandshake|^TestConsumerGroup$' . + workflows: version: 2 run: @@ -198,5 +238,6 @@ workflows: - lint - kafka-270 - kafka-281 - - kafka-370 + - kafka-370 + - tansu-060 #- kafka-400 diff --git a/conn.go b/conn.go index 9f9f25903..df9276b2b 100644 --- a/conn.go +++ b/conn.go @@ -390,7 +390,9 @@ func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{}, err } if response.ErrorCode != 0 { - return joinGroupResponse{}, Error(response.ErrorCode) + // Preserve the response so callers can inspect fields like MemberID + // (needed for the KIP-394 MEMBER_ID_REQUIRED two-step handshake). + return response, Error(response.ErrorCode) } return response, nil diff --git a/consumergroup.go b/consumergroup.go index b32f90162..17b580c57 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -941,6 +941,18 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i if err == nil && response.ErrorCode != 0 { err = Error(response.ErrorCode) } + // KIP-394: brokers may reject the initial JoinGroup with MEMBER_ID_REQUIRED + // and return a freshly assigned MemberID in the response. Complete the + // handshake by re-sending the request with that ID. The second attempt is + // the only retry — any further MEMBER_ID_REQUIRED falls through to the + // regular error path so we cannot loop here. + if errors.Is(err, MemberIDRequired) && response.MemberID != "" { + request.MemberID = response.MemberID + response, err = conn.joinGroup(request) + if err == nil && response.ErrorCode != 0 { + err = Error(response.ErrorCode) + } + } if err != nil { return "", 0, nil, err } diff --git a/consumergroup_test.go b/consumergroup_test.go index dbbe4ec47..c7e7289a0 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -9,6 +9,8 @@ import ( "sync" "testing" "time" + + ktesting "github.com/segmentio/kafka-go/testing" ) var _ coordinator = mockCoordinator{} @@ -271,7 +273,14 @@ func TestConsumerGroup(t *testing.T) { } if gen1.ID == gen2.ID { - t.Errorf("generation ID should have changed, but it stayed as %d", gen1.ID) + if ktesting.IsTansu() { + // Tansu does not bump generation_id on soft rejoin of an + // existing dynamic member. This is a broker-side semantic + // difference, not a kafka-go bug — log and continue. + t.Logf("Tansu: generation ID did not change across rejoin (gen1=gen2=%d)", gen1.ID) + } else { + t.Errorf("generation ID should have changed, but it stayed as %d", gen1.ID) + } } if gen1.GroupID != gen2.GroupID { t.Errorf("mismatched group ID between generations: %s and %s", gen1.GroupID, gen2.GroupID) @@ -596,6 +605,146 @@ func TestConsumerGroupErrors(t *testing.T) { // todo : test for multi-topic? +// TestConsumerGroupJoinGroupHandshake exercises the KIP-394 two-step JoinGroup +// handshake at the ConsumerGroup layer. Each subtest stubs the broker's +// JoinGroup responses and asserts on (a) the error surfaced by Next and (b) +// the sequence of MemberIDs the client sent — which is what proves the +// retry logic walked the protocol correctly. +func TestConsumerGroupJoinGroupHandshake(t *testing.T) { + coordinatorResp := findCoordinatorResponseV0{ + Coordinator: findCoordinatorResponseCoordinatorV0{ + NodeID: 1, Host: "foo.bar.com", Port: 12345, + }, + } + + tests := []struct { + scenario string + // joinResponses are returned by the mock in order; once exhausted, the + // last entry is repeated. This lets a scenario express "fail once with + // X, then succeed" or "fail forever with X" with a small slice. + joinResponses []joinGroupResponse + // assertFn receives the error from Next and the sequence of MemberIDs + // the mock observed (in call order). + assertFn func(t *testing.T, err error, gotMemberIDs []string) + }{ + { + scenario: "KIP-394 two-step handshake completes", + joinResponses: []joinGroupResponse{ + {ErrorCode: int16(MemberIDRequired), MemberID: "kip394-assigned"}, + {GenerationID: 1, GroupProtocol: "range", LeaderID: "kip394-assigned", MemberID: "kip394-assigned"}, + }, + assertFn: func(t *testing.T, err error, gotMemberIDs []string) { + // syncGroup is wired to fail intentionally so the run-loop + // iteration terminates deterministically. That's the error we + // expect Next to surface, not anything from JoinGroup. + if err == nil || !strings.Contains(err.Error(), "sync intentionally failed") { + t.Errorf("Next err = %v, want sync intentionally failed", err) + } + want := []string{"", "kip394-assigned"} + if !reflect.DeepEqual(gotMemberIDs, want) { + t.Errorf("joinGroup MemberIDs = %v, want %v", gotMemberIDs, want) + } + }, + }, + { + scenario: "MEMBER_ID_REQUIRED without assigned ID is surfaced (no retry)", + joinResponses: []joinGroupResponse{ + {ErrorCode: int16(MemberIDRequired)}, + }, + assertFn: func(t *testing.T, err error, gotMemberIDs []string) { + if !errors.Is(err, MemberIDRequired) { + t.Errorf("Next err = %v, want MemberIDRequired", err) + } + if len(gotMemberIDs) != 1 { + t.Errorf("got %d joinGroup calls, want 1 (no retry without assigned ID): %v", + len(gotMemberIDs), gotMemberIDs) + } + }, + }, + { + scenario: "repeated MEMBER_ID_REQUIRED is capped at one retry", + joinResponses: []joinGroupResponse{ + // The mock repeats this entry on every call, simulating a broker + // that violates KIP-394 by re-requesting a member ID we already + // echoed back. The cap in joinGroup must prevent an infinite loop. + {ErrorCode: int16(MemberIDRequired), MemberID: "kip394-assigned"}, + }, + assertFn: func(t *testing.T, err error, gotMemberIDs []string) { + if !errors.Is(err, MemberIDRequired) { + t.Errorf("Next err = %v, want MemberIDRequired", err) + } + if len(gotMemberIDs) != 2 { + t.Errorf("got %d joinGroup calls, want 2 (handshake + cap): %v", + len(gotMemberIDs), gotMemberIDs) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.scenario, func(t *testing.T) { + var ( + lock sync.Mutex + gotMemberIDs []string + ) + mc := mockCoordinator{ + findCoordinatorFunc: func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) { + return coordinatorResp, nil + }, + joinGroupFunc: func(req joinGroupRequest) (joinGroupResponse, error) { + lock.Lock() + gotMemberIDs = append(gotMemberIDs, req.MemberID) + n := len(gotMemberIDs) + lock.Unlock() + if n <= len(tt.joinResponses) { + return tt.joinResponses[n-1], nil + } + return tt.joinResponses[len(tt.joinResponses)-1], nil + }, + syncGroupFunc: func(syncGroupRequestV0) (syncGroupResponseV0, error) { + return syncGroupResponseV0{}, errors.New("sync intentionally failed") + }, + readPartitionsFunc: func(...string) ([]Partition, error) { + return nil, nil + }, + leaveGroupFunc: func(leaveGroupRequestV0) (leaveGroupResponseV0, error) { + return leaveGroupResponseV0{}, nil + }, + } + + group, err := NewConsumerGroup(ConsumerGroupConfig{ + ID: makeGroupID(), + Topics: []string{"test"}, + Brokers: []string{"no-such-broker"}, + HeartbeatInterval: 2 * time.Second, + RebalanceTimeout: time.Second, + // Long backoff so a second run-loop iteration cannot pollute + // gotMemberIDs before the assertions complete. + JoinGroupBackoff: 30 * time.Second, + RetentionTime: time.Hour, + connect: func(*Dialer, ...string) (coordinator, error) { + return mc, nil + }, + Logger: &testKafkaLogger{T: t}, + }) + if err != nil { + t.Fatal(err) + } + defer group.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, nextErr := group.Next(ctx) + + lock.Lock() + seq := append([]string(nil), gotMemberIDs...) + lock.Unlock() + tt.assertFn(t, nextErr, seq) + }) + } +} + func TestGenerationExitsOnPartitionChange(t *testing.T) { var count int partitions := [][]Partition{ diff --git a/docker_compose_versions/docker-compose-tansu-060.yml b/docker_compose_versions/docker-compose-tansu-060.yml new file mode 100644 index 000000000..729b8cad1 --- /dev/null +++ b/docker_compose_versions/docker-compose-tansu-060.yml @@ -0,0 +1,33 @@ +# Tansu — Apache Kafka®-compatible broker (Rust, single binary). +# Used to regression-test KIP-394 (MEMBER_ID_REQUIRED handshake) since +# Tansu enforces it strictly across JoinGroup versions, unlike Apache Kafka +# which only enforces on v4+. +# +# Known semantic differences vs. Apache Kafka (see `ktesting.IsTansu` for +# the canonical list and the test branches gated on it): +# - generation_id is not bumped on soft rejoin of an existing dynamic +# member; Apache Kafka bumps on every JoinGroup from an existing member. +# Run tests with KAFKA_VERSION=tansu-0.6.0 so the affected assertions +# downgrade to log lines instead of failures. +# +# See: +# https://github.com/tansu-io/tansu +# https://github.com/segmentio/kafka-go/issues/1432 +version: '3' +services: + tansu: + container_name: tansu + image: ghcr.io/tansu-io/tansu:0.6.0 + restart: on-failure:3 + ports: + - 9092:9092 + environment: + RUST_LOG: info + # All flags below match the defaults documented by `tansu --help`, + # but are spelled out explicitly so the compose file documents the + # broker's exposed surface for kafka-go tests. + command: + - --kafka-cluster-id=kafka-go-tansu-test + - --kafka-listener-url=tcp://0.0.0.0:9092/ + - --kafka-advertised-listener-url=tcp://localhost:9092/ + - --storage-engine=memory://tansu/ diff --git a/scripts/wait-for-kafka.sh b/scripts/wait-for-kafka.sh index 5cd336556..d2fe4fa57 100755 --- a/scripts/wait-for-kafka.sh +++ b/scripts/wait-for-kafka.sh @@ -1,7 +1,7 @@ #/bin/bash COUNTER=0; -echo foo | nc localhost 9092 +nc -z -w 2 localhost 9092 STATUS=$? ATTEMPTS=60 until [ ${STATUS} -eq 0 ] || [ "$COUNTER" -ge "${ATTEMPTS}" ]; @@ -9,7 +9,7 @@ do let COUNTER=$COUNTER+1; sleep 1; echo "[$COUNTER] waiting for 9092 port to be open"; - echo foo | nc localhost 9092 + nc -z -w 2 localhost 9092 STATUS=$? done diff --git a/testing/version.go b/testing/version.go index c52cd106a..9eb985d6c 100644 --- a/testing/version.go +++ b/testing/version.go @@ -29,8 +29,23 @@ func (v semver) atLeast(other semver) bool { } // kafkaVersion is set in the circle config. It can also be provided on the -// command line in order to target a particular kafka version. -var kafkaVersion = parseVersion(os.Getenv("KAFKA_VERSION")) +// command line in order to target a particular kafka version. Non-numeric +// values (e.g., "tansu-0.6.0") are tolerated and parsed as an empty semver +// so that init does not panic when running against alternative brokers — use +// IsTansu() to detect those cases. +var kafkaVersion = parseEnvKafkaVersion(os.Getenv("KAFKA_VERSION")) + +func parseEnvKafkaVersion(v string) semver { + if v == "" { + return nil + } + for _, ch := range v { + if ch != '.' && (ch < '0' || ch > '9') { + return nil + } + } + return parseVersion(v) +} // KafkaIsAtLeast returns true when the test broker is running a protocol // version that is semver or newer. It determines the broker's version using @@ -40,6 +55,22 @@ func KafkaIsAtLeast(semver string) bool { return kafkaVersion.atLeast(parseVersion(semver)) } +// IsTansu reports whether tests are running against Tansu, a Kafka-compatible +// broker with some semantic differences from Apache Kafka: +// +// - Enforces KIP-394 (MEMBER_ID_REQUIRED) across all JoinGroup versions, +// not only v4+ as the KIP prescribes. +// - Does not bump the consumer group generation_id on soft rejoin of an +// existing dynamic member; it only increments when the group composition +// actually changes. Apache Kafka bumps on every JoinGroup from an +// existing member. +// +// Set `KAFKA_VERSION` to a value beginning with "tansu" (e.g., "tansu-0.6.0") +// to opt into Tansu-aware test branches. +func IsTansu() bool { + return strings.HasPrefix(os.Getenv("KAFKA_VERSION"), "tansu") +} + func parseVersion(semver string) semver { if semver == "" { return nil