From fc46df7b1c8bbf8ecfe273dbdd881096e35cc40e Mon Sep 17 00:00:00 2001 From: Marius Kleidl Date: Tue, 17 Sep 2024 22:35:33 +0200 Subject: [PATCH] s3store: Allow bucket customization via pre-create hook --- pkg/s3store/s3store.go | 88 +++++++++++++++++++++++++------------ pkg/s3store/s3store_test.go | 39 +++++++++------- 2 files changed, 83 insertions(+), 44 deletions(-) diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index c4c6ab6f..d2d2c7e7 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -284,6 +284,9 @@ type s3Upload struct { // objectKey is the object key under which we save the final file. Might be customized by pre-create hook. // Its value already includes ObjectPrefix, if that's set. objectKey string + // objectBucket is the bucket in which the final file is saved. Might be customized by pre-create hook. + // The .info and .part files are always stored in `store.Bucket` + objectBucket string // multipartId is the ID given by S3 to us for the multipart upload. multipartId string @@ -328,6 +331,15 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand } objectKey = store.keyWithPrefix(objectKey) + // Destination bucket can also be customized by pre-create. It's used for the multipart + // upload itself, while the info and part files always go into the bucket configured in the store. + var objectBucket string + if info.Storage != nil && info.Storage["Bucket"] != "" { + objectBucket = info.Storage["Bucket"] + } else { + objectBucket = store.Bucket + } + // Convert meta data into a map of pointers for AWS Go SDK, sigh. metadata := make(map[string]string, len(info.MetaData)) for key, value := range info.MetaData { @@ -337,7 +349,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand // Create the actual multipart upload t := time.Now() res, err := store.Service.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(objectBucket), Key: aws.String(objectKey), Metadata: metadata, }) @@ -349,12 +361,12 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand multipartId := *res.UploadId info.Storage = map[string]string{ "Type": "s3store", - "Bucket": store.Bucket, + "Bucket": objectBucket, "Key": objectKey, "MultipartUpload": multipartId, } - upload := &s3Upload{uploadId, objectKey, multipartId, &store, info, []*s3Part{}, 0} + upload := &s3Upload{uploadId, objectKey, objectBucket, multipartId, &store, info, []*s3Part{}, 0} err = upload.writeInfo(ctx, info) if err != nil { return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err) @@ -364,7 +376,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand } func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Upload, error) { - objectKey, multipartId, info, parts, incompletePartSize, err := store.fetchInfo(ctx, uploadId, nil) + objectKey, objectBucket, multipartId, info, parts, incompletePartSize, err := store.fetchInfo(ctx, uploadId, nil) if err != nil { // Currently, s3store stores the multipart ID in the info object. However, in the past the // multipart ID was part of the upload ID, which consisted of the object ID and multipart ID @@ -376,7 +388,7 @@ func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Up if errors.Is(err, handler.ErrNotFound) && lastPlusIndex != -1 { fallbackMultipartId := uploadId[lastPlusIndex+1:] uploadId = uploadId[:lastPlusIndex] - objectKey, multipartId, info, parts, incompletePartSize, err = store.fetchInfo(ctx, uploadId, &fallbackMultipartId) + objectKey, objectBucket, multipartId, info, parts, incompletePartSize, err = store.fetchInfo(ctx, uploadId, &fallbackMultipartId) if err != nil { return nil, err } @@ -385,7 +397,7 @@ func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Up } } - return &s3Upload{uploadId, objectKey, multipartId, &store, info, parts, incompletePartSize}, nil + return &s3Upload{uploadId, objectKey, objectBucket, multipartId, &store, info, parts, incompletePartSize}, nil } func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { @@ -522,7 +534,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re t := time.Now() uploadPartInput := &s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(part.number), @@ -629,7 +641,7 @@ func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err return upload.info, nil } -func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMultipartId *string) (objectKey string, multipartId string, info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { +func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMultipartId *string) (objectKey string, objectBucket string, multipartId string, info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { // Start by fetching the file info stored in a separate object. t := time.Now() res, infoErr := store.Service.GetObject(ctx, &s3.GetObjectInput{ @@ -663,6 +675,12 @@ func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMul objectKey = store.keyWithPrefix(uploadId) } + if info.Storage != nil && info.Storage["Bucket"] != "" { + objectBucket = info.Storage["Bucket"] + } else { + objectBucket = store.Bucket + } + if info.Storage != nil && info.Storage["MultipartUpload"] != "" { multipartId = info.Storage["MultipartUpload"] } else if fallbackMultipartId != nil { @@ -685,7 +703,7 @@ func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMul defer wg.Done() // Get uploaded parts and their offset - parts, partsErr = store.listAllParts(ctx, objectKey, multipartId) + parts, partsErr = store.listAllParts(ctx, objectKey, objectBucket, multipartId) }() go func() { @@ -746,7 +764,7 @@ func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) { store := upload.store res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), }) if err != nil { @@ -762,7 +780,7 @@ func (upload s3Upload) Terminate(ctx context.Context) error { store := upload.store var wg sync.WaitGroup - wg.Add(2) + wg.Add(3) errs := make([]error, 0, 4) go func() { @@ -770,7 +788,7 @@ func (upload s3Upload) Terminate(ctx context.Context) error { // Abort the multipart upload _, err := store.Service.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), }) @@ -782,14 +800,29 @@ func (upload s3Upload) Terminate(ctx context.Context) error { go func() { defer wg.Done() - // Delete the info and content files + // Delete the object itself + _, err := store.Service.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(upload.objectBucket), + Key: aws.String(upload.objectKey), + }) + + if isAwsError[*types.NoSuchKey](err) || isAwsErrorCode(err, "NoSuchKey") { + err = nil + } + + if err != nil { + errs = append(errs, err) + } + }() + + go func() { + defer wg.Done() + + // Delete the info and part files res, err := store.Service.DeleteObjects(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(store.Bucket), Delete: &types.Delete{ Objects: []types.ObjectIdentifier{ - { - Key: aws.String(upload.objectKey), - }, { Key: store.metadataKeyWithPrefix(upload.uploadId + ".part"), }, @@ -830,7 +863,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { // upload. So if the tus upload has a size of 0, we create an empty part // and use that for completing the multipart upload. res, err := store.Service.UploadPart(ctx, &s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(1), @@ -863,7 +896,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { t := time.Now() _, err := store.Service.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), MultipartUpload: &types.CompletedMultipartUpload{ @@ -914,7 +947,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads partialS3Upload := partialUpload.(*s3Upload) res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(partialS3Upload.objectBucket), Key: aws.String(partialS3Upload.objectKey), }) if err != nil { @@ -932,7 +965,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads // Upload the entire file to S3 _, err = store.Service.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), Body: file, }) @@ -946,7 +979,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads // the request. go func() { store.Service.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), }) @@ -976,15 +1009,16 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads etag: "", }) - go func(partNumber int32, sourceObject string) { + source := partialS3Upload.objectBucket + "/" + partialS3Upload.objectKey + go func(partNumber int32, source string) { defer wg.Done() res, err := store.Service.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(upload.objectBucket), Key: aws.String(upload.objectKey), UploadId: aws.String(upload.multipartId), PartNumber: aws.Int32(partNumber), - CopySource: aws.String(store.Bucket + "/" + sourceObject), + CopySource: aws.String(source), }) if err != nil { errs = append(errs, err) @@ -992,7 +1026,7 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads } upload.parts[partNumber-1].etag = *res.CopyPartResult.ETag - }(partNumber, partialS3Upload.objectKey) + }(partNumber, source) } wg.Wait() @@ -1015,14 +1049,14 @@ func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error { return upload.writeInfo(ctx, info) } -func (store S3Store) listAllParts(ctx context.Context, objectKey string, multipartId string) (parts []*s3Part, err error) { +func (store S3Store) listAllParts(ctx context.Context, objectKey string, objectBucket string, multipartId string) (parts []*s3Part, err error) { var partMarker *string for { t := time.Now() // Get uploaded parts listPtr, err := store.Service.ListParts(ctx, &s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), + Bucket: aws.String(objectBucket), Key: aws.String(objectKey), UploadId: aws.String(multipartId), PartNumberMarker: partMarker, diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 6b8ace85..76a97a8a 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -986,13 +986,15 @@ func TestTerminate(t *testing.T) { UploadId: aws.String("multipartId"), }).Return(nil, nil) + s3obj.EXPECT().DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + }).Return(&s3.DeleteObjectOutput{}, nil) + s3obj.EXPECT().DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String("bucket"), Delete: &types.Delete{ Objects: []types.ObjectIdentifier{ - { - Key: aws.String("uploadId"), - }, { Key: aws.String("uploadId.part"), }, @@ -1057,13 +1059,15 @@ func TestTerminateWithErrors(t *testing.T) { UploadId: aws.String("multipartId"), }).Return(nil, &types.NoSuchUpload{}) + s3obj.EXPECT().DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + }).Return(nil, &types.NoSuchKey{}) + s3obj.EXPECT().DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String("bucket"), Delete: &types.Delete{ Objects: []types.ObjectIdentifier{ - { - Key: aws.String("uploadId"), - }, { Key: aws.String("uploadId.part"), }, @@ -1694,9 +1698,9 @@ func TestMetadataObjectPrefix(t *testing.T) { assert.Nil(err) } -// TestCustomKey asserts an entire upload flow when ObjectPrefix +// TestCustomKeyAndBucket asserts an entire upload flow when ObjectPrefix // and MetadataObjectPrefix are set, including creating, resuming and finishing an upload. -func TestCustomKey(t *testing.T) { +func TestCustomKeyAndBucket(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -1711,7 +1715,7 @@ func TestCustomKey(t *testing.T) { // For NewUpload s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("my/uploaded/files/custom/key"), Metadata: map[string]string{}, }).Return(&s3.CreateMultipartUploadOutput{ @@ -1720,13 +1724,13 @@ func TestCustomKey(t *testing.T) { s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("my/uploaded/files/uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`)), - ContentLength: aws.Int64(247), + Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`)), + ContentLength: aws.Int64(254), }) // For WriteChunk s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("my/uploaded/files/custom/key"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(1), @@ -1740,10 +1744,10 @@ func TestCustomKey(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("my/uploaded/files/uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`))), }, nil) s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("my/uploaded/files/custom/key"), UploadId: aws.String("multipartId"), PartNumberMarker: nil, @@ -1764,7 +1768,7 @@ func TestCustomKey(t *testing.T) { // For WriteChunk s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("my/uploaded/files/custom/key"), UploadId: aws.String("multipartId"), PartNumber: aws.Int32(2), @@ -1775,7 +1779,7 @@ func TestCustomKey(t *testing.T) { // For FinishUpload s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{ - Bucket: aws.String("bucket"), + Bucket: aws.String("custom-bucket"), Key: aws.String("my/uploaded/files/custom/key"), UploadId: aws.String("multipartId"), MultipartUpload: &types.CompletedMultipartUpload{ @@ -1797,7 +1801,8 @@ func TestCustomKey(t *testing.T) { Size: 11, MetaData: map[string]string{}, Storage: map[string]string{ - "Key": "custom/key", + "Key": "custom/key", + "Bucket": "custom-bucket", }, }