Skip to content

Commit 6222e35

Browse files
committed
replace watches with periodic lists for spdx CRDs
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
1 parent cf2ff3c commit 6222e35

File tree

5 files changed

+140
-101
lines changed

5 files changed

+140
-101
lines changed

adapters/incluster/v1/client.go

Lines changed: 104 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/cenkalti/backoff/v4"
17+
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
1718
"go.uber.org/multierr"
1819
"k8s.io/apimachinery/pkg/runtime"
1920
"k8s.io/client-go/tools/pager"
@@ -64,19 +65,20 @@ type resourceVersionGetter interface {
6465
}
6566

6667
type Client struct {
67-
dynamicClient dynamic.Interface
68-
storageClient spdxv1beta1.SpdxV1beta1Interface
6968
account string
69+
batchProcessingFunc map[domain.BatchType]BatchProcessingFunc
70+
callbacks domain.Callbacks
7071
cluster string
72+
dynamicClient dynamic.Interface
7173
excludeNamespaces []string
7274
includeNamespaces []string
73-
operatorNamespace string // the namespace where the kubescape operator is running
7475
kind *domain.Kind
75-
callbacks domain.Callbacks
76+
listPeriod time.Duration
77+
operatorNamespace string // the namespace where the kubescape operator is running
7678
res schema.GroupVersionResource
79+
storageClient spdxv1beta1.SpdxV1beta1Interface
7780
ShadowObjects map[string][]byte
7881
Strategy domain.Strategy
79-
batchProcessingFunc map[domain.BatchType]BatchProcessingFunc
8082
}
8183

8284
var errWatchClosed = errors.New("watch channel closed")
@@ -89,52 +91,42 @@ func NewClient(dynamicClient dynamic.Interface, storageClient spdxv1beta1.SpdxV1
8991
logger.L().Warning("event multiplier config detected, but it is deprecated", helpers.String("resource", res.String()), helpers.Int("multiplier", multiplier))
9092
}
9193
return &Client{
92-
account: cfg.Account,
94+
account: cfg.Account,
95+
batchProcessingFunc: map[domain.BatchType]BatchProcessingFunc{
96+
domain.DefaultBatch: defaultBatchProcessingFunc, // regular processing, when batch type is not set
97+
domain.ReconciliationBatch: reconcileBatchProcessingFunc,
98+
},
99+
cluster: cfg.ClusterName,
93100
dynamicClient: dynamicClient,
94-
storageClient: storageClient,
95101
excludeNamespaces: cfg.ExcludeNamespaces,
96102
includeNamespaces: cfg.IncludeNamespaces,
97-
operatorNamespace: cfg.Namespace,
98-
cluster: cfg.ClusterName,
99103
kind: &domain.Kind{
100104
Group: res.Group,
101105
Version: res.Version,
102106
Resource: res.Resource,
103107
},
104-
res: res,
105-
ShadowObjects: map[string][]byte{},
106-
Strategy: r.Strategy,
107-
batchProcessingFunc: map[domain.BatchType]BatchProcessingFunc{
108-
domain.DefaultBatch: defaultBatchProcessingFunc, // regular processing, when batch type is not set
109-
domain.ReconciliationBatch: reconcileBatchProcessingFunc,
110-
},
108+
listPeriod: cfg.ListPeriod,
109+
operatorNamespace: cfg.Namespace,
110+
res: res,
111+
storageClient: storageClient,
112+
ShadowObjects: map[string][]byte{},
113+
Strategy: r.Strategy,
111114
}
112115
}
113116

114117
var _ adapters.Client = (*Client)(nil)
115118

116119
func (c *Client) Start(ctx context.Context) error {
117120
logger.L().Info("starting incluster client", helpers.String("resource", c.res.Resource))
118-
watchOpts := metav1.ListOptions{}
119-
// for our storage, we need to list all resources and get them one by one
120-
// as list returns objects with empty spec
121-
// and watch does not return existing objects
122-
if c.res.Group == kubescapeCustomResourceGroup {
123-
if err := backoff.RetryNotify(func() error {
124-
var err error
125-
watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx)
126-
return err
127-
}, utils.NewBackOff(true), func(err error, d time.Duration) {
128-
logger.L().Ctx(ctx).Warning("get existing storage objects", helpers.Error(err),
129-
helpers.String("resource", c.res.Resource),
130-
helpers.String("retry in", d.String()))
131-
}); err != nil {
132-
return fmt.Errorf("giving up get existing storage objects: %w", err)
133-
}
134-
}
135121
// begin watch
136122
eventQueue := utils.NewCooldownQueue()
137-
go c.watchRetry(ctx, watchOpts, eventQueue)
123+
if c.res.Group == kubescapeCustomResourceGroup {
124+
// our custom resources no longer support watch, use periodic listing
125+
go c.periodicList(ctx, eventQueue, c.listPeriod)
126+
} else {
127+
watchOpts := metav1.ListOptions{}
128+
go c.watchRetry(ctx, watchOpts, eventQueue)
129+
}
138130
// process events
139131
for event := range eventQueue.ResultChan {
140132
// skip non-objects
@@ -203,7 +195,7 @@ func (c *Client) Stop(_ context.Context) error {
203195
func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, eventQueue *utils.CooldownQueue) {
204196
exitFatal := true
205197
if err := backoff.RetryNotify(func() error {
206-
watcher, err := c.chooseWatcher(watchOpts)
198+
watcher, err := c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
207199
if err != nil {
208200
if k8sErrors.ReasonForError(err) == metav1.StatusReasonNotFound {
209201
exitFatal = false
@@ -534,37 +526,40 @@ func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, e
534526
return object, nil
535527
}
536528

537-
func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error) {
538-
logger.L().Debug("getting existing objects from storage", helpers.String("resource", c.res.Resource))
539-
var resourceVersion string
540-
if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
541-
return c.chooseLister(opts)
542-
}).EachListItem(context.Background(), metav1.ListOptions{}, func(run runtime.Object) error {
543-
d := run.(metav1.Object)
544-
resourceVersion = d.GetResourceVersion()
545-
// no need for skip ns since these are our CRDs
546-
id := domain.KindName{
547-
Kind: c.kind,
548-
Name: d.GetName(),
549-
Namespace: d.GetNamespace(),
550-
ResourceVersion: domain.ToResourceVersion(d.GetResourceVersion()),
551-
}
552-
// get checksum
553-
checksum, err := c.getChecksum(d)
554-
if err != nil {
555-
logger.L().Ctx(ctx).Error("cannot get checksums", helpers.Error(err), helpers.String("id", id.String()))
556-
return nil
557-
}
558-
err = c.callbacks.VerifyObject(ctx, id, checksum)
559-
if err != nil {
560-
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
529+
func (c *Client) periodicList(ctx context.Context, queue *utils.CooldownQueue, duration time.Duration) {
530+
ticker := time.NewTicker(duration)
531+
defer ticker.Stop()
532+
var since string
533+
for {
534+
select {
535+
case <-ctx.Done():
536+
return
537+
case <-ticker.C:
538+
var continueToken string
539+
for {
540+
logger.L().Debug("periodicList - listing resources", helpers.String("resource", c.res.Resource), helpers.String("continueToken", continueToken), helpers.String("since", since))
541+
items, nextToken, lastUpdated, err := c.listFunc(metav1.ListOptions{
542+
Limit: int64(100),
543+
Continue: continueToken,
544+
ResourceVersion: since, // ensure we only get changes since the last check
545+
})
546+
if err != nil {
547+
logger.L().Ctx(ctx).Error("periodicList - error in listFunc", helpers.Error(err))
548+
break
549+
}
550+
for _, obj := range items {
551+
// added and modified events are treated the same, so we enqueue a Modified event for both
552+
// deleted events are not possible with listing, so we rely on the reconciliation batch to detect deletions
553+
queue.Enqueue(watch.Event{Type: watch.Modified, Object: obj})
554+
}
555+
since = lastUpdated
556+
if nextToken == "" {
557+
break
558+
}
559+
continueToken = nextToken
560+
}
561561
}
562-
return nil
563-
}); err != nil {
564-
return "", fmt.Errorf("list resources: %w", err)
565562
}
566-
// set resource version to watch from
567-
return resourceVersion, nil
568563
}
569564

570565
func (c *Client) filterAndMarshal(d metav1.Object) ([]byte, error) {
@@ -787,6 +782,8 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
787782
switch c.res.Resource {
788783
case "applicationprofiles":
789784
return c.storageClient.ApplicationProfiles("").List(context.Background(), opts)
785+
case "knownservers":
786+
return c.storageClient.KnownServers("").List(context.Background(), opts)
790787
case "networkneighborhoods":
791788
return c.storageClient.NetworkNeighborhoods("").List(context.Background(), opts)
792789
case "sbomsyfts":
@@ -800,22 +797,52 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
800797
return c.dynamicClient.Resource(c.res).Namespace("").List(context.Background(), opts)
801798
}
802799

803-
func (c *Client) chooseWatcher(opts metav1.ListOptions) (watch.Interface, error) {
800+
func (c *Client) listFunc(opts metav1.ListOptions) ([]runtime.Object, string, string, error) {
804801
if c.storageClient != nil {
805-
switch c.res.Resource {
806-
case "applicationprofiles":
807-
return c.storageClient.ApplicationProfiles("").Watch(context.Background(), opts)
808-
case "networkneighborhoods":
809-
return c.storageClient.NetworkNeighborhoods("").Watch(context.Background(), opts)
810-
case "sbomsyfts":
811-
return c.storageClient.SBOMSyfts("").Watch(context.Background(), opts)
812-
case "seccompprofiles":
813-
return c.storageClient.SeccompProfiles("").Watch(context.Background(), opts)
814-
case "vulnerabilitymanifests":
815-
return c.storageClient.VulnerabilityManifests("").Watch(context.Background(), opts)
802+
list, err := c.chooseLister(opts)
803+
if err != nil {
804+
return nil, "", "", err
805+
}
806+
switch l := list.(type) {
807+
case *v1beta1.ApplicationProfileList:
808+
items := make([]runtime.Object, len(l.Items))
809+
for i := range l.Items {
810+
items[i] = &l.Items[i]
811+
}
812+
return items, l.Continue, l.ResourceVersion, nil
813+
case *v1beta1.KnownServerList:
814+
items := make([]runtime.Object, len(l.Items))
815+
for i := range l.Items {
816+
items[i] = &l.Items[i]
817+
}
818+
return items, l.Continue, l.ResourceVersion, nil
819+
case *v1beta1.NetworkNeighborhoodList:
820+
items := make([]runtime.Object, len(l.Items))
821+
for i := range l.Items {
822+
items[i] = &l.Items[i]
823+
}
824+
return items, l.Continue, l.ResourceVersion, nil
825+
case *v1beta1.SBOMSyftList:
826+
items := make([]runtime.Object, len(l.Items))
827+
for i := range l.Items {
828+
items[i] = &l.Items[i]
829+
}
830+
return items, l.Continue, l.ResourceVersion, nil
831+
case *v1beta1.SeccompProfileList:
832+
items := make([]runtime.Object, len(l.Items))
833+
for i := range l.Items {
834+
items[i] = &l.Items[i]
835+
}
836+
return items, l.Continue, l.ResourceVersion, nil
837+
case *v1beta1.VulnerabilityManifestList:
838+
items := make([]runtime.Object, len(l.Items))
839+
for i := range l.Items {
840+
items[i] = &l.Items[i]
841+
}
842+
return items, l.Continue, l.ResourceVersion, nil
816843
}
817844
}
818-
return c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), opts)
845+
return nil, "", "", fmt.Errorf("list function not implemented for resource %s", c.res.Resource)
819846
}
820847

821848
func (c *Client) getResource(namespace string, name string) (metav1.Object, error) {

config/config.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"os"
66
"strings"
7+
"time"
78

89
"github.com/armosec/utils-k8s-go/armometadata"
910
"github.com/kubescape/backend/pkg/servicediscovery"
@@ -36,14 +37,15 @@ type Backend struct {
3637
}
3738

3839
type InCluster struct {
39-
ServerUrl string `mapstructure:"serverUrl"`
40-
Namespace string `mapstructure:"namespace"`
41-
ClusterName string `mapstructure:"clusterName"`
42-
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
43-
IncludeNamespaces []string `mapstructure:"includeNamespaces"`
44-
Account string `mapstructure:"account"`
45-
AccessKey string `mapstructure:"accessKey"`
46-
Resources []Resource `mapstructure:"resources"`
40+
AccessKey string `mapstructure:"accessKey"`
41+
Account string `mapstructure:"account"`
42+
ClusterName string `mapstructure:"clusterName"`
43+
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
44+
IncludeNamespaces []string `mapstructure:"includeNamespaces"`
45+
ListPeriod time.Duration `mapstructure:"listPeriod"`
46+
Namespace string `mapstructure:"namespace"`
47+
Resources []Resource `mapstructure:"resources"`
48+
ServerUrl string `mapstructure:"serverUrl"`
4749
}
4850

4951
type HTTPEndpoint struct {
@@ -95,6 +97,8 @@ func LoadConfig(path string) (Config, error) {
9597
v.SetConfigName("config")
9698
v.SetConfigType("json")
9799

100+
v.SetDefault("inCluster.listPeriod", time.Minute)
101+
98102
v.AutomaticEnv()
99103

100104
err := v.ReadInConfig()

config/config_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,18 @@ func TestLoadConfig(t *testing.T) {
6565
path: "../configuration/client",
6666
want: Config{
6767
InCluster: InCluster{
68-
ServerUrl: "ws://127.0.0.1:8080/",
69-
Namespace: "kubescape",
68+
AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx",
69+
Account: "11111111-2222-3333-4444-11111111",
7070
ClusterName: "cluster-1",
7171
ExcludeNamespaces: []string{"kube-system", "kubescape"},
7272
IncludeNamespaces: []string{},
73-
Account: "11111111-2222-3333-4444-11111111",
74-
AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx",
73+
ListPeriod: 60000000000,
74+
Namespace: "kubescape",
75+
ServerUrl: "ws://127.0.0.1:8080/",
7576
Resources: []Resource{
7677
{Group: "", Version: "v1", Resource: "pods", Strategy: "patch"},
7778
{Group: "", Version: "v1", Resource: "nodes", Strategy: "patch"},
79+
{Group: "", Version: "v1", Resource: "configmaps", Strategy: "copy"},
7880
{Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"},
7981
{Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"},
8082
{Group: "spdx.softwarecomposition.kubescape.io", Version: "v1beta1", Resource: "applicationprofiles", Strategy: "patch"},
@@ -108,6 +110,9 @@ func TestLoadConfig(t *testing.T) {
108110
ConsumerTopic: "synchronizer",
109111
SkipAlertsFrom: []string{"foo", "bar"},
110112
},
113+
InCluster: InCluster{
114+
ListPeriod: 60000000000,
115+
},
111116
},
112117
},
113118
}

configuration/client/config.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
"resource": "nodes",
2121
"strategy": "patch"
2222
},
23+
{
24+
"group": "",
25+
"version": "v1",
26+
"resource": "configmaps",
27+
"strategy": "copy"
28+
},
2329
{
2430
"group": "apps",
2531
"version": "v1",

tests/synchronizer_integration_test.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -509,19 +509,14 @@ func createAndStartSynchronizerClient(t *testing.T, cluster *TestKubernetesClust
509509
require.NoError(t, err)
510510

511511
// set cluster config
512-
clientCfg.InCluster.Namespace = kubescapeNamespace
512+
clientCfg.InCluster.Account = cluster.account
513513
clientCfg.InCluster.ClusterName = cluster.cluster
514514
clientCfg.InCluster.ExcludeNamespaces = []string{"kube-system", "kubescape"}
515515
clientCfg.InCluster.IncludeNamespaces = []string{}
516-
clientCfg.InCluster.Account = cluster.account
516+
clientCfg.InCluster.ListPeriod = 5 * time.Second
517+
clientCfg.InCluster.Namespace = kubescapeNamespace
517518
clientCfg.InCluster.ServerUrl = syncServer.serverUrl
518519
if watchDefaults {
519-
clientCfg.InCluster.Resources = append(clientCfg.InCluster.Resources, config.Resource{
520-
Group: "",
521-
Version: "v1",
522-
Resource: "configmaps",
523-
Strategy: "copy",
524-
})
525520
clientCfg.InCluster.Resources = append(clientCfg.InCluster.Resources, config.Resource{
526521
Group: "",
527522
Version: "v1",
@@ -917,21 +912,23 @@ func TestSynchronizer_TC03(t *testing.T) {
917912
}
918913

919914
// TestSynchronizer_TC04_InCluster: Deletion of a single entity
915+
// we use configmap here because our CRDs no longer propagates deletion because of the periodic listings
916+
// (we now rely on the reconciliation batch to detect deletions)
920917
func TestSynchronizer_TC04_InCluster(t *testing.T) {
921918
td := initIntegrationTest(t)
922919
// add applicationprofile to k8s
923-
_, err := td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{})
920+
_, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Create(context.TODO(), td.clusters[0].cm, metav1.CreateOptions{})
924921
require.NoError(t, err)
925922
time.Sleep(10 * time.Second)
926923
// check object in postgres
927-
objMetadata := waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name)
924+
objMetadata := waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name)
928925
assert.NotNil(t, objMetadata)
929926
// delete applicationprofile from k8s
930-
err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
927+
err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
931928
require.NoError(t, err)
932929

933930
// check object not in postgres
934-
waitForObjectToBeDeletedInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name)
931+
waitForObjectToBeDeletedInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name)
935932

936933
// tear down
937934
tearDown(td)

0 commit comments

Comments
 (0)