-
Notifications
You must be signed in to change notification settings - Fork 736
Expand file tree
/
Copy pathstart.go
More file actions
264 lines (243 loc) · 8.72 KB
/
start.go
File metadata and controls
264 lines (243 loc) · 8.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
package remotebundle
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/debug/debugbundle"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/labels"
)
type startResponse struct {
Broker string
JobID string
Error string
}
func newStartCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var (
noConfirm bool
jobID string
opts remoteBundleOptions
wait bool
waitTimeout time.Duration
)
cmd := &cobra.Command{
Use: "start",
Short: "Start a remote debug bundle collection in your cluster",
Long: `Start a remote debug bundle collection in your cluster.
This command starts the debug collection process in a remote cluster that
you configured in flags, environment variables, or your rpk profile.
After starting the debug collection process, you can query the status with
'rpk debug remote-bundle status'. When it completes, you can download it with
'rpk debug remote-bundle download'.
Use the flag '--no-confirm' to avoid the confirmation prompt.
`,
Args: cobra.NoArgs,
PreRun: func(_ *cobra.Command, _ []string) {
clsl, err := units.FromHumanSize(opts.ControllerLogsSizeLimit)
out.MaybeDie(err, "unable to parse --controller-logs-size-limit: %v", err)
opts.controllerLogsSizeLimitBytes = int32(clsl)
lsl, err := units.FromHumanSize(opts.LogsSizeLimit)
out.MaybeDie(err, "unable to parse --logs-size-limit: %v", err)
opts.logsSizeLimitBytes = int32(lsl)
if len(opts.labelSelectorMap) > 0 {
labelsMap, err := labels.ConvertSelectorToLabelsMap(strings.Join(opts.LabelSelector, ","))
out.MaybeDie(err, "unable to parse label-selector flag: %v", err)
opts.labelSelectorMap = labelsMap
}
},
Run: func(cmd *cobra.Command, _ []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(p)
if !noConfirm {
printBrokers(p.AdminAPI.Addresses)
confirmed, err := out.Confirm("Confirm debug bundle collection from these brokers?")
out.MaybeDie(err, "unable to confirm collection: %v; if you want to select a single broker, use -X admin.hosts=<brokers,to,collect>", err)
if !confirmed {
out.Exit("operation canceled; if you want to select a single broker, use -X admin.hosts=<brokers,to,collect>")
}
}
if jobID == "" {
id, err := uuid.NewRandom()
out.MaybeDie(err, "unable to generate a UUID for the Job ID: %v", err)
jobID = id.String()
}
var (
response []startResponse
wg sync.WaitGroup
mu sync.Mutex
anyErr, anyOK bool
alreadyRunning bool
)
updateStatus := func(resp startResponse, err error) {
mu.Lock()
defer mu.Unlock()
anyErr = anyErr || err != nil
anyOK = anyOK || err == nil
alreadyRunning = alreadyRunning || isAlreadyStartedErr(err)
response = append(response, resp)
}
for _, addr := range p.AdminAPI.Addresses {
wg.Add(1)
go func(addr string) {
defer wg.Done()
resp := startResponse{Broker: addr}
client, err := adminapi.NewHostClient(fs, p, addr)
if err != nil {
resp.Error = fmt.Sprintf("unable to connect: %s", tryMessageFromErr(err))
updateStatus(resp, err)
return
}
bundleResp, err := client.CreateDebugBundle(cmd.Context(), jobID, opts.toRpadminOptions(p)...)
if err != nil {
isAlreadyStartedErr(err)
resp.Error = fmt.Sprintf("unable to start debug bundle: %s", tryMessageFromErr(err))
updateStatus(resp, err)
return
}
resp.JobID = bundleResp.JobID
updateStatus(resp, nil)
}(addr)
}
wg.Wait()
headers := []string{"broker", "job-ID"}
if anyErr {
headers = append(headers, "error")
defer os.Exit(1)
}
tw := out.NewTable(headers...)
for _, row := range response {
tw.PrintStructFields(row)
}
tw.Flush()
if alreadyRunning {
fmt.Printf(`
A debug bundle collection is already in process. To cancel it, run:
rpk debug remote-bundle cancel
`)
}
if wait {
fmt.Printf(`
The debug bundle collection process has started with Job-ID %v.
Waiting for collection to complete...
`, jobID)
ctx, cancel := context.WithTimeout(cmd.Context(), waitTimeout)
defer cancel()
statusCR := "\n"
if !noConfirm {
// Only use CRs if we've been launched interactively, so we don't break logging redirection for scripts
statusCR = "\r"
}
var done bool
for !done {
select {
case <-ctx.Done():
// Context canceled or timed out
out.Die("%s", ctx.Err())
case <-time.After(10 * time.Second):
status, _, _, _ := executeBundleStatus(ctx, fs, p)
ready, errorred := filterCompletedBrokers(status)
// Print in all cases, even if we're about to follow with a completion, to ensure the last print of the CR'd line is correct
fmt.Printf("%sJob-ID %v status: %v/%v ready (%v errored)...", statusCR, jobID, len(ready), len(status), len(errorred))
if len(ready)+len(errorred) == len(status) {
if len(ready) == 0 {
out.Die(`the debug bundle collection process with Job-ID %v has completed, but no bundles were successfully created; to check the status, run 'rpk debug remote-bundle status'`, jobID)
}
fmt.Printf(`
The debug bundle collection process with Job-ID %v has completed. To download the bundles, run:
rpk debug remote-bundle download
`, jobID)
done = true
}
}
}
} else if anyOK {
fmt.Printf(`
The debug bundle collection process has started with Job-ID %v. To check the
status, run:
rpk debug remote-bundle status
`, jobID)
}
},
}
f := cmd.Flags()
f.StringVar(&jobID, "job-id", "", "Custom UUID to assign to the job that generates the debug bundle")
f.BoolVar(&noConfirm, "no-confirm", false, "Disable confirmation prompt")
f.BoolVar(&wait, "wait", false, "Wait for completion of remote bundle before returning")
f.DurationVar(&waitTimeout, "wait-timeout", 300*time.Second, "How long to wait locally for remote-bundle completion if --wait is specified. Collection on cluster will continue regardless.")
// Debug bundle options:
opts.InstallFlags(f)
return cmd
}
type remoteBundleOptions struct {
debugbundle.DebugBundleSharedOptions
controllerLogsSizeLimitBytes int32
logsSizeLimitBytes int32
labelSelectorMap map[string]string
}
func (o *remoteBundleOptions) toRpadminOptions(p *config.RpkProfile) []rpadmin.DebugBundleOption {
var opts []rpadmin.DebugBundleOption
if o.controllerLogsSizeLimitBytes > 0 {
opts = append(opts, rpadmin.WithControllerLogsSizeLimitBytes(o.controllerLogsSizeLimitBytes))
}
if cpuWait := int32(o.CPUProfilerWait.Seconds()); cpuWait > 0 {
opts = append(opts, rpadmin.WithCPUProfilerWaitSeconds(cpuWait))
}
if o.LogsSince != "" {
opts = append(opts, rpadmin.WithLogsSince(o.LogsSince))
}
if o.LogsUntil != "" {
opts = append(opts, rpadmin.WithLogsUntil(o.LogsUntil))
}
if o.logsSizeLimitBytes > 0 {
opts = append(opts, rpadmin.WithLogsSizeLimitBytes(o.logsSizeLimitBytes))
}
if mis := int32(o.MetricsInterval.Seconds()); mis > 0 {
opts = append(opts, rpadmin.WithMetricsIntervalSeconds(mis))
}
if o.MetricsSampleCount > 0 {
opts = append(opts, rpadmin.WithMetricsSamples(int32(o.MetricsSampleCount)))
}
if len(o.PartitionFlag) > 0 {
opts = append(opts, rpadmin.WithPartitions(o.PartitionFlag))
}
if o.Namespace != "" {
opts = append(opts, rpadmin.WithNamespace(o.Namespace))
}
if len(o.LabelSelector) > 0 {
dbls := make([]rpadmin.DebugBundleLabelSelector, 0, len(o.LabelSelector))
for k, v := range o.labelSelectorMap {
dbls = append(dbls, rpadmin.DebugBundleLabelSelector{Key: k, Value: v})
}
opts = append(opts, rpadmin.WithLabelSelector(dbls))
}
if s := p.KafkaAPI.SASL; s != nil {
if strings.EqualFold(s.Mechanism, adminapi.OAuthBearer) {
opts = append(opts, rpadmin.WithOAuthBearerAuthentication(adminapi.OAuthBearerToken(s.Password)))
} else if p.HasSASLCredentials() {
opts = append(opts, rpadmin.WithSCRAMAuthentication(s.User, s.Password, s.Mechanism))
}
}
if tls := p.KafkaAPI.TLS; tls != nil {
opts = append(opts, rpadmin.WithTLS(true, tls.InsecureSkipVerify))
}
return opts
}