Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions e2e/config/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/Azure/agentbaker/e2e/toolkit"
Expand Down Expand Up @@ -559,7 +560,49 @@ func (a *AzureClient) LatestSIGImageVersionByTag(ctx context.Context, image *Ima
return VHDResourceID(*latestVersion.ID), nil
}

// versionReplicationLocks serializes concurrent replications of the same gallery image version
// (keyed by version resource ID). Adding a target region is a read-modify-write of the version's
// TargetRegions, so concurrent updates from scenarios in different regions would otherwise clobber
// each other and leave the image unavailable in a dropped region (GalleryImageNotFound).
var versionReplicationLocks sync.Map

func versionReplicationLock(versionID string) *sync.Mutex {
mu, _ := versionReplicationLocks.LoadOrStore(versionID, &sync.Mutex{})
return mu.(*sync.Mutex)
}

// refreshImageVersion re-reads the live gallery image version and updates version in place, so
// callers act on the current TargetRegions/ProvisioningState rather than a stale snapshot.
func (a *AzureClient) refreshImageVersion(ctx context.Context, image *Image, version *armcompute.GalleryImageVersion) error {
client, err := armcompute.NewGalleryImageVersionsClient(image.Gallery.SubscriptionID, a.Credential, a.ArmOptions)
if err != nil {
return fmt.Errorf("create image version client: %w", err)
}
resp, err := client.Get(ctx, image.Gallery.ResourceGroupName, image.Gallery.Name, image.Name, *version.Name, nil)
if err != nil {
return fmt.Errorf("get image version: %w", err)
}
*version = resp.GalleryImageVersion
return nil
}

func (a *AzureClient) ensureReplication(ctx context.Context, image *Image, version *armcompute.GalleryImageVersion, location string) error {
// Serialize replication of a given image version across concurrently-running scenarios.
// Adding a region is a read-modify-write PUT of the version's TargetRegions; scenarios in
// different regions resolve the same freshly-built version in parallel, so without
// serialization a stale PUT drops a region another goroutine just added and the image
// becomes unavailable there (GalleryImageNotFound). Lock per version ID and re-read the live
// TargetRegions inside the lock so every update builds on the current region set.
mu := versionReplicationLock(*version.ID)
Comment on lines +592 to +596
mu.Lock()
defer mu.Unlock()

// Re-read the live version under the lock: the snapshot passed in was captured (by List/Get)
// before the lock was acquired and may be missing regions added by another goroutine.
if err := a.refreshImageVersion(ctx, image, version); err != nil {
return fmt.Errorf("refreshing image version before replication: %w", err)
}

// Wait for any ongoing update operations to complete first
if err := a.waitForVersionOperationCompletion(ctx, image, version); err != nil {
return fmt.Errorf("waiting for version operation completion: %w", err)
Expand Down
Loading