Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type AutoscalingGceClient interface {
// modifying resources
ResizeMig(GceRef, int64) error
DeleteInstances(migRef GceRef, instances []GceRef) error
CreateInstances(GceRef, string, int64, []string) error
CreateInstances(GceRef, string, int64, []string) ([]string, error)

// WaitForOperation can be used to poll GCE operations until completion/timeout using WAIT calls.
// Calling this is normally not needed when interacting with the client, other methods should call it internally.
Expand Down Expand Up @@ -294,24 +294,27 @@ func (client *autoscalingGceClientV1) ResizeMig(migRef GceRef, size int64) error
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName string, delta int64, existingInstanceProviderIds []string) error {
func (client *autoscalingGceClientV1) CreateInstances(migRef GceRef, baseName string, delta int64, existingInstanceProviderIds []string) ([]string, error) {
registerRequest("instance_group_managers", "create_instances")
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)
defer cancel()
req := gce.InstanceGroupManagersCreateInstancesRequest{}
instanceNames := instanceIdsToNamesMap(existingInstanceProviderIds)
req.Instances = make([]*gce.PerInstanceConfig, 0, delta)
for i := int64(0); i < delta; i++ {
createdIds := make([]string, delta)
for i := range delta {
newInstanceName := generateInstanceName(baseName, instanceNames)
instanceNames[newInstanceName] = true
req.Instances = append(req.Instances, &gce.PerInstanceConfig{Name: newInstanceName})
ref := GceRef{migRef.Project, migRef.Zone, newInstanceName}
createdIds[i] = ref.ToProviderId()
}

op, err := client.gceService.InstanceGroupManagers.CreateInstances(migRef.Project, migRef.Zone, migRef.Name, &req).Context(ctx).Do()
if err != nil {
return err
return nil, err
}
return client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
return createdIds, client.WaitForOperation(op.Name, op.OperationType, migRef.Project, migRef.Zone)
}

func instanceIdsToNamesMap(instanceProviderIds []string) map[string]bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"os"
"regexp"
"strings"
"testing"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
gce_api "google.golang.org/api/compute/v1"
)

Expand Down Expand Up @@ -697,7 +699,8 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
}{
"CreateInstances_ContextTimeout": {
clientFunc: func(client *autoscalingGceClientV1) error {
return client.CreateInstances(GceRef{}, "", 0, nil)
_, err := client.CreateInstances(GceRef{}, "", 0, nil)
return err
},
operationPerCallTimeout: &instantTimeout,
},
Expand Down Expand Up @@ -764,7 +767,8 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
},
"CreateInstances_HttpClientTimeout": {
clientFunc: func(client *autoscalingGceClientV1) error {
return client.CreateInstances(GceRef{}, "", 0, nil)
_, err := client.CreateInstances(GceRef{}, "", 0, nil)
return err
},
httpTimeout: instantTimeout,
},
Expand Down Expand Up @@ -896,6 +900,27 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
}
}

func TestCreateInstances(t *testing.T) {
server := test_util.NewHttpServerMock()
defer server.Close()
b, err := json.Marshal(gce_api.Operation{
Name: "operation-2505728466148-216f5197",
})
assert.NoError(t, err)
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/igm1/createInstances").Return(string(b)).Times(1)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-2505728466148-216f5197/wait").Return(operationDoneResponse).Once()
client := newTestAutoscalingGceClientWithTimeout(t, "project", server.URL, "", time.Second)
migRef := GceRef{Project: "project1", Zone: "us-central1-b", Name: "igm1"}
createdIds, err := client.CreateInstances(migRef, migRef.Name, 10, nil)
assert.NoError(t, err)
assert.Len(t, createdIds, 10, "Expected 10 instance names in result")
for _, id := range createdIds {
createdRef, _ := GceRefFromProviderId(id)
prefixed := strings.HasPrefix(createdRef.Name, migRef.Name+"-")
require.Truef(t, prefixed, "Expected node name \"%v\" to be prefixed with \"%v\"", createdRef.Name, migRef.Name)
}
}

func TestFetchAllInstances(t *testing.T) {
igm1 := "projects/893226960234/zones/zones/instanceGroupManagers/test-igm1-grp"
igm2 := "projects/893226960234/zones/zones/instanceGroupManagers/test-igm2-grp"
Expand Down
23 changes: 20 additions & 3 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
migAutoDiscovererKeyPrefix = "namePrefix"
migAutoDiscovererKeyMinNodes = "min"
migAutoDiscovererKeyMaxNodes = "max"
createInstancesRequestLimit = 1000
)

var (
Expand Down Expand Up @@ -332,16 +333,32 @@ func (m *gceManagerImpl) CreateInstances(mig Mig, delta int64) error {
if err != nil {
return err
}
instancesNames := make([]string, 0, len(instances))
instanceIds := make([]string, 0, len(instances)+int(delta))
for _, ins := range instances {
instancesNames = append(instancesNames, ins.Id)
instanceIds = append(instanceIds, ins.Id)
}
baseName, err := m.migInfoProvider.GetMigBasename(mig.GceRef())
if err != nil {
return fmt.Errorf("can't upscale %s: failed to collect BaseInstanceName: %w", mig.GceRef(), err)
}
m.cache.InvalidateMigTargetSize(mig.GceRef())
return m.GceService.CreateInstances(mig.GceRef(), baseName, delta, instancesNames)
totalReqs := int((delta + createInstancesRequestLimit - 1) / createInstancesRequestLimit)
remaining := delta
for i := 0; i < totalReqs; i++ {
increment := min(remaining, createInstancesRequestLimit)
if totalReqs > 1 {
klog.Infof("Sending chunked GCE createInstances request. Request: %d/%d RequestSize: %v", i+1, totalReqs, increment)
}
ids, err := m.GceService.CreateInstances(mig.GceRef(), baseName, increment, instanceIds)
if err != nil {
return err
}
remaining -= increment
for _, id := range ids {
instanceIds = append(instanceIds, id)
}
}
return nil
}

func (m *gceManagerImpl) forceRefresh() error {
Expand Down
40 changes: 40 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,46 @@ func TestAppendInstances(t *testing.T) {
mock.AssertExpectationsForObjects(t, server)
}

func TestCreateInstancesWithMultipleRequests(t *testing.T) {
server := NewHttpServerMock()
defer server.Close()
g := newTestGceManager(t, server.URL, false)
mig := setupTestDefaultPool(g, true)
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildListInstanceGroupManagersResponse(
buildListInstanceGroupManagersResponsePart(defaultPoolMigName, zoneB, 3),
)).Once()

tests := []struct {
delta int
wantRequests int
}{
{
delta: 100,
wantRequests: 1,
},
{
delta: 1000,
wantRequests: 1,
},
{
delta: 1001,
wantRequests: 2,
},
{
delta: 3000,
wantRequests: 3,
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("delta=%v", tt.delta), func(t *testing.T) {
server.On("handle", fmt.Sprintf("/projects/project1/zones/us-central1-b/instanceGroupManagers/%v/createInstances", mig.gceRef.Name)).Return(createInstancesResponse).Times(tt.wantRequests)
server.On("handle", "/projects/project1/zones/us-central1-b/operations/operation-1624366531120-5c55a4e128c15-fc5daa90-e1ef6c32/wait").Return(createInstancesOperationResponse).Times(tt.wantRequests)
err := g.CreateInstances(mig, int64(tt.delta))
assert.NoError(t, err)
})
}
}

func TestGetMigOptions(t *testing.T) {
defaultOptions := &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ func (client *mockAutoscalingGceClient) DeleteInstances(_ GceRef, _ []GceRef) er
return nil
}

func (client *mockAutoscalingGceClient) CreateInstances(_ GceRef, _ string, _ int64, _ []string) error {
return nil
func (client *mockAutoscalingGceClient) CreateInstances(_ GceRef, _ string, _ int64, _ []string) ([]string, error) {
return nil, nil
}

func (client *mockAutoscalingGceClient) WaitForOperation(_, _, _, _ string) error {
Expand Down