Skip to content

Commit bc239d4

Browse files
committed
remove watcher and watch verb (forked apiserver)
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
1 parent 896644b commit bc239d4

File tree

12 files changed

+54
-647
lines changed

12 files changed

+54
-647
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,5 @@ require (
221221
sigs.k8s.io/randfill v1.0.0 // indirect
222222
sigs.k8s.io/yaml v1.4.0 // indirect
223223
)
224+
225+
replace k8s.io/apiserver => github.com/matthyx/apiserver v0.0.0-20251003105411-dd9f2bda2b69

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPK
495495
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
496496
github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
497497
github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU=
498+
github.com/matthyx/apiserver v0.0.0-20251003105411-dd9f2bda2b69 h1:+/KDXV2mYNpUBO6EozBz09JOQgtLTyG967jOe7Z80i0=
499+
github.com/matthyx/apiserver v0.0.0-20251003105411-dd9f2bda2b69/go.mod h1:VMbE4ArWYLO01omz+k8hFjAdYfc3GVAYPrhP2tTKccs=
498500
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
499501
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
500502
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
@@ -1337,8 +1339,6 @@ k8s.io/apiextensions-apiserver v0.33.1 h1:N7ccbSlRN6I2QBcXevB73PixX2dQNIW0ZRuguE
13371339
k8s.io/apiextensions-apiserver v0.33.1/go.mod h1:uNQ52z1A1Gu75QSa+pFK5bcXc4hq7lpOXbweZgi4dqA=
13381340
k8s.io/apimachinery v0.33.1 h1:mzqXWV8tW9Rw4VeW9rEkqvnxj59k1ezDUl20tFK/oM4=
13391341
k8s.io/apimachinery v0.33.1/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM=
1340-
k8s.io/apiserver v0.33.1 h1:yLgLUPDVC6tHbNcw5uE9mo1T6ELhJj7B0geifra3Qdo=
1341-
k8s.io/apiserver v0.33.1/go.mod h1:VMbE4ArWYLO01omz+k8hFjAdYfc3GVAYPrhP2tTKccs=
13421342
k8s.io/client-go v0.33.1 h1:ZZV/Ks2g92cyxWkRRnfUDsnhNn28eFpt26aGc8KbXF4=
13431343
k8s.io/client-go v0.33.1/go.mod h1:JAsUrl1ArO7uRVFWfcj6kOomSlCv+JpvIsp6usAGefA=
13441344
k8s.io/code-generator v0.33.1 h1:ZLzIRdMsh3Myfnx9BaooX6iQry29UJjVfVG+BuS+UMw=

main.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,6 @@ func main() {
9292
osFs := afero.NewOsFs()
9393
pool := file.NewPool(filepath.Join(file.DefaultStorageRoot, "metadata.sq3"), 0) // If less than 1, a reasonable default is used.
9494

95-
// setup watcher
96-
watchDispatcher := file.NewWatchDispatcher()
97-
9895
// cleanup task
9996
client, err := file.NewKubernetesClient()
10097
kubernetesAPI := file.NewKubernetesAPI(cfg, client)
@@ -104,11 +101,11 @@ func main() {
104101

105102
relevancyEnabled := clusterData.RelevantImageVulnerabilitiesEnabled != nil && *clusterData.RelevantImageVulnerabilitiesEnabled
106103

107-
cleanupHandler := file.NewResourcesCleanupHandler(osFs, file.DefaultStorageRoot, pool, watchDispatcher, cfg.CleanupInterval, cfg.DefaultNamespace, kubernetesAPI, relevancyEnabled)
104+
cleanupHandler := file.NewResourcesCleanupHandler(osFs, file.DefaultStorageRoot, pool, cfg.CleanupInterval, cfg.DefaultNamespace, kubernetesAPI, relevancyEnabled)
108105
go cleanupHandler.RunCleanupTask(ctx)
109106

110107
// start the server
111-
options := server.NewWardleServerOptions(os.Stdout, os.Stderr, osFs, pool, cfg, watchDispatcher, cleanupHandler)
108+
options := server.NewWardleServerOptions(os.Stdout, os.Stderr, osFs, pool, cfg, cleanupHandler)
112109
cmd := server.NewCommandStartWardleServer(ctx, options, false)
113110
logger.L().Info("APIServer starting")
114111
code := cli.Run(cmd)

pkg/apiserver/apiserver.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,10 @@ func init() {
8282

8383
// ExtraConfig holds custom apiserver config
8484
type ExtraConfig struct {
85-
CleanupHandler *file.ResourcesCleanupHandler
86-
OsFs afero.Fs
87-
Pool *sqlitemigration.Pool
88-
StorageConfig config.Config
89-
WatchDispatcher *file.WatchDispatcher
85+
CleanupHandler *file.ResourcesCleanupHandler
86+
OsFs afero.Fs
87+
Pool *sqlitemigration.Pool
88+
StorageConfig config.Config
9089
}
9190

9291
// Config defines the config for the apiserver
@@ -140,11 +139,11 @@ func (c completedConfig) New() (*WardleServer, error) {
140139
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(softwarecomposition.GroupName, Scheme, metav1.ParameterCodec, Codecs)
141140

142141
var (
143-
storageImpl = file.NewStorageImpl(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme)
142+
storageImpl = file.NewStorageImpl(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme)
144143

145-
applicationProfileStorageImpl = file.NewApplicationProfileStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme, file.NewApplicationProfileProcessor(c.ExtraConfig.StorageConfig)))
146-
containerProfileStorageImpl = file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme, file.NewContainerProfileProcessor(c.ExtraConfig.StorageConfig, c.ExtraConfig.Pool, c.ExtraConfig.CleanupHandler))
147-
networkNeighborhoodStorageImpl = file.NewNetworkNeighborhoodStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, c.ExtraConfig.WatchDispatcher, Scheme, file.NewNetworkNeighborhoodProcessor(c.ExtraConfig.StorageConfig)))
144+
applicationProfileStorageImpl = file.NewApplicationProfileStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme, file.NewApplicationProfileProcessor(c.ExtraConfig.StorageConfig)))
145+
containerProfileStorageImpl = file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme, file.NewContainerProfileProcessor(c.ExtraConfig.StorageConfig, c.ExtraConfig.Pool, c.ExtraConfig.CleanupHandler))
146+
networkNeighborhoodStorageImpl = file.NewNetworkNeighborhoodStorage(file.NewStorageImplWithCollector(c.ExtraConfig.OsFs, file.DefaultStorageRoot, c.ExtraConfig.Pool, Scheme, file.NewNetworkNeighborhoodProcessor(c.ExtraConfig.StorageConfig)))
148147
configScanStorageImpl = file.NewConfigurationScanSummaryStorage(storageImpl)
149148
vulnerabilitySummaryStorage = file.NewVulnerabilitySummaryStorage(storageImpl)
150149
generatedNetworkPolicyStorage = file.NewGeneratedNetworkPolicyStorage(storageImpl, networkNeighborhoodStorageImpl)

pkg/cmd/server/start.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,10 @@ type WardleServerOptions struct {
7171

7272
AlternateDNS []string
7373

74-
CleanupHandler *file.ResourcesCleanupHandler
75-
OsFs afero.Fs
76-
Pool *sqlitemigration.Pool
77-
StorageConfig config.Config
78-
WatchDispatcher *file.WatchDispatcher
74+
CleanupHandler *file.ResourcesCleanupHandler
75+
OsFs afero.Fs
76+
Pool *sqlitemigration.Pool
77+
StorageConfig config.Config
7978
}
8079

8180
func WardleVersionToKubeVersion(ver *version.Version) *version.Version {
@@ -93,7 +92,7 @@ func WardleVersionToKubeVersion(ver *version.Version) *version.Version {
9392
}
9493

9594
// NewWardleServerOptions returns a new WardleServerOptions
96-
func NewWardleServerOptions(out, errOut io.Writer, osFs afero.Fs, pool *sqlitemigration.Pool, cfg config.Config, watchDispatcher *file.WatchDispatcher, cleanupHandler *file.ResourcesCleanupHandler) *WardleServerOptions {
95+
func NewWardleServerOptions(out, errOut io.Writer, osFs afero.Fs, pool *sqlitemigration.Pool, cfg config.Config, cleanupHandler *file.ResourcesCleanupHandler) *WardleServerOptions {
9796
o := &WardleServerOptions{
9897
RecommendedOptions: genericoptions.NewRecommendedOptions(
9998
defaultEtcdPathPrefix,
@@ -104,11 +103,10 @@ func NewWardleServerOptions(out, errOut io.Writer, osFs afero.Fs, pool *sqlitemi
104103
StdOut: out,
105104
StdErr: errOut,
106105

107-
CleanupHandler: cleanupHandler,
108-
OsFs: osFs,
109-
Pool: pool,
110-
StorageConfig: cfg,
111-
WatchDispatcher: watchDispatcher,
106+
CleanupHandler: cleanupHandler,
107+
OsFs: osFs,
108+
Pool: pool,
109+
StorageConfig: cfg,
112110
}
113111
o.RecommendedOptions.Admission = nil
114112
o.RecommendedOptions.Etcd = nil
@@ -274,11 +272,10 @@ func (o *WardleServerOptions) Config() (*apiserver.Config, error) {
274272
c := &apiserver.Config{
275273
GenericConfig: serverConfig,
276274
ExtraConfig: apiserver.ExtraConfig{
277-
CleanupHandler: o.CleanupHandler,
278-
OsFs: o.OsFs,
279-
Pool: o.Pool,
280-
StorageConfig: o.StorageConfig,
281-
WatchDispatcher: o.WatchDispatcher,
275+
CleanupHandler: o.CleanupHandler,
276+
OsFs: o.OsFs,
277+
Pool: o.Pool,
278+
StorageConfig: o.StorageConfig,
282279
},
283280
}
284281
return c, nil

pkg/registry/file/applicationprofile_storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ func (a ApplicationProfileStorage) Delete(ctx context.Context, key string, out r
3838
return a.realStore.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts)
3939
}
4040

41-
func (a ApplicationProfileStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
42-
return a.realStore.Watch(ctx, key, opts)
41+
func (a ApplicationProfileStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
42+
return nil, nil // watch disabled
4343
}
4444

4545
func (a ApplicationProfileStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {

pkg/registry/file/cleanup.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ type ResourcesCleanupHandler struct {
3838
fetcher ResourcesFetcher
3939
deleteFunc TypeDeleteFunc
4040
resourceToKindHandler map[string][]TypeCleanupHandlerFunc
41-
watchDispatcher *WatchDispatcher
4241
}
4342

4443
func initResourceToKindHandler(relevancyEnabled bool) map[string][]TypeCleanupHandlerFunc {
@@ -77,7 +76,7 @@ func initResourceToKindHandler(relevancyEnabled bool) map[string][]TypeCleanupHa
7776
return resourceKindToHandler
7877
}
7978

80-
func NewResourcesCleanupHandler(appFs afero.Fs, root string, pool *sqlitemigration.Pool, watchDispatcher *WatchDispatcher, interval time.Duration, defaultNamespace string, fetcher ResourcesFetcher, relevancyEnabled bool) *ResourcesCleanupHandler {
79+
func NewResourcesCleanupHandler(appFs afero.Fs, root string, pool *sqlitemigration.Pool, interval time.Duration, defaultNamespace string, fetcher ResourcesFetcher, relevancyEnabled bool) *ResourcesCleanupHandler {
8180

8281
return &ResourcesCleanupHandler{
8382
appFs: appFs,
@@ -88,7 +87,6 @@ func NewResourcesCleanupHandler(appFs afero.Fs, root string, pool *sqlitemigrati
8887
fetcher: fetcher,
8988
deleteFunc: deleteFile,
9089
resourceToKindHandler: initResourceToKindHandler(relevancyEnabled),
91-
watchDispatcher: watchDispatcher,
9290
}
9391
}
9492

@@ -197,11 +195,7 @@ func (h *ResourcesCleanupHandler) cleanupNamespace(ctx context.Context, ns strin
197195
logger.L().Debug("deleting", helpers.String("kind", resourceKind), helpers.String("namespace", metadata.Namespace), helpers.String("name", metadata.Name))
198196
h.deleteFunc(h.appFs, path)
199197

200-
metaOut := h.deleteMetadata(conn, path)
201-
if h.watchDispatcher != nil {
202-
key := path[len(h.root) : len(path)-len(GobExt)]
203-
h.watchDispatcher.Deleted(key, metaOut)
204-
}
198+
_ = h.deleteMetadata(conn, path)
205199
}
206200
return nil
207201
})

pkg/registry/file/networkneighborhood_storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func (a NetworkNeighborhoodStorage) Delete(ctx context.Context, key string, out
3939
return a.realStore.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject, opts)
4040
}
4141

42-
func (a NetworkNeighborhoodStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
43-
return a.realStore.Watch(ctx, key, opts)
42+
func (a NetworkNeighborhoodStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
43+
return nil, nil // watch disabled
4444
}
4545

4646
func (a NetworkNeighborhoodStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {

pkg/registry/file/storage.go

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,13 @@ type objState struct {
6060
// StorageImpl offers a common interface for object marshaling/unmarshaling operations and
6161
// hides all the storage-related operations behind it.
6262
type StorageImpl struct {
63-
appFs afero.Fs
64-
pool *sqlitemigration.Pool
65-
locks utils.MapMutex[string]
66-
processor Processor
67-
root string
68-
scheme *runtime.Scheme
69-
versioner storage.Versioner
70-
watchDispatcher *WatchDispatcher
63+
appFs afero.Fs
64+
pool *sqlitemigration.Pool
65+
locks utils.MapMutex[string]
66+
processor Processor
67+
root string
68+
scheme *runtime.Scheme
69+
versioner storage.Versioner
7170
}
7271

7372
// StorageQuerier wraps the storage.Interface and adds some extra methods which are used by the storage implementation.
@@ -82,23 +81,19 @@ var _ storage.Interface = &StorageImpl{}
8281

8382
var _ StorageQuerier = &StorageImpl{}
8483

85-
func NewStorageImpl(appFs afero.Fs, root string, pool *sqlitemigration.Pool, watchDispatcher *WatchDispatcher, scheme *runtime.Scheme) StorageQuerier {
86-
return NewStorageImplWithCollector(appFs, root, pool, watchDispatcher, scheme, DefaultProcessor{})
84+
func NewStorageImpl(appFs afero.Fs, root string, pool *sqlitemigration.Pool, scheme *runtime.Scheme) StorageQuerier {
85+
return NewStorageImplWithCollector(appFs, root, pool, scheme, DefaultProcessor{})
8786
}
8887

89-
func NewStorageImplWithCollector(appFs afero.Fs, root string, conn *sqlitemigration.Pool, watchDispatcher *WatchDispatcher, scheme *runtime.Scheme, processor Processor) StorageQuerier {
90-
if watchDispatcher == nil {
91-
watchDispatcher = NewWatchDispatcher()
92-
}
88+
func NewStorageImplWithCollector(appFs afero.Fs, root string, conn *sqlitemigration.Pool, scheme *runtime.Scheme, processor Processor) StorageQuerier {
9389
storageImpl := &StorageImpl{
94-
appFs: appFs,
95-
pool: conn,
96-
locks: utils.NewMapMutex[string](),
97-
processor: processor,
98-
root: root,
99-
scheme: scheme,
100-
versioner: storage.APIObjectVersioner{},
101-
watchDispatcher: watchDispatcher,
90+
appFs: appFs,
91+
pool: conn,
92+
locks: utils.NewMapMutex[string](),
93+
processor: processor,
94+
root: root,
95+
scheme: scheme,
96+
versioner: storage.APIObjectVersioner{},
10297
}
10398
processor.SetStorage(storageImpl)
10499
return storageImpl
@@ -260,8 +255,6 @@ func (s *StorageImpl) CreateWithConn(ctx context.Context, conn *sqlite.Conn, key
260255
if err := s.processor.AfterCreate(ctx, conn, obj); err != nil {
261256
return fmt.Errorf("processor.AfterCreate: %w", err)
262257
}
263-
// publish event to watchers
264-
s.watchDispatcher.Added(key, metaOut, obj)
265258
return nil
266259
}
267260

@@ -309,8 +302,6 @@ func (s *StorageImpl) delete(ctx context.Context, conn *sqlite.Conn, key string,
309302
if err := s.appFs.Remove(makePayloadPath(p)); err != nil {
310303
logger.L().Ctx(ctx).Error("Delete - remove json file failed", helpers.Error(err), helpers.String("key", key))
311304
}
312-
// publish event to watchers
313-
s.watchDispatcher.Deleted(key, metaOut)
314305
return nil
315306
}
316307

@@ -321,20 +312,8 @@ func (s *StorageImpl) delete(ctx context.Context, conn *sqlite.Conn, key string,
321312
// (e.g. reconnecting without missing any updates).
322313
// If resource version is "0", this interface will get current object at given key
323314
// and send it in an "ADDED" event, before watch starts.
324-
func (s *StorageImpl) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
325-
_, span := otel.Tracer("").Start(ctx, "StorageImpl.Watch")
326-
span.SetAttributes(attribute.String("key", key))
327-
defer span.End()
328-
_, _, _, namespace, _ := pathToKeys(key)
329-
if namespace != "" {
330-
// FIXME find an alternative to fix NS deletion
331-
logger.L().Debug("rejecting Watch called with namespace", helpers.String("key", key), helpers.String("namespace", namespace))
332-
return watch.NewEmptyWatch(), nil
333-
}
334-
// TODO(ttimonen) Should we do ctx.WithoutCancel; or does the parent ctx lifetime match with expectations?
335-
nw := newWatcher(ctx, opts.ResourceVersion == softwarecomposition.ResourceVersionFullSpec)
336-
s.watchDispatcher.Register(key, nw)
337-
return nw, nil
315+
func (s *StorageImpl) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
316+
return nil, nil // watch disabled
338317
}
339318

340319
// Get unmarshals object found at key into objPtr. On a not found error, will either
@@ -776,8 +755,6 @@ func (s *StorageImpl) GuaranteedUpdateWithConn(
776755
logger.L().Ctx(ctx).Error("GuaranteedUpdate - save object failed", helpers.Error(err), helpers.String("key", key))
777756
return err
778757
}
779-
// Only successful updates should produce modification events
780-
s.watchDispatcher.Modified(key, metaOut, ret)
781758
return nil
782759
}
783760
}
@@ -910,7 +887,7 @@ func (immutableStorage) Delete(_ context.Context, key string, _ runtime.Object,
910887

911888
// Watch is not supported for immutable objects. Objects are generated on the fly and not stored.
912889
func (immutableStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
913-
return watch.NewEmptyWatch(), nil
890+
return nil, nil // watch disabled
914891
}
915892

916893
// GuaranteedUpdate is not supported for immutable objects. Objects are generated on the fly and not stored.

pkg/registry/file/vulnerabilitysummarystorage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestVulnSummaryStorageImpl_Create(t *testing.T) {
4141
wantErr: true,
4242
},
4343
}
44-
realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil, nil)
44+
realStorage := NewStorageImpl(afero.NewMemMapFs(), "/", nil, nil)
4545

4646
for _, tt := range tests {
4747
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)