diff --git a/docs/user_guide/targets/target_discovery/consul_discovery.md b/docs/user_guide/targets/target_discovery/consul_discovery.md index af39479e..5ce36d85 100644 --- a/docs/user_guide/targets/target_discovery/consul_discovery.md +++ b/docs/user_guide/targets/target_discovery/consul_discovery.md @@ -17,6 +17,7 @@ loader: type: consul services: - name: cluster1-gnmi-server + filter: Service.Meta.environment == "production" config: insecure: true username: admin @@ -71,6 +72,8 @@ loader: - name: # a list of strings to further filter the service instances tags: + # optional go-bexpr filter evaluated by Consul in addition to tags + filter: # configuration map to apply to target discovered from this service config: # list of actions to run on target discovery @@ -82,4 +85,19 @@ loader: # path to variable file, the variables defined will be passed to the actions to be run # values in this file will be overwritten by the ones defined in `vars` vars-file: + +### Filtering services + +Each service entry can also define a [`filter`](https://developer.hashicorp.com/consul/api-docs/features/filtering) that Consul evaluates before returning results. Filters use [go-bexpr syntax](https://github.com/HashiCorp/go-bexpr) and are applied in addition to the `tags` list (both conditions must match). + +```yaml +loader: + type: consul + services: + - name: cluster1-gnmi-server + tags: ["gnmic", "network-device"] + filter: Service.Meta.environment == "production" && Node.Datacenter == "dc1" +``` + +Use filters to keep existing tag-based configs working while narrowing the results with metadata such as health status, service meta fields, or node attributes. ``` diff --git a/pkg/loaders/consul_loader/consul_loader.go b/pkg/loaders/consul_loader/consul_loader.go index c8a090b8..7a541698 100644 --- a/pkg/loaders/consul_loader/consul_loader.go +++ b/pkg/loaders/consul_loader/consul_loader.go @@ -76,6 +76,11 @@ type consulLoader struct { numActions int } +type serviceWatchResult struct { + def *serviceDef + entries []*api.ServiceEntry +} + type cfg struct { // Consul server address Address string `mapstructure:"address,omitempty" json:"address,omitempty"` @@ -113,6 +118,7 @@ type cfg struct { type serviceDef struct { Name string `mapstructure:"name,omitempty" json:"name,omitempty"` Tags []string `mapstructure:"tags,omitempty" json:"tags,omitempty"` + Filter string `mapstructure:"filter,omitempty" json:"filter,omitempty"` Config map[string]interface{} `mapstructure:"config,omitempty" json:"config,omitempty"` tags map[string]struct{} @@ -210,25 +216,27 @@ CLIENT: time.Sleep(2 * time.Second) goto CLIENT } - sChan := make(chan []*api.ServiceEntry) + sChan := make(chan *serviceWatchResult) go func() { for { select { case <-ctx.Done(): return - case ses, ok := <-sChan: + case res, ok := <-sChan: if !ok { return } tcs := make(map[string]*types.TargetConfig) - srvName := "" - for _, se := range ses { - srvName = se.Service.Service - tc, err := c.serviceEntryToTargetConfig(se) + srvName := res.def.Name + for _, se := range res.entries { + tc, err := c.serviceEntryToTargetConfig(res.def, se) if err != nil { c.logger.Printf("Failed to convert service entry %+v to a target config: %v", se, err) continue } + if tc == nil { + continue + } tcs[tc.Name] = tc } @@ -238,7 +246,7 @@ CLIENT: }() for _, s := range c.cfg.Services { go func(s *serviceDef) { - err := c.startServicesWatch(ctx, s.Name, s.Tags, sChan, time.Minute) + err := c.startServicesWatch(ctx, s, sChan, time.Minute) if err != nil { c.logger.Printf("service %q watch stopped: %v", s.Name, err) } @@ -252,7 +260,7 @@ func (c *consulLoader) RunOnce(ctx context.Context) (map[string]*types.TargetCon return nil, err } result := make(map[string]*types.TargetConfig) - rsChan := make(chan *api.ServiceEntry) + rsChan := make(chan *serviceWatchResult) wg := new(sync.WaitGroup) // fan-out queries @@ -260,17 +268,19 @@ func (c *consulLoader) RunOnce(ctx context.Context) (map[string]*types.TargetCon wg.Add(1) go func(s *serviceDef) { defer wg.Done() - ses, _, err := c.client.Health().ServiceMultipleTags(s.Name, s.Tags, true, &api.QueryOptions{}) + var qOpts *api.QueryOptions + if s.Filter != "" { + qOpts = &api.QueryOptions{Filter: s.Filter} + } + ses, _, err := c.client.Health().ServiceMultipleTags(s.Name, s.Tags, true, qOpts) if err != nil { c.logger.Printf("failed to get service %q instances: %v", s.Name, err) return } - for _, se := range ses { - select { - case rsChan <- se: - case <-ctx.Done(): - return - } + select { + case rsChan <- &serviceWatchResult{def: s, entries: ses}: + case <-ctx.Done(): + return } }(s) } @@ -283,17 +293,19 @@ func (c *consulLoader) RunOnce(ctx context.Context) (map[string]*types.TargetCon for { select { - case se, ok := <-rsChan: + case res, ok := <-rsChan: if !ok { return result, nil } - tc, err := c.serviceEntryToTargetConfig(se) - if err != nil { - c.logger.Printf("failed to convert service %+v to target config: %v", se, err) - continue - } - if tc != nil { - result[tc.Name] = tc + for _, se := range res.entries { + tc, err := c.serviceEntryToTargetConfig(res.def, se) + if err != nil { + c.logger.Printf("failed to convert service %+v to target config: %v", se, err) + continue + } + if tc != nil { + result[tc.Name] = tc + } } case <-ctx.Done(): return result, ctx.Err() @@ -344,7 +356,7 @@ func (c *consulLoader) setDefaults() error { return nil } -func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName string, tags []string, sChan chan<- []*api.ServiceEntry, watchTimeout time.Duration) error { +func (c *consulLoader) startServicesWatch(ctx context.Context, sd *serviceDef, sChan chan<- *serviceWatchResult, watchTimeout time.Duration) error { if watchTimeout <= 0 { watchTimeout = defaultWatchTimeout } @@ -352,6 +364,7 @@ func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName strin qOpts := &api.QueryOptions{ WaitIndex: index, WaitTime: watchTimeout, + Filter: sd.Filter, } var err error // long blocking watch @@ -361,11 +374,11 @@ func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName strin return ctx.Err() default: if c.cfg.Debug { - c.logger.Printf("(re)starting watch service=%q, index=%d", serviceName, qOpts.WaitIndex) + c.logger.Printf("(re)starting watch service=%q, index=%d", sd.Name, qOpts.WaitIndex) } - index, err = c.watch(qOpts.WithContext(ctx), serviceName, tags, sChan) + index, err = c.watch(qOpts.WithContext(ctx), sd, sChan) if err != nil { - c.logger.Printf("service %q watch failed: %v", serviceName, err) + c.logger.Printf("service %q watch failed: %v", sd.Name, err) } if index == 1 { qOpts.WaitIndex = index @@ -384,90 +397,79 @@ func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName strin } } -func (c *consulLoader) watch(qOpts *api.QueryOptions, serviceName string, tags []string, sChan chan<- []*api.ServiceEntry) (uint64, error) { - se, meta, err := c.client.Health().ServiceMultipleTags(serviceName, tags, true, qOpts) +func (c *consulLoader) watch(qOpts *api.QueryOptions, sd *serviceDef, sChan chan<- *serviceWatchResult) (uint64, error) { + se, meta, err := c.client.Health().ServiceMultipleTags(sd.Name, sd.Tags, true, qOpts) if err != nil { return 0, err } if meta.LastIndex == qOpts.WaitIndex { - c.logger.Printf("service=%q did not change", serviceName) + c.logger.Printf("service=%q did not change", sd.Name) return meta.LastIndex, nil } if len(se) == 0 { return 1, nil } - sChan <- se + sChan <- &serviceWatchResult{def: sd, entries: se} return meta.LastIndex, nil } -func (c *consulLoader) serviceEntryToTargetConfig(se *api.ServiceEntry) (*types.TargetConfig, error) { +func (c *consulLoader) serviceEntryToTargetConfig(sd *serviceDef, se *api.ServiceEntry) (*types.TargetConfig, error) { tc := new(types.TargetConfig) if se.Service == nil { return tc, nil } -SRV: - for _, sd := range c.cfg.Services { - // match service name - if se.Service.Service != sd.Name { - continue - } + if se.Service.Service != sd.Name { + return nil, fmt.Errorf("service entry name %q mismatches definition %q", se.Service.Service, sd.Name) + } - // match service tags - if len(sd.tags) > 0 { - for requiredTag := range sd.tags { - if !slices.Contains(se.Service.Tags, requiredTag) { - goto SRV - } + if len(sd.tags) > 0 { + for requiredTag := range sd.tags { + if !slices.Contains(se.Service.Tags, requiredTag) { + return nil, fmt.Errorf("service entry %q missing required tag %q", se.Service.ID, requiredTag) } } + } - // decode config if present - if sd.Config != nil { - err := mapstructure.Decode(sd.Config, tc) - if err != nil { - return nil, err - } + if sd.Config != nil { + err := mapstructure.Decode(sd.Config, tc) + if err != nil { + return nil, err } + } - tc.Address = se.Service.Address - if tc.Address == "" { - tc.Address = se.Node.Address - } - tc.Address = net.JoinHostPort(tc.Address, strconv.Itoa(se.Service.Port)) + tc.Address = se.Service.Address + if tc.Address == "" { + tc.Address = se.Node.Address + } + tc.Address = net.JoinHostPort(tc.Address, strconv.Itoa(se.Service.Port)) - var buffer bytes.Buffer + var buffer bytes.Buffer - tc.Name = se.Service.ID + tc.Name = se.Service.ID - if sd.targetNameTemplate != nil { - buffer.Reset() - err := sd.targetNameTemplate.Execute(&buffer, se.Service) - if err != nil { - c.logger.Println("Could not execute nameTemplate") - continue - } - tc.Name = buffer.String() + if sd.targetNameTemplate != nil { + buffer.Reset() + err := sd.targetNameTemplate.Execute(&buffer, se.Service) + if err != nil { + return nil, fmt.Errorf("execute name template: %w", err) } + tc.Name = buffer.String() + } - // Create Event tags from Consul via templates - if len(sd.targetTagsTemplate) > 0 { - eventTags := make(map[string]string) - for tagName, tagTemplate := range sd.targetTagsTemplate { - buffer.Reset() - err := tagTemplate.Execute(&buffer, se.Service) - if err != nil { - c.logger.Println("Could not execute tagTemplate:", tagName) - return nil, err - } - eventTags[tagName] = buffer.String() + if len(sd.targetTagsTemplate) > 0 { + eventTags := make(map[string]string) + for tagName, tagTemplate := range sd.targetTagsTemplate { + buffer.Reset() + err := tagTemplate.Execute(&buffer, se.Service) + if err != nil { + return nil, fmt.Errorf("execute tag template %q: %w", tagName, err) } - tc.EventTags = eventTags + eventTags[tagName] = buffer.String() } - return tc, nil + tc.EventTags = eventTags } - - return nil, errors.New("unable to find a match in Consul service(s)") + return tc, nil } func (c *consulLoader) updateTargets(ctx context.Context, srvName string, tcs map[string]*types.TargetConfig, opChan chan *loaders.TargetOperation) { diff --git a/pkg/loaders/consul_loader/consul_loader_test.go b/pkg/loaders/consul_loader/consul_loader_test.go index 872e9b4a..535ec4c3 100644 --- a/pkg/loaders/consul_loader/consul_loader_test.go +++ b/pkg/loaders/consul_loader/consul_loader_test.go @@ -10,8 +10,13 @@ package consul_loader import ( "context" + "fmt" "io" "log" + "net/http" + "net/http/httptest" + "strings" + "sync" "testing" "github.com/hashicorp/consul/api" @@ -59,7 +64,7 @@ func TestIssue706_ServicesWithExtraTagsFiltered(t *testing.T) { }, } - result, err := cl.serviceEntryToTargetConfig(serviceEntry) + result, err := cl.serviceEntryToTargetConfig(cl.cfg.Services[0], serviceEntry) if err != nil { t.Fatalf("Expected service with extra tags to be accepted, but got error: %v", err) @@ -132,3 +137,46 @@ func TestOldBuggyLogicWouldReject(t *testing.T) { t.Logf("✓ Old logic would incorrectly reject: %v", oldLogicWouldReject) t.Logf("✓ New logic correctly accepts: %v", newLogicShouldAccept) } + +func TestRunOnceAppliesServiceFilter(t *testing.T) { + filterExpr := `Service.Meta.profile == "arista"` + var filterChecked bool + hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch { + case r.URL.Path == "/v1/agent/self": + fmt.Fprint(w, `{"Member":{"Tags":{}}}`) + case strings.HasPrefix(r.URL.Path, "/v1/health/service/gnmi"): + if got := r.URL.Query().Get("filter"); got != filterExpr { + t.Fatalf("expected filter %q, got %q", filterExpr, got) + } + filterChecked = true + fmt.Fprint(w, `[{"Node":{"Address":"10.0.0.1"},"Service":{"ID":"target-1","Service":"gnmi","Address":"10.0.0.1","Port":6030}}]`) + default: + t.Fatalf("unexpected path: %s", r.URL.Path) + } + })) + defer hs.Close() + addr := strings.TrimPrefix(hs.URL, "http://") + cl := &consulLoader{ + cfg: &cfg{ + Address: addr, + Datacenter: "dc1", + Services: []*serviceDef{ + {Name: "gnmi", Filter: filterExpr}, + }, + }, + logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), + m: new(sync.Mutex), + } + res, err := cl.RunOnce(context.Background()) + if err != nil { + t.Fatalf("RunOnce returned error: %v", err) + } + if !filterChecked { + t.Fatalf("expected health query to include filter parameter") + } + if _, ok := res["target-1"]; !ok { + t.Fatalf("expected target-1 in results, got %v", res) + } +}