Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ min-distance=1

[notifier.default]
class-name="http"
cluster="local"
url-open="http://someservice.example.com:1467/v1/event"
interval=60
timeout=5
Expand Down
52 changes: 52 additions & 0 deletions config/default-dingtalk-post.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{"msgtype": "markdown","markdown": {"title":"Kafka LagChecker", "text": "
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
{{- $FormatString := "2006-01-02 15:04:05"}}
# Kafka: {{.Cluster}}
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}normal{{end}}
{{- if eq . 2}}lagging{{end}}
{{- if eq . 3}}abnormal{{end}}
{{- end}}
**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}{{.}}{{end}}
{{- if eq . 2}}{{.}}{{end}}
{{- if eq . 3}}{{.}}{{end}}
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
{{- if .Result.Maxlag|maxlag}}
**MaxLagDetails:**
{{- with .Result.Maxlag}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
{{- end}}
{{- end}}
{{- $TotalErrors := len .Result.Partitions}}
{{- if $TotalErrors}}
### {{$TotalErrors}} partitions have problems
>**CountPartitions:**
{{- range $k,$v := .Result.Partitions|partitioncounts}}
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
{{- end}}
**TopicsByStatus:**
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
\t{{$k}}={{$v}}
{{- end}}
**PartitionDetails:**
{{- range .Result.Partitions}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
{{- end}}
{{- end}}
"
}}
52 changes: 52 additions & 0 deletions config/default-wecom-post.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{"msgtype": "markdown","markdown": {"content": "
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
{{- $FormatString := "2006-01-02 15:04:05"}}
# Kafka: {{.Cluster}}
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}<font color=\"info\">normal</font>{{end}}
{{- if eq . 2}}<font color=\"warning\">lagging</font>{{end}}
{{- if eq . 3}}<font color=\"comment\">abnormal</font>{{end}}
{{- end}}
**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}<font color=\"info\">{{.}}</font>{{end}}
{{- if eq . 2}}<font color=\"warning\">{{.}}</font>{{end}}
{{- if eq . 3}}<font color=\"comment\">{{.}}</font>{{end}}
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
{{- if .Result.Maxlag|maxlag}}
**MaxLagDetails:**
{{- with .Result.Maxlag}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
{{- end}}
{{- end}}
{{- $TotalErrors := len .Result.Partitions}}
{{- if $TotalErrors}}
### <font color=\"comment\">{{$TotalErrors}} partitions have problems</font>
>**CountPartitions:**
{{- range $k,$v := .Result.Partitions|partitioncounts}}
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
{{- end}}
**TopicsByStatus:**
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
\t{{$k}}={{$v}}
{{- end}}
**PartitionDetails:**
{{- range .Result.Partitions}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
{{- end}}
{{- end}}
"
}}
6 changes: 6 additions & 0 deletions core/internal/helpers/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (m *MockModule) GetName() string {
return args.String(0)
}

// GetCluster mocks the notifier.Module GetCluster func
func (m *MockModule) GetCluster() string {
args := m.Called()
return args.String(0)
}

// GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func
func (m *MockModule) GetGroupAllowlist() *regexp.Regexp {
args := m.Called()
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
SendClose: viper.GetBool(configRoot + ".send-close"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down Expand Up @@ -265,6 +266,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
To: viper.GetString(configRoot + ".to"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
SendClose bool `json:"send-close"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierSlack struct {
Expand Down Expand Up @@ -238,6 +239,7 @@ type httpResponseConfigModuleNotifierEmail struct {
To string `json:"to"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierNull struct {
Expand Down
13 changes: 11 additions & 2 deletions core/internal/notifier/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
type Module interface {
protocol.Module
GetName() string
GetCluster() string
GetGroupAllowlist() *regexp.Regexp
GetGroupDenylist() *regexp.Regexp
GetLogger() *zap.Logger
Expand Down Expand Up @@ -95,7 +96,7 @@ type Coordinator struct {

// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
// is any error, it will panic with an appropriate message describing the problem.
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
logger := app.Logger.With(
zap.String("type", "module"),
zap.String("coordinator", "notifier"),
Expand All @@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "email":
return &EmailNotifier{
Expand All @@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "null":
return &NullNotifier{
Expand All @@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
default:
panic("Unknown notifier className provided: " + className)
Expand Down Expand Up @@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() {
groupAllowlist = re
}

cluster := viper.GetString(configRoot + ".cluster")

// Compile the denylist for the consumer groups to not notify for
var groupDenylist *regexp.Regexp
denylist := viper.GetString(configRoot + ".group-denylist")
Expand Down Expand Up @@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() {
templateClose = tmpl.Templates()[0]
}

module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose)
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster)
module.Configure(name, configRoot)
nc.modules[name] = module
interval := viper.GetInt64(configRoot + ".interval")
Expand Down Expand Up @@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
for _, genericModule := range nc.modules {
module := genericModule.(Module)

if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
continue
}
// No allowlist means everything passes
groupAllowlist := module.GetGroupAllowlist()
groupDenylist := module.GetGroupDenylist()
Expand Down
52 changes: 31 additions & 21 deletions core/internal/notifier/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,34 +472,39 @@ var notifyModuleTests = []struct {
ExpectClose bool
ExpectID bool
SendOnce bool
Cluster string
}{
// {1, 0, false, false, false, false, false},
// {2, 0, false, false, false, false, false},
// {1, 0, true, false, false, false, false},
// {1, 0, false, true, false, false, false},
// {1, 0, true, true, false, false, false},

{1, 1, false, false, true, false, false, false},
{1, 1, false, true, true, false, false, false},
{1, 1, true, false, true, false, false, false},
{1, 1, true, true, true, true, false, true},

{1, 2, false, false, true, false, true, false},
{1, 2, false, true, true, false, true, false},
{1, 2, true, false, true, false, true, false},
{1, 2, true, true, true, false, true, false},
{1, 2, true, true, false, false, true, true},
{1, 2, false, true, true, false, true, true},

{3, 2, false, false, false, false, true, false},
{3, 2, false, true, false, false, true, false},
{3, 2, true, false, false, false, true, false},
{3, 2, true, true, false, false, true, false},

{2, 1, false, false, false, false, false, false},
{2, 1, false, true, false, false, false, false},
{2, 1, true, false, false, false, false, false},
{2, 1, true, true, true, true, false, false},
{1, 1, false, false, true, false, false, false, ""},
{1, 1, false, true, true, false, false, false, "testcluster"},
{1, 1, true, false, true, false, false, false, "unmatchedCluster"},
{1, 1, true, true, true, true, false, true, ""},

{1, 2, false, false, true, false, true, false, ""},
{1, 2, false, true, true, false, true, false, ""},
{1, 2, true, false, true, false, true, false, ""},
{1, 2, true, true, true, false, true, false, ""},
{1, 2, true, true, false, false, true, true, ""},
{1, 2, false, true, true, false, true, true, ""},

{3, 2, false, false, false, false, true, false, ""},
{3, 2, false, true, false, false, true, false, ""},
{3, 2, true, false, false, false, true, false, ""},
{3, 2, true, true, false, false, true, false, ""},

{2, 1, false, false, false, false, false, false, ""},
{2, 1, false, true, false, false, false, false, ""},
{2, 1, true, false, false, false, false, false, ""},
{2, 1, true, true, true, true, false, false, ""},
}

func checkNotifierClusterMatch(cluster string) bool {
return cluster == "" || cluster == "testcluster"
}

func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
Expand Down Expand Up @@ -558,10 +563,15 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
// Set up the mock module and expected calls
mockModule := &helpers.MockModule{}
coordinator.modules["test"] = mockModule
mockModule.On("GetCluster").Return(testSet.Cluster)

if checkNotifierClusterMatch(testSet.Cluster) {
mockModule.On("GetName").Return("test")
mockModule.On("GetGroupAllowlist").Return((*regexp.Regexp)(nil))
mockModule.On("GetGroupDenylist").Return((*regexp.Regexp)(nil))
mockModule.On("AcceptConsumerGroup", response).Return(true)
}

if testSet.ExpectSend {
mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return()
}
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EmailNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -139,6 +140,11 @@ func (module *EmailNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *EmailNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *EmailNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
5 changes: 5 additions & 0 deletions core/internal/notifier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type HTTPNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -124,6 +125,10 @@ func (module *HTTPNotifier) GetName() string {
return module.name
}

func (module *HTTPNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *HTTPNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type NullNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -75,6 +76,11 @@ func (module *NullNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *NullNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *NullNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down