From 51c943c39a98479272e415d94f7c6d466c8aeae6 Mon Sep 17 00:00:00 2001 From: Nicolas Hedger Date: Tue, 24 Mar 2026 22:32:39 +0100 Subject: [PATCH 1/3] feat: implement `exo storage move` command --- cmd/storage/storage_move.go | 188 +++++++++++ pkg/storage/sos/acl.go | 41 ++- pkg/storage/sos/move.go | 509 +++++++++++++++++++++++++++++ pkg/storage/sos/move_test.go | 289 ++++++++++++++++ pkg/storage/sos/object.go | 8 +- pkg/storage/sos/s3api.go | 2 + pkg/storage/sos/s3api_mock_test.go | 10 + 7 files changed, 1038 insertions(+), 9 deletions(-) create mode 100644 cmd/storage/storage_move.go create mode 100644 pkg/storage/sos/move.go create mode 100644 pkg/storage/sos/move_test.go diff --git a/cmd/storage/storage_move.go b/cmd/storage/storage_move.go new file mode 100644 index 000000000..9cb2d10fb --- /dev/null +++ b/cmd/storage/storage_move.go @@ -0,0 +1,188 @@ +package storage + +import ( + "fmt" + "strings" + + "github.com/spf13/cobra" + + exocmd "github.com/exoscale/cli/cmd" + "github.com/exoscale/cli/pkg/globalstate" + "github.com/exoscale/cli/pkg/storage/sos" +) + +type storageMoveLocation struct { + Bucket string + Key string +} + +var storageMoveCmd = &cobra.Command{ + Use: "move SOURCE DESTINATION", + Aliases: []string{"mv"}, + Short: "Move objects without local download", + Long: `This command moves objects between buckets without downloading them locally. + +Examples: + + # Move an object within a bucket + exo storage move sos://my-bucket/file-a sos://my-bucket/archive/file-a + + # Move an object to another bucket and keep its basename + exo storage move sos://my-bucket/file-a sos://other-bucket/archive/ + + # Move a prefix recursively + exo storage move -r sos://my-bucket/public/ sos://other-bucket/archive/ + +Notes: + + * If the destination ends with "/", the source basename is preserved. + * Prefix moves preserve relative paths under the destination prefix. + * Existing destination objects are overwritten. +`, + + PreRunE: func(cmd *cobra.Command, args []string) error { + if len(args) != 2 { + exocmd.CmdExitOnUsageError(cmd, "invalid arguments") + } + + args[0] = strings.TrimPrefix(args[0], sos.BucketPrefix) + args[1] = strings.TrimPrefix(args[1], sos.BucketPrefix) + + if !strings.Contains(args[0], "/") { + exocmd.CmdExitOnUsageError(cmd, fmt.Sprintf("invalid argument: %q", args[0])) + } + + return nil + }, + + RunE: func(cmd *cobra.Command, args []string) error { + source, err := parseStorageMoveLocation(args[0]) + if err != nil { + return err + } + + destination, err := parseStorageMoveLocation(args[1]) + if err != nil { + return err + } + + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + return err + } + + dryRun, err := cmd.Flags().GetBool("dry-run") + if err != nil { + return err + } + + verbose, err := cmd.Flags().GetBool("verbose") + if err != nil { + return err + } + + concurrency, err := cmd.Flags().GetInt("concurrency") + if err != nil { + return err + } + if concurrency < 1 { + return fmt.Errorf("invalid concurrency %d, value must be greater than 0", concurrency) + } + + srcStorage, err := sos.NewStorageClient( + exocmd.GContext, + sos.ClientOptZoneFromBucket(exocmd.GContext, source.Bucket), + ) + if err != nil { + return fmt.Errorf("unable to initialize source storage client: %w", err) + } + + dstStorage, err := sos.NewStorageClient( + exocmd.GContext, + sos.ClientOptZoneFromBucket(exocmd.GContext, destination.Bucket), + ) + if err != nil { + return fmt.Errorf("unable to initialize destination storage client: %w", err) + } + + moved, moveErr := dstStorage.MoveObjects( + exocmd.GContext, + srcStorage, + source.Bucket, + source.Key, + destination.Bucket, + destination.Key, + &sos.StorageMoveConfig{ + Recursive: recursive, + DryRun: dryRun, + MultipartCopyConcurrency: concurrency, + }, + ) + + if dryRun { + fmt.Println("[DRY-RUN]") + } + + if dryRun || verbose { + for _, move := range moved { + fmt.Printf("%s%s/%s -> %s%s/%s\n", + sos.BucketPrefix, move.SourceBucket, move.SourceKey, + sos.BucketPrefix, move.DestinationBucket, move.DestinationKey, + ) + } + } + + if moveErr != nil { + if !dryRun && !globalstate.Quiet && len(moved) > 0 && !verbose { + fmt.Printf("Moved %d object(s) before an error occurred\n", len(moved)) + } + return moveErr + } + + if len(moved) == 0 { + if !globalstate.Quiet { + fmt.Printf("no objects exist at %q\n", source.Key) + } + return nil + } + + if dryRun { + return nil + } + + return nil + }, +} + +func init() { + storageMoveCmd.Flags().BoolP("dry-run", "n", false, + "simulate object moves, don't actually do them") + storageMoveCmd.Flags().BoolP("recursive", "r", false, + "move object prefixes recursively") + storageMoveCmd.Flags().IntP("concurrency", "c", 5, + "number of parallel multipart copy workers for large object moves") + storageMoveCmd.Flags().BoolP("verbose", "v", false, + "output moved objects") + storageCmd.AddCommand(storageMoveCmd) +} + +func parseStorageMoveLocation(value string) (storageMoveLocation, error) { + var location storageMoveLocation + + parts := strings.SplitN(value, "/", 2) + location.Bucket = parts[0] + if location.Bucket == "" { + return location, fmt.Errorf("invalid bucket name") + } + + if len(parts) == 1 { + return location, nil + } + + location.Key = parts[1] + if location.Key == "" { + location.Key = "/" + } + + return location, nil +} diff --git a/pkg/storage/sos/acl.go b/pkg/storage/sos/acl.go index 42fc10a86..ca4dd9135 100644 --- a/pkg/storage/sos/acl.go +++ b/pkg/storage/sos/acl.go @@ -250,8 +250,14 @@ func storageACLGranteeToS3(v string) *s3types.Grantee { } } -// storageACLToCopyObject updates the object to be copied with S3 ACL information. -func storageACLToCopyObject(acl *s3.GetObjectAclOutput, o *s3.CopyObjectInput) { +type objectACLGrants struct { + GrantRead *string + GrantReadACP *string + GrantWriteACP *string + GrantFullControl *string +} + +func storageACLToGrants(acl *s3.GetObjectAclOutput) objectACLGrants { s3GranteeToString := func(g *s3types.Grantee) *string { if g.Type == s3types.TypeCanonicalUser { return aws.String("id=" + aws.ToString(g.ID)) @@ -268,24 +274,45 @@ func storageACLToCopyObject(acl *s3.GetObjectAclOutput, o *s3.CopyObjectInput) { return nil } - o.GrantFullControl = aws.String("id=" + aws.ToString(acl.Owner.ID)) + grants := objectACLGrants{ + GrantFullControl: aws.String("id=" + aws.ToString(acl.Owner.ID)), + } for _, grant := range acl.Grants { switch grant.Permission { case s3types.PermissionRead: - o.GrantRead = s3GranteeToString(grant.Grantee) + grants.GrantRead = s3GranteeToString(grant.Grantee) // Write permission is not supported on S3 objects: // https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#permissions case s3types.PermissionReadAcp: - o.GrantReadACP = s3GranteeToString(grant.Grantee) + grants.GrantReadACP = s3GranteeToString(grant.Grantee) case s3types.PermissionWriteAcp: - o.GrantWriteACP = s3GranteeToString(grant.Grantee) + grants.GrantWriteACP = s3GranteeToString(grant.Grantee) case s3types.PermissionFullControl: - o.GrantFullControl = s3GranteeToString(grant.Grantee) + grants.GrantFullControl = s3GranteeToString(grant.Grantee) } } + + return grants +} + +// storageACLToCopyObject updates the object to be copied with S3 ACL information. +func storageACLToCopyObject(acl *s3.GetObjectAclOutput, o *s3.CopyObjectInput) { + grants := storageACLToGrants(acl) + o.GrantRead = grants.GrantRead + o.GrantReadACP = grants.GrantReadACP + o.GrantWriteACP = grants.GrantWriteACP + o.GrantFullControl = grants.GrantFullControl +} + +func storageACLToCreateMultipartUpload(acl *s3.GetObjectAclOutput, o *s3.CreateMultipartUploadInput) { + grants := storageACLToGrants(acl) + o.GrantRead = grants.GrantRead + o.GrantReadACP = grants.GrantReadACP + o.GrantWriteACP = grants.GrantWriteACP + o.GrantFullControl = grants.GrantFullControl } diff --git a/pkg/storage/sos/move.go b/pkg/storage/sos/move.go new file mode 100644 index 000000000..82611409a --- /dev/null +++ b/pkg/storage/sos/move.go @@ -0,0 +1,509 @@ +package sos + +import ( + "context" + "fmt" + "net/url" + "path" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/hashicorp/go-multierror" +) + +const ( + maxSingleCopyObjectSize int64 = 5 * 1024 * 1024 * 1024 + defaultMultipartCopyConcurrency = 5 +) + +type MovedObject struct { + SourceBucket string + SourceKey string + DestinationBucket string + DestinationKey string +} + +type StorageMoveConfig struct { + Recursive bool + DryRun bool + MultipartCopyConcurrency int +} + +type moveSourceObject struct { + CopySource string + ContentLength int64 + Metadata map[string]string + ETag *string + CacheControl *string + ContentDisposition *string + ContentEncoding *string + ContentLanguage *string + ContentType *string + Expires *time.Time + WebsiteRedirectLocation *string + StorageClass s3types.StorageClass + ServerSideEncryption s3types.ServerSideEncryption + SSEKMSKeyID *string + BucketKeyEnabled bool + ACL *s3.GetObjectAclOutput +} + +type multipartCopyPart struct { + Index int + PartNumber int32 + Start int64 + End int64 +} + +func (c *Client) MoveObjects( + ctx context.Context, + srcClient *Client, + srcBucket, srcKey, dstBucket, dstKey string, + config *StorageMoveConfig, +) ([]MovedObject, error) { + config = normalizeStorageMoveConfig(config) + + if srcKey == "" { + return nil, fmt.Errorf("source must include an object or prefix") + } + + srcIsPrefix := srcKey == "/" || strings.HasSuffix(srcKey, "/") + if !srcIsPrefix && config.Recursive { + return nil, fmt.Errorf("source %q is an object, remove flag `-r` or suffix the source with `/` to move a prefix", srcKey) + } + + if srcIsPrefix { + if dstKey != "" && dstKey != "/" && !strings.HasSuffix(dstKey, "/") { + return nil, fmt.Errorf("moving a prefix requires the destination to end with `/`") + } + + sourceKeys, err := srcClient.listMoveSourceKeys(ctx, srcBucket, srcKey, config.Recursive) + if err != nil { + return nil, err + } + + moved := make([]MovedObject, 0, len(sourceKeys)) + var errs *multierror.Error + + for _, sourceKey := range sourceKeys { + destinationKey := resolveMovePrefixDestinationKey(srcKey, sourceKey, dstKey) + mapping := MovedObject{ + SourceBucket: srcBucket, + SourceKey: sourceKey, + DestinationBucket: dstBucket, + DestinationKey: destinationKey, + } + + if moveLocationsEqual(mapping) { + errs = multierror.Append(errs, fmt.Errorf("source and destination are identical: %s/%s", srcBucket, sourceKey)) + continue + } + + if !config.DryRun { + if err := c.moveObject(ctx, srcClient, mapping, config.MultipartCopyConcurrency); err != nil { + errs = multierror.Append(errs, err) + continue + } + } + + moved = append(moved, mapping) + } + + return moved, errs.ErrorOrNil() + } + + destinationKey := resolveMoveObjectDestinationKey(srcKey, dstKey) + mapping := MovedObject{ + SourceBucket: srcBucket, + SourceKey: srcKey, + DestinationBucket: dstBucket, + DestinationKey: destinationKey, + } + + if moveLocationsEqual(mapping) { + return nil, fmt.Errorf("source and destination are identical: %s/%s", srcBucket, srcKey) + } + + if config.DryRun { + if _, err := srcClient.describeMoveSourceObject(ctx, srcBucket, srcKey); err != nil { + return nil, err + } + return []MovedObject{mapping}, nil + } + + if err := c.moveObject(ctx, srcClient, mapping, config.MultipartCopyConcurrency); err != nil { + return nil, err + } + + return []MovedObject{mapping}, nil +} + +func (c *Client) listMoveSourceKeys(ctx context.Context, bucket, prefix string, recursive bool) ([]string, error) { + keys := make([]string, 0) + if err := c.ForEachObject(ctx, bucket, prefix, recursive, func(o *s3types.Object) error { + keys = append(keys, aws.ToString(o.Key)) + return nil + }); err != nil { + return nil, fmt.Errorf("error listing objects to move: %w", err) + } + + return keys, nil +} + +func (c *Client) moveObject(ctx context.Context, srcClient *Client, move MovedObject, multipartCopyConcurrency int) error { + sourceObject, err := srcClient.describeMoveSourceObject(ctx, move.SourceBucket, move.SourceKey) + if err != nil { + return err + } + + if sourceObject.ContentLength <= maxSingleCopyObjectSize { + if err := c.copyObjectToDestination(ctx, move, sourceObject); err != nil { + return err + } + } else { + if err := c.copyMultipartObjectToDestination(ctx, move, sourceObject, multipartCopyConcurrency); err != nil { + return err + } + } + + if err := srcClient.deleteMovedObject(ctx, move.SourceBucket, move.SourceKey); err != nil { + return fmt.Errorf("delete source object %s/%s: %w", move.SourceBucket, move.SourceKey, err) + } + + return nil +} + +func (c *Client) describeMoveSourceObject(ctx context.Context, bucket, key string) (*moveSourceObject, error) { + head, err := c.S3Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, fmt.Errorf("unable to retrieve object information: %w", err) + } + + acl, err := c.S3Client.GetObjectAcl(ctx, &s3.GetObjectAclInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, fmt.Errorf("unable to retrieve object ACL: %w", err) + } + + return &moveSourceObject{ + CopySource: encodeCopySource(bucket, key, head.VersionId), + ContentLength: head.ContentLength, + Metadata: head.Metadata, + ETag: head.ETag, + CacheControl: head.CacheControl, + ContentDisposition: head.ContentDisposition, + ContentEncoding: head.ContentEncoding, + ContentLanguage: head.ContentLanguage, + ContentType: head.ContentType, + Expires: head.Expires, + WebsiteRedirectLocation: head.WebsiteRedirectLocation, + StorageClass: head.StorageClass, + ServerSideEncryption: head.ServerSideEncryption, + SSEKMSKeyID: head.SSEKMSKeyId, + BucketKeyEnabled: head.BucketKeyEnabled, + ACL: acl, + }, nil +} + +func (c *Client) copyObjectToDestination(ctx context.Context, move MovedObject, source *moveSourceObject) error { + copyInput := &s3.CopyObjectInput{ + Bucket: aws.String(move.DestinationBucket), + Key: aws.String(move.DestinationKey), + CopySource: aws.String(source.CopySource), + Metadata: source.Metadata, + MetadataDirective: s3types.MetadataDirectiveReplace, + CacheControl: source.CacheControl, + ContentDisposition: source.ContentDisposition, + ContentEncoding: source.ContentEncoding, + ContentLanguage: source.ContentLanguage, + ContentType: source.ContentType, + Expires: source.Expires, + WebsiteRedirectLocation: source.WebsiteRedirectLocation, + BucketKeyEnabled: source.BucketKeyEnabled, + SSEKMSKeyId: source.SSEKMSKeyID, + ServerSideEncryption: source.ServerSideEncryption, + } + + if source.ETag != nil { + copyInput.CopySourceIfMatch = source.ETag + } + if source.StorageClass != "" { + copyInput.StorageClass = source.StorageClass + } + + storageACLToCopyObject(source.ACL, copyInput) + + if _, err := c.S3Client.CopyObject(ctx, copyInput); err != nil { + return fmt.Errorf("copy object to %s/%s: %w", move.DestinationBucket, move.DestinationKey, err) + } + + return nil +} + +func (c *Client) copyMultipartObjectToDestination(ctx context.Context, move MovedObject, source *moveSourceObject, multipartCopyConcurrency int) error { + createInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(move.DestinationBucket), + Key: aws.String(move.DestinationKey), + Metadata: source.Metadata, + CacheControl: source.CacheControl, + ContentDisposition: source.ContentDisposition, + ContentEncoding: source.ContentEncoding, + ContentLanguage: source.ContentLanguage, + ContentType: source.ContentType, + Expires: source.Expires, + WebsiteRedirectLocation: source.WebsiteRedirectLocation, + BucketKeyEnabled: source.BucketKeyEnabled, + SSEKMSKeyId: source.SSEKMSKeyID, + ServerSideEncryption: source.ServerSideEncryption, + } + + if source.StorageClass != "" { + createInput.StorageClass = source.StorageClass + } + + storageACLToCreateMultipartUpload(source.ACL, createInput) + + createOutput, err := c.S3Client.CreateMultipartUpload(ctx, createInput) + if err != nil { + return fmt.Errorf("create multipart upload for %s/%s: %w", move.DestinationBucket, move.DestinationKey, err) + } + + uploadID := createOutput.UploadId + partSize := estimateMultipartCopyPartSize(source.ContentLength) + parts := buildMultipartCopyParts(source.ContentLength, partSize) + completedParts := make([]s3types.CompletedPart, len(parts)) + copyCtx, cancel := context.WithCancel(ctx) + defer cancel() + + jobs := make(chan multipartCopyPart) + var ( + completedPartsMu sync.Mutex + copyErrMu sync.Mutex + firstCopyErr error + wg sync.WaitGroup + ) + + workerCount := multipartCopyConcurrency + if workerCount > len(parts) { + workerCount = len(parts) + } + + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-copyCtx.Done(): + return + + case part, ok := <-jobs: + if !ok { + return + } + + uploadPartInput := &s3.UploadPartCopyInput{ + Bucket: aws.String(move.DestinationBucket), + Key: aws.String(move.DestinationKey), + UploadId: uploadID, + PartNumber: part.PartNumber, + CopySource: aws.String(source.CopySource), + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", part.Start, part.End)), + } + + if source.ETag != nil { + uploadPartInput.CopySourceIfMatch = source.ETag + } + + uploadPartOutput, err := c.S3Client.UploadPartCopy(copyCtx, uploadPartInput) + if err != nil { + setMultipartCopyError(©ErrMu, &firstCopyErr, fmt.Errorf("copy object part %d to %s/%s: %w", part.PartNumber, move.DestinationBucket, move.DestinationKey, err)) + cancel() + return + } + if uploadPartOutput.CopyPartResult == nil { + setMultipartCopyError(©ErrMu, &firstCopyErr, fmt.Errorf("copy object part %d to %s/%s: empty copy result", part.PartNumber, move.DestinationBucket, move.DestinationKey)) + cancel() + return + } + + completedPart := s3types.CompletedPart{ + ETag: uploadPartOutput.CopyPartResult.ETag, + PartNumber: part.PartNumber, + } + + completedPartsMu.Lock() + completedParts[part.Index] = completedPart + completedPartsMu.Unlock() + } + } + }() + } + + go func() { + defer close(jobs) + + for _, part := range parts { + select { + case jobs <- part: + case <-copyCtx.Done(): + return + } + } + }() + + wg.Wait() + if firstCopyErr != nil { + return c.abortMultipartCopy(ctx, move, uploadID, firstCopyErr) + } + + _, err = c.S3Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(move.DestinationBucket), + Key: aws.String(move.DestinationKey), + UploadId: uploadID, + MultipartUpload: &s3types.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) + if err != nil { + return c.abortMultipartCopy(ctx, move, uploadID, fmt.Errorf("complete multipart upload for %s/%s: %w", move.DestinationBucket, move.DestinationKey, err)) + } + + return nil +} + +func (c *Client) abortMultipartCopy(ctx context.Context, move MovedObject, uploadID *string, copyErr error) error { + _, abortErr := c.S3Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(move.DestinationBucket), + Key: aws.String(move.DestinationKey), + UploadId: uploadID, + }) + if abortErr != nil { + return fmt.Errorf("%w (abort multipart upload: %v)", copyErr, abortErr) + } + + return copyErr +} + +func (c *Client) deleteMovedObject(ctx context.Context, bucket, key string) error { + res, err := c.S3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3types.Delete{Objects: []s3types.ObjectIdentifier{{Key: aws.String(key)}}}, + }) + if err != nil { + return err + } + if len(res.Errors) > 0 { + deleteErr := res.Errors[0] + if deleteErr.Message != nil { + return fmt.Errorf("%s", aws.ToString(deleteErr.Message)) + } + if deleteErr.Code != nil { + return fmt.Errorf("%s", aws.ToString(deleteErr.Code)) + } + return fmt.Errorf("delete failed") + } + + return nil +} + +func encodeCopySource(bucket, key string, versionID *string) string { + u := url.URL{Path: bucket + "/" + key} + if versionID != nil && aws.ToString(versionID) != "" && aws.ToString(versionID) != "null" { + query := url.Values{} + query.Set("versionId", aws.ToString(versionID)) + u.RawQuery = query.Encode() + } + + return u.String() +} + +func estimateMultipartCopyPartSize(size int64) int64 { + return estimateMultipartPartSize(size) +} + +func buildMultipartCopyParts(size, partSize int64) []multipartCopyPart { + parts := make([]multipartCopyPart, 0, ((size-1)/partSize)+1) + + for start, index, partNumber := int64(0), 0, int32(1); start < size; start, index, partNumber = start+partSize, index+1, partNumber+1 { + end := start + partSize - 1 + if end >= size { + end = size - 1 + } + + parts = append(parts, multipartCopyPart{ + Index: index, + PartNumber: partNumber, + Start: start, + End: end, + }) + } + + return parts +} + +func setMultipartCopyError(mu *sync.Mutex, firstErr *error, err error) { + mu.Lock() + defer mu.Unlock() + + if *firstErr == nil { + *firstErr = err + } +} + +func normalizeStorageMoveConfig(config *StorageMoveConfig) *StorageMoveConfig { + if config == nil { + config = &StorageMoveConfig{} + } + + if config.MultipartCopyConcurrency <= 0 { + config.MultipartCopyConcurrency = defaultMultipartCopyConcurrency + } + + return config +} + +func resolveMoveObjectDestinationKey(srcKey, dstKey string) string { + if dstKey == "" || dstKey == "/" { + return path.Base(srcKey) + } + + if strings.HasSuffix(dstKey, "/") { + return path.Join(strings.TrimSuffix(normalizeMovePrefix(dstKey), "/"), path.Base(srcKey)) + } + + return dstKey +} + +func resolveMovePrefixDestinationKey(srcPrefix, srcKey, dstPrefix string) string { + sourceBase := normalizeMovePrefix(srcPrefix) + relativeKey := strings.TrimPrefix(srcKey, sourceBase) + destinationBase := normalizeMovePrefix(dstPrefix) + if destinationBase == "" { + return relativeKey + } + + return path.Join(strings.TrimSuffix(destinationBase, "/"), relativeKey) +} + +func normalizeMovePrefix(prefix string) string { + if prefix == "/" { + return "" + } + + return prefix +} + +func moveLocationsEqual(move MovedObject) bool { + return move.SourceBucket == move.DestinationBucket && move.SourceKey == move.DestinationKey +} diff --git a/pkg/storage/sos/move_test.go b/pkg/storage/sos/move_test.go new file mode 100644 index 000000000..34c0b3e14 --- /dev/null +++ b/pkg/storage/sos/move_test.go @@ -0,0 +1,289 @@ +package sos_test + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + + "github.com/exoscale/cli/pkg/storage/sos" +) + +func TestMoveObject(t *testing.T) { + ctx := context.Background() + sourceBucket := "source-bucket" + destinationBucket := "destination-bucket" + sourceKey := "folder/source file.txt" + destinationKey := "archive/source file.txt" + + copyCalls := 0 + deleteCalls := 0 + + srcClient := &sos.Client{S3Client: &MockS3API{ + mockHeadObject: func(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + assert.Equal(t, sourceBucket, aws.ToString(input.Bucket)) + assert.Equal(t, sourceKey, aws.ToString(input.Key)) + return &s3.HeadObjectOutput{ + ContentLength: 1024, + ContentType: aws.String("text/plain"), + Metadata: map[string]string{"key": "value"}, + ETag: aws.String("\"etag\""), + }, nil + }, + mockGetObjectAcl: func(ctx context.Context, input *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + assert.Equal(t, sourceBucket, aws.ToString(input.Bucket)) + assert.Equal(t, sourceKey, aws.ToString(input.Key)) + return &s3.GetObjectAclOutput{ + Owner: &types.Owner{ID: aws.String("owner-id")}, + }, nil + }, + mockDeleteObjects: func(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + deleteCalls++ + assert.Equal(t, sourceBucket, aws.ToString(params.Bucket)) + assert.Len(t, params.Delete.Objects, 1) + assert.Equal(t, sourceKey, aws.ToString(params.Delete.Objects[0].Key)) + return &s3.DeleteObjectsOutput{ + Deleted: []types.DeletedObject{{Key: aws.String(sourceKey)}}, + }, nil + }, + }} + + dstClient := &sos.Client{S3Client: &MockS3API{ + mockCopyObject: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + copyCalls++ + assert.Equal(t, destinationBucket, aws.ToString(params.Bucket)) + assert.Equal(t, destinationKey, aws.ToString(params.Key)) + assert.Equal(t, "source-bucket/folder/source%20file.txt", aws.ToString(params.CopySource)) + assert.Equal(t, types.MetadataDirectiveReplace, params.MetadataDirective) + assert.Equal(t, map[string]string{"key": "value"}, params.Metadata) + assert.Equal(t, "text/plain", aws.ToString(params.ContentType)) + assert.Equal(t, "\"etag\"", aws.ToString(params.CopySourceIfMatch)) + assert.Equal(t, "id=owner-id", aws.ToString(params.GrantFullControl)) + return &s3.CopyObjectOutput{}, nil + }, + }} + + moved, err := dstClient.MoveObjects(ctx, srcClient, sourceBucket, sourceKey, destinationBucket, destinationKey, nil) + assert.NoError(t, err) + assert.Equal(t, []sos.MovedObject{{ + SourceBucket: sourceBucket, + SourceKey: sourceKey, + DestinationBucket: destinationBucket, + DestinationKey: destinationKey, + }}, moved) + assert.Equal(t, 1, copyCalls) + assert.Equal(t, 1, deleteCalls) +} + +func TestMovePrefixRecursively(t *testing.T) { + ctx := context.Background() + sourceBucket := "source-bucket" + destinationBucket := "destination-bucket" + sourcePrefix := "public/" + destinationPrefix := "archive/" + sourceKeys := []string{"public/a.txt", "public/sub/b.txt"} + expectedDestinationKeys := []string{"archive/a.txt", "archive/sub/b.txt"} + + headCalls := 0 + deleteCalls := 0 + copyCalls := 0 + deletedKeys := make([]string, 0, len(sourceKeys)) + copiedKeys := make([]string, 0, len(sourceKeys)) + + srcClient := &sos.Client{S3Client: &MockS3API{ + mockListObjectsV2: func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + assert.Equal(t, sourceBucket, aws.ToString(params.Bucket)) + assert.Equal(t, sourcePrefix, aws.ToString(params.Prefix)) + return &s3.ListObjectsV2Output{ + Contents: []types.Object{ + {Key: aws.String(sourceKeys[0])}, + {Key: aws.String(sourceKeys[1])}, + }, + IsTruncated: false, + }, nil + }, + mockHeadObject: func(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + headCalls++ + assert.Contains(t, sourceKeys, aws.ToString(input.Key)) + return &s3.HeadObjectOutput{ContentLength: 1024, ETag: aws.String("\"etag\"")}, nil + }, + mockGetObjectAcl: func(ctx context.Context, input *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{Owner: &types.Owner{ID: aws.String("owner-id")}}, nil + }, + mockDeleteObjects: func(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + deleteCalls++ + deletedKey := aws.ToString(params.Delete.Objects[0].Key) + deletedKeys = append(deletedKeys, deletedKey) + return &s3.DeleteObjectsOutput{Deleted: []types.DeletedObject{{Key: aws.String(deletedKey)}}}, nil + }, + }} + + dstClient := &sos.Client{S3Client: &MockS3API{ + mockCopyObject: func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + copyCalls++ + copiedKeys = append(copiedKeys, aws.ToString(params.Key)) + return &s3.CopyObjectOutput{}, nil + }, + }} + + moved, err := dstClient.MoveObjects(ctx, srcClient, sourceBucket, sourcePrefix, destinationBucket, destinationPrefix, &sos.StorageMoveConfig{Recursive: true}) + assert.NoError(t, err) + assert.Len(t, moved, 2) + assert.Equal(t, sourceKeys, deletedKeys) + assert.Equal(t, expectedDestinationKeys, copiedKeys) + assert.Equal(t, len(sourceKeys), headCalls) + assert.Equal(t, len(sourceKeys), deleteCalls) + assert.Equal(t, len(sourceKeys), copyCalls) +} + +func TestMoveObjectMultipartCopy(t *testing.T) { + testMoveObjectMultipartCopy(t, nil, 5) +} + +func TestMoveObjectMultipartCopyCustomConcurrency(t *testing.T) { + testMoveObjectMultipartCopy(t, &sos.StorageMoveConfig{MultipartCopyConcurrency: 2}, 2) +} + +func testMoveObjectMultipartCopy(t *testing.T, config *sos.StorageMoveConfig, expectedConcurrency int) { + t.Helper() + + ctx := context.Background() + sourceBucket := "source-bucket" + destinationBucket := "destination-bucket" + sourceKey := "large-object.bin" + destinationKey := "archive/large-object.bin" + size := int64(5)*1024*1024*1024 + 1 + + partCalls := 0 + deleteCalls := 0 + completeCalls := 0 + maxInFlight := 0 + inFlight := 0 + partRanges := make(map[int32]string) + var partMu sync.Mutex + + srcClient := &sos.Client{S3Client: &MockS3API{ + mockHeadObject: func(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ + ContentLength: size, + ContentType: aws.String("application/octet-stream"), + ETag: aws.String("\"etag\""), + }, nil + }, + mockGetObjectAcl: func(ctx context.Context, input *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{Owner: &types.Owner{ID: aws.String("owner-id")}}, nil + }, + mockDeleteObjects: func(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + deleteCalls++ + return &s3.DeleteObjectsOutput{Deleted: []types.DeletedObject{{Key: aws.String(sourceKey)}}}, nil + }, + }} + + dstClient := &sos.Client{S3Client: &MockS3API{ + mockCreateMultipartUpload: func(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + assert.Equal(t, destinationBucket, aws.ToString(params.Bucket)) + assert.Equal(t, destinationKey, aws.ToString(params.Key)) + assert.Equal(t, "application/octet-stream", aws.ToString(params.ContentType)) + return &s3.CreateMultipartUploadOutput{UploadId: aws.String("upload-id")}, nil + }, + mockUploadPartCopy: func(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + partMu.Lock() + partCalls++ + inFlight++ + if inFlight > maxInFlight { + maxInFlight = inFlight + } + partRanges[params.PartNumber] = aws.ToString(params.CopySourceRange) + partMu.Unlock() + + time.Sleep(2 * time.Millisecond) + + partMu.Lock() + inFlight-- + partMu.Unlock() + + assert.Equal(t, "source-bucket/large-object.bin", aws.ToString(params.CopySource)) + assert.Equal(t, "\"etag\"", aws.ToString(params.CopySourceIfMatch)) + return &s3.UploadPartCopyOutput{ + CopyPartResult: &types.CopyPartResult{ETag: aws.String(fmt.Sprintf("etag-%d", params.PartNumber))}, + }, nil + }, + mockCompleteMultipartUpload: func(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + completeCalls++ + assert.Equal(t, 1025, len(params.MultipartUpload.Parts)) + return &s3.CompleteMultipartUploadOutput{}, nil + }, + mockAbortMultipartUpload: func(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + assert.Fail(t, "abort should not be called on successful multipart copy") + return nil, nil + }, + }} + + moved, err := dstClient.MoveObjects(ctx, srcClient, sourceBucket, sourceKey, destinationBucket, destinationKey, config) + assert.NoError(t, err) + assert.Len(t, moved, 1) + assert.Equal(t, 1025, partCalls) + assert.Equal(t, 1, completeCalls) + assert.Equal(t, 1, deleteCalls) + assert.Equal(t, "bytes=0-5242879", partRanges[1]) + assert.Equal(t, "bytes=5368709120-5368709120", partRanges[1025]) + assert.Equal(t, expectedConcurrency, maxInFlight) +} + +func TestMoveObjectMultipartCopyAbortOnFailure(t *testing.T) { + ctx := context.Background() + sourceBucket := "source-bucket" + destinationBucket := "destination-bucket" + sourceKey := "large-object.bin" + destinationKey := "archive/large-object.bin" + size := int64(5)*1024*1024*1024 + 1 + + abortCalls := 0 + deleteCalls := 0 + completeCalls := 0 + + srcClient := &sos.Client{S3Client: &MockS3API{ + mockHeadObject: func(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return &s3.HeadObjectOutput{ContentLength: size, ETag: aws.String("\"etag\"")}, nil + }, + mockGetObjectAcl: func(ctx context.Context, input *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) { + return &s3.GetObjectAclOutput{Owner: &types.Owner{ID: aws.String("owner-id")}}, nil + }, + mockDeleteObjects: func(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + deleteCalls++ + return &s3.DeleteObjectsOutput{}, nil + }, + }} + + dstClient := &sos.Client{S3Client: &MockS3API{ + mockCreateMultipartUpload: func(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + return &s3.CreateMultipartUploadOutput{UploadId: aws.String("upload-id")}, nil + }, + mockUploadPartCopy: func(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + return nil, errors.New("copy failed") + }, + mockAbortMultipartUpload: func(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + abortCalls++ + assert.Equal(t, "upload-id", aws.ToString(params.UploadId)) + return &s3.AbortMultipartUploadOutput{}, nil + }, + mockCompleteMultipartUpload: func(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + completeCalls++ + return &s3.CompleteMultipartUploadOutput{}, nil + }, + }} + + moved, err := dstClient.MoveObjects(ctx, srcClient, sourceBucket, sourceKey, destinationBucket, destinationKey, nil) + assert.Error(t, err) + assert.Nil(t, moved) + assert.Equal(t, 1, abortCalls) + assert.Equal(t, 0, deleteCalls) + assert.Equal(t, 0, completeCalls) +} diff --git a/pkg/storage/sos/object.go b/pkg/storage/sos/object.go index 068888fef..8db19f9bd 100644 --- a/pkg/storage/sos/object.go +++ b/pkg/storage/sos/object.go @@ -626,11 +626,15 @@ func (c *Client) EstimatePartSize(f *os.File) (int64, error) { return 0, err } + return estimateMultipartPartSize(size), nil +} + +func estimateMultipartPartSize(size int64) int64 { if size/int64(s3manager.DefaultUploadPartSize) >= int64(s3manager.MaxUploadParts) { - return (size / int64(s3manager.MaxUploadParts)) + 1, nil + return (size / int64(s3manager.MaxUploadParts)) + 1 } - return s3manager.DefaultUploadPartSize, nil + return s3manager.DefaultUploadPartSize } func computeSeekerLength(s io.Seeker) (int64, error) { diff --git a/pkg/storage/sos/s3api.go b/pkg/storage/sos/s3api.go index ed5c4580c..a31783a6b 100644 --- a/pkg/storage/sos/s3api.go +++ b/pkg/storage/sos/s3api.go @@ -11,11 +11,13 @@ type S3API interface { s3manager.UploadAPIClient DeleteObjects(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) + HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) GetObjectAcl(ctx context.Context, params *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) ListObjectVersions(ctx context.Context, params *s3.ListObjectVersionsInput, optFns ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error) CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) + UploadPartCopy(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) DeleteBucketCors(ctx context.Context, params *s3.DeleteBucketCorsInput, optFns ...func(*s3.Options)) (*s3.DeleteBucketCorsOutput, error) GetBucketCors(ctx context.Context, params *s3.GetBucketCorsInput, optFns ...func(*s3.Options)) (*s3.GetBucketCorsOutput, error) PutBucketCors(ctx context.Context, params *s3.PutBucketCorsInput, optFns ...func(*s3.Options)) (*s3.PutBucketCorsOutput, error) diff --git a/pkg/storage/sos/s3api_mock_test.go b/pkg/storage/sos/s3api_mock_test.go index 23bffcad4..1fa132db6 100644 --- a/pkg/storage/sos/s3api_mock_test.go +++ b/pkg/storage/sos/s3api_mock_test.go @@ -7,12 +7,14 @@ import ( ) type MockS3API struct { + mockHeadObject func(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) mockGetObject func(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) mockGetObjectAcl func(ctx context.Context, input *s3.GetObjectAclInput, optFns ...func(*s3.Options)) (*s3.GetObjectAclOutput, error) //nolint:stylecheck mockDeleteObjects func(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) mockListObjectsV2 func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) mockListObjectVersions func(ctx context.Context, params *s3.ListObjectVersionsInput, optFns ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error) mockCopyObject func(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) + mockUploadPartCopy func(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) mockDeleteBucketCors func(ctx context.Context, params *s3.DeleteBucketCorsInput, optFns ...func(*s3.Options)) (*s3.DeleteBucketCorsOutput, error) mockGetBucketCors func(ctx context.Context, params *s3.GetBucketCorsInput, optFns ...func(*s3.Options)) (*s3.GetBucketCorsOutput, error) mockPutBucketCors func(ctx context.Context, params *s3.PutBucketCorsInput, optFns ...func(*s3.Options)) (*s3.PutBucketCorsOutput, error) @@ -38,6 +40,10 @@ type MockS3API struct { mockAbortMultipartUpload func(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) } +func (m *MockS3API) HeadObject(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return m.mockHeadObject(ctx, input, optFns...) +} + func (m *MockS3API) GetObject(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { return m.mockGetObject(ctx, input, optFns...) } @@ -63,6 +69,10 @@ func (m *MockS3API) CopyObject(ctx context.Context, params *s3.CopyObjectInput, return m.mockCopyObject(ctx, params, optFns...) } +func (m *MockS3API) UploadPartCopy(ctx context.Context, params *s3.UploadPartCopyInput, optFns ...func(*s3.Options)) (*s3.UploadPartCopyOutput, error) { + return m.mockUploadPartCopy(ctx, params, optFns...) +} + func (m *MockS3API) DeleteBucketCors(ctx context.Context, params *s3.DeleteBucketCorsInput, optFns ...func(*s3.Options)) (*s3.DeleteBucketCorsOutput, error) { return m.mockDeleteBucketCors(ctx, params, optFns...) } From 3184eafd33927382f08cd158d64682166e3bddda Mon Sep 17 00:00:00 2001 From: Nicolas Hedger Date: Tue, 24 Mar 2026 22:49:15 +0100 Subject: [PATCH 2/3] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 20bc9e6c2..129a36aeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - sks: add `rotate-karpenter-credentials` command #797 - sks: add `active-nodepool-templates` command #797 - new command `exo ai deployment instance-type` that allows showing what GPU is usable in which zone #809 +- new command `exo storage move` to move objects within or across buckets without downloading them locally ### Bug fixes From d9e5985f949e573bcb2ad188ae72ec2fc2ecd29d Mon Sep 17 00:00:00 2001 From: Nicolas Hedger Date: Tue, 24 Mar 2026 22:50:05 +0100 Subject: [PATCH 3/3] add PR number --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 129a36aeb..8cd1ef767 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ - sks: add `rotate-karpenter-credentials` command #797 - sks: add `active-nodepool-templates` command #797 - new command `exo ai deployment instance-type` that allows showing what GPU is usable in which zone #809 -- new command `exo storage move` to move objects within or across buckets without downloading them locally +- new command `exo storage move` to move objects within or across buckets without downloading them locally #814 ### Bug fixes