Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a1a4bd2
rpk: add OAUTHBEARER SASL mechanism support
david-yu Apr 15, 2026
4b3f789
rpk: add unit tests for OAUTHBEARER SASL support
david-yu Apr 15, 2026
8d87e83
rpk: include PLAIN in profile doc mechanism list
david-yu Apr 15, 2026
03d7388
rpk: update -X help text to list all SASL mechanisms
david-yu Apr 15, 2026
2d5f797
rpk: add NewFranzClient SASL error path tests
david-yu Apr 15, 2026
d8ac846
rpk: add copyright headers to new test files
david-yu Apr 15, 2026
319275c
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 16, 2026
bcf5806
ci: empty commit to retrigger build
david-yu Apr 16, 2026
eeab01c
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 16, 2026
7dd8eb0
rpk: address OAUTHBEARER PR review feedback
david-yu Apr 17, 2026
33b24f1
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 17, 2026
c4e9f70
rpk: reject OAUTHBEARER up front in remote debug bundle
david-yu Apr 17, 2026
f47f4c1
rpk: add OAUTHBEARER tests and ordering comment for SR client
david-yu Apr 17, 2026
2fa9579
rpk: reference follow-up issue in remote debug bundle guard
david-yu Apr 17, 2026
2e13ac5
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 20, 2026
4dcd842
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 20, 2026
ce7880a
rpk: regenerate BUILD files for OAUTHBEARER test additions
david-yu Apr 21, 2026
7a003d6
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 21, 2026
9a73bbb
rpk: fix $HOME-unset failures in OAUTHBEARER tests
david-yu Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/go/rpk/pkg/adminapi/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ const (
ScramSha256 = "SCRAM-SHA-256"
ScramSha512 = "SCRAM-SHA-512"
CloudOIDC = "CLOUD-OIDC"
OAuthBearer = "OAUTHBEARER"
)

// GetAuth gets the rpadmin.Auth from the rpk profile.
func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
switch {
case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, OAuthBearer):
token := oauthBearerToken(p.KafkaAPI.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --sasl-password")
Comment thread
david-yu marked this conversation as resolved.
Outdated
}
return &rpadmin.BearerToken{Token: token}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism != CloudOIDC:
return &rpadmin.BasicAuth{Username: p.KafkaAPI.SASL.User, Password: p.KafkaAPI.SASL.Password}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == CloudOIDC:
Expand Down Expand Up @@ -72,6 +79,15 @@ func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
}
}

// oauthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func oauthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}
Comment thread
david-yu marked this conversation as resolved.

// NewClient returns an rpadmin.AdminAPI client that talks to each of the
// addresses in the rpk.admin_api section of the config.
func NewClient(ctx context.Context, fs afero.Fs, p *config.RpkProfile, opts ...rpadmin.Opt) (*rpadmin.AdminAPI, error) {
Expand Down
143 changes: 143 additions & 0 deletions src/go/rpk/pkg/adminapi/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package adminapi

import (
"testing"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/stretchr/testify/require"
)

func TestGetAuth(t *testing.T) {
tests := []struct {
name string
profile *config.RpkProfile
wantTyp rpadmin.Auth
wantErr string
}{
{
name: "no SASL returns NopAuth",
profile: &config.RpkProfile{},
wantTyp: &rpadmin.NopAuth{},
},
{
name: "SCRAM-SHA-256 returns BasicAuth",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
User: "admin",
Password: "secret",
Mechanism: "SCRAM-SHA-256",
},
},
},
wantTyp: &rpadmin.BasicAuth{Username: "admin", Password: "secret"},
},
{
name: "OAUTHBEARER with token prefix returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with raw token returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER case-insensitive",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "oauthbearer",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with empty password errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
{
name: "OAUTHBEARER with token: prefix only errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:",
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetAuth(tt.profile)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
return
}
require.NoError(t, err)
require.IsType(t, tt.wantTyp, got)
require.Equal(t, tt.wantTyp, got)
})
}
}

func Test_oauthBearerToken(t *testing.T) {
tests := []struct {
name string
password string
want string
}{
{
name: "token prefix stripped",
password: "token:eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "raw token returned as-is",
password: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "empty password returns empty",
password: "",
want: "",
},
{
name: "token prefix only returns empty",
password: "token:",
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := oauthBearerToken(tt.password)
require.Equal(t, tt.want, got)
})
}
}
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func (p *Params) InstallSASLFlags(cmd *cobra.Command) {

pf.StringVar(&p.user, FlagSASLUser, "", "SASL user to be used for authentication")
pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)")

pf.MarkHidden(FlagSASLUser)
pf.MarkHidden("password")
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/profile_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var profileFieldDocs = map[string]string{
"kafka_api.sasl": "SASL authentication configuration",
"kafka_api.sasl.user": "username for authentication",
"kafka_api.sasl.password": "password for authentication",
"kafka_api.sasl.mechanism": "SCRAM-SHA-256 or SCRAM-SHA-512",
"kafka_api.sasl.mechanism": "SCRAM-SHA-256, SCRAM-SHA-512, or OAUTHBEARER",
Comment thread
david-yu marked this conversation as resolved.
Outdated

"admin_api": "Admin API connection configuration",
"admin_api.addresses": "Comma-separated list of Admin API addresses (host:port)",
Expand Down
25 changes: 20 additions & 5 deletions src/go/rpk/pkg/kafka/client_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,28 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
Token: a.AuthToken,
}).AsMechanism()))
} else {
a := scram.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}
switch name := strings.ToUpper(k.SASL.Mechanism); name {
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha256Mechanism()))
case "SCRAM-SHA-512":
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha512Mechanism()))
case "PLAIN":
opts = append(opts, kgo.SASL((&plain.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}).AsMechanism()))
case "OAUTHBEARER":
token := oauthBearerToken(k.SASL.Password)
if token == "" {
return nil, fmt.Errorf("OAUTHBEARER requires a token passed via --sasl-password")
Comment thread
david-yu marked this conversation as resolved.
Outdated
}
opts = append(opts, kgo.SASL((koauth.Auth{
Token: token,
}).AsMechanism()))
default:
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN]", name)
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, OAUTHBEARER]", name)
}
}
}
Expand All @@ -154,6 +160,15 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
return kgo.NewClient(opts...)
}

// oauthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func oauthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}

// NewAdmin returns a franz-go admin client.
func NewAdmin(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*kadm.Client, error) {
cl, err := NewFranzClient(fs, p, extraOpts...)
Expand Down
45 changes: 45 additions & 0 deletions src/go/rpk/pkg/kafka/client_franz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kafka

import "testing"

func Test_oauthBearerToken(t *testing.T) {
Comment thread
david-yu marked this conversation as resolved.
Outdated
tests := []struct {
name string
password string
want string
}{
{
name: "token prefix stripped",
password: "token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test",
want: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test",
},
{
name: "raw token returned as-is",
password: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test",
want: "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test",
},
{
name: "empty password returns empty",
password: "",
want: "",
},
{
name: "token prefix only returns empty",
password: "token:",
want: "",
},
{
name: "token prefix is case-sensitive",
password: "Token:abc",
want: "Token:abc",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := oauthBearerToken(tt.password)
if got != tt.want {
t.Errorf("oauthBearerToken(%q) = %q, want %q", tt.password, got, tt.want)
}
})
}
}
15 changes: 15 additions & 0 deletions src/go/rpk/pkg/schemaregistry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func NewClient(fs afero.Fs, p *config.RpkProfile) (*rpsr.Client, error) {
}

switch {
case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, adminapi.OAuthBearer):
token := oauthBearerToken(p.KafkaAPI.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --sasl-password")
Comment thread
david-yu marked this conversation as resolved.
Outdated
}
opts = append(opts, sr.BearerToken(token))
case p.HasSASLCredentials() && p.KafkaAPI.SASL.Mechanism != adminapi.CloudOIDC:
opts = append(opts, sr.BasicAuth(p.KafkaAPI.SASL.User, p.KafkaAPI.SASL.Password))
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == adminapi.CloudOIDC:
Expand Down Expand Up @@ -93,6 +99,15 @@ func NewClient(fs afero.Fs, p *config.RpkProfile) (*rpsr.Client, error) {
return rpsr.NewClient(srCl)
}

// oauthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func oauthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}

// IsSoftDeleteError checks whether the error is a SoftDeleteError. This error
// occurs when attempting to soft-delete a schema that was already marked as
// soft deleted.
Expand Down
Loading