Skip to content

Commit 8ef1b88

Browse files
committed
replace watches with periodic lists for spdx CRDs
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
1 parent cf2ff3c commit 8ef1b88

File tree

1 file changed

+86
-61
lines changed

1 file changed

+86
-61
lines changed

adapters/incluster/v1/client.go

Lines changed: 86 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/cenkalti/backoff/v4"
17+
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
1718
"go.uber.org/multierr"
1819
"k8s.io/apimachinery/pkg/runtime"
1920
"k8s.io/client-go/tools/pager"
@@ -115,26 +116,15 @@ var _ adapters.Client = (*Client)(nil)
115116

116117
func (c *Client) Start(ctx context.Context) error {
117118
logger.L().Info("starting incluster client", helpers.String("resource", c.res.Resource))
118-
watchOpts := metav1.ListOptions{}
119-
// for our storage, we need to list all resources and get them one by one
120-
// as list returns objects with empty spec
121-
// and watch does not return existing objects
122-
if c.res.Group == kubescapeCustomResourceGroup {
123-
if err := backoff.RetryNotify(func() error {
124-
var err error
125-
watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx)
126-
return err
127-
}, utils.NewBackOff(true), func(err error, d time.Duration) {
128-
logger.L().Ctx(ctx).Warning("get existing storage objects", helpers.Error(err),
129-
helpers.String("resource", c.res.Resource),
130-
helpers.String("retry in", d.String()))
131-
}); err != nil {
132-
return fmt.Errorf("giving up get existing storage objects: %w", err)
133-
}
134-
}
135119
// begin watch
136120
eventQueue := utils.NewCooldownQueue()
137-
go c.watchRetry(ctx, watchOpts, eventQueue)
121+
if c.res.Group == kubescapeCustomResourceGroup {
122+
// our custom resources no longer support watch, use periodic listing
123+
go c.periodicList(ctx, eventQueue, 1*time.Minute)
124+
} else {
125+
watchOpts := metav1.ListOptions{}
126+
go c.watchRetry(ctx, watchOpts, eventQueue)
127+
}
138128
// process events
139129
for event := range eventQueue.ResultChan {
140130
// skip non-objects
@@ -203,7 +193,7 @@ func (c *Client) Stop(_ context.Context) error {
203193
func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, eventQueue *utils.CooldownQueue) {
204194
exitFatal := true
205195
if err := backoff.RetryNotify(func() error {
206-
watcher, err := c.chooseWatcher(watchOpts)
196+
watcher, err := c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
207197
if err != nil {
208198
if k8sErrors.ReasonForError(err) == metav1.StatusReasonNotFound {
209199
exitFatal = false
@@ -534,37 +524,40 @@ func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, e
534524
return object, nil
535525
}
536526

537-
func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error) {
538-
logger.L().Debug("getting existing objects from storage", helpers.String("resource", c.res.Resource))
539-
var resourceVersion string
540-
if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
541-
return c.chooseLister(opts)
542-
}).EachListItem(context.Background(), metav1.ListOptions{}, func(run runtime.Object) error {
543-
d := run.(metav1.Object)
544-
resourceVersion = d.GetResourceVersion()
545-
// no need for skip ns since these are our CRDs
546-
id := domain.KindName{
547-
Kind: c.kind,
548-
Name: d.GetName(),
549-
Namespace: d.GetNamespace(),
550-
ResourceVersion: domain.ToResourceVersion(d.GetResourceVersion()),
551-
}
552-
// get checksum
553-
checksum, err := c.getChecksum(d)
554-
if err != nil {
555-
logger.L().Ctx(ctx).Error("cannot get checksums", helpers.Error(err), helpers.String("id", id.String()))
556-
return nil
557-
}
558-
err = c.callbacks.VerifyObject(ctx, id, checksum)
559-
if err != nil {
560-
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
527+
func (c *Client) periodicList(ctx context.Context, queue *utils.CooldownQueue, duration time.Duration) {
528+
ticker := time.NewTicker(duration)
529+
defer ticker.Stop()
530+
var since string
531+
for {
532+
select {
533+
case <-ctx.Done():
534+
return
535+
case <-ticker.C:
536+
var continueToken string
537+
for {
538+
logger.L().Debug("periodicList - listing resources", helpers.String("continueToken", continueToken), helpers.String("since", since))
539+
items, nextToken, lastUpdated, err := c.listFunc(metav1.ListOptions{
540+
Limit: int64(100),
541+
Continue: continueToken,
542+
ResourceVersion: since, // ensure we only get changes since the last check
543+
})
544+
if err != nil {
545+
logger.L().Ctx(ctx).Error("GenericResourceWatch - error in listFunc", helpers.Error(err))
546+
break
547+
}
548+
for _, obj := range items {
549+
// added and modified events are treated the same, so we enqueue a Modified event for both
550+
// deleted events are not possible with listing, so we rely on the reconciliation batch to detect deletions
551+
queue.Enqueue(watch.Event{Type: watch.Modified, Object: obj})
552+
}
553+
since = lastUpdated
554+
if nextToken == "" {
555+
break
556+
}
557+
continueToken = nextToken
558+
}
561559
}
562-
return nil
563-
}); err != nil {
564-
return "", fmt.Errorf("list resources: %w", err)
565560
}
566-
// set resource version to watch from
567-
return resourceVersion, nil
568561
}
569562

570563
func (c *Client) filterAndMarshal(d metav1.Object) ([]byte, error) {
@@ -787,6 +780,8 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
787780
switch c.res.Resource {
788781
case "applicationprofiles":
789782
return c.storageClient.ApplicationProfiles("").List(context.Background(), opts)
783+
case "knownservers":
784+
return c.storageClient.KnownServers("").List(context.Background(), opts)
790785
case "networkneighborhoods":
791786
return c.storageClient.NetworkNeighborhoods("").List(context.Background(), opts)
792787
case "sbomsyfts":
@@ -800,22 +795,52 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
800795
return c.dynamicClient.Resource(c.res).Namespace("").List(context.Background(), opts)
801796
}
802797

803-
func (c *Client) chooseWatcher(opts metav1.ListOptions) (watch.Interface, error) {
798+
func (c *Client) listFunc(opts metav1.ListOptions) ([]runtime.Object, string, string, error) {
804799
if c.storageClient != nil {
805-
switch c.res.Resource {
806-
case "applicationprofiles":
807-
return c.storageClient.ApplicationProfiles("").Watch(context.Background(), opts)
808-
case "networkneighborhoods":
809-
return c.storageClient.NetworkNeighborhoods("").Watch(context.Background(), opts)
810-
case "sbomsyfts":
811-
return c.storageClient.SBOMSyfts("").Watch(context.Background(), opts)
812-
case "seccompprofiles":
813-
return c.storageClient.SeccompProfiles("").Watch(context.Background(), opts)
814-
case "vulnerabilitymanifests":
815-
return c.storageClient.VulnerabilityManifests("").Watch(context.Background(), opts)
800+
list, err := c.chooseLister(opts)
801+
if err != nil {
802+
return nil, "", "", err
803+
}
804+
switch l := list.(type) {
805+
case *v1beta1.ApplicationProfileList:
806+
items := make([]runtime.Object, len(l.Items))
807+
for i := range l.Items {
808+
items[i] = &l.Items[i]
809+
}
810+
return items, l.Continue, l.ResourceVersion, nil
811+
case *v1beta1.KnownServerList:
812+
items := make([]runtime.Object, len(l.Items))
813+
for i := range l.Items {
814+
items[i] = &l.Items[i]
815+
}
816+
return items, l.Continue, l.ResourceVersion, nil
817+
case *v1beta1.NetworkNeighborhoodList:
818+
items := make([]runtime.Object, len(l.Items))
819+
for i := range l.Items {
820+
items[i] = &l.Items[i]
821+
}
822+
return items, l.Continue, l.ResourceVersion, nil
823+
case *v1beta1.SBOMSyftList:
824+
items := make([]runtime.Object, len(l.Items))
825+
for i := range l.Items {
826+
items[i] = &l.Items[i]
827+
}
828+
return items, l.Continue, l.ResourceVersion, nil
829+
case *v1beta1.SeccompProfileList:
830+
items := make([]runtime.Object, len(l.Items))
831+
for i := range l.Items {
832+
items[i] = &l.Items[i]
833+
}
834+
return items, l.Continue, l.ResourceVersion, nil
835+
case *v1beta1.VulnerabilityManifestList:
836+
items := make([]runtime.Object, len(l.Items))
837+
for i := range l.Items {
838+
items[i] = &l.Items[i]
839+
}
840+
return items, l.Continue, l.ResourceVersion, nil
816841
}
817842
}
818-
return c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), opts)
843+
return nil, "", "", fmt.Errorf("list function not implemented for resource %s", c.res.Resource)
819844
}
820845

821846
func (c *Client) getResource(namespace string, name string) (metav1.Object, error) {

0 commit comments

Comments
 (0)