Skip to content

Commit b4e4d78

Browse files
committed
rpk: add OAUTHBEARER SASL mechanism support
Add OAUTHBEARER as a supported SASL mechanism in rpk, alongside the existing SCRAM-SHA-256 and SCRAM-SHA-512 mechanisms. - toSASLConfig and NewFranzClient now dispatch on OAUTHBEARER to set up kgo.SASL with the bearer token from the profile's sasl.password - toRpadminOptions similarly calls WithOAuthBearerAuthentication for the admin client - OAUTHBEARER is rejected early in remote debug bundle (follow-up issue referenced in the guard comment) - Update -X help text and profile docs to list all SASL mechanisms including PLAIN and OAUTHBEARER - Add unit tests for the SASL dispatch paths in adminapi, franz client, and schema registry client; fix $HOME-unset failures in those tests - Regenerate BUILD files for new test files (cherry picked from commits in PR #30169)
1 parent 40fbabe commit b4e4d78

11 files changed

Lines changed: 463 additions & 14 deletions

File tree

src/go/rpk/pkg/adminapi/BUILD

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ go_library(
2020
go_test(
2121
name = "adminapi_test",
2222
size = "small",
23-
srcs = ["admin_test.go"],
23+
srcs = [
24+
"admin_test.go",
25+
"auth_test.go",
26+
],
2427
embed = [":adminapi"],
2528
env = {"HOME": "/"},
2629
deps = [
2730
"//src/go/rpk/pkg/config",
31+
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
2832
"@com_github_spf13_afero//:afero",
2933
"@com_github_stretchr_testify//require",
3034
],

src/go/rpk/pkg/adminapi/admin.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,18 @@ const (
4040
ScramSha256 = "SCRAM-SHA-256"
4141
ScramSha512 = "SCRAM-SHA-512"
4242
CloudOIDC = "CLOUD-OIDC"
43+
OAuthBearer = "OAUTHBEARER"
4344
)
4445

4546
// GetAuth gets the rpadmin.Auth from the rpk profile.
4647
func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
4748
switch {
49+
case p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, OAuthBearer):
50+
token := OAuthBearerToken(p.KafkaAPI.SASL.Password)
51+
if token == "" {
52+
return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)")
53+
}
54+
return &rpadmin.BearerToken{Token: token}, nil
4855
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism != CloudOIDC:
4956
return &rpadmin.BasicAuth{Username: p.KafkaAPI.SASL.User, Password: p.KafkaAPI.SASL.Password}, nil
5057
case p.KafkaAPI.SASL != nil && p.KafkaAPI.SASL.Mechanism == CloudOIDC:
@@ -72,6 +79,15 @@ func GetAuth(p *config.RpkProfile) (rpadmin.Auth, error) {
7279
}
7380
}
7481

82+
// OAuthBearerToken extracts the bearer token from the SASL password field.
83+
// It accepts both "token:<TOKEN>" format and a raw token string.
84+
func OAuthBearerToken(password string) string {
85+
if t, ok := strings.CutPrefix(password, "token:"); ok {
86+
return t
87+
}
88+
return password
89+
}
90+
7591
// NewClient returns an rpadmin.AdminAPI client that talks to each of the
7692
// addresses in the rpk.admin_api section of the config.
7793
func NewClient(ctx context.Context, fs afero.Fs, p *config.RpkProfile, opts ...rpadmin.Opt) (*rpadmin.AdminAPI, error) {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2026 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
package adminapi
11+
12+
import (
13+
"testing"
14+
15+
"github.com/redpanda-data/common-go/rpadmin"
16+
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
17+
"github.com/stretchr/testify/require"
18+
)
19+
20+
func TestGetAuth(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
profile *config.RpkProfile
24+
wantTyp rpadmin.Auth
25+
wantErr string
26+
}{
27+
{
28+
name: "no SASL returns NopAuth",
29+
profile: &config.RpkProfile{},
30+
wantTyp: &rpadmin.NopAuth{},
31+
},
32+
{
33+
name: "SCRAM-SHA-256 returns BasicAuth",
34+
profile: &config.RpkProfile{
35+
KafkaAPI: config.RpkKafkaAPI{
36+
SASL: &config.SASL{
37+
User: "admin",
38+
Password: "secret",
39+
Mechanism: "SCRAM-SHA-256",
40+
},
41+
},
42+
},
43+
wantTyp: &rpadmin.BasicAuth{Username: "admin", Password: "secret"},
44+
},
45+
{
46+
name: "OAUTHBEARER with token prefix returns BearerToken",
47+
profile: &config.RpkProfile{
48+
KafkaAPI: config.RpkKafkaAPI{
49+
SASL: &config.SASL{
50+
Password: "token:my-jwt-token",
51+
Mechanism: "OAUTHBEARER",
52+
},
53+
},
54+
},
55+
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
56+
},
57+
{
58+
name: "OAUTHBEARER with raw token returns BearerToken",
59+
profile: &config.RpkProfile{
60+
KafkaAPI: config.RpkKafkaAPI{
61+
SASL: &config.SASL{
62+
Password: "my-jwt-token",
63+
Mechanism: "OAUTHBEARER",
64+
},
65+
},
66+
},
67+
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
68+
},
69+
{
70+
name: "OAUTHBEARER case-insensitive",
71+
profile: &config.RpkProfile{
72+
KafkaAPI: config.RpkKafkaAPI{
73+
SASL: &config.SASL{
74+
Password: "my-jwt-token",
75+
Mechanism: "oauthbearer",
76+
},
77+
},
78+
},
79+
wantTyp: &rpadmin.BearerToken{Token: "my-jwt-token"},
80+
},
81+
{
82+
name: "OAUTHBEARER with empty password errors",
83+
profile: &config.RpkProfile{
84+
KafkaAPI: config.RpkKafkaAPI{
85+
SASL: &config.SASL{
86+
Mechanism: "OAUTHBEARER",
87+
},
88+
},
89+
},
90+
wantErr: "OAUTHBEARER requires a token",
91+
},
92+
{
93+
name: "OAUTHBEARER with token: prefix only errors",
94+
profile: &config.RpkProfile{
95+
KafkaAPI: config.RpkKafkaAPI{
96+
SASL: &config.SASL{
97+
Password: "token:",
98+
Mechanism: "OAUTHBEARER",
99+
},
100+
},
101+
},
102+
wantErr: "OAUTHBEARER requires a token",
103+
},
104+
}
105+
for _, tt := range tests {
106+
t.Run(tt.name, func(t *testing.T) {
107+
got, err := GetAuth(tt.profile)
108+
if tt.wantErr != "" {
109+
require.ErrorContains(t, err, tt.wantErr)
110+
return
111+
}
112+
require.NoError(t, err)
113+
require.IsType(t, tt.wantTyp, got)
114+
require.Equal(t, tt.wantTyp, got)
115+
})
116+
}
117+
}
118+
119+
func TestOAuthBearerToken(t *testing.T) {
120+
tests := []struct {
121+
name string
122+
password string
123+
want string
124+
}{
125+
{
126+
name: "token prefix stripped",
127+
password: "token:eyJhbGciOiJSUzI1NiJ9.payload.sig",
128+
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
129+
},
130+
{
131+
name: "raw token returned as-is",
132+
password: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
133+
want: "eyJhbGciOiJSUzI1NiJ9.payload.sig",
134+
},
135+
{
136+
name: "empty password returns empty",
137+
password: "",
138+
want: "",
139+
},
140+
{
141+
name: "token prefix only returns empty",
142+
password: "token:",
143+
want: "",
144+
},
145+
{
146+
name: "token prefix is case-sensitive",
147+
password: "Token:abc",
148+
want: "Token:abc",
149+
},
150+
}
151+
for _, tt := range tests {
152+
t.Run(tt.name, func(t *testing.T) {
153+
got := OAuthBearerToken(tt.password)
154+
require.Equal(t, tt.want, got)
155+
})
156+
}
157+
}

src/go/rpk/pkg/cli/debug/remotebundle/start.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ Use the flag '--no-confirm' to avoid the confirmation prompt.
7878
out.MaybeDie(err, "rpk unable to load config: %v", err)
7979
config.CheckExitCloudAdmin(p)
8080

81+
// Remote debug bundle forwards SASL credentials to the broker so
82+
// rpk running on the broker can authenticate to Kafka. The broker
83+
// API currently only accepts a SCRAM-shaped payload; until that
84+
// gains OAUTHBEARER support (tracked in redpanda#30222), reject
85+
// up front rather than silently sending the request with no auth
86+
// (which then fails confusingly on secured clusters).
87+
if p.KafkaAPI.SASL != nil && strings.EqualFold(p.KafkaAPI.SASL.Mechanism, adminapi.OAuthBearer) {
88+
out.Die("OAUTHBEARER is not yet supported for remote debug bundle collection; use SCRAM-SHA-256 or SCRAM-SHA-512 credentials to run this command")
89+
}
90+
8191
if !noConfirm {
8292
printBrokers(p.AdminAPI.Addresses)
8393
confirmed, err := out.Confirm("Confirm debug bundle collection from these brokers?")

src/go/rpk/pkg/config/params.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -675,10 +675,12 @@ tls.key=/path/to/key.pem
675675
API listeners with mTLS.
676676
677677
sasl.mechanism=SCRAM-SHA-256
678-
The SASL mechanism to use for authentication. This can be either SCRAM-SHA-256
679-
or SCRAM-SHA-512. Note that with Redpanda, the Admin API can be configured to
680-
require basic authentication with your Kafka API SASL credentials. This
681-
defaults to SCRAM-SHA-256 if no mechanism is specified.
678+
The SASL mechanism to use for authentication. This can be SCRAM-SHA-256,
679+
SCRAM-SHA-512, PLAIN, or OAUTHBEARER. For OAUTHBEARER, pass the token via
680+
the pass field (optionally prefixed with "token:"). Note that with Redpanda,
681+
the Admin API can be configured to require basic authentication with your
682+
Kafka API SASL credentials. This defaults to SCRAM-SHA-256 if no mechanism
683+
is specified.
682684
683685
user=username
684686
The SASL username to use for authentication. This is also used for the admin
@@ -801,7 +803,7 @@ tls.insecure_skip_verify=boolean
801803
tls.ca=/path/to/ca.pem
802804
tls.cert=/path/to/cert.pem
803805
tls.key=/path/to/key.pem
804-
sasl.mechanism=SCRAM-SHA-256 or SCRAM-SHA-512
806+
sasl.mechanism=SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, or OAUTHBEARER
805807
user=username
806808
pass=password
807809
admin.hosts=comma,delimited,host:ports
@@ -857,7 +859,7 @@ func (p *Params) InstallSASLFlags(cmd *cobra.Command) {
857859

858860
pf.StringVar(&p.user, FlagSASLUser, "", "SASL user to be used for authentication")
859861
pf.StringVar(&p.password, "password", "", "SASL password to be used for authentication")
860-
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512)")
862+
pf.StringVar(&p.saslMechanism, "sasl-mechanism", "", "The authentication mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)")
861863

862864
pf.MarkHidden(FlagSASLUser)
863865
pf.MarkHidden("password")

src/go/rpk/pkg/kafka/BUILD

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@rules_go//go:def.bzl", "go_library")
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "kafka",
@@ -21,3 +21,15 @@ go_library(
2121
"@com_github_twmb_franz_go_plugin_kzap//:kzap",
2222
],
2323
)
24+
25+
go_test(
26+
name = "kafka_test",
27+
size = "small",
28+
srcs = ["client_franz_test.go"],
29+
embed = [":kafka"],
30+
deps = [
31+
"//src/go/rpk/pkg/config",
32+
"@com_github_spf13_afero//:afero",
33+
"@com_github_stretchr_testify//require",
34+
],
35+
)

src/go/rpk/pkg/kafka/client_franz.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,28 @@ func NewFranzClient(fs afero.Fs, p *config.RpkProfile, extraOpts ...kgo.Opt) (*k
121121
Token: a.AuthToken,
122122
}).AsMechanism()))
123123
} else {
124-
a := scram.Auth{
125-
User: k.SASL.User,
126-
Pass: k.SASL.Password,
127-
}
128124
switch name := strings.ToUpper(k.SASL.Mechanism); name {
129125
case "SCRAM-SHA-256", "": // we default to SCRAM-SHA-256 -- people commonly specify user & pass without --sasl-mechanism
126+
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
130127
opts = append(opts, kgo.SASL(a.AsSha256Mechanism()))
131128
case "SCRAM-SHA-512":
129+
a := scram.Auth{User: k.SASL.User, Pass: k.SASL.Password}
132130
opts = append(opts, kgo.SASL(a.AsSha512Mechanism()))
133131
case "PLAIN":
134132
opts = append(opts, kgo.SASL((&plain.Auth{
135133
User: k.SASL.User,
136134
Pass: k.SASL.Password,
137135
}).AsMechanism()))
136+
case "OAUTHBEARER":
137+
token := adminapi.OAuthBearerToken(k.SASL.Password)
138+
if token == "" {
139+
return nil, errors.New("OAUTHBEARER requires a token passed via --password (or kafka_api.sasl.password in the profile)")
140+
}
141+
opts = append(opts, kgo.SASL((koauth.Auth{
142+
Token: token,
143+
}).AsMechanism()))
138144
default:
139-
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN]", name)
145+
return nil, fmt.Errorf("unknown SASL mechanism %q, supported: [SCRAM-SHA-256, SCRAM-SHA-512, PLAIN, OAUTHBEARER]", name)
140146
}
141147
}
142148
}

0 commit comments

Comments
 (0)