Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a1a4bd2
rpk: add OAUTHBEARER SASL mechanism support
david-yu Apr 15, 2026
4b3f789
rpk: add unit tests for OAUTHBEARER SASL support
david-yu Apr 15, 2026
8d87e83
rpk: include PLAIN in profile doc mechanism list
david-yu Apr 15, 2026
03d7388
rpk: update -X help text to list all SASL mechanisms
david-yu Apr 15, 2026
2d5f797
rpk: add NewFranzClient SASL error path tests
david-yu Apr 15, 2026
d8ac846
rpk: add copyright headers to new test files
david-yu Apr 15, 2026
319275c
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 16, 2026
bcf5806
ci: empty commit to retrigger build
david-yu Apr 16, 2026
eeab01c
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 16, 2026
7dd8eb0
rpk: address OAUTHBEARER PR review feedback
david-yu Apr 17, 2026
33b24f1
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 17, 2026
c4e9f70
rpk: reject OAUTHBEARER up front in remote debug bundle
david-yu Apr 17, 2026
f47f4c1
rpk: add OAUTHBEARER tests and ordering comment for SR client
david-yu Apr 17, 2026
2fa9579
rpk: reference follow-up issue in remote debug bundle guard
david-yu Apr 17, 2026
2e13ac5
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 20, 2026
4dcd842
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 20, 2026
ce7880a
rpk: regenerate BUILD files for OAUTHBEARER test additions
david-yu Apr 21, 2026
7a003d6
Merge branch 'dev' into rpk-oauthbearer-support
david-yu Apr 21, 2026
9a73bbb
rpk: fix $HOME-unset failures in OAUTHBEARER tests
david-yu Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/go/rpk/pkg/adminapi/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ go_library(
go_test(
name = "adminapi_test",
size = "small",
srcs = ["admin_test.go"],
srcs = [
"admin_test.go",
"auth_test.go",
],
embed = [":adminapi"],
env = {"HOME": "/"},
deps = [
"//src/go/rpk/pkg/config",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//require",
],
Expand Down
16 changes: 16 additions & 0 deletions src/go/rpk/pkg/adminapi/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ const (
ScramSha256 = "SCRAM-SHA-256"
ScramSha512 = "SCRAM-SHA-512"
CloudOIDC = "CLOUD-OIDC"
OAuthBearer = "OAUTHBEARER"
)

// GetAuth gets the rpadmin.Auth from the rpk profile.
func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
switch {
case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, OAuthBearer):
token := OAuthBearerToken(p.KafkaAPI.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)")
}
return &rpadmin.BearerToken{Token: token}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism != CloudOIDC:
return &rpadmin.BasicAuth{Username: p.KafkaAPI.SASL.User, Password: p.KafkaAPI.SASL.Password}, nil
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == CloudOIDC:
Expand Down Expand Up @@ -72,6 +79,15 @@ func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
}
}

// OAuthBearerToken extracts the bearer token from the SASL password field.
// It accepts both "token:<TOKEN>" format and a raw token string.
func OAuthBearerToken(password string) string {
if t, ok := strings.CutPrefix(password, "token:"); ok {
return t
}
return password
}
Comment thread
david-yu marked this conversation as resolved.

// NewClient returns an rpadmin.AdminAPI client that talks to each of the
// addresses in the rpk.admin_api section of the config.
func NewClient(ctx context.Context, fs afero.Fs, p *config.RpkProfile, opts ...rpadmin.Opt) (*rpadmin.AdminAPI, error) {
Expand Down
157 changes: 157 additions & 0 deletions src/go/rpk/pkg/adminapi/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package adminapi

import (
"testing"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/stretchr/testify/require"
)

func TestGetAuth(t *testing.T) {
tests := []struct {
name string
profile *config.RpkProfile
wantTyp rpadmin.Auth
wantErr string
}{
{
name: "no SASL returns NopAuth",
profile: &config.RpkProfile{},
wantTyp: &rpadmin.NopAuth{},
},
{
name: "SCRAM-SHA-256 returns BasicAuth",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
User: "admin",
Password: "secret",
Mechanism: "SCRAM-SHA-256",
},
},
},
wantTyp: &rpadmin.BasicAuth{Username: "admin", Password: "secret"},
},
{
name: "OAUTHBEARER with token prefix returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with raw token returns BearerToken",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "OAUTHBEARER",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER case-insensitive",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "my-jwt-token",
Mechanism: "oauthbearer",
},
},
},
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
},
{
name: "OAUTHBEARER with empty password errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
{
name: "OAUTHBEARER with token: prefix only errors",
profile: &config.RpkProfile{
KafkaAPI: config.RpkKafkaAPI{
SASL: &config.SASL{
Password: "token:",
Mechanism: "OAUTHBEARER",
},
},
},
wantErr: "OAUTHBEARER requires a token",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetAuth(tt.profile)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
return
}
require.NoError(t, err)
require.IsType(t, tt.wantTyp, got)
require.Equal(t, tt.wantTyp, got)
})
}
}

func TestOAuthBearerToken(t *testing.T) {
tests := []struct {
name string
password string
want string
}{
{
name: "token prefix stripped",
password: "token:eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "raw token returned as-is",
password: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
},
{
name: "empty password returns empty",
password: "",
want: "",
},
{
name: "token prefix only returns empty",
password: "token:",
want: "",
},
{
name: "token prefix is case-sensitive",
password: "Token:abc",
want: "Token:abc",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := OAuthBearerToken(tt.password)
require.Equal(t, tt.want, got)
})
}
}
10 changes: 10 additions & 0 deletions src/go/rpk/pkg/cli/debug/remotebundle/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ Use the flag '--no-confirm' to avoid the confirmation prompt.
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(p)

// Remote debug bundle forwards SASL credentials to the broker so
// rpk running on the broker can authenticate to Kafka. The broker
// API currently only accepts a SCRAM-shaped payload; until that
// gains OAUTHBEARER support (tracked in redpanda#30222), reject
// up front rather than silently sending the request with no auth
// (which then fails confusingly on secured clusters).
if p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, adminapi.OAuthBearer) {
out.Die("OAUTHBEARER is not yet supported for remote debug bundle collection; use SCRAM-SHA-256 or SCRAM-SHA-512 credentials to run this command")
}

if !noConfirm {
printBrokers(p.AdminAPI.Addresses)
confirmed, err := out.Confirm("Confirm debug bundle collection from these brokers?")
Expand Down
14 changes: 8 additions & 6 deletions src/go/rpk/pkg/config/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,12 @@ tls.key=/path/to/key.pem
API listeners with mTLS.

sasl.mechanism=SCRAM-SHA-256
The SASL mechanism to use for authentication. This can be either SCRAM-SHA-256
or SCRAM-SHA-512. Note that with Redpanda, the Admin API can be configured to
require basic authentication with your Kafka API SASL credentials. This
defaults to SCRAM-SHA-256 if no mechanism is specified.
The SASL mechanism to use for authentication. This can be SCRAM-SHA-256,
SCRAM-SHA-512, PLAIN, or OAUTHBEARER. For OAUTHBEARER, pass the token via
the pass field (optionally prefixed with "token:"). Note that with Redpanda,
the Admin API can be configured to require basic authentication with your
Kafka API SASL credentials. This defaults to SCRAM-SHA-256 if no mechanism
is specified.

user=username
The SASL username to use for authentication. This is also used for the admin
Expand Down Expand Up @@ -815,7 +817,7 @@ tls.insecure_skip_verify=boolean
tls.ca=/path/to/ca.pem
tls.cert=/path/to/cert.pem
tls.key=/path/to/key.pem
sasl.mechanism=SCRAM-SHA-256 or SCRAM-SHA-512
sasl.mechanism=SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER
user=username
pass=password
admin.hosts=comma,delimited,host:ports
Expand Down Expand Up @@ -871,7 +873,7 @@ func (p *Params) InstallSASLFlags(cmd *cobra.Command) {

pf.StringVar(&p.user, FlagSASLUser, "", "SASL user to be used for authentication")
pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)")
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)")

pf.MarkHidden(FlagSASLUser)
pf.MarkHidden("password")
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/config/profile_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var profileFieldDocs = map[string]string{
"kafka_api.sasl": "SASL authentication configuration",
"kafka_api.sasl.user": "username for authentication",
"kafka_api.sasl.password": "password for authentication",
"kafka_api.sasl.mechanism": "SCRAM-SHA-256 or SCRAM-SHA-512",
"kafka_api.sasl.mechanism": "SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER",

"admin_api": "Admin API connection configuration",
"admin_api.addresses": "Comma-separated list of Admin API addresses (host:port)",
Expand Down
14 changes: 13 additions & 1 deletion src/go/rpk/pkg/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@rules_go//go:def.bzl", "go_library")
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kafka",
Expand All @@ -21,3 +21,15 @@ go_library(
"@com_github_twmb_franz_go_plugin_kzap//:kzap",
],
)

go_test(
name = "kafka_test",
size = "small",
srcs = ["client_franz_test.go"],
embed = [":kafka"],
deps = [
"//src/go/rpk/pkg/config",
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//require",
],
)
16 changes: 11 additions & 5 deletions src/go/rpk/pkg/kafka/client_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,28 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
Token: a.AuthToken,
}).AsMechanism()))
} else {
a := scram.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}
switch name := strings.ToUpper(k.SASL.Mechanism); name {
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha256Mechanism()))
case "SCRAM-SHA-512":
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
opts = append(opts, kgo.SASL(a.AsSha512Mechanism()))
case "PLAIN":
opts = append(opts, kgo.SASL((&plain.Auth{
User: k.SASL.User,
Pass: k.SASL.Password,
}).AsMechanism()))
case "OAUTHBEARER":
token := adminapi.OAuthBearerToken(k.SASL.Password)
if token == "" {
return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)")
}
opts = append(opts, kgo.SASL((koauth.Auth{
Token: token,
}).AsMechanism()))
default:
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN]", name)
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, OAUTHBEARER]", name)
}
}
}
Expand Down
Loading
Loading