Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/go/rpk/pkg/adminapi/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ go_library(
go_test(
name = "adminapi_test",
size = "small",
srcs = ["admin_test.go"],
srcs = [
"admin_test.go",
"auth_test.go",
],
embed = [":adminapi"],
env = {"HOME": "/"},
deps = [
"//src/go/rpk/pkg/config",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//require",
],
Expand Down
16 changes: 16 additions & 0 deletions src/go/rpk/pkg/adminapi/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ const (
ScramSha256 = "SCRAM-SHA-256"
ScramSha512 = "SCRAM-SHA-512"
CloudOIDC = "CLOUD-OIDC"
OAuthBearer = "OAUTHBEARER"
)

// GetAuth gets the rpadmin.Auth from the rpk profile.
func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
switch {
case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, OAuthBearer):
token := OAuthBearerToken(p.KafkaAPI.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)")
}
return &rpadmin.BearerToken{Token: token}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism != CloudOIDC:
return &rpadmin.BasicAuth{Username: p.KafkaAPI.SASL.User, Password: p.KafkaAPI.SASL.Password}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == CloudOIDC:
Expand Down Expand Up @@ -72,6 +79,15 @@ func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
}
}

// OAuthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func OAuthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}

// NewClient returns an rpadmin.AdminAPI client that talks to each of the
// addresses in the rpk.admin_api section of the config.
func NewClient(ctx context.Context, fs afero.Fs, p *config.RpkProfile, opts ...rpadmin.Opt) (*rpadmin.AdminAPI, error) {
Expand Down
157 changes: 157 additions & 0 deletions src/go/rpk/pkg/adminapi/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package adminapi

import (
"testing"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/stretchr/testify/require"
)

func TestGetAuth(t *testing.T) {
tests := []struct {
name string
profile *config.RpkProfile
wantTyp rpadmin.Auth
wantErr string
}{
{
name: "no SASL returns NopAuth",
profile: &config.RpkProfile{},
wantTyp: &rpadmin.NopAuth{},
},
{
name: "SCRAM-SHA-256 returns BasicAuth",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
User: "admin",
Password: "secret",
Mechanism: "SCRAM-SHA-256",
},
},
},
wantTyp: &rpadmin.BasicAuth{Username: "admin", Password: "secret"},
},
{
name: "OAUTHBEARER with token prefix returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with raw token returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER case-insensitive",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "oauthbearer",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with empty password errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
{
name: "OAUTHBEARER with token: prefix only errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:",
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetAuth(tt.profile)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
return
}
require.NoError(t, err)
require.IsType(t, tt.wantTyp, got)
require.Equal(t, tt.wantTyp, got)
})
}
}

func TestOAuthBearerToken(t *testing.T) {
tests := []struct {
name string
password string
want string
}{
{
name: "token prefix stripped",
password: "token:eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "raw token returned as-is",
password: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "empty password returns empty",
password: "",
want: "",
},
{
name: "token prefix only returns empty",
password: "token:",
want: "",
},
{
name: "token prefix is case-sensitive",
password: "Token:abc",
want: "Token:abc",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := OAuthBearerToken(tt.password)
require.Equal(t, tt.want, got)
})
}
}
10 changes: 10 additions & 0 deletions src/go/rpk/pkg/cli/debug/remotebundle/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ Use the flag '--no-confirm' to avoid the confirmation prompt.
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(p)

// Remote debug bundle forwards SASL credentials to the broker so
// rpk running on the broker can authenticate to Kafka. The broker
// API currently only accepts a SCRAM-shaped payload; until that
// gains OAUTHBEARER support (tracked in redpanda#30222), reject
// up front rather than silently sending the request with no auth
// (which then fails confusingly on secured clusters).
if p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, adminapi.OAuthBearer) {
out.Die("OAUTHBEARER is not yet supported for remote debug bundle collection; use SCRAM-SHA-256 or SCRAM-SHA-512 credentials to run this command")
}

if !noConfirm {
printBrokers(p.AdminAPI.Addresses)
confirmed, err := out.Confirm("Confirm debug bundle collection from these brokers?")
Expand Down
14 changes: 8 additions & 6 deletions src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,12 @@ tls.key=/path/to/key.pem
API listeners with mTLS.

sasl.mechanism=SCRAM-SHA-256
The SASL mechanism to use for authentication. This can be either SCRAM-SHA-256
or SCRAM-SHA-512. Note that with Redpanda, the Admin API can be configured to
require basic authentication with your Kafka API SASL credentials. This
defaults to SCRAM-SHA-256 if no mechanism is specified.
The SASL mechanism to use for authentication. This can be SCRAM-SHA-256,
SCRAM-SHA-512, PLAIN, or OAUTHBEARER. For OAUTHBEARER, pass the token via
the pass field (optionally prefixed with "token:"). Note that with Redpanda,
the Admin API can be configured to require basic authentication with your
Kafka API SASL credentials. This defaults to SCRAM-SHA-256 if no mechanism
is specified.

user=username
The SASL username to use for authentication. This is also used for the admin
Expand Down Expand Up @@ -801,7 +803,7 @@ tls.insecure_skip_verify=boolean
tls.ca=/path/to/ca.pem
tls.cert=/path/to/cert.pem
tls.key=/path/to/key.pem
sasl.mechanism=SCRAM-SHA-256 or SCRAM-SHA-512
sasl.mechanism=SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER
user=username
pass=password
admin.hosts=comma,delimited,host:ports
Expand Down Expand Up @@ -857,7 +859,7 @@ func (p *Params) InstallSASLFlags(cmd *cobra.Command) {

pf.StringVar(&p.user, FlagSASLUser, "", "SASL user to be used for authentication")
pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)")

pf.MarkHidden(FlagSASLUser)
pf.MarkHidden("password")
Expand Down
14 changes: 13 additions & 1 deletion src/go/rpk/pkg/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@rules_go//go:def.bzl", "go_library")
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kafka",
Expand All @@ -21,3 +21,15 @@ go_library(
"@com_github_twmb_franz_go_plugin_kzap//:kzap",
],
)

go_test(
name = "kafka_test",
size = "small",
srcs = ["client_franz_test.go"],
embed = [":kafka"],
deps = [
"//src/go/rpk/pkg/config",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//require",
],
)
16 changes: 11 additions & 5 deletions src/go/rpk/pkg/kafka/client_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,28 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
Token: a.AuthToken,
}).AsMechanism()))
} else {
a := scram.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}
switch name := strings.ToUpper(k.SASL.Mechanism); name {
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha256Mechanism()))
case "SCRAM-SHA-512":
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha512Mechanism()))
case "PLAIN":
opts = append(opts, kgo.SASL((&plain.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}).AsMechanism()))
case "OAUTHBEARER":
token := adminapi.OAuthBearerToken(k.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)")
}
opts = append(opts, kgo.SASL((koauth.Auth{
Token: token,
}).AsMechanism()))
default:
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN]", name)
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, OAUTHBEARER]", name)
}
}
}
Expand Down
Loading
Loading