From 639f0bf3878a3bc44afdbdf088d52ab0d571f0b6 Mon Sep 17 00:00:00 2001 From: muicoder Date: Sat, 26 Mar 2022 00:40:57 +0800 Subject: [PATCH 1/7] Adding association between a Kafka cluster and a notifier https://github.com/linkedin/Burrow/pull/611 Signed-off-by: muicoder --- core/internal/helpers/coordinators.go | 6 ++++++ core/internal/httpserver/config.go | 2 ++ core/internal/httpserver/structs.go | 2 ++ core/internal/notifier/coordinator.go | 13 +++++++++++-- core/internal/notifier/email.go | 6 ++++++ core/internal/notifier/http.go | 5 +++++ core/internal/notifier/null.go | 6 ++++++ 7 files changed, 38 insertions(+), 2 deletions(-) diff --git a/core/internal/helpers/coordinators.go b/core/internal/helpers/coordinators.go index 0742d6ea..6eb7fc25 100644 --- a/core/internal/helpers/coordinators.go +++ b/core/internal/helpers/coordinators.go @@ -75,6 +75,12 @@ func (m *MockModule) GetName() string { return args.String(0) } +// GetCluster mocks the notifier.Module GetCluster func +func (m *MockModule) GetCluster() string { + args := m.Called() + return args.String(0) +} + // GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func func (m *MockModule) GetGroupAllowlist() *regexp.Regexp { args := m.Called() diff --git a/core/internal/httpserver/config.go b/core/internal/httpserver/config.go index 61a22130..eba7e260 100644 --- a/core/internal/httpserver/config.go +++ b/core/internal/httpserver/config.go @@ -214,6 +214,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request SendClose: viper.GetBool(configRoot + ".send-close"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) @@ -266,6 +267,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques To: viper.GetString(configRoot + ".to"), ExtraCa: viper.GetString(configRoot + ".extra-ca"), NoVerify: viper.GetString(configRoot + ".noverify"), + Cluster: viper.GetString(configRoot + ".cluster"), }, Request: requestInfo, }) diff --git a/core/internal/httpserver/structs.go b/core/internal/httpserver/structs.go index 6b634468..0116a43a 100644 --- a/core/internal/httpserver/structs.go +++ b/core/internal/httpserver/structs.go @@ -203,6 +203,7 @@ type httpResponseConfigModuleNotifierHTTP struct { SendClose bool `json:"send-close"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierSlack struct { @@ -239,6 +240,7 @@ type httpResponseConfigModuleNotifierEmail struct { To string `json:"to"` ExtraCa string `json:"extra-ca"` NoVerify string `json:"noverify"` + Cluster string `json:"cluster"` } type httpResponseConfigModuleNotifierNull struct { diff --git a/core/internal/notifier/coordinator.go b/core/internal/notifier/coordinator.go index eaab203d..d20a6200 100644 --- a/core/internal/notifier/coordinator.go +++ b/core/internal/notifier/coordinator.go @@ -49,6 +49,7 @@ import ( type Module interface { protocol.Module GetName() string + GetCluster() string GetGroupAllowlist() *regexp.Regexp GetGroupDenylist() *regexp.Regexp GetLogger() *zap.Logger @@ -96,7 +97,7 @@ type Coordinator struct { // getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there // is any error, it will panic with an appropriate message describing the problem. -func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module { +func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module { logger := app.Logger.With( zap.String("type", "module"), zap.String("coordinator", "notifier"), @@ -114,6 +115,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } case "email": return &EmailNotifier{ @@ -124,6 +126,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } case "null": return &NullNotifier{ @@ -134,6 +137,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s extras: extras, templateOpen: templateOpen, templateClose: templateClose, + cluster: cluster, } default: panic("Unknown notifier className provided: " + className) @@ -195,6 +199,8 @@ func (nc *Coordinator) Configure() { groupAllowlist = re } + cluster := viper.GetString(configRoot + ".cluster") + // Compile the denylist for the consumer groups to not notify for var groupDenylist *regexp.Regexp denylist := viper.GetString(configRoot + ".group-denylist") @@ -228,7 +234,7 @@ func (nc *Coordinator) Configure() { templateClose = tmpl.Templates()[0] } - module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose) + module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster) module.Configure(name, configRoot) nc.modules[name] = module interval := viper.GetInt64(configRoot + ".interval") @@ -437,6 +443,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer for _, genericModule := range nc.modules { module := genericModule.(Module) + if module.GetCluster() != "" && response.Cluster != module.GetCluster() { + continue + } // No allowlist means everything passes groupAllowlist := module.GetGroupAllowlist() groupDenylist := module.GetGroupDenylist() diff --git a/core/internal/notifier/email.go b/core/internal/notifier/email.go index bdc184ed..bdeaad66 100644 --- a/core/internal/notifier/email.go +++ b/core/internal/notifier/email.go @@ -39,6 +39,7 @@ type EmailNotifier struct { Log *zap.Logger name string + cluster string groupAllowlist *regexp.Regexp groupDenylist *regexp.Regexp extras map[string]string @@ -140,6 +141,11 @@ func (module *EmailNotifier) GetName() string { return module.name } +// GetCluster returns the configured name of this module +func (module *EmailNotifier) GetCluster() string { + return module.cluster +} + // GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one) func (module *EmailNotifier) GetGroupAllowlist() *regexp.Regexp { return module.groupAllowlist diff --git a/core/internal/notifier/http.go b/core/internal/notifier/http.go index 4e17dd6b..334029d1 100644 --- a/core/internal/notifier/http.go +++ b/core/internal/notifier/http.go @@ -40,6 +40,7 @@ type HTTPNotifier struct { Log *zap.Logger name string + cluster string groupAllowlist *regexp.Regexp groupDenylist *regexp.Regexp extras map[string]string @@ -125,6 +126,10 @@ func (module *HTTPNotifier) GetName() string { return module.name } +func (module *HTTPNotifier) GetCluster() string { + return module.cluster +} + // GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one) func (module *HTTPNotifier) GetGroupAllowlist() *regexp.Regexp { return module.groupAllowlist diff --git a/core/internal/notifier/null.go b/core/internal/notifier/null.go index fdf9c964..b4ab5ae2 100644 --- a/core/internal/notifier/null.go +++ b/core/internal/notifier/null.go @@ -31,6 +31,7 @@ type NullNotifier struct { Log *zap.Logger name string + cluster string groupAllowlist *regexp.Regexp groupDenylist *regexp.Regexp extras map[string]string @@ -76,6 +77,11 @@ func (module *NullNotifier) GetName() string { return module.name } +// GetCluster returns the configured name of this module +func (module *NullNotifier) GetCluster() string { + return module.cluster +} + // GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one) func (module *NullNotifier) GetGroupAllowlist() *regexp.Regexp { return module.groupAllowlist From 7471eecbc2be6e740ced3161a4b9711c7270da0d Mon Sep 17 00:00:00 2001 From: muicoder Date: Sat, 26 Mar 2022 00:42:12 +0800 Subject: [PATCH 2/7] Update sarama to 1.32.0 Signed-off-by: muicoder --- go.mod | 10 +++++----- go.sum | 31 +++++++++++++++++-------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 766e539a..d45b14fd 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/OneOfOne/xxhash v1.2.8 - github.com/Shopify/sarama v1.31.1 + github.com/Shopify/sarama v1.32.0 github.com/julienschmidt/httprouter v1.3.0 github.com/karrick/goswarm v1.10.0 github.com/pborman/uuid v1.2.1 @@ -12,9 +12,9 @@ require ( github.com/prometheus/client_golang v1.12.1 github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 github.com/spf13/viper v1.10.1 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 github.com/xdg/scram v1.0.5 - go.uber.org/zap v1.20.0 + go.uber.org/zap v1.21.0 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -37,7 +37,7 @@ require ( github.com/jcmturner/gofork v1.0.0 // indirect github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.14.2 // indirect + github.com/klauspost/compress v1.14.4 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect @@ -57,7 +57,7 @@ require ( github.com/xdg/stringprep v1.0.3 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect - golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect + golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index be32d200..0054ff00 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= -github.com/Shopify/sarama v1.31.1 h1:uxwJ+p4isb52RyV83MCJD8v2wJ/HBxEGMmG/8+sEzG0= -github.com/Shopify/sarama v1.31.1/go.mod h1:99E1xQ1Ql2bYcuJfwdXY3cE17W8+549Ty8PG/11BDqY= +github.com/Shopify/sarama v1.32.0 h1:P+RUjEaRU0GMMbYexGMDyrMkLhbbBVUVISDywi+IlFU= +github.com/Shopify/sarama v1.32.0/go.mod h1:+EmJJKZWVT/faR9RcOxJerP+LId4iWdQPBGLy1Y1Njs= github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -129,8 +129,8 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= -github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns= +github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -195,8 +195,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -291,14 +292,15 @@ github.com/karrick/goswarm v1.10.0 h1:hGUt7r6O3bR02whvkW/E4rU6Ei7MekGGlTD5zqAYSH github.com/karrick/goswarm v1.10.0/go.mod h1:wqange6Y/RHXs23gBc4nRXPent8RaiFyfl2+otwXj8U= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= -github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -419,8 +421,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= @@ -457,8 +460,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= -go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc= -go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= +go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -471,9 +474,8 @@ golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220128200615-198e4374d7ed/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= -golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292 h1:f+lwQ+GtmgoY+A2YaQxlSOnDjXcQ7ZRLWOHbC6HtRqE= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -882,8 +884,9 @@ gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gG gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE= gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw= From 272fce4695c2bf1287c03a3d30f52252bacf4ed0 Mon Sep 17 00:00:00 2001 From: muicoder Date: Sat, 26 Mar 2022 00:55:00 +0800 Subject: [PATCH 3/7] Update the sample configuration file Signed-off-by: muicoder --- config/burrow.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/burrow.toml b/config/burrow.toml index ea2dc4fc..de0cf7bd 100644 --- a/config/burrow.toml +++ b/config/burrow.toml @@ -58,6 +58,7 @@ min-distance=1 [notifier.default] class-name="http" +cluster="local" url-open="http://someservice.example.com:1467/v1/event" interval=60 timeout=5 From 8ca42cb8739c3583911796f0acc581ac464fd5d0 Mon Sep 17 00:00:00 2001 From: muicoder Date: Sat, 26 Mar 2022 01:13:14 +0800 Subject: [PATCH 4/7] Add test data for notifier --- core/internal/notifier/coordinator_test.go | 52 +++++++++++++--------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/core/internal/notifier/coordinator_test.go b/core/internal/notifier/coordinator_test.go index 69f099f6..4b37bec7 100644 --- a/core/internal/notifier/coordinator_test.go +++ b/core/internal/notifier/coordinator_test.go @@ -473,6 +473,7 @@ var notifyModuleTests = []struct { ExpectClose bool ExpectID bool SendOnce bool + Cluster string }{ /*{1, 0, false, false, false, false, false}, {2, 0, false, false, false, false, false}, @@ -480,27 +481,31 @@ var notifyModuleTests = []struct { {1, 0, false, true, false, false, false}, {1, 0, true, true, false, false, false}, */ - {1, 1, false, false, true, false, false, false}, - {1, 1, false, true, true, false, false, false}, - {1, 1, true, false, true, false, false, false}, - {1, 1, true, true, true, true, false, true}, - - {1, 2, false, false, true, false, true, false}, - {1, 2, false, true, true, false, true, false}, - {1, 2, true, false, true, false, true, false}, - {1, 2, true, true, true, false, true, false}, - {1, 2, true, true, false, false, true, true}, - {1, 2, false, true, true, false, true, true}, - - {3, 2, false, false, false, false, true, false}, - {3, 2, false, true, false, false, true, false}, - {3, 2, true, false, false, false, true, false}, - {3, 2, true, true, false, false, true, false}, - - {2, 1, false, false, false, false, false, false}, - {2, 1, false, true, false, false, false, false}, - {2, 1, true, false, false, false, false, false}, - {2, 1, true, true, true, true, false, false}, + {1, 1, false, false, true, false, false, false, ""}, + {1, 1, false, true, true, false, false, false, "testcluster"}, + {1, 1, true, false, true, false, false, false, "unmatchedCluster"}, + {1, 1, true, true, true, true, false, true, ""}, + + {1, 2, false, false, true, false, true, false, ""}, + {1, 2, false, true, true, false, true, false, ""}, + {1, 2, true, false, true, false, true, false, ""}, + {1, 2, true, true, true, false, true, false, ""}, + {1, 2, true, true, false, false, true, true, ""}, + {1, 2, false, true, true, false, true, true, ""}, + + {3, 2, false, false, false, false, true, false, ""}, + {3, 2, false, true, false, false, true, false, ""}, + {3, 2, true, false, false, false, true, false, ""}, + {3, 2, true, true, false, false, true, false, ""}, + + {2, 1, false, false, false, false, false, false, ""}, + {2, 1, false, true, false, false, false, false, ""}, + {2, 1, true, false, false, false, false, false, ""}, + {2, 1, true, true, true, true, false, false, ""}, +} + +func checkNotifierClusterMatch(cluster string) bool { + return cluster == "" || cluster == "testcluster" } func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { @@ -559,10 +564,15 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) { // Set up the mock module and expected calls mockModule := &helpers.MockModule{} coordinator.modules["test"] = mockModule + mockModule.On("GetCluster").Return(testSet.Cluster) + + if checkNotifierClusterMatch(testSet.Cluster) { mockModule.On("GetName").Return("test") mockModule.On("GetGroupAllowlist").Return((*regexp.Regexp)(nil)) mockModule.On("GetGroupDenylist").Return((*regexp.Regexp)(nil)) mockModule.On("AcceptConsumerGroup", response).Return(true) + } + if testSet.ExpectSend { mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return() } From 75787cfa89d9ef3ed23df5d3aab0f69bfc0439bd Mon Sep 17 00:00:00 2001 From: muicoder Date: Sat, 26 Mar 2022 01:28:42 +0800 Subject: [PATCH 5/7] Add WeCom robot markdown template --- config/default-wecom-post.tmpl | 43 ++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 config/default-wecom-post.tmpl diff --git a/config/default-wecom-post.tmpl b/config/default-wecom-post.tmpl new file mode 100644 index 00000000..371aec43 --- /dev/null +++ b/config/default-wecom-post.tmpl @@ -0,0 +1,43 @@ +{"msgtype": "markdown","markdown": {"content": " +{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}} +{{- $FormatString := "2006-01-02 15:04:05"}} +# Kafka: {{.Cluster}} +ConsumerGroup: {{.Group}}{{- with .Result.Status}} +{{- if eq . 0}}NotFound{{end}} +{{- if eq . 1}}normal{{end}} +{{- if eq . 2}}lagging{{end}} +{{- if eq . 3}}abnormal{{end}} +{{- end}} +**Status**: Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}} +{{- if eq . 0}}NotFound{{end}} +{{- if eq . 1}}{{.}}{{end}} +{{- if eq . 2}}{{.}}{{end}} +{{- if eq . 3}}{{.}}{{end}} +{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}} +{{printf "StartTime: %s" (.Start.Format $FormatString)}} +{{- $TotalErrors := len .Result.Partitions}} +{{- if $TotalErrors}} +### {{$TotalErrors}} partitions have problems +>**CountPartitions:** +{{- range $k,$v := .Result.Partitions|partitioncounts}} +{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}} +{{- end}} +**TopicsByStatus:** +{{- range $k,$v := .Result.Partitions|topicsbystatus}} +\t{{$k}}={{$v}} +{{- end}} +**PartitionDetails:** +{{- range .Result.Partitions}} +{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} +{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} +{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} +\tMaxLag={{. | maxlag}} +\tPartitionID={{.Partition}} +\tStart={{formattimestamp .Start.Timestamp "01-02 15:04:05"}} +\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}} +\tEnd={{formattimestamp .End.Timestamp "01-02 15:04:05"}} +\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}} +{{- end}} +{{- end}} +" +}} From cd3bb2065d6ab9b9f03d3348019c8dbf35a95cec Mon Sep 17 00:00:00 2001 From: muicoder Date: Sun, 27 Mar 2022 13:13:53 +0800 Subject: [PATCH 6/7] Support for getting the current time --- core/internal/notifier/helpers.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/internal/notifier/helpers.go b/core/internal/notifier/helpers.go index 2ba2d364..628236d5 100644 --- a/core/internal/notifier/helpers.go +++ b/core/internal/notifier/helpers.go @@ -164,5 +164,9 @@ func maxLagHelper(a *protocol.PartitionStatus) uint64 { } func formatTimestamp(timestamp int64, formatString string) string { - return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString) + if timestamp > 0 { + return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString) + } else { + return time.Now().Format(formatString) + } } From 002b6446c322fa7bb4c846348f081dd1487bed4e Mon Sep 17 00:00:00 2001 From: muicoder Date: Sun, 27 Mar 2022 13:32:41 +0800 Subject: [PATCH 7/7] add MaxLag PartitionDetails for WeCom template --- config/default-wecom-post.tmpl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/config/default-wecom-post.tmpl b/config/default-wecom-post.tmpl index 371aec43..259d1a26 100644 --- a/config/default-wecom-post.tmpl +++ b/config/default-wecom-post.tmpl @@ -14,7 +14,14 @@ ConsumerGroup: {{.Group}}{{- with .Result.Status}} {{- if eq . 2}}{{.}}{{end}} {{- if eq . 3}}{{.}}{{end}} {{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}} -{{printf "StartTime: %s" (.Start.Format $FormatString)}} +{{printf "Time: %s" (formattimestamp 0 $FormatString)}} +**MaxLagDetails:**\tMaxLag={{.Result.Maxlag|maxlag}}{{- with .Result.Maxlag}} +{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} +{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} +{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} +\tCurrentLag={{.CurrentLag}} +\tPartition={{.Partition}} +{{- end}} {{- $TotalErrors := len .Result.Partitions}} {{- if $TotalErrors}} ### {{$TotalErrors}} partitions have problems @@ -31,11 +38,11 @@ ConsumerGroup: {{.Group}}{{- with .Result.Status}} {{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}} {{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}} {{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}} -\tMaxLag={{. | maxlag}} -\tPartitionID={{.Partition}} -\tStart={{formattimestamp .Start.Timestamp "01-02 15:04:05"}} +\tCurrentLag={{.CurrentLag}} +\tPartition={{.Partition}} +\tStart={{formattimestamp .Start.Timestamp $FormatString}} \t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}} -\tEnd={{formattimestamp .End.Timestamp "01-02 15:04:05"}} +\tEnd={{formattimestamp .End.Timestamp $FormatString}} \t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}} {{- end}} {{- end}}