Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 104 additions & 77 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/pager"
Expand Down Expand Up @@ -64,19 +65,20 @@ type resourceVersionGetter interface {
}

type Client struct {
dynamicClient dynamic.Interface
storageClient spdxv1beta1.SpdxV1beta1Interface
account string
batchProcessingFunc map[domain.BatchType]BatchProcessingFunc
callbacks domain.Callbacks
cluster string
dynamicClient dynamic.Interface
excludeNamespaces []string
includeNamespaces []string
operatorNamespace string // the namespace where the kubescape operator is running
kind *domain.Kind
callbacks domain.Callbacks
listPeriod time.Duration
operatorNamespace string // the namespace where the kubescape operator is running
res schema.GroupVersionResource
storageClient spdxv1beta1.SpdxV1beta1Interface
ShadowObjects map[string][]byte
Strategy domain.Strategy
batchProcessingFunc map[domain.BatchType]BatchProcessingFunc
}

var errWatchClosed = errors.New("watch channel closed")
Expand All @@ -89,52 +91,42 @@ func NewClient(dynamicClient dynamic.Interface, storageClient spdxv1beta1.SpdxV1
logger.L().Warning("event multiplier config detected, but it is deprecated", helpers.String("resource", res.String()), helpers.Int("multiplier", multiplier))
}
return &Client{
account: cfg.Account,
account: cfg.Account,
batchProcessingFunc: map[domain.BatchType]BatchProcessingFunc{
domain.DefaultBatch: defaultBatchProcessingFunc, // regular processing, when batch type is not set
domain.ReconciliationBatch: reconcileBatchProcessingFunc,
},
cluster: cfg.ClusterName,
dynamicClient: dynamicClient,
storageClient: storageClient,
excludeNamespaces: cfg.ExcludeNamespaces,
includeNamespaces: cfg.IncludeNamespaces,
operatorNamespace: cfg.Namespace,
cluster: cfg.ClusterName,
kind: &domain.Kind{
Group: res.Group,
Version: res.Version,
Resource: res.Resource,
},
res: res,
ShadowObjects: map[string][]byte{},
Strategy: r.Strategy,
batchProcessingFunc: map[domain.BatchType]BatchProcessingFunc{
domain.DefaultBatch: defaultBatchProcessingFunc, // regular processing, when batch type is not set
domain.ReconciliationBatch: reconcileBatchProcessingFunc,
},
listPeriod: cfg.ListPeriod,
operatorNamespace: cfg.Namespace,
res: res,
storageClient: storageClient,
ShadowObjects: map[string][]byte{},
Strategy: r.Strategy,
}
}

var _ adapters.Client = (*Client)(nil)

func (c *Client) Start(ctx context.Context) error {
logger.L().Info("starting incluster client", helpers.String("resource", c.res.Resource))
watchOpts := metav1.ListOptions{}
// for our storage, we need to list all resources and get them one by one
// as list returns objects with empty spec
// and watch does not return existing objects
if c.res.Group == kubescapeCustomResourceGroup {
if err := backoff.RetryNotify(func() error {
var err error
watchOpts.ResourceVersion, err = c.getExistingStorageObjects(ctx)
return err
}, utils.NewBackOff(true), func(err error, d time.Duration) {
logger.L().Ctx(ctx).Warning("get existing storage objects", helpers.Error(err),
helpers.String("resource", c.res.Resource),
helpers.String("retry in", d.String()))
}); err != nil {
return fmt.Errorf("giving up get existing storage objects: %w", err)
}
}
// begin watch
eventQueue := utils.NewCooldownQueue()
go c.watchRetry(ctx, watchOpts, eventQueue)
if c.res.Group == kubescapeCustomResourceGroup {
// our custom resources no longer support watch, use periodic listing
go c.periodicList(ctx, eventQueue, c.listPeriod)
} else {
watchOpts := metav1.ListOptions{}
go c.watchRetry(ctx, watchOpts, eventQueue)
}
// process events
for event := range eventQueue.ResultChan {
// skip non-objects
Expand Down Expand Up @@ -203,7 +195,7 @@ func (c *Client) Stop(_ context.Context) error {
func (c *Client) watchRetry(ctx context.Context, watchOpts metav1.ListOptions, eventQueue *utils.CooldownQueue) {
exitFatal := true
if err := backoff.RetryNotify(func() error {
watcher, err := c.chooseWatcher(watchOpts)
watcher, err := c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
if k8sErrors.ReasonForError(err) == metav1.StatusReasonNotFound {
exitFatal = false
Expand Down Expand Up @@ -534,37 +526,40 @@ func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, e
return object, nil
}

func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error) {
logger.L().Debug("getting existing objects from storage", helpers.String("resource", c.res.Resource))
var resourceVersion string
if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return c.chooseLister(opts)
}).EachListItem(context.Background(), metav1.ListOptions{}, func(run runtime.Object) error {
d := run.(metav1.Object)
resourceVersion = d.GetResourceVersion()
// no need for skip ns since these are our CRDs
id := domain.KindName{
Kind: c.kind,
Name: d.GetName(),
Namespace: d.GetNamespace(),
ResourceVersion: domain.ToResourceVersion(d.GetResourceVersion()),
}
// get checksum
checksum, err := c.getChecksum(d)
if err != nil {
logger.L().Ctx(ctx).Error("cannot get checksums", helpers.Error(err), helpers.String("id", id.String()))
return nil
}
err = c.callbacks.VerifyObject(ctx, id, checksum)
if err != nil {
logger.L().Ctx(ctx).Error("cannot handle added resource", helpers.Error(err), helpers.String("id", id.String()))
func (c *Client) periodicList(ctx context.Context, queue *utils.CooldownQueue, duration time.Duration) {
ticker := time.NewTicker(duration)
defer ticker.Stop()
var since string
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var continueToken string
for {
logger.L().Debug("periodicList - listing resources", helpers.String("resource", c.res.Resource), helpers.String("continueToken", continueToken), helpers.String("since", since))
items, nextToken, lastUpdated, err := c.listFunc(metav1.ListOptions{
Limit: int64(100),
Continue: continueToken,
ResourceVersion: since, // ensure we only get changes since the last check
})
if err != nil {
logger.L().Ctx(ctx).Error("periodicList - error in listFunc", helpers.Error(err))
break
}
for _, obj := range items {
// added and modified events are treated the same, so we enqueue a Modified event for both
// deleted events are not possible with listing, so we rely on the reconciliation batch to detect deletions
queue.Enqueue(watch.Event{Type: watch.Modified, Object: obj})
}
since = lastUpdated
if nextToken == "" {
break
}
continueToken = nextToken
}
}
return nil
}); err != nil {
return "", fmt.Errorf("list resources: %w", err)
}
// set resource version to watch from
return resourceVersion, nil
}

func (c *Client) filterAndMarshal(d metav1.Object) ([]byte, error) {
Expand Down Expand Up @@ -787,6 +782,8 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
switch c.res.Resource {
case "applicationprofiles":
return c.storageClient.ApplicationProfiles("").List(context.Background(), opts)
case "knownservers":
return c.storageClient.KnownServers("").List(context.Background(), opts)
case "networkneighborhoods":
return c.storageClient.NetworkNeighborhoods("").List(context.Background(), opts)
case "sbomsyfts":
Expand All @@ -800,22 +797,52 @@ func (c *Client) chooseLister(opts metav1.ListOptions) (runtime.Object, error) {
return c.dynamicClient.Resource(c.res).Namespace("").List(context.Background(), opts)
}

func (c *Client) chooseWatcher(opts metav1.ListOptions) (watch.Interface, error) {
func (c *Client) listFunc(opts metav1.ListOptions) ([]runtime.Object, string, string, error) {
if c.storageClient != nil {
switch c.res.Resource {
case "applicationprofiles":
return c.storageClient.ApplicationProfiles("").Watch(context.Background(), opts)
case "networkneighborhoods":
return c.storageClient.NetworkNeighborhoods("").Watch(context.Background(), opts)
case "sbomsyfts":
return c.storageClient.SBOMSyfts("").Watch(context.Background(), opts)
case "seccompprofiles":
return c.storageClient.SeccompProfiles("").Watch(context.Background(), opts)
case "vulnerabilitymanifests":
return c.storageClient.VulnerabilityManifests("").Watch(context.Background(), opts)
list, err := c.chooseLister(opts)
if err != nil {
return nil, "", "", err
}
switch l := list.(type) {
case *v1beta1.ApplicationProfileList:
items := make([]runtime.Object, len(l.Items))
for i := range l.Items {
items[i] = &l.Items[i]
}
return items, l.Continue, l.ResourceVersion, nil
case *v1beta1.KnownServerList:
items := make([]runtime.Object, len(l.Items))
for i := range l.Items {
items[i] = &l.Items[i]
}
return items, l.Continue, l.ResourceVersion, nil
case *v1beta1.NetworkNeighborhoodList:
items := make([]runtime.Object, len(l.Items))
for i := range l.Items {
items[i] = &l.Items[i]
}
return items, l.Continue, l.ResourceVersion, nil
case *v1beta1.SBOMSyftList:
items := make([]runtime.Object, len(l.Items))
for i := range l.Items {
items[i] = &l.Items[i]
}
return items, l.Continue, l.ResourceVersion, nil
case *v1beta1.SeccompProfileList:
items := make([]runtime.Object, len(l.Items))
for i := range l.Items {
items[i] = &l.Items[i]
}
return items, l.Continue, l.ResourceVersion, nil
case *v1beta1.VulnerabilityManifestList:
items := make([]runtime.Object, len(l.Items))
for i := range l.Items {
items[i] = &l.Items[i]
}
return items, l.Continue, l.ResourceVersion, nil
}
}
return c.dynamicClient.Resource(c.res).Namespace("").Watch(context.Background(), opts)
return nil, "", "", fmt.Errorf("list function not implemented for resource %s", c.res.Resource)
}

func (c *Client) getResource(namespace string, name string) (metav1.Object, error) {
Expand Down
20 changes: 12 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/armosec/utils-k8s-go/armometadata"
"github.com/kubescape/backend/pkg/servicediscovery"
Expand Down Expand Up @@ -36,14 +37,15 @@ type Backend struct {
}

type InCluster struct {
ServerUrl string `mapstructure:"serverUrl"`
Namespace string `mapstructure:"namespace"`
ClusterName string `mapstructure:"clusterName"`
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
IncludeNamespaces []string `mapstructure:"includeNamespaces"`
Account string `mapstructure:"account"`
AccessKey string `mapstructure:"accessKey"`
Resources []Resource `mapstructure:"resources"`
AccessKey string `mapstructure:"accessKey"`
Account string `mapstructure:"account"`
ClusterName string `mapstructure:"clusterName"`
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
IncludeNamespaces []string `mapstructure:"includeNamespaces"`
ListPeriod time.Duration `mapstructure:"listPeriod"`
Namespace string `mapstructure:"namespace"`
Resources []Resource `mapstructure:"resources"`
ServerUrl string `mapstructure:"serverUrl"`
}

type HTTPEndpoint struct {
Expand Down Expand Up @@ -95,6 +97,8 @@ func LoadConfig(path string) (Config, error) {
v.SetConfigName("config")
v.SetConfigType("json")

v.SetDefault("inCluster.listPeriod", time.Minute)

v.AutomaticEnv()

err := v.ReadInConfig()
Expand Down
13 changes: 9 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,18 @@ func TestLoadConfig(t *testing.T) {
path: "../configuration/client",
want: Config{
InCluster: InCluster{
ServerUrl: "ws://127.0.0.1:8080/",
Namespace: "kubescape",
AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx",
Account: "11111111-2222-3333-4444-11111111",
ClusterName: "cluster-1",
ExcludeNamespaces: []string{"kube-system", "kubescape"},
IncludeNamespaces: []string{},
Account: "11111111-2222-3333-4444-11111111",
AccessKey: "xxxxxxxx-1111-1111-1111-xxxxxxxx",
ListPeriod: 60000000000,
Namespace: "kubescape",
ServerUrl: "ws://127.0.0.1:8080/",
Resources: []Resource{
{Group: "", Version: "v1", Resource: "pods", Strategy: "patch"},
{Group: "", Version: "v1", Resource: "nodes", Strategy: "patch"},
{Group: "", Version: "v1", Resource: "configmaps", Strategy: "copy"},
{Group: "apps", Version: "v1", Resource: "deployments", Strategy: "patch"},
{Group: "apps", Version: "v1", Resource: "statefulsets", Strategy: "patch"},
{Group: "spdx.softwarecomposition.kubescape.io", Version: "v1beta1", Resource: "applicationprofiles", Strategy: "patch"},
Expand Down Expand Up @@ -108,6 +110,9 @@ func TestLoadConfig(t *testing.T) {
ConsumerTopic: "synchronizer",
SkipAlertsFrom: []string{"foo", "bar"},
},
InCluster: InCluster{
ListPeriod: 60000000000,
},
},
},
}
Expand Down
6 changes: 6 additions & 0 deletions configuration/client/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
"resource": "nodes",
"strategy": "patch"
},
{
"group": "",
"version": "v1",
"resource": "configmaps",
"strategy": "copy"
},
{
"group": "apps",
"version": "v1",
Expand Down
21 changes: 9 additions & 12 deletions tests/synchronizer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,19 +509,14 @@ func createAndStartSynchronizerClient(t *testing.T, cluster *TestKubernetesClust
require.NoError(t, err)

// set cluster config
clientCfg.InCluster.Namespace = kubescapeNamespace
clientCfg.InCluster.Account = cluster.account
clientCfg.InCluster.ClusterName = cluster.cluster
clientCfg.InCluster.ExcludeNamespaces = []string{"kube-system", "kubescape"}
clientCfg.InCluster.IncludeNamespaces = []string{}
clientCfg.InCluster.Account = cluster.account
clientCfg.InCluster.ListPeriod = 5 * time.Second
clientCfg.InCluster.Namespace = kubescapeNamespace
clientCfg.InCluster.ServerUrl = syncServer.serverUrl
if watchDefaults {
clientCfg.InCluster.Resources = append(clientCfg.InCluster.Resources, config.Resource{
Group: "",
Version: "v1",
Resource: "configmaps",
Strategy: "copy",
})
clientCfg.InCluster.Resources = append(clientCfg.InCluster.Resources, config.Resource{
Group: "",
Version: "v1",
Expand Down Expand Up @@ -917,21 +912,23 @@ func TestSynchronizer_TC03(t *testing.T) {
}

// TestSynchronizer_TC04_InCluster: Deletion of a single entity
// we use configmap here because our CRDs no longer propagates deletion because of the periodic listings
// (we now rely on the reconciliation batch to detect deletions)
func TestSynchronizer_TC04_InCluster(t *testing.T) {
td := initIntegrationTest(t)
// add applicationprofile to k8s
_, err := td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{})
_, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Create(context.TODO(), td.clusters[0].cm, metav1.CreateOptions{})
require.NoError(t, err)
time.Sleep(10 * time.Second)
// check object in postgres
objMetadata := waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name)
objMetadata := waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name)
assert.NotNil(t, objMetadata)
// delete applicationprofile from k8s
err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
require.NoError(t, err)

// check object not in postgres
waitForObjectToBeDeletedInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name)
waitForObjectToBeDeletedInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name)

// tear down
tearDown(td)
Expand Down
Loading