diff --git a/src/go/rpk/pkg/adminapi/BUILD b/src/go/rpk/pkg/adminapi/BUILD index cca2956038480..75f216dcbaf2e 100644 --- a/src/go/rpk/pkg/adminapi/BUILD +++ b/src/go/rpk/pkg/adminapi/BUILD @@ -26,11 +26,15 @@ go_library( go_test( name = "adminapi_test", size = "small", - srcs = ["admin_test.go"], + srcs = [ + "admin_test.go", + "auth_test.go", + ], embed = [":adminapi"], env = {"HOME": "/"}, deps = [ "//src/go/rpk/pkg/config", + "@com_github_redpanda_data_common_go_rpadmin//:rpadmin", "@com_github_spf13_afero//:afero", "@com_github_stretchr_testify//require", ], diff --git a/src/go/rpk/pkg/adminapi/admin.go b/src/go/rpk/pkg/adminapi/admin.go index 63c5f564fc4d7..34e52c5079a4b 100644 --- a/src/go/rpk/pkg/adminapi/admin.go +++ b/src/go/rpk/pkg/adminapi/admin.go @@ -40,11 +40,18 @@ const ( ScramSha256 = "SCRAM-SHA-256" ScramSha512 = "SCRAM-SHA-512" CloudOIDC = "CLOUD-OIDC" + OAuthBearer = "OAUTHBEARER" ) // GetAuth gets the rpadmin.Auth from the rpk profile. func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) { switch { + case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, OAuthBearer): + token := OAuthBearerToken(p.KafkaAPI.SASL.Password) + if token == "" { + return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)") + } + return &rpadmin.BearerToken{Token: token}, nil case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism != CloudOIDC: return &rpadmin.BasicAuth{Username: p.KafkaAPI.SASL.User, Password: p.KafkaAPI.SASL.Password}, nil case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == CloudOIDC: @@ -72,6 +79,15 @@ func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) { } } +// OAuthBearerToken extracts the bearer token from the SASL password field. +// It accepts both "token:" format and a raw token string. +func OAuthBearerToken(password string) string { + if t, ok := strings.CutPrefix(password, "token:"); ok { + return t + } + return password +} + // NewClient returns an rpadmin.AdminAPI client that talks to each of the // addresses in the rpk.admin_api section of the config. func NewClient(ctx context.Context, fs afero.Fs, p *config.RpkProfile, opts ...rpadmin.Opt) (*rpadmin.AdminAPI, error) { diff --git a/src/go/rpk/pkg/adminapi/auth_test.go b/src/go/rpk/pkg/adminapi/auth_test.go new file mode 100644 index 0000000000000..f86c0b5befae0 --- /dev/null +++ b/src/go/rpk/pkg/adminapi/auth_test.go @@ -0,0 +1,157 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package adminapi + +import ( + "testing" + + "github.com/redpanda-data/common-go/rpadmin" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestGetAuth(t *testing.T) { + tests := []struct { + name string + profile *config.RpkProfile + wantTyp rpadmin.Auth + wantErr string + }{ + { + name: "no SASL returns NopAuth", + profile: &config.RpkProfile{}, + wantTyp: &rpadmin.NopAuth{}, + }, + { + name: "SCRAM-SHA-256 returns BasicAuth", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + User: "admin", + Password: "secret", + Mechanism: "SCRAM-SHA-256", + }, + }, + }, + wantTyp: &rpadmin.BasicAuth{Username: "admin", Password: "secret"}, + }, + { + name: "OAUTHBEARER with token prefix returns BearerToken", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "token:my-jwt-token", + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"}, + }, + { + name: "OAUTHBEARER with raw token returns BearerToken", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "my-jwt-token", + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"}, + }, + { + name: "OAUTHBEARER case-insensitive", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "my-jwt-token", + Mechanism: "oauthbearer", + }, + }, + }, + wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"}, + }, + { + name: "OAUTHBEARER with empty password errors", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantErr: "OAUTHBEARER requires a token", + }, + { + name: "OAUTHBEARER with token: prefix only errors", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "token:", + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantErr: "OAUTHBEARER requires a token", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetAuth(tt.profile) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.IsType(t, tt.wantTyp, got) + require.Equal(t, tt.wantTyp, got) + }) + } +} + +func TestOAuthBearerToken(t *testing.T) { + tests := []struct { + name string + password string + want string + }{ + { + name: "token prefix stripped", + password: "token:eyJhbGciOiJSUzI1NiJ9.payload.sig", + want: "eyJhbGciOiJSUzI1NiJ9.payload.sig", + }, + { + name: "raw token returned as-is", + password: "eyJhbGciOiJSUzI1NiJ9.payload.sig", + want: "eyJhbGciOiJSUzI1NiJ9.payload.sig", + }, + { + name: "empty password returns empty", + password: "", + want: "", + }, + { + name: "token prefix only returns empty", + password: "token:", + want: "", + }, + { + name: "token prefix is case-sensitive", + password: "Token:abc", + want: "Token:abc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := OAuthBearerToken(tt.password) + require.Equal(t, tt.want, got) + }) + } +} diff --git a/src/go/rpk/pkg/cli/debug/remotebundle/start.go b/src/go/rpk/pkg/cli/debug/remotebundle/start.go index 811cfa74892ed..e093a1a43be5e 100644 --- a/src/go/rpk/pkg/cli/debug/remotebundle/start.go +++ b/src/go/rpk/pkg/cli/debug/remotebundle/start.go @@ -78,6 +78,16 @@ Use the flag '--no-confirm' to avoid the confirmation prompt. out.MaybeDie(err, "rpk unable to load config: %v", err) config.CheckExitCloudAdmin(p) + // Remote debug bundle forwards SASL credentials to the broker so + // rpk running on the broker can authenticate to Kafka. The broker + // API currently only accepts a SCRAM-shaped payload; until that + // gains OAUTHBEARER support (tracked in redpanda#30222), reject + // up front rather than silently sending the request with no auth + // (which then fails confusingly on secured clusters). + if p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, adminapi.OAuthBearer) { + out.Die("OAUTHBEARER is not yet supported for remote debug bundle collection; use SCRAM-SHA-256 or SCRAM-SHA-512 credentials to run this command") + } + if !noConfirm { printBrokers(p.AdminAPI.Addresses) confirmed, err := out.Confirm("Confirm debug bundle collection from these brokers?") diff --git a/src/go/rpk/pkg/config/params.go b/src/go/rpk/pkg/config/params.go index 44201d7af52bb..0e383b5a74cfa 100644 --- a/src/go/rpk/pkg/config/params.go +++ b/src/go/rpk/pkg/config/params.go @@ -689,10 +689,12 @@ tls.key=/path/to/key.pem API listeners with mTLS. sasl.mechanism=SCRAM-SHA-256 - The SASL mechanism to use for authentication. This can be either SCRAM-SHA-256 - or SCRAM-SHA-512. Note that with Redpanda, the Admin API can be configured to - require basic authentication with your Kafka API SASL credentials. This - defaults to SCRAM-SHA-256 if no mechanism is specified. + The SASL mechanism to use for authentication. This can be SCRAM-SHA-256, + SCRAM-SHA-512, PLAIN, or OAUTHBEARER. For OAUTHBEARER, pass the token via + the pass field (optionally prefixed with "token:"). Note that with Redpanda, + the Admin API can be configured to require basic authentication with your + Kafka API SASL credentials. This defaults to SCRAM-SHA-256 if no mechanism + is specified. user=username The SASL username to use for authentication. This is also used for the admin @@ -815,7 +817,7 @@ tls.insecure_skip_verify=boolean tls.ca=/path/to/ca.pem tls.cert=/path/to/cert.pem tls.key=/path/to/key.pem -sasl.mechanism=SCRAM-SHA-256 or SCRAM-SHA-512 +sasl.mechanism=SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER user=username pass=password admin.hosts=comma,delimited,host:ports @@ -871,7 +873,7 @@ func (p *Params) InstallSASLFlags(cmd *cobra.Command) { pf.StringVar(&p.user, FlagSASLUser, "", "SASL user to be used for authentication") pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication") - pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)") + pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)") pf.MarkHidden(FlagSASLUser) pf.MarkHidden("password") diff --git a/src/go/rpk/pkg/config/profile_doc.go b/src/go/rpk/pkg/config/profile_doc.go index 9822f395cb5e5..549372412ee12 100644 --- a/src/go/rpk/pkg/config/profile_doc.go +++ b/src/go/rpk/pkg/config/profile_doc.go @@ -35,7 +35,7 @@ var profileFieldDocs = map[string]string{ "kafka_api.sasl": "SASL authentication configuration", "kafka_api.sasl.user": "username for authentication", "kafka_api.sasl.password": "password for authentication", - "kafka_api.sasl.mechanism": "SCRAM-SHA-256 or SCRAM-SHA-512", + "kafka_api.sasl.mechanism": "SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER", "admin_api": "Admin API connection configuration", "admin_api.addresses": "Comma-separated list of Admin API addresses (host:port)", diff --git a/src/go/rpk/pkg/kafka/BUILD b/src/go/rpk/pkg/kafka/BUILD index 60c854db04564..372274859312c 100644 --- a/src/go/rpk/pkg/kafka/BUILD +++ b/src/go/rpk/pkg/kafka/BUILD @@ -1,4 +1,4 @@ -load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "kafka", @@ -21,3 +21,15 @@ go_library( "@com_github_twmb_franz_go_plugin_kzap//:kzap", ], ) + +go_test( + name = "kafka_test", + size = "small", + srcs = ["client_franz_test.go"], + embed = [":kafka"], + deps = [ + "//src/go/rpk/pkg/config", + "@com_github_spf13_afero//:afero", + "@com_github_stretchr_testify//require", + ], +) diff --git a/src/go/rpk/pkg/kafka/client_franz.go b/src/go/rpk/pkg/kafka/client_franz.go index 71d0bf2c7e170..103834ee05492 100644 --- a/src/go/rpk/pkg/kafka/client_franz.go +++ b/src/go/rpk/pkg/kafka/client_franz.go @@ -121,22 +121,28 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k Token: a.AuthToken, }).AsMechanism())) } else { - a := scram.Auth{ - User: k.SASL.User, - Pass: k.SASL.Password, - } switch name := strings.ToUpper(k.SASL.Mechanism); name { case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism + a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password} opts = append(opts, kgo.SASL(a.AsSha256Mechanism())) case "SCRAM-SHA-512": + a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password} opts = append(opts, kgo.SASL(a.AsSha512Mechanism())) case "PLAIN": opts = append(opts, kgo.SASL((&plain.Auth{ User: k.SASL.User, Pass: k.SASL.Password, }).AsMechanism())) + case "OAUTHBEARER": + token := adminapi.OAuthBearerToken(k.SASL.Password) + if token == "" { + return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)") + } + opts = append(opts, kgo.SASL((koauth.Auth{ + Token: token, + }).AsMechanism())) default: - return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN]", name) + return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, OAUTHBEARER]", name) } } } diff --git a/src/go/rpk/pkg/kafka/client_franz_test.go b/src/go/rpk/pkg/kafka/client_franz_test.go new file mode 100644 index 0000000000000..776e3be6835f7 --- /dev/null +++ b/src/go/rpk/pkg/kafka/client_franz_test.go @@ -0,0 +1,90 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package kafka + +import ( + "testing" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/spf13/afero" + "github.com/stretchr/testify/require" +) + +// loadProfile writes a profile to an in-memory filesystem and loads it back +// through the config system so internal fields are populated. +func loadProfile(t *testing.T, fs afero.Fs, p *config.RpkProfile) *config.RpkProfile { + t.Helper() + t.Setenv("HOME", "/") + p.Name = "test" + rpkyaml := config.RpkYaml{ + CurrentProfile: "test", + Version: 7, + Profiles: []config.RpkProfile{*p}, + } + err := rpkyaml.Write(fs) + require.NoError(t, err) + y, err := new(config.Params).Load(fs) + require.NoError(t, err) + return y.VirtualProfile() +} + +func TestNewFranzClient_SASLErrors(t *testing.T) { + tests := []struct { + name string + profile *config.RpkProfile + wantErr string + }{ + { + name: "OAUTHBEARER with empty password", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + Brokers: []string{"localhost:9092"}, + SASL: &config.SASL{ + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantErr: "OAUTHBEARER requires a token", + }, + { + name: "OAUTHBEARER with token: prefix only", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + Brokers: []string{"localhost:9092"}, + SASL: &config.SASL{ + Password: "token:", + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantErr: "OAUTHBEARER requires a token", + }, + { + name: "unknown SASL mechanism", + profile: &config.RpkProfile{ + KafkaAPI: config.RpkKafkaAPI{ + Brokers: []string{"localhost:9092"}, + SASL: &config.SASL{ + Mechanism: "KERBEROS", + }, + }, + }, + wantErr: `unknown SASL mechanism "KERBEROS"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fs := afero.NewMemMapFs() + p := loadProfile(t, fs, tt.profile) + _, err := NewFranzClient(fs, p) + require.ErrorContains(t, err, tt.wantErr) + }) + } +} diff --git a/src/go/rpk/pkg/schemaregistry/BUILD b/src/go/rpk/pkg/schemaregistry/BUILD index 57a70190bf832..bb13cfea1f7ef 100644 --- a/src/go/rpk/pkg/schemaregistry/BUILD +++ b/src/go/rpk/pkg/schemaregistry/BUILD @@ -24,7 +24,15 @@ go_library( go_test( name = "schemaregistry_test", - srcs = ["context_test.go"], + size = "small", + srcs = [ + "client_test.go", + "context_test.go", + ], embed = [":schemaregistry"], - deps = ["@com_github_stretchr_testify//require"], + deps = [ + "//src/go/rpk/pkg/config", + "@com_github_spf13_afero//:afero", + "@com_github_stretchr_testify//require", + ], ) diff --git a/src/go/rpk/pkg/schemaregistry/client.go b/src/go/rpk/pkg/schemaregistry/client.go index 6c0af61c4e90f..8a1088cfef292 100644 --- a/src/go/rpk/pkg/schemaregistry/client.go +++ b/src/go/rpk/pkg/schemaregistry/client.go @@ -61,6 +61,17 @@ func NewClient(fs afero.Fs, p *config.RpkProfile) (*rpsr.Client, error) { } switch { + // OAUTHBEARER must be matched before the HasSASLCredentials() case below: + // HasSASLCredentials() returns false for OAUTHBEARER today (no username), + // so the current ordering happens to work, but if that check ever loosens + // or the cases are reordered, OAUTHBEARER tokens would be sent as a + // BasicAuth password. + case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, adminapi.OAuthBearer): + token := adminapi.OAuthBearerToken(p.KafkaAPI.SASL.Password) + if token == "" { + return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)") + } + opts = append(opts, sr.BearerToken(token)) case p.HasSASLCredentials() && p.KafkaAPI.SASL.Mechanism != adminapi.CloudOIDC: opts = append(opts, sr.BasicAuth(p.KafkaAPI.SASL.User, p.KafkaAPI.SASL.Password)) case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == adminapi.CloudOIDC: diff --git a/src/go/rpk/pkg/schemaregistry/client_test.go b/src/go/rpk/pkg/schemaregistry/client_test.go new file mode 100644 index 0000000000000..857e96e01509f --- /dev/null +++ b/src/go/rpk/pkg/schemaregistry/client_test.go @@ -0,0 +1,129 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package schemaregistry + +import ( + "testing" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/spf13/afero" + "github.com/stretchr/testify/require" +) + +// loadProfile writes a profile to an in-memory filesystem and loads it back +// through the config system so internal fields are populated. +func loadProfile(t *testing.T, fs afero.Fs, p *config.RpkProfile) *config.RpkProfile { + t.Helper() + t.Setenv("HOME", "/") + p.Name = "test" + rpkyaml := config.RpkYaml{ + CurrentProfile: "test", + Version: 7, + Profiles: []config.RpkProfile{*p}, + } + err := rpkyaml.Write(fs) + require.NoError(t, err) + y, err := new(config.Params).Load(fs) + require.NoError(t, err) + return y.VirtualProfile() +} + +func TestNewClient_OAUTHBEARER(t *testing.T) { + tests := []struct { + name string + profile *config.RpkProfile + wantErr string + }{ + { + name: "OAUTHBEARER with token prefix succeeds", + profile: &config.RpkProfile{ + SR: config.RpkSchemaRegistryAPI{ + Addresses: []string{"localhost:8081"}, + }, + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "token:my-jwt", + Mechanism: "OAUTHBEARER", + }, + }, + }, + }, + { + name: "OAUTHBEARER with raw token succeeds", + profile: &config.RpkProfile{ + SR: config.RpkSchemaRegistryAPI{ + Addresses: []string{"localhost:8081"}, + }, + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "my-jwt", + Mechanism: "OAUTHBEARER", + }, + }, + }, + }, + { + name: "OAUTHBEARER case-insensitive", + profile: &config.RpkProfile{ + SR: config.RpkSchemaRegistryAPI{ + Addresses: []string{"localhost:8081"}, + }, + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "my-jwt", + Mechanism: "oauthbearer", + }, + }, + }, + }, + { + name: "OAUTHBEARER with empty password errors", + profile: &config.RpkProfile{ + SR: config.RpkSchemaRegistryAPI{ + Addresses: []string{"localhost:8081"}, + }, + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantErr: "OAUTHBEARER requires a token", + }, + { + name: "OAUTHBEARER with token: prefix only errors", + profile: &config.RpkProfile{ + SR: config.RpkSchemaRegistryAPI{ + Addresses: []string{"localhost:8081"}, + }, + KafkaAPI: config.RpkKafkaAPI{ + SASL: &config.SASL{ + Password: "token:", + Mechanism: "OAUTHBEARER", + }, + }, + }, + wantErr: "OAUTHBEARER requires a token", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fs := afero.NewMemMapFs() + p := loadProfile(t, fs, tt.profile) + cl, err := NewClient(fs, p) + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, cl) + }) + } +}