@@ -14,6 +14,7 @@ import (
1414 "time"
1515
1616 "github.com/cenkalti/backoff/v4"
17+ "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
1718 "go.uber.org/multierr"
1819 "k8s.io/apimachinery/pkg/runtime"
1920 "k8s.io/client-go/tools/pager"
@@ -115,26 +116,15 @@ var _ adapters.Client = (*Client)(nil)
115116
116117func (c * Client ) Start (ctx context.Context ) error {
117118 logger .L ().Info ("starting incluster client" , helpers .String ("resource" , c .res .Resource ))
118- watchOpts := metav1.ListOptions {}
119- // for our storage, we need to list all resources and get them one by one
120- // as list returns objects with empty spec
121- // and watch does not return existing objects
122- if c .res .Group == kubescapeCustomResourceGroup {
123- if err := backoff .RetryNotify (func () error {
124- var err error
125- watchOpts .ResourceVersion , err = c .getExistingStorageObjects (ctx )
126- return err
127- }, utils .NewBackOff (true ), func (err error , d time.Duration ) {
128- logger .L ().Ctx (ctx ).Warning ("get existing storage objects" , helpers .Error (err ),
129- helpers .String ("resource" , c .res .Resource ),
130- helpers .String ("retry in" , d .String ()))
131- }); err != nil {
132- return fmt .Errorf ("giving up get existing storage objects: %w" , err )
133- }
134- }
135119 // begin watch
136120 eventQueue := utils .NewCooldownQueue ()
137- go c .watchRetry (ctx , watchOpts , eventQueue )
121+ if c .res .Group == kubescapeCustomResourceGroup {
122+ // our custom resources no longer support watch, use periodic listing
123+ go c .periodicList (ctx , eventQueue , 1 * time .Minute )
124+ } else {
125+ watchOpts := metav1.ListOptions {}
126+ go c .watchRetry (ctx , watchOpts , eventQueue )
127+ }
138128 // process events
139129 for event := range eventQueue .ResultChan {
140130 // skip non-objects
@@ -203,7 +193,7 @@ func (c *Client) Stop(_ context.Context) error {
203193func (c * Client ) watchRetry (ctx context.Context , watchOpts metav1.ListOptions , eventQueue * utils.CooldownQueue ) {
204194 exitFatal := true
205195 if err := backoff .RetryNotify (func () error {
206- watcher , err := c .chooseWatcher ( watchOpts )
196+ watcher , err := c .dynamicClient . Resource ( c . res ). Namespace ( "" ). Watch ( context . Background (), watchOpts )
207197 if err != nil {
208198 if k8sErrors .ReasonForError (err ) == metav1 .StatusReasonNotFound {
209199 exitFatal = false
@@ -534,37 +524,40 @@ func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, e
534524 return object , nil
535525}
536526
537- func (c * Client ) getExistingStorageObjects (ctx context.Context ) (string , error ) {
538- logger .L ().Debug ("getting existing objects from storage" , helpers .String ("resource" , c .res .Resource ))
539- var resourceVersion string
540- if err := pager .New (func (ctx context.Context , opts metav1.ListOptions ) (runtime.Object , error ) {
541- return c .chooseLister (opts )
542- }).EachListItem (context .Background (), metav1.ListOptions {}, func (run runtime.Object ) error {
543- d := run .(metav1.Object )
544- resourceVersion = d .GetResourceVersion ()
545- // no need for skip ns since these are our CRDs
546- id := domain.KindName {
547- Kind : c .kind ,
548- Name : d .GetName (),
549- Namespace : d .GetNamespace (),
550- ResourceVersion : domain .ToResourceVersion (d .GetResourceVersion ()),
551- }
552- // get checksum
553- checksum , err := c .getChecksum (d )
554- if err != nil {
555- logger .L ().Ctx (ctx ).Error ("cannot get checksums" , helpers .Error (err ), helpers .String ("id" , id .String ()))
556- return nil
557- }
558- err = c .callbacks .VerifyObject (ctx , id , checksum )
559- if err != nil {
560- logger .L ().Ctx (ctx ).Error ("cannot handle added resource" , helpers .Error (err ), helpers .String ("id" , id .String ()))
527+ func (c * Client ) periodicList (ctx context.Context , queue * utils.CooldownQueue , duration time.Duration ) {
528+ ticker := time .NewTicker (duration )
529+ defer ticker .Stop ()
530+ var since string
531+ for {
532+ select {
533+ case <- ctx .Done ():
534+ return
535+ case <- ticker .C :
536+ var continueToken string
537+ for {
538+ logger .L ().Debug ("periodicList - listing resources" , helpers .String ("continueToken" , continueToken ), helpers .String ("since" , since ))
539+ items , nextToken , lastUpdated , err := c .listFunc (metav1.ListOptions {
540+ Limit : int64 (100 ),
541+ Continue : continueToken ,
542+ ResourceVersion : since , // ensure we only get changes since the last check
543+ })
544+ if err != nil {
545+ logger .L ().Ctx (ctx ).Error ("GenericResourceWatch - error in listFunc" , helpers .Error (err ))
546+ break
547+ }
548+ for _ , obj := range items {
549+ // added and modified events are treated the same, so we enqueue a Modified event for both
550+ // deleted events are not possible with listing, so we rely on the reconciliation batch to detect deletions
551+ queue .Enqueue (watch.Event {Type : watch .Modified , Object : obj })
552+ }
553+ since = lastUpdated
554+ if nextToken == "" {
555+ break
556+ }
557+ continueToken = nextToken
558+ }
561559 }
562- return nil
563- }); err != nil {
564- return "" , fmt .Errorf ("list resources: %w" , err )
565560 }
566- // set resource version to watch from
567- return resourceVersion , nil
568561}
569562
570563func (c * Client ) filterAndMarshal (d metav1.Object ) ([]byte , error ) {
@@ -800,22 +793,46 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
800793 return c .dynamicClient .Resource (c .res ).Namespace ("" ).List (context .Background (), opts )
801794}
802795
803- func (c * Client ) chooseWatcher (opts metav1.ListOptions ) (watch. Interface , error ) {
796+ func (c * Client ) listFunc (opts metav1.ListOptions ) ([]runtime. Object , string , string , error ) {
804797 if c .storageClient != nil {
805- switch c .res .Resource {
806- case "applicationprofiles" :
807- return c .storageClient .ApplicationProfiles ("" ).Watch (context .Background (), opts )
808- case "networkneighborhoods" :
809- return c .storageClient .NetworkNeighborhoods ("" ).Watch (context .Background (), opts )
810- case "sbomsyfts" :
811- return c .storageClient .SBOMSyfts ("" ).Watch (context .Background (), opts )
812- case "seccompprofiles" :
813- return c .storageClient .SeccompProfiles ("" ).Watch (context .Background (), opts )
814- case "vulnerabilitymanifests" :
815- return c .storageClient .VulnerabilityManifests ("" ).Watch (context .Background (), opts )
798+ list , err := c .chooseLister (opts )
799+ if err != nil {
800+ return nil , "" , "" , err
801+ }
802+ switch l := list .(type ) {
803+ case * v1beta1.ApplicationProfileList :
804+ items := make ([]runtime.Object , len (l .Items ))
805+ for i := range l .Items {
806+ items [i ] = & l .Items [i ]
807+ }
808+ return items , l .Continue , l .ResourceVersion , nil
809+ case * v1beta1.NetworkNeighborhoodList :
810+ items := make ([]runtime.Object , len (l .Items ))
811+ for i := range l .Items {
812+ items [i ] = & l .Items [i ]
813+ }
814+ return items , l .Continue , l .ResourceVersion , nil
815+ case * v1beta1.SBOMSyftList :
816+ items := make ([]runtime.Object , len (l .Items ))
817+ for i := range l .Items {
818+ items [i ] = & l .Items [i ]
819+ }
820+ return items , l .Continue , l .ResourceVersion , nil
821+ case * v1beta1.SeccompProfileList :
822+ items := make ([]runtime.Object , len (l .Items ))
823+ for i := range l .Items {
824+ items [i ] = & l .Items [i ]
825+ }
826+ return items , l .Continue , l .ResourceVersion , nil
827+ case * v1beta1.VulnerabilityManifestList :
828+ items := make ([]runtime.Object , len (l .Items ))
829+ for i := range l .Items {
830+ items [i ] = & l .Items [i ]
831+ }
832+ return items , l .Continue , l .ResourceVersion , nil
816833 }
817834 }
818- return c . dynamicClient . Resource ( c . res ). Namespace ( "" ). Watch ( context . Background (), opts )
835+ return nil , "" , "" , fmt . Errorf ( "list function not implemented for resource %s" , c . res . Resource )
819836}
820837
821838func (c * Client ) getResource (namespace string , name string ) (metav1.Object , error ) {
0 commit comments