Skip to content

Commit db40d81

Browse files
authored
Fixed an issue in SequentialSubscriber where future never got completed when the consumer threw error (#5945)
1 parent c6957e0 commit db40d81

File tree

5 files changed

+134
-1
lines changed

5 files changed

+134
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Updated the SDK to handle error thrown from consumer subscribed to paginator publisher, which caused the request to hang for pagination operations"
6+
}

utils/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@
125125
<artifactId>commons-lang</artifactId>
126126
<scope>test</scope>
127127
</dependency>
128+
<dependency>
129+
<groupId>io.reactivex.rxjava2</groupId>
130+
<artifactId>rxjava</artifactId>
131+
<scope>test</scope>
132+
</dependency>
128133
</dependencies>
129134

130135
<build>

utils/src/main/java/software/amazon/awssdk/utils/async/SequentialSubscriber.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,25 @@ public SequentialSubscriber(Consumer<T> consumer,
4040

4141
@Override
4242
public void onSubscribe(Subscription subscription) {
43+
if (this.subscription != null) {
44+
subscription.cancel();
45+
}
46+
4347
this.subscription = subscription;
4448
subscription.request(1);
4549
}
4650

4751
@Override
4852
public void onNext(T t) {
53+
if (t == null) {
54+
NullPointerException exception = new NullPointerException("onNext(null) is not allowed.");
55+
future.completeExceptionally(exception);
56+
throw exception;
57+
}
4958
try {
5059
consumer.accept(t);
5160
subscription.request(1);
52-
} catch (RuntimeException e) {
61+
} catch (Throwable e) {
5362
// Handle the consumer throwing an exception
5463
subscription.cancel();
5564
future.completeExceptionally(e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
22+
import org.reactivestreams.tck.TestEnvironment;
23+
24+
public class SequentialSubscriberTckTest extends SubscriberWhiteboxVerification<Integer> {
25+
protected SequentialSubscriberTckTest() {
26+
super(new TestEnvironment());
27+
}
28+
29+
@Override
30+
public Subscriber<Integer> createSubscriber(WhiteboxSubscriberProbe<Integer> probe) {
31+
CompletableFuture<Void> future = new CompletableFuture<>();
32+
return new SequentialSubscriber<Integer>(i -> {}, future) {
33+
@Override
34+
public void onError(Throwable throwable) {
35+
super.onError(throwable);
36+
probe.registerOnError(throwable);
37+
}
38+
39+
@Override
40+
public void onSubscribe(Subscription subscription) {
41+
super.onSubscribe(subscription);
42+
probe.registerOnSubscribe(new SubscriberPuppet() {
43+
@Override
44+
public void triggerRequest(long elements) {
45+
subscription.request(elements);
46+
}
47+
48+
@Override
49+
public void signalCancel() {
50+
subscription.cancel();
51+
}
52+
});
53+
}
54+
55+
@Override
56+
public void onNext(Integer nextItems) {
57+
super.onNext(nextItems);
58+
probe.registerOnNext(nextItems);
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
super.onComplete();
64+
probe.registerOnComplete();
65+
}
66+
};
67+
}
68+
69+
@Override
70+
public Integer createElement(int i) {
71+
return i;
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import io.reactivex.Flowable;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.junit.jupiter.api.Test;
24+
25+
public class SequentialSubscriberTest {
26+
27+
@Test
28+
void consumerThrowsException_shouldCompleteFutureExceptionally() {
29+
CompletableFuture<Void> future = new CompletableFuture<>();
30+
AssertionError error = new AssertionError("boom");
31+
SequentialSubscriber<Integer> subscriber =
32+
new SequentialSubscriber<>(i -> {
33+
throw error;
34+
}, future);
35+
36+
Flowable.fromArray(1, 2).subscribe(subscriber);
37+
assertThat(future).isCompletedExceptionally();
38+
assertThatThrownBy(() -> future.join()).hasRootCause(error);
39+
}
40+
}

0 commit comments

Comments
 (0)