From 6222e358f9cb5f790496ba19a36c011a073aa185 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 6 Oct 2025 20:41:00 +0200 Subject: [PATCH] replace watches with periodic lists for spdx CRDs Signed-off-by: Matthias Bertschy --- adapters/incluster/v1/client.go | 181 ++++++++++++++----------- config/config.go | 20 +-- config/config_test.go | 13 +- configuration/client/config.json | 6 + tests/synchronizer_integration_test.go | 21 ++- 5 files changed, 140 insertions(+), 101 deletions(-) diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index 4dd6147..ca20ee6 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -14,6 +14,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" "go.uber.org/multierr" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/pager" @@ -64,19 +65,20 @@ type resourceVersionGetter interface { } type Client struct { - dynamicClient dynamic.Interface - storageClient spdxv1beta1.SpdxV1beta1Interface account string + batchProcessingFunc map[domain.BatchType]BatchProcessingFunc + callbacks domain.Callbacks cluster string + dynamicClient dynamic.Interface excludeNamespaces []string includeNamespaces []string - operatorNamespace string // the namespace where the kubescape operator is running kind *domain.Kind - callbacks domain.Callbacks + listPeriod time.Duration + operatorNamespace string // the namespace where the kubescape operator is running res schema.GroupVersionResource + storageClient spdxv1beta1.SpdxV1beta1Interface ShadowObjects map[string][]byte Strategy domain.Strategy - batchProcessingFunc map[domain.BatchType]BatchProcessingFunc } var errWatchClosed = errors.New("watch channel closed") @@ -89,25 +91,26 @@ func NewClient(dynamicClient dynamic.Interface, storageClient spdxv1beta1.SpdxV1 logger.L().Warning("event multiplier config detected, but it is deprecated", helpers.String("resource", res.String()), helpers.Int("multiplier", multiplier)) } return &Client{ - account: cfg.Account, + account: cfg.Account, + batchProcessingFunc: map[domain.BatchType]BatchProcessingFunc{ + domain.DefaultBatch: defaultBatchProcessingFunc, // regular processing, when batch type is not set + domain.ReconciliationBatch: reconcileBatchProcessingFunc, + }, + cluster: cfg.ClusterName, dynamicClient: dynamicClient, - storageClient: storageClient, excludeNamespaces: cfg.ExcludeNamespaces, includeNamespaces: cfg.IncludeNamespaces, - operatorNamespace: cfg.Namespace, - cluster: cfg.ClusterName, kind: &domain.Kind{ Group: res.Group, Version: res.Version, Resource: res.Resource, }, - res: res, - ShadowObjects: map[string][]byte{}, - Strategy: r.Strategy, - batchProcessingFunc: map[domain.BatchType]BatchProcessingFunc{ - domain.DefaultBatch: defaultBatchProcessingFunc, // regular processing, when batch type is not set - domain.ReconciliationBatch: reconcileBatchProcessingFunc, - }, + listPeriod: cfg.ListPeriod, + operatorNamespace: cfg.Namespace, + res: res, + storageClient: storageClient, + ShadowObjects: map[string][]byte{}, + Strategy: r.Strategy, } } @@ -115,26 +118,15 @@ var _ adapters.Client = (*Client)(nil) func (c *Client) Start(ctx context.Context) error { logger.L().Info("starting incluster client", helpers.String("resource", c.res.Resource)) - watchOpts := metav1.ListOptions{} - // for our storage, we need to list all resources and get them one by one - // as list returns objects with empty spec - // and watch does not return existing objects - if c.res.Group == kubescapeCustomResourceGroup { - if err := backoff.RetryNotify(func() error { - var err error - watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx) - return err - }, utils.NewBackOff(true), func(err error, d time.Duration) { - logger.L().Ctx(ctx).Warning("get existing storage objects", helpers.Error(err), - helpers.String("resource", c.res.Resource), - helpers.String("retry in", d.String())) - }); err != nil { - return fmt.Errorf("giving up get existing storage objects: %w", err) - } - } // begin watch eventQueue := utils.NewCooldownQueue() - go c.watchRetry(ctx, watchOpts, eventQueue) + if c.res.Group == kubescapeCustomResourceGroup { + // our custom resources no longer support watch, use periodic listing + go c.periodicList(ctx, eventQueue, c.listPeriod) + } else { + watchOpts := metav1.ListOptions{} + go c.watchRetry(ctx, watchOpts, eventQueue) + } // process events for event := range eventQueue.ResultChan { // skip non-objects @@ -203,7 +195,7 @@ func (c *Client) Stop(_ context.Context) error { func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, eventQueue *utils.CooldownQueue) { exitFatal := true if err := backoff.RetryNotify(func() error { - watcher, err := c.chooseWatcher(watchOpts) + watcher, err := c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts) if err != nil { if k8sErrors.ReasonForError(err) == metav1.StatusReasonNotFound { exitFatal = false @@ -534,37 +526,40 @@ func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, e return object, nil } -func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error) { - logger.L().Debug("getting existing objects from storage", helpers.String("resource", c.res.Resource)) - var resourceVersion string - if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return c.chooseLister(opts) - }).EachListItem(context.Background(), metav1.ListOptions{}, func(run runtime.Object) error { - d := run.(metav1.Object) - resourceVersion = d.GetResourceVersion() - // no need for skip ns since these are our CRDs - id := domain.KindName{ - Kind: c.kind, - Name: d.GetName(), - Namespace: d.GetNamespace(), - ResourceVersion: domain.ToResourceVersion(d.GetResourceVersion()), - } - // get checksum - checksum, err := c.getChecksum(d) - if err != nil { - logger.L().Ctx(ctx).Error("cannot get checksums", helpers.Error(err), helpers.String("id", id.String())) - return nil - } - err = c.callbacks.VerifyObject(ctx, id, checksum) - if err != nil { - logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String())) +func (c *Client) periodicList(ctx context.Context, queue *utils.CooldownQueue, duration time.Duration) { + ticker := time.NewTicker(duration) + defer ticker.Stop() + var since string + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var continueToken string + for { + logger.L().Debug("periodicList - listing resources", helpers.String("resource", c.res.Resource), helpers.String("continueToken", continueToken), helpers.String("since", since)) + items, nextToken, lastUpdated, err := c.listFunc(metav1.ListOptions{ + Limit: int64(100), + Continue: continueToken, + ResourceVersion: since, // ensure we only get changes since the last check + }) + if err != nil { + logger.L().Ctx(ctx).Error("periodicList - error in listFunc", helpers.Error(err)) + break + } + for _, obj := range items { + // added and modified events are treated the same, so we enqueue a Modified event for both + // deleted events are not possible with listing, so we rely on the reconciliation batch to detect deletions + queue.Enqueue(watch.Event{Type: watch.Modified, Object: obj}) + } + since = lastUpdated + if nextToken == "" { + break + } + continueToken = nextToken + } } - return nil - }); err != nil { - return "", fmt.Errorf("list resources: %w", err) } - // set resource version to watch from - return resourceVersion, nil } func (c *Client) filterAndMarshal(d metav1.Object) ([]byte, error) { @@ -787,6 +782,8 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) { switch c.res.Resource { case "applicationprofiles": return c.storageClient.ApplicationProfiles("").List(context.Background(), opts) + case "knownservers": + return c.storageClient.KnownServers("").List(context.Background(), opts) case "networkneighborhoods": return c.storageClient.NetworkNeighborhoods("").List(context.Background(), opts) case "sbomsyfts": @@ -800,22 +797,52 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) { return c.dynamicClient.Resource(c.res).Namespace("").List(context.Background(), opts) } -func (c *Client) chooseWatcher(opts metav1.ListOptions) (watch.Interface, error) { +func (c *Client) listFunc(opts metav1.ListOptions) ([]runtime.Object, string, string, error) { if c.storageClient != nil { - switch c.res.Resource { - case "applicationprofiles": - return c.storageClient.ApplicationProfiles("").Watch(context.Background(), opts) - case "networkneighborhoods": - return c.storageClient.NetworkNeighborhoods("").Watch(context.Background(), opts) - case "sbomsyfts": - return c.storageClient.SBOMSyfts("").Watch(context.Background(), opts) - case "seccompprofiles": - return c.storageClient.SeccompProfiles("").Watch(context.Background(), opts) - case "vulnerabilitymanifests": - return c.storageClient.VulnerabilityManifests("").Watch(context.Background(), opts) + list, err := c.chooseLister(opts) + if err != nil { + return nil, "", "", err + } + switch l := list.(type) { + case *v1beta1.ApplicationProfileList: + items := make([]runtime.Object, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items, l.Continue, l.ResourceVersion, nil + case *v1beta1.KnownServerList: + items := make([]runtime.Object, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items, l.Continue, l.ResourceVersion, nil + case *v1beta1.NetworkNeighborhoodList: + items := make([]runtime.Object, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items, l.Continue, l.ResourceVersion, nil + case *v1beta1.SBOMSyftList: + items := make([]runtime.Object, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items, l.Continue, l.ResourceVersion, nil + case *v1beta1.SeccompProfileList: + items := make([]runtime.Object, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items, l.Continue, l.ResourceVersion, nil + case *v1beta1.VulnerabilityManifestList: + items := make([]runtime.Object, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items, l.Continue, l.ResourceVersion, nil } } - return c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), opts) + return nil, "", "", fmt.Errorf("list function not implemented for resource %s", c.res.Resource) } func (c *Client) getResource(namespace string, name string) (metav1.Object, error) { diff --git a/config/config.go b/config/config.go index 1dd361c..b613668 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/armosec/utils-k8s-go/armometadata" "github.com/kubescape/backend/pkg/servicediscovery" @@ -36,14 +37,15 @@ type Backend struct { } type InCluster struct { - ServerUrl string `mapstructure:"serverUrl"` - Namespace string `mapstructure:"namespace"` - ClusterName string `mapstructure:"clusterName"` - ExcludeNamespaces []string `mapstructure:"excludeNamespaces"` - IncludeNamespaces []string `mapstructure:"includeNamespaces"` - Account string `mapstructure:"account"` - AccessKey string `mapstructure:"accessKey"` - Resources []Resource `mapstructure:"resources"` + AccessKey string `mapstructure:"accessKey"` + Account string `mapstructure:"account"` + ClusterName string `mapstructure:"clusterName"` + ExcludeNamespaces []string `mapstructure:"excludeNamespaces"` + IncludeNamespaces []string `mapstructure:"includeNamespaces"` + ListPeriod time.Duration `mapstructure:"listPeriod"` + Namespace string `mapstructure:"namespace"` + Resources []Resource `mapstructure:"resources"` + ServerUrl string `mapstructure:"serverUrl"` } type HTTPEndpoint struct { @@ -95,6 +97,8 @@ func LoadConfig(path string) (Config, error) { v.SetConfigName("config") v.SetConfigType("json") + v.SetDefault("inCluster.listPeriod", time.Minute) + v.AutomaticEnv() err := v.ReadInConfig() diff --git a/config/config_test.go b/config/config_test.go index 66b77e6..852b9a7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -65,16 +65,18 @@ func TestLoadConfig(t *testing.T) { path: "../configuration/client", want: Config{ InCluster: InCluster{ - ServerUrl: "ws://127.0.0.1:8080/", - Namespace: "kubescape", + AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx", + Account: "11111111-2222-3333-4444-11111111", ClusterName: "cluster-1", ExcludeNamespaces: []string{"kube-system", "kubescape"}, IncludeNamespaces: []string{}, - Account: "11111111-2222-3333-4444-11111111", - AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx", + ListPeriod: 60000000000, + Namespace: "kubescape", + ServerUrl: "ws://127.0.0.1:8080/", Resources: []Resource{ {Group: "", Version: "v1", Resource: "pods", Strategy: "patch"}, {Group: "", Version: "v1", Resource: "nodes", Strategy: "patch"}, + {Group: "", Version: "v1", Resource: "configmaps", Strategy: "copy"}, {Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"}, {Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"}, {Group: "spdx.softwarecomposition.kubescape.io", Version: "v1beta1", Resource: "applicationprofiles", Strategy: "patch"}, @@ -108,6 +110,9 @@ func TestLoadConfig(t *testing.T) { ConsumerTopic: "synchronizer", SkipAlertsFrom: []string{"foo", "bar"}, }, + InCluster: InCluster{ + ListPeriod: 60000000000, + }, }, }, } diff --git a/configuration/client/config.json b/configuration/client/config.json index e13a090..34a3a45 100644 --- a/configuration/client/config.json +++ b/configuration/client/config.json @@ -20,6 +20,12 @@ "resource": "nodes", "strategy": "patch" }, + { + "group": "", + "version": "v1", + "resource": "configmaps", + "strategy": "copy" + }, { "group": "apps", "version": "v1", diff --git a/tests/synchronizer_integration_test.go b/tests/synchronizer_integration_test.go index 667b407..c323409 100644 --- a/tests/synchronizer_integration_test.go +++ b/tests/synchronizer_integration_test.go @@ -509,19 +509,14 @@ func createAndStartSynchronizerClient(t *testing.T, cluster *TestKubernetesClust require.NoError(t, err) // set cluster config - clientCfg.InCluster.Namespace = kubescapeNamespace + clientCfg.InCluster.Account = cluster.account clientCfg.InCluster.ClusterName = cluster.cluster clientCfg.InCluster.ExcludeNamespaces = []string{"kube-system", "kubescape"} clientCfg.InCluster.IncludeNamespaces = []string{} - clientCfg.InCluster.Account = cluster.account + clientCfg.InCluster.ListPeriod = 5 * time.Second + clientCfg.InCluster.Namespace = kubescapeNamespace clientCfg.InCluster.ServerUrl = syncServer.serverUrl if watchDefaults { - clientCfg.InCluster.Resources = append(clientCfg.InCluster.Resources, config.Resource{ - Group: "", - Version: "v1", - Resource: "configmaps", - Strategy: "copy", - }) clientCfg.InCluster.Resources = append(clientCfg.InCluster.Resources, config.Resource{ Group: "", Version: "v1", @@ -917,21 +912,23 @@ func TestSynchronizer_TC03(t *testing.T) { } // TestSynchronizer_TC04_InCluster: Deletion of a single entity +// we use configmap here because our CRDs no longer propagates deletion because of the periodic listings +// (we now rely on the reconciliation batch to detect deletions) func TestSynchronizer_TC04_InCluster(t *testing.T) { td := initIntegrationTest(t) // add applicationprofile to k8s - _, err := td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{}) + _, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Create(context.TODO(), td.clusters[0].cm, metav1.CreateOptions{}) require.NoError(t, err) time.Sleep(10 * time.Second) // check object in postgres - objMetadata := waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name) + objMetadata := waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name) assert.NotNil(t, objMetadata) // delete applicationprofile from k8s - err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) require.NoError(t, err) // check object not in postgres - waitForObjectToBeDeletedInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name) + waitForObjectToBeDeletedInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name) // tear down tearDown(td)