Skip to content

Commit

Permalink
Multipart Upload: Remove the redirect to specific Artifactory node (j…
Browse files Browse the repository at this point in the history
  • Loading branch information
idand1741 authored Jul 28, 2024
1 parent 21daf59 commit a0d079c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
19 changes: 8 additions & 11 deletions artifactory/services/utils/multipartupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ const (
aborted completionStatus = "ABORTED"

// API constants
uploadsApi = "/api/v1/uploads/"
routeToHeader = "X-JFrog-Route-To"
artifactoryNodeId = "X-Artifactory-Node-Id"
uploadsApi = "/api/v1/uploads/"

// Sizes and limits constants
MaxMultipartUploadFileSize = SizeTiB * 5
Expand Down Expand Up @@ -305,18 +303,17 @@ type urlPartResponse struct {
}

func (mu *MultipartUpload) completeAndPollForStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (err error) {
nodeId, err := mu.completeMultipartUpload(logMsgPrefix, sha1, multipartUploadClient)
err = mu.completeMultipartUpload(logMsgPrefix, sha1, multipartUploadClient)
if err != nil {
return
}

err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, nodeId, multipartUploadClient, progressReader)
err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, multipartUploadClient, progressReader)
return
}

func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1, nodeId string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) error {
func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) error {
multipartUploadClientWithNodeId := multipartUploadClient.Clone()
multipartUploadClientWithNodeId.Headers = map[string]string{routeToHeader: nodeId}

lastMergeLog := time.Now()
pollingExecutor := &utils.RetryExecutor{
Expand Down Expand Up @@ -360,22 +357,22 @@ func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionA
return pollingExecutor.Execute()
}

func (mu *MultipartUpload) completeMultipartUpload(logMsgPrefix, sha1 string, multipartUploadClient *httputils.HttpClientDetails) (string, error) {
func (mu *MultipartUpload) completeMultipartUpload(logMsgPrefix, sha1 string, multipartUploadClient *httputils.HttpClientDetails) error {
url := fmt.Sprintf("%s%scomplete?sha1=%s", mu.artifactoryUrl, uploadsApi, sha1)
resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClient, logMsgPrefix)
if err != nil {
return "", err
return err
}
log.Debug("Artifactory response:", string(body), resp.Status)
return resp.Header.Get(artifactoryNodeId), errorutils.CheckResponseStatusWithBody(resp, body, http.StatusAccepted)
return errorutils.CheckResponseStatusWithBody(resp, body, http.StatusAccepted)
}

func (mu *MultipartUpload) status(logMsgPrefix string, multipartUploadClientWithNodeId *httputils.HttpClientDetails) (status statusResponse, err error) {
url := fmt.Sprintf("%s%sstatus", mu.artifactoryUrl, uploadsApi)
resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClientWithNodeId, logMsgPrefix)
// If the Artifactory node returns a "Service unavailable" error (status 503), attempt to retry the upload completion process on a different node.
if resp != nil && resp.StatusCode == http.StatusServiceUnavailable {
unavailableNodeErr := fmt.Sprintf(logMsgPrefix + fmt.Sprintf("The Artifactory node ID '%s' is unavailable.", multipartUploadClientWithNodeId.Headers[routeToHeader]))
unavailableNodeErr := fmt.Sprintf(logMsgPrefix + "Artifactory is unavailable.")
return statusResponse{Status: retryableError, Error: unavailableNodeErr}, nil
}
if err != nil {
Expand Down
17 changes: 4 additions & 13 deletions artifactory/services/utils/multipartupload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ func TestCompleteMultipartUpload(t *testing.T) {
assert.Equal(t, "/api/v1/uploads/complete", r.URL.Path)
assert.Equal(t, fmt.Sprintf("sha1=%s", sha1), r.URL.RawQuery)

// Add the "X-Artifactory-Node-Id" header to the response
w.Header().Add(artifactoryNodeId, nodeId)

// Send response 202 Accepted
w.WriteHeader(http.StatusAccepted)
})
Expand All @@ -198,9 +195,8 @@ func TestCompleteMultipartUpload(t *testing.T) {
defer cleanUp()

// Execute completeMultipartUpload
actualNodeId, err := multipartUpload.completeMultipartUpload("", sha1, &httputils.HttpClientDetails{})
err := multipartUpload.completeMultipartUpload("", sha1, &httputils.HttpClientDetails{})
assert.NoError(t, err)
assert.Equal(t, nodeId, actualNodeId)
}

func TestStatus(t *testing.T) {
Expand All @@ -211,9 +207,6 @@ func TestStatus(t *testing.T) {
// Check URL
assert.Equal(t, "/api/v1/uploads/status", r.URL.Path)

// Check "X-JFrog-Route-To" header
assert.Equal(t, nodeId, r.Header.Get(routeToHeader))

// Send response 200 OK
w.WriteHeader(http.StatusOK)
response, err := json.Marshal(statusResponse{Status: finished, Progress: utils.Pointer(100)})
Expand All @@ -227,8 +220,7 @@ func TestStatus(t *testing.T) {
defer cleanUp()

// Execute status
clientDetails := &httputils.HttpClientDetails{Headers: map[string]string{routeToHeader: nodeId}}
status, err := multipartUpload.status("", clientDetails)
status, err := multipartUpload.status("", &httputils.HttpClientDetails{})
assert.NoError(t, err)
assert.Equal(t, statusResponse{Status: finished, Progress: utils.Pointer(100)}, status)
}
Expand All @@ -252,10 +244,9 @@ func TestStatusServiceUnavailable(t *testing.T) {
defer cleanUp()

// Execute status
clientDetails := &httputils.HttpClientDetails{Headers: map[string]string{routeToHeader: nodeId}}
status, err := multipartUpload.status("", clientDetails)
status, err := multipartUpload.status("", &httputils.HttpClientDetails{})
assert.NoError(t, err)
assert.Equal(t, statusResponse{Status: retryableError, Error: "The Artifactory node ID 'nodeId' is unavailable."}, status)
assert.Equal(t, statusResponse{Status: retryableError, Error: "Artifactory is unavailable."}, status)
}

func TestAbort(t *testing.T) {
Expand Down

0 comments on commit a0d079c

Please sign in to comment.