Skip to content

Commit

Permalink
s3store: Allow bucket customization via pre-create hook
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Sep 17, 2024
1 parent b7c4a9b commit fc46df7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 44 deletions.
88 changes: 61 additions & 27 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ type s3Upload struct {
// objectKey is the object key under which we save the final file. Might be customized by pre-create hook.
// Its value already includes ObjectPrefix, if that's set.
objectKey string
// objectBucket is the bucket in which the final file is saved. Might be customized by pre-create hook.
// The .info and .part files are always stored in `store.Bucket`
objectBucket string
// multipartId is the ID given by S3 to us for the multipart upload.
multipartId string

Expand Down Expand Up @@ -328,6 +331,15 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
}
objectKey = store.keyWithPrefix(objectKey)

// Destination bucket can also be customized by pre-create. It's used for the multipart
// upload itself, while the info and part files always go into the bucket configured in the store.
var objectBucket string
if info.Storage != nil && info.Storage["Bucket"] != "" {
objectBucket = info.Storage["Bucket"]
} else {
objectBucket = store.Bucket
}

// Convert meta data into a map of pointers for AWS Go SDK, sigh.
metadata := make(map[string]string, len(info.MetaData))
for key, value := range info.MetaData {
Expand All @@ -337,7 +349,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
// Create the actual multipart upload
t := time.Now()
res, err := store.Service.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(objectBucket),
Key: aws.String(objectKey),
Metadata: metadata,
})
Expand All @@ -349,12 +361,12 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
multipartId := *res.UploadId
info.Storage = map[string]string{
"Type": "s3store",
"Bucket": store.Bucket,
"Bucket": objectBucket,
"Key": objectKey,
"MultipartUpload": multipartId,
}

upload := &s3Upload{uploadId, objectKey, multipartId, &store, info, []*s3Part{}, 0}
upload := &s3Upload{uploadId, objectKey, objectBucket, multipartId, &store, info, []*s3Part{}, 0}
err = upload.writeInfo(ctx, info)
if err != nil {
return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err)
Expand All @@ -364,7 +376,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
}

func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Upload, error) {
objectKey, multipartId, info, parts, incompletePartSize, err := store.fetchInfo(ctx, uploadId, nil)
objectKey, objectBucket, multipartId, info, parts, incompletePartSize, err := store.fetchInfo(ctx, uploadId, nil)
if err != nil {
// Currently, s3store stores the multipart ID in the info object. However, in the past the
// multipart ID was part of the upload ID, which consisted of the object ID and multipart ID
Expand All @@ -376,7 +388,7 @@ func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Up
if errors.Is(err, handler.ErrNotFound) && lastPlusIndex != -1 {
fallbackMultipartId := uploadId[lastPlusIndex+1:]
uploadId = uploadId[:lastPlusIndex]
objectKey, multipartId, info, parts, incompletePartSize, err = store.fetchInfo(ctx, uploadId, &fallbackMultipartId)
objectKey, objectBucket, multipartId, info, parts, incompletePartSize, err = store.fetchInfo(ctx, uploadId, &fallbackMultipartId)
if err != nil {
return nil, err
}
Expand All @@ -385,7 +397,7 @@ func (store S3Store) GetUpload(ctx context.Context, uploadId string) (handler.Up
}
}

return &s3Upload{uploadId, objectKey, multipartId, &store, info, parts, incompletePartSize}, nil
return &s3Upload{uploadId, objectKey, objectBucket, multipartId, &store, info, parts, incompletePartSize}, nil
}

func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
Expand Down Expand Up @@ -522,7 +534,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re

t := time.Now()
uploadPartInput := &s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
UploadId: aws.String(upload.multipartId),
PartNumber: aws.Int32(part.number),
Expand Down Expand Up @@ -629,7 +641,7 @@ func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err
return upload.info, nil
}

func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMultipartId *string) (objectKey string, multipartId string, info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) {
func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMultipartId *string) (objectKey string, objectBucket string, multipartId string, info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) {
// Start by fetching the file info stored in a separate object.
t := time.Now()
res, infoErr := store.Service.GetObject(ctx, &s3.GetObjectInput{
Expand Down Expand Up @@ -663,6 +675,12 @@ func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMul
objectKey = store.keyWithPrefix(uploadId)
}

if info.Storage != nil && info.Storage["Bucket"] != "" {
objectBucket = info.Storage["Bucket"]
} else {
objectBucket = store.Bucket
}

if info.Storage != nil && info.Storage["MultipartUpload"] != "" {
multipartId = info.Storage["MultipartUpload"]
} else if fallbackMultipartId != nil {
Expand All @@ -685,7 +703,7 @@ func (store S3Store) fetchInfo(ctx context.Context, uploadId string, fallbackMul
defer wg.Done()

// Get uploaded parts and their offset
parts, partsErr = store.listAllParts(ctx, objectKey, multipartId)
parts, partsErr = store.listAllParts(ctx, objectKey, objectBucket, multipartId)
}()

go func() {
Expand Down Expand Up @@ -746,7 +764,7 @@ func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) {

store := upload.store
res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
})
if err != nil {
Expand All @@ -762,15 +780,15 @@ func (upload s3Upload) Terminate(ctx context.Context) error {
store := upload.store

var wg sync.WaitGroup
wg.Add(2)
wg.Add(3)
errs := make([]error, 0, 4)

go func() {
defer wg.Done()

// Abort the multipart upload
_, err := store.Service.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
UploadId: aws.String(upload.multipartId),
})
Expand All @@ -782,14 +800,29 @@ func (upload s3Upload) Terminate(ctx context.Context) error {
go func() {
defer wg.Done()

// Delete the info and content files
// Delete the object itself
_, err := store.Service.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
})

if isAwsError[*types.NoSuchKey](err) || isAwsErrorCode(err, "NoSuchKey") {
err = nil
}

if err != nil {
errs = append(errs, err)
}
}()

go func() {
defer wg.Done()

// Delete the info and part files
res, err := store.Service.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(store.Bucket),
Delete: &types.Delete{
Objects: []types.ObjectIdentifier{
{
Key: aws.String(upload.objectKey),
},
{
Key: store.metadataKeyWithPrefix(upload.uploadId + ".part"),
},
Expand Down Expand Up @@ -830,7 +863,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
// upload. So if the tus upload has a size of 0, we create an empty part
// and use that for completing the multipart upload.
res, err := store.Service.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
UploadId: aws.String(upload.multipartId),
PartNumber: aws.Int32(1),
Expand Down Expand Up @@ -863,7 +896,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {

t := time.Now()
_, err := store.Service.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
UploadId: aws.String(upload.multipartId),
MultipartUpload: &types.CompletedMultipartUpload{
Expand Down Expand Up @@ -914,7 +947,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
partialS3Upload := partialUpload.(*s3Upload)

res, err := store.Service.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(partialS3Upload.objectBucket),
Key: aws.String(partialS3Upload.objectKey),
})
if err != nil {
Expand All @@ -932,7 +965,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads

// Upload the entire file to S3
_, err = store.Service.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
Body: file,
})
Expand All @@ -946,7 +979,7 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
// the request.
go func() {
store.Service.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
UploadId: aws.String(upload.multipartId),
})
Expand Down Expand Up @@ -976,23 +1009,24 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads
etag: "",
})

go func(partNumber int32, sourceObject string) {
source := partialS3Upload.objectBucket + "/" + partialS3Upload.objectKey
go func(partNumber int32, source string) {
defer wg.Done()

res, err := store.Service.UploadPartCopy(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(upload.objectBucket),
Key: aws.String(upload.objectKey),
UploadId: aws.String(upload.multipartId),
PartNumber: aws.Int32(partNumber),
CopySource: aws.String(store.Bucket + "/" + sourceObject),
CopySource: aws.String(source),
})
if err != nil {
errs = append(errs, err)
return
}

upload.parts[partNumber-1].etag = *res.CopyPartResult.ETag
}(partNumber, partialS3Upload.objectKey)
}(partNumber, source)
}

wg.Wait()
Expand All @@ -1015,14 +1049,14 @@ func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error {
return upload.writeInfo(ctx, info)
}

func (store S3Store) listAllParts(ctx context.Context, objectKey string, multipartId string) (parts []*s3Part, err error) {
func (store S3Store) listAllParts(ctx context.Context, objectKey string, objectBucket string, multipartId string) (parts []*s3Part, err error) {
var partMarker *string
for {
t := time.Now()

// Get uploaded parts
listPtr, err := store.Service.ListParts(ctx, &s3.ListPartsInput{
Bucket: aws.String(store.Bucket),
Bucket: aws.String(objectBucket),
Key: aws.String(objectKey),
UploadId: aws.String(multipartId),
PartNumberMarker: partMarker,
Expand Down
39 changes: 22 additions & 17 deletions pkg/s3store/s3store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,13 +986,15 @@ func TestTerminate(t *testing.T) {
UploadId: aws.String("multipartId"),
}).Return(nil, nil)

s3obj.EXPECT().DeleteObject(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
}).Return(&s3.DeleteObjectOutput{}, nil)

s3obj.EXPECT().DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String("bucket"),
Delete: &types.Delete{
Objects: []types.ObjectIdentifier{
{
Key: aws.String("uploadId"),
},
{
Key: aws.String("uploadId.part"),
},
Expand Down Expand Up @@ -1057,13 +1059,15 @@ func TestTerminateWithErrors(t *testing.T) {
UploadId: aws.String("multipartId"),
}).Return(nil, &types.NoSuchUpload{})

s3obj.EXPECT().DeleteObject(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
}).Return(nil, &types.NoSuchKey{})

s3obj.EXPECT().DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String("bucket"),
Delete: &types.Delete{
Objects: []types.ObjectIdentifier{
{
Key: aws.String("uploadId"),
},
{
Key: aws.String("uploadId.part"),
},
Expand Down Expand Up @@ -1694,9 +1698,9 @@ func TestMetadataObjectPrefix(t *testing.T) {
assert.Nil(err)
}

// TestCustomKey asserts an entire upload flow when ObjectPrefix
// TestCustomKeyAndBucket asserts an entire upload flow when ObjectPrefix
// and MetadataObjectPrefix are set, including creating, resuming and finishing an upload.
func TestCustomKey(t *testing.T) {
func TestCustomKeyAndBucket(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
Expand All @@ -1711,7 +1715,7 @@ func TestCustomKey(t *testing.T) {

// For NewUpload
s3obj.EXPECT().CreateMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{
Bucket: aws.String("bucket"),
Bucket: aws.String("custom-bucket"),
Key: aws.String("my/uploaded/files/custom/key"),
Metadata: map[string]string{},
}).Return(&s3.CreateMultipartUploadOutput{
Expand All @@ -1720,13 +1724,13 @@ func TestCustomKey(t *testing.T) {
s3obj.EXPECT().PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("my/uploaded/files/uploadId.info"),
Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`)),
ContentLength: aws.Int64(247),
Body: bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`)),
ContentLength: aws.Int64(254),
})

// For WriteChunk
s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Bucket: aws.String("custom-bucket"),
Key: aws.String("my/uploaded/files/custom/key"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int32(1),
Expand All @@ -1740,10 +1744,10 @@ func TestCustomKey(t *testing.T) {
Bucket: aws.String("bucket"),
Key: aws.String("my/uploaded/files/uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`))),
Body: io.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":11,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"custom-bucket","Key":"my/uploaded/files/custom/key","MultipartUpload":"multipartId","Type":"s3store"}}`))),
}, nil)
s3obj.EXPECT().ListParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Bucket: aws.String("custom-bucket"),
Key: aws.String("my/uploaded/files/custom/key"),
UploadId: aws.String("multipartId"),
PartNumberMarker: nil,
Expand All @@ -1764,7 +1768,7 @@ func TestCustomKey(t *testing.T) {

// For WriteChunk
s3obj.EXPECT().UploadPart(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Bucket: aws.String("custom-bucket"),
Key: aws.String("my/uploaded/files/custom/key"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int32(2),
Expand All @@ -1775,7 +1779,7 @@ func TestCustomKey(t *testing.T) {

// For FinishUpload
s3obj.EXPECT().CompleteMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"),
Bucket: aws.String("custom-bucket"),
Key: aws.String("my/uploaded/files/custom/key"),
UploadId: aws.String("multipartId"),
MultipartUpload: &types.CompletedMultipartUpload{
Expand All @@ -1797,7 +1801,8 @@ func TestCustomKey(t *testing.T) {
Size: 11,
MetaData: map[string]string{},
Storage: map[string]string{
"Key": "custom/key",
"Key": "custom/key",
"Bucket": "custom-bucket",
},
}

Expand Down

0 comments on commit fc46df7

Please sign in to comment.