Skip to content

Commit f2a689e

Browse files
authored
Fixed the issue where AWS CRT HTTP client was eagerly buffering data before the underlying CRT component was able to handle it and added error handling for the case where it failed to convert SDK request to CRT request (#6260)
1 parent 9ec6d43 commit f2a689e

File tree

5 files changed

+45
-5
lines changed

5 files changed

+45
-5
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT Async HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed potential connection leak issue when SDK failed to convert the SDK request to CRT request"
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT Async HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed the issue where AWS CRT HTTP client was eagerly buffering data before the underlying CRT component was able to handle it"
6+
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ private void executeRequest(CrtAsyncRequestContext executionContext,
8888
CompletableFuture<Void> requestFuture,
8989
HttpClientConnection crtConn,
9090
AsyncExecuteRequest asyncRequest) {
91-
HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext);
92-
HttpStreamResponseHandler crtResponseHandler =
93-
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());
94-
9591
// Submit the request on the connection
9692
try {
93+
HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext);
94+
HttpStreamResponseHandler crtResponseHandler =
95+
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());
96+
9797
crtConn.makeRequest(crtRequest, crtResponseHandler).activate();
9898
} catch (HttpException e) {
9999
Throwable toThrow = wrapWithIoExceptionIfRetryable(e);

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.http.crt.internal.request;
1717

1818
import java.nio.ByteBuffer;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
2021
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
2122
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
@@ -26,15 +27,19 @@
2627
final class CrtRequestBodyAdapter implements HttpRequestBodyStream {
2728
private final SdkHttpContentPublisher requestPublisher;
2829
private final ByteBufferStoringSubscriber requestBodySubscriber;
30+
private final AtomicBoolean subscribed = new AtomicBoolean(false);
2931

3032
CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, long readLimit) {
3133
this.requestPublisher = requestPublisher;
3234
this.requestBodySubscriber = new ByteBufferStoringSubscriber(readLimit);
33-
requestPublisher.subscribe(requestBodySubscriber);
3435
}
3536

3637
@Override
3738
public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
39+
if (subscribed.compareAndSet(false, true)) {
40+
requestPublisher.subscribe(requestBodySubscriber);
41+
}
42+
3843
return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM;
3944
}
4045

http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,29 @@ public void acquireConnectionThrowException_shouldInvokeOnError() {
105105
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
106106
}
107107

108+
@Test
109+
public void invalidRequest_requestConversionThrowError_shouldInvokeOnError() {
110+
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
111+
.crtConnPool(connectionManager)
112+
.request(AsyncExecuteRequest.builder()
113+
.responseHandler(responseHandler)
114+
.build())
115+
.build();
116+
CompletableFuture<HttpClientConnection> completableFuture = new CompletableFuture<>();
117+
118+
Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture);
119+
completableFuture.complete(httpClientConnection);
120+
121+
CompletableFuture<Void> executeFuture = requestExecutor.execute(context);
122+
123+
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
124+
Mockito.verify(responseHandler).onError(argumentCaptor.capture());
125+
126+
Exception actualException = argumentCaptor.getValue();
127+
assertThat(actualException).isInstanceOf(NullPointerException.class);
128+
assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
129+
}
130+
108131
@Test
109132
public void executeAsyncRequest_CrtRuntimeException_shouldInvokeOnError() {
110133
CrtRuntimeException exception = new CrtRuntimeException("");

0 commit comments

Comments
 (0)