diff --git a/CHANGELOG.md b/CHANGELOG.md index a59603037..2a49d9eb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Features + +- storage: add `move` command for moving objects within and across buckets + ## 1.95.0 diff --git a/cmd/storage/storage_move.go b/cmd/storage/storage_move.go new file mode 100644 index 000000000..9fa73962c --- /dev/null +++ b/cmd/storage/storage_move.go @@ -0,0 +1,215 @@ +package storage + +import ( + "fmt" + "os" + "strings" + + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/spf13/cobra" + + exocmd "github.com/exoscale/cli/cmd" + "github.com/exoscale/cli/pkg/globalstate" + "github.com/exoscale/cli/pkg/storage/sos" + "github.com/exoscale/cli/utils" +) + +var storageMoveCmd = &cobra.Command{ + Use: "move sos://BUCKET/[OBJECT|PREFIX/] sos://BUCKET/[OBJECT|PREFIX/]", + Short: "Move objects within a bucket or across buckets", + Long: `Move objects within a bucket or across buckets. + +This command moves objects by performing a server-side copy followed by +a delete of the source object. Object metadata, headers, and ACLs are +preserved. + +Warning: move is implemented as server-side copy followed by delete. +If the delete step fails after a successful copy, the object will +remain in both locations. There is no automatic rollback. + +Multi-object prefix moves are processed serially. A trailing slash on the +source selects prefix mode; -r controls recursion into subdirectories. + +Examples: + + exo storage move sos://my-bucket/file-a sos://my-bucket/folder/ + + exo storage move sos://my-bucket/file-a sos://other-bucket/file-a + + exo storage move -r sos://my-bucket/prefix/ sos://other-bucket/prefix/ + + exo storage move -n sos://my-bucket/file-a sos://other-bucket/ +`, + PreRunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 2 { + exocmd.CmdExitOnUsageError(cmd, "invalid arguments") + } + return validateMoveArgs(args) + }, + + RunE: func(cmd *cobra.Command, args []string) error { + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + return err + } + force, err := cmd.Flags().GetBool("force") + if err != nil { + return err + } + multipartConcurrency, err := cmd.Flags().GetInt("multipart-concurrency") + if err != nil { + return err + } + verbose, err := cmd.Flags().GetBool("verbose") + if err != nil { + return err + } + dryRun, err := cmd.Flags().GetBool("dry-run") + if err != nil { + return err + } + + srcBucket, srcKey := parseBucketKey(args[0]) + dstBucket, dstKey := parseBucketKey(args[1]) + + storage, err := sos.NewStorageClient( + exocmd.GContext, + sos.ClientOptZoneFromBucket(exocmd.GContext, srcBucket), + ) + if err != nil { + return fmt.Errorf("unable to initialize storage client: %w", err) + } + + isPrefix := strings.HasSuffix(srcKey, "/") || recursive + + if !force && !dryRun && isPrefix { + if !utils.AskQuestion(exocmd.GContext, fmt.Sprintf( + "Are you sure you want to move all objects from %s%s/%s to %s%s/%s?", + sos.BucketPrefix, srcBucket, srcKey, sos.BucketPrefix, dstBucket, dstKey)) { + return nil + } + } + + if dryRun { + fmt.Println("[DRY-RUN]") + } + + if !isPrefix { + return runSingleObjectMove(storage, srcBucket, srcKey, dstBucket, dstKey, multipartConcurrency, verbose, dryRun) + } + + return runPrefixMove(storage, srcBucket, srcKey, dstBucket, dstKey, multipartConcurrency, recursive, verbose, dryRun) + }, +} + +func init() { + storageMoveCmd.Flags().BoolP("dry-run", "n", false, "simulate the move operation") + storageMoveCmd.Flags().BoolP("force", "f", false, "skip confirmation prompt") + storageMoveCmd.Flags().BoolP("recursive", "r", false, "move objects recursively") + storageMoveCmd.Flags().BoolP("verbose", "v", false, "output moved objects") + storageMoveCmd.Flags().Int("multipart-concurrency", 4, "number of concurrent parts for multipart moves") + storageCmd.AddCommand(storageMoveCmd) +} + +func validateMoveArgs(args []string) error { + srcBucket, srcKey := parseBucketKey(args[0]) + dstBucket, dstKey := parseBucketKey(args[1]) + + if srcBucket == "" { + return fmt.Errorf("source must include a bucket name: %s", args[0]) + } + if dstBucket == "" { + return fmt.Errorf("destination must include a bucket name: %s", args[1]) + } + if srcKey == "" && dstKey == "" { + return fmt.Errorf("at least one of source/destination must include an object key or prefix") + } + if srcKey != "" && dstKey == "" { + return fmt.Errorf("destination must include an object key when source is a single object: %s", args[1]) + } + + return nil +} + +func parseBucketKey(url string) (bucket, key string) { + url = strings.TrimPrefix(url, sos.BucketPrefix) + parts := strings.SplitN(url, "/", 2) + bucket = parts[0] + if len(parts) > 1 { + key = parts[1] + } + return +} + +func runSingleObjectMove(storage *sos.Client, srcBucket, srcKey, dstBucket, dstKey string, multipartConcurrency int, verbose, dryRun bool) error { + if srcKey == "" { + return fmt.Errorf("source must be an object key, not just a bucket: use a trailing slash for prefix moves") + } + + if dryRun { + fmt.Printf("move %s%s/%s -> %s%s/%s\n", sos.BucketPrefix, srcBucket, srcKey, sos.BucketPrefix, dstBucket, dstKey) + return nil + } + + if err := storage.MoveObject(exocmd.GContext, srcBucket, srcKey, dstBucket, dstKey, multipartConcurrency, verbose); err != nil { + return fmt.Errorf("move failed: %w", err) + } + + if verbose { + showObj, err := storage.ShowObject(exocmd.GContext, dstBucket, dstKey) + if err == nil { + fmt.Printf("moved: %s -> %s (%d bytes, %s)\n", srcKey, showObj.URL, showObj.Size, showObj.LastModified) + } + } + + return nil +} + +func runPrefixMove(storage *sos.Client, srcBucket, srcKey, dstBucket, dstKey string, multipartConcurrency int, recursive, verbose, dryRun bool) error { + var moved, failed int + err := storage.ForEachObject(exocmd.GContext, srcBucket, srcKey, recursive, func(o *types.Object) error { + if o.Key == nil { + return nil + } + + srcObjectKey := *o.Key + srcObjectKeyTrimmed := strings.TrimPrefix(srcObjectKey, srcKey) + dstObjectKey := dstKey + srcObjectKeyTrimmed + + if dryRun { + fmt.Printf("move %s%s/%s -> %s%s/%s\n", sos.BucketPrefix, srcBucket, srcObjectKey, sos.BucketPrefix, dstBucket, dstObjectKey) + return nil + } + + if err := storage.MoveObject(exocmd.GContext, srcBucket, srcObjectKey, dstBucket, dstObjectKey, multipartConcurrency, verbose); err != nil { + fmt.Fprintf(os.Stderr, "move failed for %s: %v\n", srcObjectKey, err) + failed++ + return nil + } + + moved++ + if verbose && !globalstate.Quiet { + fmt.Printf("moved: %s%s/%s -> %s%s/%s\n", sos.BucketPrefix, srcBucket, srcObjectKey, sos.BucketPrefix, dstBucket, dstObjectKey) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("move failed: %w", err) + } + + if failed > 0 { + return fmt.Errorf("%d object(s) failed to move", failed) + } + + if moved == 0 && !dryRun && !globalstate.Quiet { + fmt.Printf("no objects exist at %q\n", srcKey) + } + + if verbose && !globalstate.Quiet && moved > 0 { + fmt.Printf("moved %d objects\n", moved) + } + + return nil +} diff --git a/go.mod b/go.mod index bd99f2e61..c8204d0be 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/exoscale/cli -go 1.26 +go 1.26.4 require ( github.com/aws/aws-sdk-go-v2 v1.2.0 diff --git a/pkg/storage/sos/move.go b/pkg/storage/sos/move.go new file mode 100644 index 000000000..0847621c4 --- /dev/null +++ b/pkg/storage/sos/move.go @@ -0,0 +1,133 @@ +package sos + +import ( + "context" + "fmt" + "net/url" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +const ( + moveLargeObjectThreshold = 5 * 1024 * 1024 * 1024 // 5 GiB + moveDefaultPartSize = 100 * 1024 * 1024 // 100 MiB + moveMaxConcurrency = 10 +) + +func (c *Client) MoveObject(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string, multipartConcurrency int, verbose bool) error { + if multipartConcurrency <= 0 { + multipartConcurrency = 1 + } + if multipartConcurrency > moveMaxConcurrency { + multipartConcurrency = moveMaxConcurrency + } + + headRes, err := c.S3Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(srcKey), + }) + if err != nil { + return fmt.Errorf("unable to retrieve source object info: %w", err) + } + + size := headRes.ContentLength + + if size > moveLargeObjectThreshold { + return c.moveLargeObject(ctx, srcBucket, srcKey, dstBucket, dstKey, headRes, multipartConcurrency, verbose) + } + + return c.moveObject(ctx, srcBucket, srcKey, dstBucket, dstKey, headRes, verbose) +} + +func (c *Client) moveObject(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string, headRes *s3.HeadObjectOutput, verbose bool) error { + srcURL := fmt.Sprintf("sos://%s/%s", srcBucket, srcKey) + dstURL := fmt.Sprintf("sos://%s/%s", dstBucket, dstKey) + + if verbose { + fmt.Printf("copying: %s -> %s\n", srcURL, dstURL) + } + + acl, err := c.S3Client.GetObjectAcl(ctx, &s3.GetObjectAclInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(srcKey), + }) + if err != nil { + return fmt.Errorf("unable to retrieve object ACL: %w", err) + } + + copyInput := &s3.CopyObjectInput{ + Bucket: aws.String(dstBucket), + Key: aws.String(dstKey), + CopySource: aws.String(copySource(srcBucket, srcKey)), + Metadata: headRes.Metadata, + MetadataDirective: s3types.MetadataDirectiveReplace, + ACL: getACLFromGrants(acl.Grants), + } + + if headRes.CacheControl != nil { + copyInput.CacheControl = headRes.CacheControl + } + if headRes.ContentDisposition != nil { + copyInput.ContentDisposition = headRes.ContentDisposition + } + if headRes.ContentEncoding != nil { + copyInput.ContentEncoding = headRes.ContentEncoding + } + if headRes.ContentLanguage != nil { + copyInput.ContentLanguage = headRes.ContentLanguage + } + if headRes.ContentType != nil { + copyInput.ContentType = headRes.ContentType + } + if headRes.Expires != nil { + copyInput.Expires = headRes.Expires + } + + if _, err := c.S3Client.CopyObject(ctx, copyInput); err != nil { + return fmt.Errorf("copy: %w", err) + } + + if verbose { + fmt.Printf("deleting: %s\n", srcURL) + } + + if err := c.DeleteObject(ctx, srcBucket, srcKey); err != nil { + return fmt.Errorf("delete source: %w", err) + } + + return nil +} + +func (c *Client) DeleteObject(ctx context.Context, bucket, key string) error { + _, err := c.S3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + return err +} + +func copySource(bucket, key string) string { + return bucket + "/" + url.PathEscape(key) +} + +// getACLFromGrants maps S3 object grants to a canned ACL. Note: complex +// grant sets (e.g. per-user CanonicalUser grants) are not preserved; they +// fall back to private. Only the common public/authenticated-read group +// grants are mapped. +func getACLFromGrants(grants []s3types.Grant) s3types.ObjectCannedACL { + for _, grant := range grants { + if grant.Grantee.Type != s3types.TypeGroup { + continue + } + uri := aws.ToString(grant.Grantee.URI) + if uri == "http://acs.amazonaws.com/groups/global/AllUsers" { + return s3types.ObjectCannedACLPublicRead + } + if uri == "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" { + return s3types.ObjectCannedACLAuthenticatedRead + } + } + return s3types.ObjectCannedACLPrivate +} diff --git a/pkg/storage/sos/move_multipart.go b/pkg/storage/sos/move_multipart.go new file mode 100644 index 000000000..222aa8735 --- /dev/null +++ b/pkg/storage/sos/move_multipart.go @@ -0,0 +1,171 @@ +package sos + +import ( + "context" + "fmt" + "sort" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/hashicorp/go-multierror" +) + +func (c *Client) moveLargeObject(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string, headRes *s3.HeadObjectOutput, concurrency int, verbose bool) error { + srcURL := fmt.Sprintf("sos://%s/%s", srcBucket, srcKey) + dstURL := fmt.Sprintf("sos://%s/%s", dstBucket, dstKey) + + if verbose { + fmt.Printf("copying: %s -> %s\n", srcURL, dstURL) + } + + acl, err := c.S3Client.GetObjectAcl(ctx, &s3.GetObjectAclInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(srcKey), + }) + if err != nil { + return fmt.Errorf("unable to retrieve object ACL: %w", err) + } + + createMPInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(dstBucket), + Key: aws.String(dstKey), + Metadata: headRes.Metadata, + ACL: getACLFromGrants(acl.Grants), + } + + if headRes.CacheControl != nil { + createMPInput.CacheControl = headRes.CacheControl + } + if headRes.ContentDisposition != nil { + createMPInput.ContentDisposition = headRes.ContentDisposition + } + if headRes.ContentEncoding != nil { + createMPInput.ContentEncoding = headRes.ContentEncoding + } + if headRes.ContentLanguage != nil { + createMPInput.ContentLanguage = headRes.ContentLanguage + } + if headRes.ContentType != nil { + createMPInput.ContentType = headRes.ContentType + } + if headRes.Expires != nil { + createMPInput.Expires = headRes.Expires + } + + createRes, err := c.S3Client.CreateMultipartUpload(ctx, createMPInput) + if err != nil { + return fmt.Errorf("create multipart upload: %w", err) + } + if createRes.UploadId == nil { + return fmt.Errorf("no upload id returned") + } + + size := headRes.ContentLength + completedParts, err := c.uploadParts(ctx, srcBucket, srcKey, dstBucket, dstKey, aws.ToString(createRes.UploadId), size, concurrency) + if err != nil { + _, abortErr := c.S3Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(dstBucket), + Key: aws.String(dstKey), + UploadId: createRes.UploadId, + }) + if abortErr != nil { + return fmt.Errorf("upload failed: %w, abort failed: %v", err, abortErr) + } + return fmt.Errorf("upload failed: %w", err) + } + + sort.Slice(completedParts, func(i, j int) bool { + return completedParts[i].PartNumber < completedParts[j].PartNumber + }) + + _, err = c.S3Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(dstBucket), + Key: aws.String(dstKey), + UploadId: createRes.UploadId, + MultipartUpload: &s3types.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) + if err != nil { + return fmt.Errorf("complete multipart upload: %w", err) + } + + if verbose { + fmt.Printf("deleting: %s\n", srcURL) + } + + if err := c.DeleteObject(ctx, srcBucket, srcKey); err != nil { + return fmt.Errorf("delete source: %w", err) + } + + return nil +} + +func (c *Client) uploadParts(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey, uploadID string, size int64, concurrency int) ([]s3types.CompletedPart, error) { + partSize := int64(moveDefaultPartSize) + if partSize > size { + partSize = size + } + + numParts := int((size + partSize - 1) / partSize) + + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup + var mu sync.Mutex + var completedParts []s3types.CompletedPart + var errs *multierror.Error + + for i := 0; i < numParts; i++ { + sem <- struct{}{} + wg.Add(1) + + go func(partNum int) { + defer wg.Done() + defer func() { <-sem }() + + start := int64(partNum) * partSize + end := start + partSize + if end > size { + end = size + } + + part, err := c.uploadPartCopy(ctx, srcBucket, srcKey, dstBucket, dstKey, uploadID, int32(partNum+1), start, end) + mu.Lock() + defer mu.Unlock() + if err != nil { + errs = multierror.Append(errs, err) + } else if part != nil { + completedParts = append(completedParts, *part) + } + }(i) + } + + wg.Wait() + + if errs != nil { + return nil, errs.ErrorOrNil() + } + + return completedParts, nil +} + +func (c *Client) uploadPartCopy(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey, uploadID string, partNumber int32, start, end int64) (*s3types.CompletedPart, error) { + res, err := c.S3Client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ + Bucket: aws.String(dstBucket), + Key: aws.String(dstKey), + UploadId: aws.String(uploadID), + PartNumber: partNumber, + CopySource: aws.String(copySource(srcBucket, srcKey)), + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", start, end-1)), + }) + if err != nil { + return nil, fmt.Errorf("upload part copy: %w", err) + } + + return &s3types.CompletedPart{ + ETag: res.CopyPartResult.ETag, + PartNumber: partNumber, + }, nil +} diff --git a/pkg/storage/sos/move_test.go b/pkg/storage/sos/move_test.go new file mode 100644 index 000000000..a4d1b0203 --- /dev/null +++ b/pkg/storage/sos/move_test.go @@ -0,0 +1,208 @@ +package sos_test + +import ( + "context" + "errors" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + + "github.com/exoscale/cli/pkg/storage/sos" +) + +func TestMoveObject_SingleObject(t *testing.T) { + tests := []struct { + name string + srcBucket string + srcKey string + dstBucket string + dstKey string + setupMocks func(*MockS3API) + expectError bool + }{ + { + name: "successful move within same bucket", + srcBucket: "test-bucket", + srcKey: "source-key", + dstBucket: "test-bucket", + dstKey: "dest-key", + setupMocks: func(m *MockS3API) { + m.mockHeadObject = func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ + ContentLength: 1024, + Metadata: map[string]string{"key": "value"}, + ContentType: aws.String("text/plain"), + }, nil + } + m.mockGetObjectAcl = func(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{}, nil + } + m.mockCopyObject = func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + assert.Equal(t, "test-bucket", *params.Bucket) + assert.Equal(t, "dest-key", *params.Key) + assert.Equal(t, "test-bucket/source-key", *params.CopySource) + return &s3.CopyObjectOutput{}, nil + } + m.mockDeleteObject = func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + assert.Equal(t, "test-bucket", *params.Bucket) + assert.Equal(t, "source-key", *params.Key) + return &s3.DeleteObjectOutput{}, nil + } + }, + expectError: false, + }, + { + name: "move fails when copy fails", + srcBucket: "test-bucket", + srcKey: "source-key", + dstBucket: "test-bucket", + dstKey: "dest-key", + setupMocks: func(m *MockS3API) { + m.mockHeadObject = func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ContentLength: 1024}, nil + } + m.mockGetObjectAcl = func(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{}, nil + } + m.mockCopyObject = func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + return nil, errors.New("copy failed") + } + }, + expectError: true, + }, + { + name: "move fails when delete fails after copy", + srcBucket: "test-bucket", + srcKey: "source-key", + dstBucket: "test-bucket", + dstKey: "dest-key", + setupMocks: func(m *MockS3API) { + m.mockHeadObject = func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ContentLength: 1024}, nil + } + m.mockGetObjectAcl = func(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{}, nil + } + m.mockCopyObject = func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + return &s3.CopyObjectOutput{}, nil + } + m.mockDeleteObject = func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return nil, errors.New("delete failed") + } + }, + expectError: true, + }, + { + name: "head object fails", + srcBucket: "test-bucket", + srcKey: "source-key", + dstBucket: "test-bucket", + dstKey: "dest-key", + setupMocks: func(m *MockS3API) { + m.mockHeadObject = func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return nil, errors.New("head object failed") + } + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockS3API := &MockS3API{} + tt.setupMocks(mockS3API) + + client := &sos.Client{ + S3Client: mockS3API, + Zone: "test-zone", + } + + err := client.MoveObject(context.Background(), tt.srcBucket, tt.srcKey, tt.dstBucket, tt.dstKey, 1, false) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMoveObject_Multipart(t *testing.T) { + t.Run("successful multipart move", func(t *testing.T) { + mockS3API := &MockS3API{ + mockHeadObject: func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ + ContentLength: 5*1024*1024*1024 + 1, + Metadata: map[string]string{"key": "value"}, + }, nil + }, + mockGetObjectAcl: func(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{}, nil + }, + mockCreateMultipartUpload: func(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + assert.Equal(t, "value", params.Metadata["key"], "metadata must be preserved on multipart uploads") + return &s3.CreateMultipartUploadOutput{UploadId: aws.String("test-upload-id")}, nil + }, + mockUploadPartCopy: func(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + return &s3.UploadPartCopyOutput{ + CopyPartResult: &types.CopyPartResult{ + ETag: aws.String("test-etag"), + }, + }, nil + }, + mockCompleteMultipartUpload: func(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + for i := 1; i < len(params.MultipartUpload.Parts); i++ { + assert.Less(t, params.MultipartUpload.Parts[i-1].PartNumber, params.MultipartUpload.Parts[i].PartNumber, + "parts must be sorted by PartNumber") + } + return &s3.CompleteMultipartUploadOutput{}, nil + }, + mockDeleteObject: func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return &s3.DeleteObjectOutput{}, nil + }, + } + + client := &sos.Client{ + S3Client: mockS3API, + Zone: "test-zone", + } + + err := client.MoveObject(context.Background(), "src-bucket", "large-file", "dst-bucket", "large-file", 2, false) + assert.NoError(t, err) + }) + + t.Run("multipart abort on part copy failure", func(t *testing.T) { + abortCalled := false + mockS3API := &MockS3API{ + mockHeadObject: func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ContentLength: 5*1024*1024*1024 + 1}, nil + }, + mockGetObjectAcl: func(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{}, nil + }, + mockCreateMultipartUpload: func(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + return &s3.CreateMultipartUploadOutput{UploadId: aws.String("test-upload-id")}, nil + }, + mockUploadPartCopy: func(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + return nil, errors.New("part copy failed") + }, + mockAbortMultipartUpload: func(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + abortCalled = true + return &s3.AbortMultipartUploadOutput{}, nil + }, + } + + client := &sos.Client{ + S3Client: mockS3API, + Zone: "test-zone", + } + + err := client.MoveObject(context.Background(), "src-bucket", "large-file", "dst-bucket", "large-file", 1, false) + assert.Error(t, err) + assert.True(t, abortCalled) + assert.Contains(t, err.Error(), "upload failed") + }) +} diff --git a/pkg/storage/sos/s3api.go b/pkg/storage/sos/s3api.go index ed5c4580c..14eb237b5 100644 --- a/pkg/storage/sos/s3api.go +++ b/pkg/storage/sos/s3api.go @@ -11,6 +11,7 @@ type S3API interface { s3manager.UploadAPIClient DeleteObjects(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) + DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) GetObjectAcl(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) @@ -33,4 +34,6 @@ type S3API interface { GetBucketReplication(ctx context.Context, params *s3.GetBucketReplicationInput, optFns ...func(*s3.Options)) (*s3.GetBucketReplicationOutput, error) PutBucketReplication(ctx context.Context, params *s3.PutBucketReplicationInput, optFns ...func(*s3.Options)) (*s3.PutBucketReplicationOutput, error) DeleteBucketReplication(ctx context.Context, params *s3.DeleteBucketReplicationInput, optFns ...func(*s3.Options)) (*s3.DeleteBucketReplicationOutput, error) + HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) + UploadPartCopy(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) } diff --git a/pkg/storage/sos/s3api_mock_test.go b/pkg/storage/sos/s3api_mock_test.go index 23bffcad4..0e66e5a16 100644 --- a/pkg/storage/sos/s3api_mock_test.go +++ b/pkg/storage/sos/s3api_mock_test.go @@ -9,6 +9,7 @@ import ( type MockS3API struct { mockGetObject func(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) mockGetObjectAcl func(ctx context.Context, input *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) //nolint:stylecheck + mockDeleteObject func(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) mockDeleteObjects func(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) mockListObjectsV2 func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) mockListObjectVersions func(ctx context.Context, params *s3.ListObjectVersionsInput, optFns ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error) @@ -29,6 +30,8 @@ type MockS3API struct { mockGetBucketReplication func(ctx context.Context, params *s3.GetBucketReplicationInput, optFns ...func(*s3.Options)) (*s3.GetBucketReplicationOutput, error) mockPutBucketReplication func(ctx context.Context, params *s3.PutBucketReplicationInput, optFns ...func(*s3.Options)) (*s3.PutBucketReplicationOutput, error) mockDeleteBucketReplication func(ctx context.Context, params *s3.DeleteBucketReplicationInput, optFns ...func(*s3.Options)) (*s3.DeleteBucketReplicationOutput, error) + mockHeadObject func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) + mockUploadPartCopy func(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) // s3manager.UploadAPIClient mockPutObject func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) @@ -48,7 +51,18 @@ func (m *MockS3API) GetObjectAcl(ctx context.Context, input *s3.GetObjectAclInpu func (m *MockS3API) DeleteObjects(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { return m.mockDeleteObjects(ctx, params, optFns...) +} + +func (m *MockS3API) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return m.mockDeleteObject(ctx, params, optFns...) +} + +func (m *MockS3API) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return m.mockHeadObject(ctx, params, optFns...) +} +func (m *MockS3API) UploadPartCopy(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + return m.mockUploadPartCopy(ctx, params, optFns...) } func (m *MockS3API) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {