Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a1a4bd2
rpk: add OAUTHBEARER SASL mechanism support
david-yu Apr 15, 2026
4b3f789
rpk: add unit tests for OAUTHBEARER SASL support
david-yu Apr 15, 2026
8d87e83
rpk: include PLAIN in profile doc mechanism list
david-yu Apr 15, 2026
03d7388
rpk: update -X help text to list all SASL mechanisms
david-yu Apr 15, 2026
2d5f797
rpk: add NewFranzClient SASL error path tests
david-yu Apr 15, 2026
d8ac846
rpk: add copyright headers to new test files
david-yu Apr 15, 2026
319275c
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 16, 2026
bcf5806
ci: empty commit to retrigger build
david-yu Apr 16, 2026
eeab01c
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 16, 2026
7dd8eb0
rpk: address OAUTHBEARER PR review feedback
david-yu Apr 17, 2026
33b24f1
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 17, 2026
c4e9f70
rpk: reject OAUTHBEARER up front in remote debug bundle
david-yu Apr 17, 2026
f47f4c1
rpk: add OAUTHBEARER tests and ordering comment for SR client
david-yu Apr 17, 2026
2fa9579
rpk: reference follow-up issue in remote debug bundle guard
david-yu Apr 17, 2026
2e13ac5
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 20, 2026
4dcd842
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 20, 2026
ce7880a
rpk: regenerate BUILD files for OAUTHBEARER test additions
david-yu Apr 21, 2026
7a003d6
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 21, 2026
9a73bbb
rpk: fix $HOME-unset failures in OAUTHBEARER tests
david-yu Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/go/rpk/pkg/adminapi/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ const (
ScramSha256 = "SCRAM-SHA-256"
ScramSha512 = "SCRAM-SHA-512"
CloudOIDC = "CLOUD-OIDC"
OAuthBearer = "OAUTHBEARER"
)

// GetAuth gets the rpadmin.Auth from the rpk profile.
func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
switch {
case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, OAuthBearer):
token := oauthBearerToken(p.KafkaAPI.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --sasl-password")
Comment thread
david-yu marked this conversation as resolved.
Outdated
}
return &rpadmin.BearerToken{Token: token}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism != CloudOIDC:
return &rpadmin.BasicAuth{Username: p.KafkaAPI.SASL.User, Password: p.KafkaAPI.SASL.Password}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == CloudOIDC:
Expand Down Expand Up @@ -72,6 +79,15 @@ func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
}
}

// oauthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func oauthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}
Comment thread
david-yu marked this conversation as resolved.

// NewClient returns an rpadmin.AdminAPI client that talks to each of the
// addresses in the rpk.admin_api section of the config.
func NewClient(ctx context.Context, fs afero.Fs, p *config.RpkProfile, opts ...rpadmin.Opt) (*rpadmin.AdminAPI, error) {
Expand Down
152 changes: 152 additions & 0 deletions src/go/rpk/pkg/adminapi/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package adminapi

import (
"testing"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/stretchr/testify/require"
)

func TestGetAuth(t *testing.T) {
tests := []struct {
name string
profile *config.RpkProfile
wantTyp rpadmin.Auth
wantErr string
}{
{
name: "no SASL returns NopAuth",
profile: &config.RpkProfile{},
wantTyp: &rpadmin.NopAuth{},
},
{
name: "SCRAM-SHA-256 returns BasicAuth",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
User: "admin",
Password: "secret",
Mechanism: "SCRAM-SHA-256",
},
},
},
wantTyp: &rpadmin.BasicAuth{Username: "admin", Password: "secret"},
},
{
name: "OAUTHBEARER with token prefix returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with raw token returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER case-insensitive",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "oauthbearer",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with empty password errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
{
name: "OAUTHBEARER with token: prefix only errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:",
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetAuth(tt.profile)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
return
}
require.NoError(t, err)
require.IsType(t, tt.wantTyp, got)
require.Equal(t, tt.wantTyp, got)
})
}
}

func Test_oauthBearerToken(t *testing.T) {
tests := []struct {
name string
password string
want string
}{
{
name: "token prefix stripped",
password: "token:eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "raw token returned as-is",
password: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "empty password returns empty",
password: "",
want: "",
},
{
name: "token prefix only returns empty",
password: "token:",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := oauthBearerToken(tt.password)
require.Equal(t, tt.want, got)
})
}
}
14 changes: 8 additions & 6 deletions src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,12 @@ tls.key=/path/to/key.pem
API listeners with mTLS.

sasl.mechanism=SCRAM-SHA-256
The SASL mechanism to use for authentication. This can be either SCRAM-SHA-256
or SCRAM-SHA-512. Note that with Redpanda, the Admin API can be configured to
require basic authentication with your Kafka API SASL credentials. This
defaults to SCRAM-SHA-256 if no mechanism is specified.
The SASL mechanism to use for authentication. This can be SCRAM-SHA-256,
SCRAM-SHA-512, PLAIN, or OAUTHBEARER. For OAUTHBEARER, pass the token via
the pass field (optionally prefixed with "token:"). Note that with Redpanda,
the Admin API can be configured to require basic authentication with your
Kafka API SASL credentials. This defaults to SCRAM-SHA-256 if no mechanism
is specified.

user=username
The SASL username to use for authentication. This is also used for the admin
Expand Down Expand Up @@ -815,7 +817,7 @@ tls.insecure_skip_verify=boolean
tls.ca=/path/to/ca.pem
tls.cert=/path/to/cert.pem
tls.key=/path/to/key.pem
sasl.mechanism=SCRAM-SHA-256 or SCRAM-SHA-512
sasl.mechanism=SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER
user=username
pass=password
admin.hosts=comma,delimited,host:ports
Expand Down Expand Up @@ -871,7 +873,7 @@ func (p *Params) InstallSASLFlags(cmd *cobra.Command) {

pf.StringVar(&p.user, FlagSASLUser, "", "SASL user to be used for authentication")
pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)")

pf.MarkHidden(FlagSASLUser)
pf.MarkHidden("password")
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/profile_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var profileFieldDocs = map[string]string{
"kafka_api.sasl": "SASL authentication configuration",
"kafka_api.sasl.user": "username for authentication",
"kafka_api.sasl.password": "password for authentication",
"kafka_api.sasl.mechanism": "SCRAM-SHA-256 or SCRAM-SHA-512",
"kafka_api.sasl.mechanism": "SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER",

"admin_api": "Admin API connection configuration",
"admin_api.addresses": "Comma-separated list of Admin API addresses (host:port)",
Expand Down
25 changes: 20 additions & 5 deletions src/go/rpk/pkg/kafka/client_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,28 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
Token: a.AuthToken,
}).AsMechanism()))
} else {
a := scram.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}
switch name := strings.ToUpper(k.SASL.Mechanism); name {
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha256Mechanism()))
case "SCRAM-SHA-512":
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha512Mechanism()))
case "PLAIN":
opts = append(opts, kgo.SASL((&plain.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}).AsMechanism()))
case "OAUTHBEARER":
token := oauthBearerToken(k.SASL.Password)
if token == "" {
return nil, fmt.Errorf("OAUTHBEARER requires a token passed via --sasl-password")
Comment thread
david-yu marked this conversation as resolved.
Outdated
}
opts = append(opts, kgo.SASL((koauth.Auth{
Token: token,
}).AsMechanism()))
default:
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN]", name)
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, OAUTHBEARER]", name)
}
}
}
Expand All @@ -154,6 +160,15 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
return kgo.NewClient(opts...)
}

// oauthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func oauthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}

// NewAdmin returns a franz-go admin client.
func NewAdmin(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*kadm.Client, error) {
cl, err := NewFranzClient(fs, p, extraOpts...)
Expand Down
Loading
Loading