Skip to content

Commit a22872a

Browse files
Add core scale objects informers
1 parent e6433b9 commit a22872a

File tree

4 files changed

+350
-19
lines changed

4 files changed

+350
-19
lines changed

cluster-autoscaler/capacitybuffer/client/client.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ import (
5555
// PodTemplateRefIndex is the name of the index for buffers referencing a pod template
5656
const PodTemplateRefIndex = "podTemplateRef"
5757

58+
// ScalableRefIndex is the name of the index for buffers referencing a scalable object
59+
const ScalableRefIndex = "scalableRef"
60+
5861
// CapacityBufferClient represents client for v1 capacitybuffer CRD.
5962
type CapacityBufferClient struct {
6063
buffersClient capacitybuffer.Interface
@@ -71,9 +74,14 @@ type CapacityBufferClient struct {
7174
rqLister corev1listers.ResourceQuotaLister
7275

7376
// Informers
74-
bufferInformer cache.SharedIndexInformer
75-
podTemplateInformer cache.SharedIndexInformer
76-
resourceQuotaInformer cache.SharedIndexInformer
77+
bufferInformer cache.SharedIndexInformer
78+
podTemplateInformer cache.SharedIndexInformer
79+
resourceQuotaInformer cache.SharedIndexInformer
80+
deploymentInformer cache.SharedIndexInformer
81+
replicaSetInformer cache.SharedIndexInformer
82+
statefulSetInformer cache.SharedIndexInformer
83+
jobInformer cache.SharedIndexInformer
84+
replicationControllerInformer cache.SharedIndexInformer
7785
}
7886

7987
// NewCapacityBufferClient returns a capacityBufferClient.
@@ -152,6 +160,16 @@ func NewCapacityBufferClientFromClients(buffersClient capacitybuffer.Interface,
152160
}
153161
return []string{}, nil
154162
},
163+
ScalableRefIndex: func(obj interface{}) ([]string, error) {
164+
buffer, ok := obj.(*v1.CapacityBuffer)
165+
if !ok {
166+
return []string{}, nil
167+
}
168+
if buffer.Spec.ScalableRef != nil {
169+
return []string{buffer.Spec.ScalableRef.Name}, nil
170+
}
171+
return []string{}, nil
172+
},
155173
})
156174
if err != nil {
157175
return nil, fmt.Errorf("failed to add indexers: %v", err)
@@ -168,21 +186,26 @@ func NewCapacityBufferClientFromClients(buffersClient capacitybuffer.Interface,
168186

169187
factory := informers.NewSharedInformerFactory(kubernetesClient, defaultResyncPeriod)
170188
bufferClient := &CapacityBufferClient{
171-
buffersClient: buffersClient,
172-
kubernetesClient: kubernetesClient,
173-
scaleGetter: scaleGetter,
174-
scaleMapper: scaleMapper,
175-
buffersLister: buffersLister,
176-
podTemplateLister: factory.Core().V1().PodTemplates().Lister(),
177-
replicaSetsLister: factory.Apps().V1().ReplicaSets().Lister(),
178-
statefulSetsLister: factory.Apps().V1().StatefulSets().Lister(),
179-
jobsLister: factory.Batch().V1().Jobs().Lister(),
180-
deploymentLister: factory.Apps().V1().Deployments().Lister(),
181-
replicationContLister: factory.Core().V1().ReplicationControllers().Lister(),
182-
rqLister: factory.Core().V1().ResourceQuotas().Lister(),
183-
bufferInformer: bufferInformer,
184-
podTemplateInformer: factory.Core().V1().PodTemplates().Informer(),
185-
resourceQuotaInformer: factory.Core().V1().ResourceQuotas().Informer(),
189+
buffersClient: buffersClient,
190+
kubernetesClient: kubernetesClient,
191+
scaleGetter: scaleGetter,
192+
scaleMapper: scaleMapper,
193+
buffersLister: buffersLister,
194+
podTemplateLister: factory.Core().V1().PodTemplates().Lister(),
195+
replicaSetsLister: factory.Apps().V1().ReplicaSets().Lister(),
196+
statefulSetsLister: factory.Apps().V1().StatefulSets().Lister(),
197+
jobsLister: factory.Batch().V1().Jobs().Lister(),
198+
deploymentLister: factory.Apps().V1().Deployments().Lister(),
199+
replicationContLister: factory.Core().V1().ReplicationControllers().Lister(),
200+
rqLister: factory.Core().V1().ResourceQuotas().Lister(),
201+
bufferInformer: bufferInformer,
202+
podTemplateInformer: factory.Core().V1().PodTemplates().Informer(),
203+
resourceQuotaInformer: factory.Core().V1().ResourceQuotas().Informer(),
204+
deploymentInformer: factory.Apps().V1().Deployments().Informer(),
205+
replicaSetInformer: factory.Apps().V1().ReplicaSets().Informer(),
206+
statefulSetInformer: factory.Apps().V1().StatefulSets().Informer(),
207+
jobInformer: factory.Batch().V1().Jobs().Informer(),
208+
replicationControllerInformer: factory.Core().V1().ReplicationControllers().Informer(),
186209
}
187210
factory.Start(stopChannel)
188211
informersSynced = factory.WaitForCacheSync(stopChannel)
@@ -209,6 +232,31 @@ func (c *CapacityBufferClient) GetResourceQuotaInformer() cache.SharedIndexInfor
209232
return c.resourceQuotaInformer
210233
}
211234

235+
// GetDeploymentInformer returns the informer for Deployment resource.
236+
func (c *CapacityBufferClient) GetDeploymentInformer() cache.SharedIndexInformer {
237+
return c.deploymentInformer
238+
}
239+
240+
// GetReplicaSetInformer returns the informer for ReplicaSet resource.
241+
func (c *CapacityBufferClient) GetReplicaSetInformer() cache.SharedIndexInformer {
242+
return c.replicaSetInformer
243+
}
244+
245+
// GetStatefulSetInformer returns the informer for StatefulSet resource.
246+
func (c *CapacityBufferClient) GetStatefulSetInformer() cache.SharedIndexInformer {
247+
return c.statefulSetInformer
248+
}
249+
250+
// GetJobInformer returns the informer for Job resource.
251+
func (c *CapacityBufferClient) GetJobInformer() cache.SharedIndexInformer {
252+
return c.jobInformer
253+
}
254+
255+
// GetReplicationControllerInformer returns the informer for ReplicationController resource.
256+
func (c *CapacityBufferClient) GetReplicationControllerInformer() cache.SharedIndexInformer {
257+
return c.replicationControllerInformer
258+
}
259+
212260
// ListCapacityBuffers lists all Capacity buffer.
213261
func (c *CapacityBufferClient) ListCapacityBuffers(namespace string) ([]*v1.CapacityBuffer, error) {
214262
if c.buffersLister == nil {

cluster-autoscaler/capacitybuffer/controller/controller.go

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
corev1 "k8s.io/api/core/v1"
2727
"k8s.io/apimachinery/pkg/api/equality"
2828
"k8s.io/apimachinery/pkg/api/meta"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/util/runtime"
3031
"k8s.io/apimachinery/pkg/util/wait"
3132
"k8s.io/client-go/tools/cache"
@@ -39,6 +40,7 @@ import (
3940
filters "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
4041
cbmetrics "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/metrics"
4142
translators "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators"
43+
scalableobject "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/translators/scalable_objects"
4244
updater "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/updater"
4345
"k8s.io/utils/clock"
4446
)
@@ -205,7 +207,126 @@ func (c *bufferController) configureEventHandlers() {
205207
c.enqueueBuffersReferencingPodTemplate(obj)
206208
},
207209
})
208-
// TODO: scalable objects
210+
211+
// Deployment Informer
212+
c.client.GetDeploymentInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
213+
AddFunc: func(obj interface{}) {
214+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
215+
},
216+
UpdateFunc: func(oldObj, newObj interface{}) {
217+
oldMeta, err := meta.Accessor(oldObj)
218+
if err != nil {
219+
return
220+
}
221+
newMeta, err := meta.Accessor(newObj)
222+
if err != nil {
223+
return
224+
}
225+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
226+
return
227+
}
228+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
229+
},
230+
DeleteFunc: func(obj interface{}) {
231+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.DeploymentKind)
232+
},
233+
})
234+
235+
// ReplicaSet Informer
236+
c.client.GetReplicaSetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
237+
AddFunc: func(obj interface{}) {
238+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
239+
},
240+
UpdateFunc: func(oldObj, newObj interface{}) {
241+
oldMeta, err := meta.Accessor(oldObj)
242+
if err != nil {
243+
return
244+
}
245+
newMeta, err := meta.Accessor(newObj)
246+
if err != nil {
247+
return
248+
}
249+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
250+
return
251+
}
252+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
253+
},
254+
DeleteFunc: func(obj interface{}) {
255+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.ReplicaSetKind)
256+
},
257+
})
258+
259+
// StatefulSet Informer
260+
c.client.GetStatefulSetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
261+
AddFunc: func(obj interface{}) {
262+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
263+
},
264+
UpdateFunc: func(oldObj, newObj interface{}) {
265+
oldMeta, err := meta.Accessor(oldObj)
266+
if err != nil {
267+
return
268+
}
269+
newMeta, err := meta.Accessor(newObj)
270+
if err != nil {
271+
return
272+
}
273+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
274+
return
275+
}
276+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
277+
},
278+
DeleteFunc: func(obj interface{}) {
279+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupApps, scalableobject.StatefulSetKind)
280+
},
281+
})
282+
283+
// Job Informer
284+
c.client.GetJobInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
285+
AddFunc: func(obj interface{}) {
286+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
287+
},
288+
UpdateFunc: func(oldObj, newObj interface{}) {
289+
oldMeta, err := meta.Accessor(oldObj)
290+
if err != nil {
291+
return
292+
}
293+
newMeta, err := meta.Accessor(newObj)
294+
if err != nil {
295+
return
296+
}
297+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
298+
return
299+
}
300+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
301+
},
302+
DeleteFunc: func(obj interface{}) {
303+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupBatch, scalableobject.JobKind)
304+
},
305+
})
306+
307+
// ReplicationController Informer
308+
c.client.GetReplicationControllerInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
309+
AddFunc: func(obj interface{}) {
310+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
311+
},
312+
UpdateFunc: func(oldObj, newObj interface{}) {
313+
oldMeta, err := meta.Accessor(oldObj)
314+
if err != nil {
315+
return
316+
}
317+
newMeta, err := meta.Accessor(newObj)
318+
if err != nil {
319+
return
320+
}
321+
if oldMeta.GetGeneration() == newMeta.GetGeneration() {
322+
return
323+
}
324+
c.enqueueBuffersReferencingScalableObject(newObj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
325+
},
326+
DeleteFunc: func(obj interface{}) {
327+
c.enqueueBuffersReferencingScalableObject(obj, scalableobject.ApiGroupCore, scalableobject.ReplicationControllerKind)
328+
},
329+
})
209330
}
210331

211332
func (c *bufferController) enqueueNamespace(obj interface{}) {
@@ -253,6 +374,37 @@ func (c *bufferController) enqueueBuffersReferencingPodTemplate(obj interface{})
253374
}
254375
}
255376

377+
func (c *bufferController) enqueueBuffersReferencingScalableObject(obj interface{}, apiGroup, kind string) {
378+
object, err := meta.Accessor(obj)
379+
if err != nil {
380+
// handle tombstone
381+
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
382+
if cast, ok := tombstone.Obj.(metav1.Object); ok {
383+
object = cast
384+
}
385+
}
386+
}
387+
if object == nil {
388+
return
389+
}
390+
391+
// Use indexer to find buffers referencing this scalable object
392+
buffers, err := c.client.GetBufferInformer().GetIndexer().ByIndex(cbclient.ScalableRefIndex, object.GetName())
393+
if err != nil {
394+
runtime.HandleError(fmt.Errorf("error looking up buffers for scalable object %s/%s: %w", kind, object.GetName(), err))
395+
return
396+
}
397+
398+
for _, obj := range buffers {
399+
buffer := obj.(*v1.CapacityBuffer)
400+
if buffer.Namespace == object.GetNamespace() && buffer.Spec.ScalableRef != nil &&
401+
buffer.Spec.ScalableRef.Kind == kind && buffer.Spec.ScalableRef.APIGroup == apiGroup {
402+
c.queue.Add(buffer.Namespace)
403+
return // we reconcile the whole namespace, so finding one buffer is enough to trigger it.
404+
}
405+
}
406+
}
407+
256408
// Run to run the controller reconcile loop
257409
func (c *bufferController) Run(stopCh <-chan struct{}) {
258410
defer runtime.HandleCrash()

0 commit comments

Comments
 (0)