Skip to content

Commit 94eb8b2

Browse files
Add core scale objects informers
1 parent 30b4fcc commit 94eb8b2

File tree

4 files changed

+342
-19
lines changed

4 files changed

+342
-19
lines changed

cluster-autoscaler/capacitybuffer/client/client.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ import (
5555
// PodTemplateRefIndex is the name of the index for buffers referencing a pod template
5656
const PodTemplateRefIndex = "podTemplateRef"
5757

58+
// ScalableRefIndex is the name of the index for buffers referencing a scalable object
59+
const ScalableRefIndex = "scalableRef"
60+
5861
// CapacityBufferClient represents client for v1 capacitybuffer CRD.
5962
type CapacityBufferClient struct {
6063
buffersClient capacitybuffer.Interface
@@ -71,9 +74,14 @@ type CapacityBufferClient struct {
7174
rqLister corev1listers.ResourceQuotaLister
7275

7376
// Informers
74-
bufferInformer cache.SharedIndexInformer
75-
podTemplateInformer cache.SharedIndexInformer
76-
resourceQuotaInformer cache.SharedIndexInformer
77+
bufferInformer cache.SharedIndexInformer
78+
podTemplateInformer cache.SharedIndexInformer
79+
resourceQuotaInformer cache.SharedIndexInformer
80+
deploymentInformer cache.SharedIndexInformer
81+
replicaSetInformer cache.SharedIndexInformer
82+
statefulSetInformer cache.SharedIndexInformer
83+
jobInformer cache.SharedIndexInformer
84+
replicationControllerInformer cache.SharedIndexInformer
7785
}
7886

7987
// NewCapacityBufferClient returns a capacityBufferClient.
@@ -152,6 +160,16 @@ func NewCapacityBufferClientFromClients(buffersClient capacitybuffer.Interface,
152160
}
153161
return []string{}, nil
154162
},
163+
ScalableRefIndex: func(obj interface{}) ([]string, error) {
164+
buffer, ok := obj.(*v1.CapacityBuffer)
165+
if !ok {
166+
return []string{}, nil
167+
}
168+
if buffer.Spec.ScalableRef != nil {
169+
return []string{buffer.Spec.ScalableRef.Name}, nil
170+
}
171+
return []string{}, nil
172+
},
155173
})
156174
if err != nil {
157175
return nil, fmt.Errorf("failed to add indexers: %v", err)
@@ -168,21 +186,26 @@ func NewCapacityBufferClientFromClients(buffersClient capacitybuffer.Interface,
168186

169187
factory := informers.NewSharedInformerFactory(kubernetesClient, defaultResyncPeriod)
170188
bufferClient := &CapacityBufferClient{
171-
buffersClient: buffersClient,
172-
kubernetesClient: kubernetesClient,
173-
scaleGetter: scaleGetter,
174-
scaleMapper: scaleMapper,
175-
buffersLister: buffersLister,
176-
podTemplateLister: factory.Core().V1().PodTemplates().Lister(),
177-
replicaSetsLister: factory.Apps().V1().ReplicaSets().Lister(),
178-
statefulSetsLister: factory.Apps().V1().StatefulSets().Lister(),
179-
jobsLister: factory.Batch().V1().Jobs().Lister(),
180-
deploymentLister: factory.Apps().V1().Deployments().Lister(),
181-
replicationContLister: factory.Core().V1().ReplicationControllers().Lister(),
182-
rqLister: factory.Core().V1().ResourceQuotas().Lister(),
183-
bufferInformer: bufferInformer,
184-
podTemplateInformer: factory.Core().V1().PodTemplates().Informer(),
185-
resourceQuotaInformer: factory.Core().V1().ResourceQuotas().Informer(),
189+
buffersClient: buffersClient,
190+
kubernetesClient: kubernetesClient,
191+
scaleGetter: scaleGetter,
192+
scaleMapper: scaleMapper,
193+
buffersLister: buffersLister,
194+
podTemplateLister: factory.Core().V1().PodTemplates().Lister(),
195+
replicaSetsLister: factory.Apps().V1().ReplicaSets().Lister(),
196+
statefulSetsLister: factory.Apps().V1().StatefulSets().Lister(),
197+
jobsLister: factory.Batch().V1().Jobs().Lister(),
198+
deploymentLister: factory.Apps().V1().Deployments().Lister(),
199+
replicationContLister: factory.Core().V1().ReplicationControllers().Lister(),
200+
rqLister: factory.Core().V1().ResourceQuotas().Lister(),
201+
bufferInformer: bufferInformer,
202+
podTemplateInformer: factory.Core().V1().PodTemplates().Informer(),
203+
resourceQuotaInformer: factory.Core().V1().ResourceQuotas().Informer(),
204+
deploymentInformer: factory.Apps().V1().Deployments().Informer(),
205+
replicaSetInformer: factory.Apps().V1().ReplicaSets().Informer(),
206+
statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(),
207+
jobInformer: factory.Batch().V1().Jobs().Informer(),
208+
replicationControllerInformer: factory.Core().V1().ReplicationControllers().Informer(),
186209
}
187210
factory.Start(stopChannel)
188211
informersSynced = factory.WaitForCacheSync(stopChannel)
@@ -209,6 +232,31 @@ func (c *CapacityBufferClient) GetResourceQuotaInformer() cache.SharedIndexInfor
209232
return c.resourceQuotaInformer
210233
}
211234

235+
// GetDeploymentInformer returns the informer for Deployment resource.
236+
func (c *CapacityBufferClient) GetDeploymentInformer() cache.SharedIndexInformer {
237+
return c.deploymentInformer
238+
}
239+
240+
// GetReplicaSetInformer returns the informer for ReplicaSet resource.
241+
func (c *CapacityBufferClient) GetReplicaSetInformer() cache.SharedIndexInformer {
242+
return c.replicaSetInformer
243+
}
244+
245+
// GetStatefulSetInformer returns the informer for StatefulSet resource.
246+
func (c *CapacityBufferClient) GetStatefulSetInformer() cache.SharedIndexInformer {
247+
return c.statefulSetInformer
248+
}
249+
250+
// GetJobInformer returns the informer for Job resource.
251+
func (c *CapacityBufferClient) GetJobInformer() cache.SharedIndexInformer {
252+
return c.jobInformer
253+
}
254+
255+
// GetReplicationControllerInformer returns the informer for ReplicationController resource.
256+
func (c *CapacityBufferClient) GetReplicationControllerInformer() cache.SharedIndexInformer {
257+
return c.replicationControllerInformer
258+
}
259+
212260
// ListCapacityBuffers lists all Capacity buffer.
213261
func (c *CapacityBufferClient) ListCapacityBuffers(namespace string) ([]*v1.CapacityBuffer, error) {
214262
if c.buffersLister == nil {

cluster-autoscaler/capacitybuffer/controller/controller.go

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
corev1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/api/equality"
2727
"k8s.io/apimachinery/pkg/api/meta"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/util/runtime"
2930
"k8s.io/apimachinery/pkg/util/wait"
3031
"k8s.io/client-go/tools/cache"
@@ -37,6 +38,7 @@ import (
3738
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/fakepods"
3839
filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
3940
translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
41+
scalableobject "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators/scalable_objects"
4042
updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
4143
)
4244

@@ -175,7 +177,126 @@ func (c *bufferController) configureEventHandlers() {
175177
c.enqueueBuffersReferencingPodTemplate(obj)
176178
},
177179
})
178-
// TODO: scalable objects
180+
181+
// Deployment Informer
182+
c.client.GetDeploymentInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
183+
AddFunc: func(obj interface{}) {
184+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
185+
},
186+
UpdateFunc: func(oldObj, newObj interface{}) {
187+
oldMeta, err := meta.Accessor(oldObj)
188+
if err != nil {
189+
return
190+
}
191+
newMeta, err := meta.Accessor(newObj)
192+
if err != nil {
193+
return
194+
}
195+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
196+
return
197+
}
198+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
199+
},
200+
DeleteFunc: func(obj interface{}) {
201+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
202+
},
203+
})
204+
205+
// ReplicaSet Informer
206+
c.client.GetReplicaSetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
207+
AddFunc: func(obj interface{}) {
208+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
209+
},
210+
UpdateFunc: func(oldObj, newObj interface{}) {
211+
oldMeta, err := meta.Accessor(oldObj)
212+
if err != nil {
213+
return
214+
}
215+
newMeta, err := meta.Accessor(newObj)
216+
if err != nil {
217+
return
218+
}
219+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
220+
return
221+
}
222+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
223+
},
224+
DeleteFunc: func(obj interface{}) {
225+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
226+
},
227+
})
228+
229+
// StatefulSet Informer
230+
c.client.GetStatefulSetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
231+
AddFunc: func(obj interface{}) {
232+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
233+
},
234+
UpdateFunc: func(oldObj, newObj interface{}) {
235+
oldMeta, err := meta.Accessor(oldObj)
236+
if err != nil {
237+
return
238+
}
239+
newMeta, err := meta.Accessor(newObj)
240+
if err != nil {
241+
return
242+
}
243+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
244+
return
245+
}
246+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
247+
},
248+
DeleteFunc: func(obj interface{}) {
249+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
250+
},
251+
})
252+
253+
// Job Informer
254+
c.client.GetJobInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
255+
AddFunc: func(obj interface{}) {
256+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
257+
},
258+
UpdateFunc: func(oldObj, newObj interface{}) {
259+
oldMeta, err := meta.Accessor(oldObj)
260+
if err != nil {
261+
return
262+
}
263+
newMeta, err := meta.Accessor(newObj)
264+
if err != nil {
265+
return
266+
}
267+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
268+
return
269+
}
270+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
271+
},
272+
DeleteFunc: func(obj interface{}) {
273+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
274+
},
275+
})
276+
277+
// ReplicationController Informer
278+
c.client.GetReplicationControllerInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
279+
AddFunc: func(obj interface{}) {
280+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
281+
},
282+
UpdateFunc: func(oldObj, newObj interface{}) {
283+
oldMeta, err := meta.Accessor(oldObj)
284+
if err != nil {
285+
return
286+
}
287+
newMeta, err := meta.Accessor(newObj)
288+
if err != nil {
289+
return
290+
}
291+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
292+
return
293+
}
294+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
295+
},
296+
DeleteFunc: func(obj interface{}) {
297+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
298+
},
299+
})
179300
}
180301

181302
func (c *bufferController) enqueueNamespace(obj interface{}) {
@@ -223,6 +344,37 @@ func (c *bufferController) enqueueBuffersReferencingPodTemplate(obj interface{})
223344
}
224345
}
225346

347+
func (c *bufferController) enqueueBuffersReferencingScalableObject(obj interface{}, apiGroup, kind string) {
348+
object, err := meta.Accessor(obj)
349+
if err != nil {
350+
// handle tombstone
351+
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
352+
if cast, ok := tombstone.Obj.(metav1.Object); ok {
353+
object = cast
354+
}
355+
}
356+
}
357+
if object == nil {
358+
return
359+
}
360+
361+
// Use indexer to find buffers referencing this scalable object
362+
buffers, err := c.client.GetBufferInformer().GetIndexer().ByIndex(cbclient.ScalableRefIndex, object.GetName())
363+
if err != nil {
364+
runtime.HandleError(fmt.Errorf("error looking up buffers for scalable object %s/%s: %w", kind, object.GetName(), err))
365+
return
366+
}
367+
368+
for _, obj := range buffers {
369+
buffer := obj.(*v1.CapacityBuffer)
370+
if buffer.Namespace == object.GetNamespace() && buffer.Spec.ScalableRef != nil &&
371+
buffer.Spec.ScalableRef.Kind == kind && buffer.Spec.ScalableRef.APIGroup == apiGroup {
372+
c.queue.Add(buffer.Namespace)
373+
return // we reconcile the whole namespace, so finding one buffer is enough to trigger it.
374+
}
375+
}
376+
}
377+
226378
// Run to run the controller reconcile loop
227379
func (c *bufferController) Run(stopCh <-chan struct{}) {
228380
defer runtime.HandleCrash()

0 commit comments

Comments
 (0)