Skip to content

Commit

Permalink
feat: Adds 'x-amz-mp-object-size' request header support for Complete…
Browse files Browse the repository at this point in the history
…MultipartUpload
  • Loading branch information
niksis02 committed Feb 19, 2025
1 parent ff0cf29 commit 64a72a2
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 7 deletions.
13 changes: 6 additions & 7 deletions backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"math"
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1207,13 +1206,8 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
uncommittedBlocks[int32(ptNumber)] = el
}

slices.SortFunc(blockList.UncommittedBlocks, func(a *blockblob.Block, b *blockblob.Block) int {
ptNumber, _ := decodeBlockId(*a.Name)
nextPtNumber, _ := decodeBlockId(*b.Name)
return ptNumber - nextPtNumber
})

// The initialie values is the lower limit of partNumber: 0
var totalSize int64
var partNumber int32
last := len(blockList.UncommittedBlocks) - 1
for i, part := range input.MultipartUpload.Parts {
Expand Down Expand Up @@ -1241,9 +1235,14 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
if i < last && *block.Size < backend.MinPartSize {
return nil, s3err.GetAPIError(s3err.ErrEntityTooSmall)
}
totalSize += *block.Size
blockIds = append(blockIds, *block.Name)
}

if input.MpuObjectSize != nil && totalSize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalSize, *input.MpuObjectSize)
}

opts := &blockblob.CommitBlockListOptions{
Metadata: props.Metadata,
Tags: parseAzTags(tags.BlobTagSet),
Expand Down
4 changes: 4 additions & 0 deletions backend/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,10 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
}
}

if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return nil, s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}

var hashRdr *utils.HashReader
var compositeChecksumRdr *utils.CompositeChecksumReader
switch checksums.Type {
Expand Down
31 changes: 31 additions & 0 deletions s3api/controllers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3439,6 +3439,36 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
})
}

var mpuObjectSize *int64
mpuObjSizeHdr := ctx.Get("X-Amz-Mp-Object-Size")
if mpuObjSizeHdr != "" {
val, err := strconv.ParseInt(mpuObjSizeHdr, 10, 64)
//TODO: Not sure if invalid request should be returned
if err != nil {
return SendXMLResponse(ctx, nil,
s3err.GetAPIError(s3err.ErrInvalidRequest),
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
Action: metrics.ActionCompleteMultipartUpload,
BucketOwner: parsedAcl.Owner,
})
}

if val < 0 {
return SendXMLResponse(ctx, nil,
s3err.GetInvalidMpObjectSizeErr(val),
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
Action: metrics.ActionCompleteMultipartUpload,
BucketOwner: parsedAcl.Owner,
})
}

mpuObjectSize = &val
}

err = auth.VerifyAccess(ctx.Context(), c.be,
auth.AccessOptions{
Readonly: c.readonly,
Expand Down Expand Up @@ -3497,6 +3527,7 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
MultipartUpload: &types.CompletedMultipartUpload{
Parts: data.Parts,
},
MpuObjectSize: mpuObjectSize,
ChecksumCRC32: backend.GetPtrFromString(checksums[types.ChecksumAlgorithmCrc32]),
ChecksumCRC32C: backend.GetPtrFromString(checksums[types.ChecksumAlgorithmCrc32c]),
ChecksumSHA1: backend.GetPtrFromString(checksums[types.ChecksumAlgorithmSha1]),
Expand Down
16 changes: 16 additions & 0 deletions s3err/s3err.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,19 @@ func GetChecksumTypeMismatchOnMpErr(t types.ChecksumType) APIError {
HTTPStatusCode: http.StatusBadRequest,
}
}

func GetIncorrectMpObjectSizeErr(expected, actual int64) APIError {
return APIError{
Code: "InvalidRequest",
Description: fmt.Sprintf("The provided 'x-amz-mp-object-size' header value %v does not match what was computed: %v", expected, actual),
HTTPStatusCode: http.StatusBadRequest,
}
}

func GetInvalidMpObjectSizeErr(val int64) APIError {
return APIError{
Code: "InvalidRequest",
Description: fmt.Sprintf("Value for x-amz-mp-object-size header is less than zero: '%v'", val),
HTTPStatusCode: http.StatusBadRequest,
}
}
2 changes: 2 additions & 0 deletions tests/integration/group-tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func TestCompleteMultipartUpload(s *S3Conf) {
CompleteMultipartUpload_small_upload_size(s)
CompleteMultipartUpload_empty_parts(s)
CompleteMultipartUpload_incorrect_parts_order(s)
CompleteMultipartUpload_mpu_object_size(s)
//TODO: remove the condition after implementing checksums in azure
if !s.azureTests {
CompleteMultipartUpload_invalid_checksum_type(s)
Expand Down Expand Up @@ -976,6 +977,7 @@ func GetIntTests() IntTests {
"CompleteMultipartUpload_small_upload_size": CompleteMultipartUpload_small_upload_size,
"CompleteMultipartUpload_empty_parts": CompleteMultipartUpload_empty_parts,
"CompleteMultipartUpload_incorrect_parts_order": CompleteMultipartUpload_incorrect_parts_order,
"CompleteMultipartUpload_mpu_object_size": CompleteMultipartUpload_mpu_object_size,
"CompleteMultipartUpload_invalid_checksum_type": CompleteMultipartUpload_invalid_checksum_type,
"CompleteMultipartUpload_invalid_checksum_part": CompleteMultipartUpload_invalid_checksum_part,
"CompleteMultipartUpload_multiple_checksum_part": CompleteMultipartUpload_multiple_checksum_part,
Expand Down
83 changes: 83 additions & 0 deletions tests/integration/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -9521,6 +9521,89 @@ func CompleteMultipartUpload_incorrect_parts_order(s *S3Conf) error {
})
}

func CompleteMultipartUpload_mpu_object_size(s *S3Conf) error {
testName := "CompleteMultipartUpload_mpu_object_size"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
obj := "my-obj"
mp, err := createMp(s3client, bucket, obj)
if err != nil {
return err
}

mpuSize := int64(23 * 1024 * 1024) // 23 mib
parts, _, err := uploadParts(s3client, mpuSize, 4, bucket, obj, *mp.UploadId)
if err != nil {
return err
}

compParts := []types.CompletedPart{}
for _, el := range parts {
compParts = append(compParts, types.CompletedPart{
ETag: el.ETag,
PartNumber: el.PartNumber,
})
}

invMpuSize := int64(-1) // invalid MpuObjectSize
// Initially provide invalid MpuObjectSize: -3
input := &s3.CompleteMultipartUploadInput{
Bucket: &bucket,
Key: &obj,
UploadId: mp.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: compParts,
},
MpuObjectSize: &invMpuSize,
}

ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, input)
cancel()
if err := checkApiErr(err, s3err.GetInvalidMpObjectSizeErr(invMpuSize)); err != nil {
return err
}

incorMpuSize := int64(213123) // incorrect object size
input.MpuObjectSize = &incorMpuSize

ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, input)
cancel()
if err := checkApiErr(err, s3err.GetIncorrectMpObjectSizeErr(mpuSize, incorMpuSize)); err != nil {
return err
}

// Correct value for MpuObjectSize
input.MpuObjectSize = &mpuSize
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
_, err = s3client.CompleteMultipartUpload(ctx, input)
cancel()
if err != nil {
return err
}

// Make sure the object has been uploaded with proper size
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
res, err := s3client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &bucket,
Key: &obj,
})
cancel()
if err != nil {
return err
}

if res.ContentLength == nil {
return fmt.Errorf("expected non nil Content-Length")
}
if *res.ContentLength != mpuSize {
return fmt.Errorf("expected the uploaded object size to be %v, instead got %v", mpuSize, *res.ContentLength)
}

return nil
})
}

func CompleteMultipartUpload_invalid_part_number(s *S3Conf) error {
testName := "CompleteMultipartUpload_invalid_part_number"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
Expand Down

0 comments on commit 64a72a2

Please sign in to comment.