Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/controller/workloads/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ const (
FailedScale = "FailedScale"
FailedGetRBGRole = "FailedGetRBGRole"
FailedGetRBGScalingAdapter = "FailedGetRBGScalingAdapter"
SelectorNotReady = "SelectorNotReady"
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package workloads

import (
"context"
"errors"
"fmt"
"reflect"
"time"

"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -51,6 +52,10 @@ import (
"sigs.k8s.io/rbgs/pkg/utils"
)

// errSelectorNotReady indicates the workload's status selector has not been populated yet.
// This is a transient condition that resolves once the workload controller reconciles.
var errSelectorNotReady = errors.New("selector not ready")

// RoleBasedGroupScalingAdapterReconciler reconciles a RoleBasedGroupScalingAdapter object
type RoleBasedGroupScalingAdapterReconciler struct {
client client.Client
Expand Down Expand Up @@ -103,11 +108,11 @@ func (r *RoleBasedGroupScalingAdapterReconciler) Reconcile(ctx context.Context,
)
rbg, err := r.GetTargetRbgFromAdapter(ctx, rbgScalingAdapter)
if err != nil {
getTargetRoleErr = errors.Wrapf(err, "Failed to get rbg %s:", rbgName)
getTargetRoleErr = pkgerrors.Wrapf(err, "Failed to get rbg %s:", rbgName)
} else {
targetRole, err = rbg.GetRole(targetRoleName)
if err != nil {
getTargetRoleErr = errors.Wrapf(err, "Failed to get role %s in rbg %s:", targetRoleName, rbgName)
getTargetRoleErr = pkgerrors.Wrapf(err, "Failed to get role %s in rbg %s:", targetRoleName, rbgName)
}
}

Expand Down Expand Up @@ -162,8 +167,18 @@ func (r *RoleBasedGroupScalingAdapterReconciler) Reconcile(ctx context.Context,
return ctrl.Result{}, err
}

selector, err := r.extractLabelSelectorDefault(rbg, targetRole)
selector, err := r.extractLabelSelectorDefault(ctx, rbg, targetRole)
if err != nil {
if errors.Is(err, errSelectorNotReady) {
r.recorder.Eventf(
rbgScalingAdapter, corev1.EventTypeWarning, SelectorNotReady,
"Waiting for workload %s selector to be populated, will retry",
rbg.GetWorkloadName(targetRole),
)
logger.Info("Waiting for workload selector to be populated, will retry",
"workload", rbg.GetWorkloadName(targetRole))
return ctrl.Result{RequeueAfter: 2 * time.Second}, nil
}
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -464,6 +479,7 @@ func (r *RoleBasedGroupScalingAdapterReconciler) updateRoleReplicas(

// extractLabelSelectorDefault extracts a LabelSelector string from the given role's scale subresource.
func (r *RoleBasedGroupScalingAdapterReconciler) extractLabelSelectorDefault(
ctx context.Context,
rbg *workloadsv1alpha2.RoleBasedGroup, role *workloadsv1alpha2.RoleSpec,
) (string, error) {
apiVersion, kind := role.Workload.APIVersion, role.Workload.Kind
Expand All @@ -473,27 +489,21 @@ func (r *RoleBasedGroupScalingAdapterReconciler) extractLabelSelectorDefault(
return "", err
}

gvk := schema.GroupVersionKind{
Group: targetGV.Group,
Version: targetGV.Version,
Kind: kind,
}

// Get the scale subresource
scaleObj := &unstructured.Unstructured{}
scaleObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: gvk.Group,
Version: gvk.Version,
Group: targetGV.Group,
Version: targetGV.Version,
Kind: kind,
})
scaleObj.SetNamespace(rbg.Namespace)
scaleObj.SetName(rbg.GetWorkloadName(role))

if err := r.client.Get(
context.TODO(),
ctx,
client.ObjectKey{Namespace: rbg.Namespace, Name: rbg.GetWorkloadName(role)}, scaleObj,
); err != nil {
return "", fmt.Errorf("failed to get workload: %v", err)
return "", fmt.Errorf("failed to get workload: %w", err)
}

// Try to get selector from status
Expand All @@ -505,10 +515,18 @@ func (r *RoleBasedGroupScalingAdapterReconciler) extractLabelSelectorDefault(
}
selectorStr, _, err := unstructured.NestedString(scaleObj.Object, "status", selectorField)
if err != nil {
return "", fmt.Errorf("failed to get selectore field in status: %v", err)
return "", fmt.Errorf("failed to get selector field in status: %w", err)
}

// Only RoleInstanceSet+LWP needs a non-empty selector guard: we must append
// the component-index label, and an empty base would produce a leading comma.
// Other workload types (StatefulSet, Deployment) may legitimately have an empty
// status.labelSelector since it is not part of their upstream API.
if kind == "RoleInstanceSet" && role.IsLeaderWorkerPattern() {
if selectorStr == "" {
return "", fmt.Errorf("workload %s status.%s is empty: %w",
rbg.GetWorkloadName(role), selectorField, errSelectorNotReady)
}
selectorStr += fmt.Sprintf(",%s=0", constants.ComponentIndexLabelKey)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package workloads

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand All @@ -28,7 +29,9 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -962,3 +965,145 @@ func TestRBGRoleStatusPredicate(t *testing.T) {
}))
})
}

func TestExtractLabelSelectorDefault(t *testing.T) {
scheme := runtime.NewScheme()
_ = workloadsv1alpha2.AddToScheme(scheme)

const (
rbgName = "test-rbg"
roleName = "decode"
namespace = "default"
)

risGVK := schema.GroupVersionKind{
Group: "workloads.x-k8s.io",
Version: "v1alpha2",
Kind: "RoleInstanceSet",
}
lwsGVK := schema.GroupVersionKind{
Group: "leaderworkerset.x-k8s.io",
Version: "v1",
Kind: "LeaderWorkerSet",
}

makeWorkload := func(gvk schema.GroupVersionKind, statusField, selectorValue string) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetNamespace(namespace)
obj.SetName(fmt.Sprintf("%s-%s", rbgName, roleName))
if statusField != "" {
_ = unstructured.SetNestedField(obj.Object, selectorValue, "status", statusField)
}
return obj
}

// RoleInstanceSet + LeaderWorkerPattern: build LWP role but override workload to RIS
lwpRole := func() *workloadsv1alpha2.RoleSpec {
role := wrappersv2.BuildLeaderWorkerRole(roleName).Obj()
role.Workload = workloadsv1alpha2.WorkloadSpec{
APIVersion: risGVK.Group + "/" + risGVK.Version,
Kind: risGVK.Kind,
}
return &role
}

// BuildStandaloneRole already sets workload to RoleInstanceSet
standaloneRole := func() *workloadsv1alpha2.RoleSpec {
role := wrappersv2.BuildStandaloneRole(roleName).Obj()
return &role
}

// LeaderWorkerSet role uses default workload from BuildLeaderWorkerRole
lwsRole := func() *workloadsv1alpha2.RoleSpec {
role := wrappersv2.BuildLeaderWorkerRole(roleName).Obj()
return &role
}

rbg := wrappersv2.BuildBasicRoleBasedGroup(rbgName, namespace).Obj()

tests := []struct {
name string
role *workloadsv1alpha2.RoleSpec
workload *unstructured.Unstructured
expectError bool
expectSelectorNotReady bool
expectSelector string
}{
{
name: "RoleInstanceSet with LWP and populated selector appends component-index",
role: lwpRole(),
workload: makeWorkload(risGVK, "labelSelector", "app=foo"),
expectSelector: fmt.Sprintf("app=foo,%s=0", constants.ComponentIndexLabelKey),
},
{
name: "RoleInstanceSet with LWP and empty selector returns transient error",
role: lwpRole(),
workload: makeWorkload(risGVK, "labelSelector", ""),
expectError: true,
expectSelectorNotReady: true,
},
{
name: "RoleInstanceSet with LWP and missing status field returns transient error",
role: lwpRole(),
workload: makeWorkload(risGVK, "", ""), // no status field set
expectError: true,
expectSelectorNotReady: true,
},
{
name: "RoleInstanceSet standalone with populated selector returns as-is",
role: standaloneRole(),
workload: makeWorkload(risGVK, "labelSelector", "app=foo"),
expectSelector: "app=foo",
},
{
name: "RoleInstanceSet standalone with empty selector returns empty",
role: standaloneRole(),
workload: makeWorkload(risGVK, "labelSelector", ""),
expectSelector: "",
},
{
name: "LeaderWorkerSet uses hpaPodSelector without component-index",
role: lwsRole(),
workload: makeWorkload(lwsGVK, "hpaPodSelector", "app=bar"),
expectSelector: "app=bar",
},
{
name: "workload not found returns hard error",
role: lwpRole(),
workload: nil, // no workload in client
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientBuilder := fake.NewClientBuilder().WithScheme(scheme)
if tt.workload != nil {
clientBuilder = clientBuilder.WithObjects(tt.workload)
}

reconciler := &RoleBasedGroupScalingAdapterReconciler{
client: clientBuilder.Build(),
}

selector, err := reconciler.extractLabelSelectorDefault(
context.Background(), rbg, tt.role,
)

if tt.expectError {
require.Error(t, err)
if tt.expectSelectorNotReady {
assert.True(t, errors.Is(err, errSelectorNotReady),
"expected errSelectorNotReady, got: %v", err)
} else {
assert.False(t, errors.Is(err, errSelectorNotReady),
"expected hard error, not errSelectorNotReady")
}
return
}
require.NoError(t, err)
assert.Equal(t, tt.expectSelector, selector)
})
}
}
Loading