diff --git a/AWSS3/AWSS3TransferUtility.m b/AWSS3/AWSS3TransferUtility.m index 9981ecdd53a..5c69bfd4604 100644 --- a/AWSS3/AWSS3TransferUtility.m +++ b/AWSS3/AWSS3TransferUtility.m @@ -2379,7 +2379,6 @@ - (void)completeTask:(AWSS3TransferUtilityTask *)task removeCompletedTask:(BOOL) [self.completedTaskDictionary removeObjectForKey:task.transferID]; [self unregisterTaskIdentifier:task.taskIdentifier]; } - } - (void)cleanupForMultiPartUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)task { diff --git a/AWSS3/AWSS3TransferUtilityTasks.m b/AWSS3/AWSS3TransferUtilityTasks.m index 2d095bf83ab..77486c117d1 100644 --- a/AWSS3/AWSS3TransferUtilityTasks.m +++ b/AWSS3/AWSS3TransferUtilityTasks.m @@ -129,12 +129,15 @@ - (instancetype)init { _waitingPartsDictionary = [NSMutableDictionary new]; _inProgressPartsDictionary = [NSMutableDictionary new]; _completedPartsSet = [NSMutableSet new]; + _serialQueue = dispatch_queue_create("com.amazonaws.AWSS3.MultipartUploadTask", DISPATCH_QUEUE_SERIAL); } return self; } - (BOOL)isUnderConcurrencyLimit { - return self.inProgressPartsDictionary.count < [self.transferUtility.transferUtilityConfiguration.multiPartConcurrencyLimit integerValue]; + NSUInteger dynamicLimit = NSProcessInfo.processInfo.activeProcessorCount * 2; + NSUInteger configuredLimit = self.transferUtility.transferUtilityConfiguration.multiPartConcurrencyLimit.integerValue; + return self.inProgressPartsDictionary.count < MAX(dynamicLimit, configuredLimit); } - (BOOL)hasWaitingTasks { @@ -167,13 +170,11 @@ - (AWSS3TransferUtilityMultiPartUploadExpression *)expression { - (void)cancel { self.cancelled = YES; self.status = AWSS3TransferUtilityTransferStatusCancelled; - for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key]; + for (AWSS3TransferUtilityUploadSubTask *subTask in self.inProgressTasks) { [subTask.sessionTask cancel]; } - for (NSNumber *key in [self.waitingPartsDictionary allKeys]) { - AWSS3TransferUtilityUploadSubTask *subTask = [self.waitingPartsDictionary objectForKey:key]; + for (AWSS3TransferUtilityUploadSubTask *subTask in self.waitingTasks) { [subTask.sessionTask cancel]; } @@ -190,37 +191,13 @@ - (void)resume { } NSCAssert(self.transferUtility != nil, @"Transfer Utility must be provided."); - -// for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) { -// AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key]; -// subTask.status = AWSS3TransferUtilityTransferStatusInProgress; -// [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID -// partNumber:subTask.partNumber -// taskIdentifier:subTask.taskIdentifier -// eTag:subTask.eTag -// status:subTask.status -// retry_count:self.retryCount -// databaseQueue:self.databaseQueue]; -// [subTask.sessionTask resume]; -// } -// -// self.status = AWSS3TransferUtilityTransferStatusInProgress; -// //Update the Master Record -// [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID -// partNumber:@0 -// taskIdentifier:0 -// eTag:@"" -// status:self.status -// retry_count:self.retryCount -// databaseQueue:self.databaseQueue]; // Change status from paused to waiting for (AWSS3TransferUtilityUploadSubTask * nextSubTask in self.waitingTasks) { nextSubTask.status = AWSS3TransferUtilityTransferStatusWaiting; } - [self moveWaitingTasksToInProgress]; - [self completeIfDone]; + [self moveWaitingTasksToInProgress:YES]; } - (void)suspend { @@ -231,30 +208,6 @@ - (void)suspend { NSCAssert(self.transferUtility != nil, @"Transfer Utility must be provided."); -// for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) { -// // all in progress tasks should be cancelled and a new subtask should replace it which is -// // put in the waiting dictionary and set with that status with a URLSessionTask which -// // has not been started. -// -// // then resuming should start uploading a number of parts up to the concurrency limit. -// -// AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key]; -// if (!subTask) { -// continue; -// } -// [subTask.sessionTask suspend]; -// subTask.status = AWSS3TransferUtilityTransferStatusPaused; -// -// [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID -// partNumber:subTask.partNumber -// taskIdentifier:subTask.taskIdentifier -// eTag:subTask.eTag -// status:subTask.status -// retry_count:self.retryCount -// databaseQueue:self.databaseQueue]; -// } -// - // Cancel session task for all subtasks which are in progress and set status to paused for (AWSS3TransferUtilityUploadSubTask *inProgressSubTask in self.inProgressTasks) { // Note: This can happen due to lack of thread-safety @@ -280,8 +233,7 @@ - (void)suspend { NSError *error = [self.transferUtility createUploadSubTask:self subTask:subTask - startTransfer:NO - internalDictionaryToAddSubTaskTo:self.waitingPartsDictionary]; + startTransfer:NO]; if (error) { AWSDDLogError(@"Error creating AWSS3TransferUtilityUploadSubTask [%@]", error); @@ -294,7 +246,7 @@ - (void)suspend { [AWSS3TransferUtilityDatabaseHelper insertMultiPartUploadRequestSubTaskInDB:self subTask:subTask databaseQueue:self.databaseQueue]; } } - + self.status = AWSS3TransferUtilityTransferStatusPaused; //Update the Master Record [AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID @@ -313,13 +265,14 @@ - (void)addUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask { self.waitingPartsDictionary[@(subTask.taskIdentifier)] = subTask; } else if (subTask.status == AWSS3TransferUtilityTransferStatusInProgress) { self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = subTask; - } else if (subTask.status == AWSS3TransferUtilityTransferStatusCompleted) { [self.completedPartsSet addObject:subTask]; } else { AWSDDLogDebug(@"Sub Task status not supported: %lu", subTask.status); NSCAssert(NO, @"Status not supported"); } + + [self completeIfDone]; } - (void)removeWaitingUploadSubTask:(NSUInteger)taskIdentifier { @@ -344,13 +297,15 @@ - (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask - (void)moveWaitingTaskToInProgress:(AWSS3TransferUtilityUploadSubTask *)subTask startTransfer:(BOOL)startTransfer { if ([self.waitingTasks containsObject:subTask]) { - //Add to inProgress list + // Add to inProgress list self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = subTask; - //Remove it from the waitingList + // Remove it from the waitingList self.waitingPartsDictionary[@(subTask.taskIdentifier)] = nil; AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(subTask.taskIdentifier), self.uploadID); if (startTransfer) { + AWSDDLogDebug(@"Starting subTask %@", @(subTask.taskIdentifier)); + NSCAssert(subTask.sessionTask.state == NSURLSessionTaskStateSuspended, @"State should be suspended before resuming."); [subTask.sessionTask resume]; } } @@ -381,20 +336,24 @@ - (void)moveWaitingTasksToInProgress:(BOOL)startTransfer { // move parts from waiting to in progress if under the concurrency limit while (self.isUnderConcurrencyLimit && self.hasWaitingTasks) { //Get a part from the waitingList - AWSS3TransferUtilityUploadSubTask *nextSubTask = [[self.waitingPartsDictionary allValues] objectAtIndex:0]; + AWSS3TransferUtilityUploadSubTask *nextSubTask = [self.waitingTasks objectAtIndex:0]; //Add to inProgress list self.inProgressPartsDictionary[@(nextSubTask.taskIdentifier)] = nextSubTask; + nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress; //Remove it from the waitingList - [self.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)]; + self.waitingPartsDictionary[@(nextSubTask.taskIdentifier)] = nil; AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), self.uploadID); + if (startTransfer) { AWSDDLogDebug(@"Starting subTask %@", @(nextSubTask.taskIdentifier)); + NSCAssert(nextSubTask.sessionTask.state == NSURLSessionTaskStateSuspended, @"State should be suspended before resuming."); [nextSubTask.sessionTask resume]; } - nextSubTask.status = AWSS3TransferUtilityTransferStatusInProgress; } + + [self completeIfDone]; } - (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask @@ -421,45 +380,49 @@ - (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask } - (void)completeIfDone { - // Complete multipart upload if in progress and waiting tasks are done - if (!self.isDone) { - return; - } + dispatch_async(self.serialQueue, ^{ + // Complete multipart upload if in progress and waiting tasks are done + if (!self.isDone && self.status != AWSS3TransferUtilityTransferStatusCompleted) { + return; + } - //If there are no more inProgress parts, then we are done. + //If there are no more inProgress parts, then we are done. - //Validate that all the content has been uploaded. - int64_t totalBytesSent = 0; - for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedPartsSet) { - totalBytesSent += aSubTask.totalBytesExpectedToSend; - } + //Validate that all the content has been uploaded. + int64_t totalBytesSent = 0; + for (AWSS3TransferUtilityUploadSubTask *aSubTask in self.completedTasks) { + totalBytesSent += aSubTask.totalBytesExpectedToSend; + } - if (totalBytesSent != self.contentLength.longLongValue ) { - NSString *errorMessage = [NSString stringWithFormat:@"Expected to send [%@], but sent [%@] and there are no remaining parts. Failing transfer ", - self.contentLength, @(totalBytesSent)]; - AWSDDLogDebug(@"%@", errorMessage); - NSDictionary *userInfo = [NSDictionary dictionaryWithObject:errorMessage - forKey:@"Message"]; + if (totalBytesSent != self.contentLength.longLongValue ) { + NSString *errorMessage = [NSString stringWithFormat:@"Expected to send [%@], but sent [%@] and there are no remaining parts. Failing transfer ", + self.contentLength, @(totalBytesSent)]; + AWSDDLogDebug(@"%@", errorMessage); + NSDictionary *userInfo = [NSDictionary dictionaryWithObject:errorMessage + forKey:@"Message"]; - self.error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain - code:AWSS3TransferUtilityErrorClientError - userInfo:userInfo]; + self.error = [NSError errorWithDomain:AWSS3TransferUtilityErrorDomain + code:AWSS3TransferUtilityErrorClientError + userInfo:userInfo]; - //Execute call back if provided. - [self.transferUtility completeTask:self]; + //Execute call back if provided. + [self.transferUtility completeTask:self]; - //Abort the request, so the server can clean up any partials. - [self.transferUtility callAbortMultiPartForUploadTask:self]; + //Abort the request, so the server can clean up any partials. + [self.transferUtility callAbortMultiPartForUploadTask:self]; - //clean up. - [self.transferUtility cleanupForMultiPartUploadTask:self]; - return; - } + //clean up. + [self.transferUtility cleanupForMultiPartUploadTask:self]; + return; + } + + AWSDDLogDebug(@"There are %lu waiting upload parts.", (unsigned long)self.waitingTasks.count); + AWSDDLogDebug(@"There are %lu in progress upload parts.", (unsigned long)self.inProgressTasks.count); + AWSDDLogDebug(@"There are %lu completed upload parts.", (unsigned long)self.completedTasks.count); + [self.transferUtility completeMultiPartForUploadTask:self]; + self.status = AWSS3TransferUtilityTransferStatusCompleted; + }); - AWSDDLogDebug(@"There are %lu waiting upload parts.", (unsigned long)self.waitingPartsDictionary.count); - AWSDDLogDebug(@"There are %lu in progress upload parts.", (unsigned long)self.inProgressPartsDictionary.count); - AWSDDLogDebug(@"There are %lu completed upload parts.", (unsigned long)self.completedPartsSet.count); - [self.transferUtility completeMultiPartForUploadTask:self]; } - (void)setCompletionHandler:(AWSS3TransferUtilityMultiPartUploadCompletionHandlerBlock)completionHandler { @@ -498,7 +461,6 @@ - (void)cancel { } - (void)setCompletionHandler:(AWSS3TransferUtilityDownloadCompletionHandlerBlock)completionHandler { - self.expression.completionHandler = completionHandler; //If the task has already completed successfully //Or the task has completed with error, complete the task diff --git a/AWSS3/AWSS3TransferUtility_private.h b/AWSS3/AWSS3TransferUtility_private.h index 2f73280cb79..a65e83b47b6 100644 --- a/AWSS3/AWSS3TransferUtility_private.h +++ b/AWSS3/AWSS3TransferUtility_private.h @@ -23,13 +23,11 @@ @interface AWSS3TransferUtility () - (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask - subTask:(AWSS3TransferUtilityUploadSubTask *)subTask -internalDictionaryToAddSubTaskTo:(NSMutableDictionary *)internalDictionaryToAddSubTaskTo; + subTask:(AWSS3TransferUtilityUploadSubTask *)subTask; - (NSError *)createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *)transferUtilityMultiPartUploadTask subTask:(AWSS3TransferUtilityUploadSubTask *)subTask - startTransfer:(BOOL)startTransfer -internalDictionaryToAddSubTaskTo:(NSMutableDictionary *)internalDictionaryToAddSubTaskTo; + startTransfer:(BOOL)startTransfer; - (void)completeTask:(AWSS3TransferUtilityTask *)task; - (AWSTask *)callAbortMultiPartForUploadTask:(AWSS3TransferUtilityMultiPartUploadTask *)uploadTask; @@ -81,6 +79,7 @@ internalDictionaryToAddSubTaskTo:(NSMutableDictionary *)internalDictionaryToAddS @property (strong, nonatomic) NSMutableDictionary *waitingPartsDictionary; @property (strong, nonatomic) NSMutableDictionary *inProgressPartsDictionary; @property (strong, nonatomic) NSMutableSet *completedPartsSet; +@property (strong, nonatomic) dispatch_queue_t serialQueue; @property int partNumber; @property NSNumber *contentLength;