Skip to content

Commit 0fca634

Browse files
authored
Merge pull request #376 from egetman/feature/tck-optional-111-reference-drop-fix
optional_spec111 test addition.
2 parents 940a51f + 27172e0 commit 0fca634

File tree

6 files changed

+138
-2
lines changed

6 files changed

+138
-2
lines changed

CopyrightWaivers.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@ JakeWharton | Jake Wharton, [email protected]
3636
anthonyvdotbe | Anthony Vanelverdinghe, [email protected]
3737
seratch | Kazuhiro Sera, [email protected], SmartNews, Inc.
3838
akarnokd | David Karnok, [email protected]
39+
egetman | Evgeniy Getman, [email protected]

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,11 @@ public void optional_spec111_maySupportMultiSubscribe() throws Throwable {
311311
publisherVerification.optional_spec111_maySupportMultiSubscribe();
312312
}
313313

314+
@Override @Test
315+
public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
316+
publisherVerification.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals();
317+
}
318+
314319
@Override @Test
315320
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
316321
publisherVerification.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne();

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,33 @@ public void run(Publisher<T> pub) throws Throwable {
572572
});
573573
}
574574

575+
@Override @Test
576+
public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable {
577+
optionalActivePublisherTest(1, false, new PublisherTestRun<T>() {
578+
@Override
579+
public void run(Publisher<T> pub) throws Throwable {
580+
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
581+
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
582+
// Since we're testing the case when the Publisher DOES support the optional multi-subscribers scenario,
583+
// and decides if it handles them uni-cast or multi-cast, we don't know which subscriber will receive an
584+
// onNext (and optional onComplete) signal(s) and which just onComplete signal.
585+
// Plus, even if subscription assumed to be unicast, it's implementation choice, which one will be signalled
586+
// with onNext.
587+
sub1.requestNextElementOrEndOfStream();
588+
sub2.requestNextElementOrEndOfStream();
589+
try {
590+
env.verifyNoAsyncErrors();
591+
} finally {
592+
try {
593+
sub1.cancel();
594+
} finally {
595+
sub2.cancel();
596+
}
597+
}
598+
}
599+
});
600+
}
601+
575602
@Override @Test
576603
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable {
577604
optionalActivePublisherTest(5, true, new PublisherTestRun<T>() { // This test is skipped if the publisher is unbounded (never sends onComplete)

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,10 @@ public T requestNextElement(long timeoutMillis, String errorMsg) throws Interrup
390390
return nextElement(timeoutMillis, errorMsg);
391391
}
392392

393+
public Optional<T> requestNextElementOrEndOfStream() throws InterruptedException {
394+
return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), "Did not receive expected stream completion");
395+
}
396+
393397
public Optional<T> requestNextElementOrEndOfStream(String errorMsg) throws InterruptedException {
394398
return requestNextElementOrEndOfStream(env.defaultTimeoutMillis(), errorMsg);
395399
}

tck/src/main/java/org/reactivestreams/tck/support/PublisherVerificationRules.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ public interface PublisherVerificationRules {
285285
*/
286286
void untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() throws Throwable;
287287
/**
288-
* Ask for a single-element {@code Publisher} and subscribes to it twice, without consuming with either {@code Subscriber} instance
288+
* Asks for a single-element {@code Publisher} and subscribes to it twice, without consuming with either
289+
* {@code Subscriber} instance
289290
* (i.e., no requests are issued).
290291
* <p>
291292
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.11'>1.11</a>
@@ -296,6 +297,19 @@ public interface PublisherVerificationRules {
296297
* means will indicate a skipped test.
297298
*/
298299
void optional_spec111_maySupportMultiSubscribe() throws Throwable;
300+
/**
301+
* Asks for a single-element {@code Publisher} and subscribes to it twice.
302+
* Each {@code Subscriber} requests for 1 element and checks if onNext or onComplete signals was received.
303+
* <p>
304+
* <b>Verifies rule:</b> <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.11'>1.11</a>,
305+
* and depends on valid implementation of rule <a href='https://github.com/reactive-streams/reactive-streams-jvm#1.5'>1.5</a>
306+
* in order to verify this.
307+
* <p>
308+
* The test is not executed if {@link org.reactivestreams.tck.PublisherVerification#maxElementsFromPublisher()} is less than 1.
309+
* <p>
310+
* Any exception thrown through non-regular means will indicate a skipped test.
311+
*/
312+
void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() throws Throwable;
299313
/**
300314
* Asks for a short {@code Publisher} (length 5), subscribes 3 {@code Subscriber}s to it, requests with different
301315
* patterns and checks if all 3 received the same events in the same order.
@@ -310,7 +324,7 @@ public interface PublisherVerificationRules {
310324
* when they subscribe and how they request elements. I.e., a "live" {@code Publisher} emitting the current time would not pass this test.
311325
* <p>
312326
* Note that this test is optional and may appear skipped even if the behavior should be actually supported by the {@code Publisher},
313-
* see the skip message for an indication of this.
327+
* see the skip message for an indication of this.
314328
* <p>
315329
* If this test fails, the following could be checked within the {@code Publisher} implementation:
316330
* <ul>

tck/src/test/java/org/reactivestreams/tck/PublisherVerificationTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
import org.testng.Assert;
2020
import org.testng.annotations.Test;
2121

22+
import java.util.Collection;
2223
import java.util.Random;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2325
import java.util.concurrent.ExecutorService;
2426
import java.util.concurrent.Executors;
2527
import java.util.concurrent.RejectedExecutionException;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicBoolean;
2830
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.atomic.AtomicLong;
2932

3033
/**
3134
* Validates that the TCK's {@link org.reactivestreams.tck.PublisherVerification} fails with nice human readable errors.
@@ -364,6 +367,21 @@ public void optional_spec111_maySupportMultiSubscribe_shouldFailBy_actuallyPass(
364367
noopPublisherVerification().optional_spec111_maySupportMultiSubscribe();
365368
}
366369

370+
@Test
371+
public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals_beSkippedWhenMultipleSubscribersNotSupported() throws Throwable {
372+
requireTestSkip(new ThrowingRunnable() {
373+
@Override
374+
public void run() throws Throwable {
375+
multiSubscribersPublisherVerification(true).optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals();
376+
}
377+
}, "Unexpected additional subscriber");
378+
}
379+
380+
@Test
381+
public void optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals_shouldPass() throws Throwable {
382+
multiSubscribersPublisherVerification(false).optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals();
383+
}
384+
367385
@Test
368386
public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront_shouldFailBy_expectingOnError() throws Throwable {
369387
requireTestFailure(new ThrowingRunnable() {
@@ -716,6 +734,73 @@ final PublisherVerification<Integer> customPublisherVerification(final Publisher
716734
};
717735
}
718736

737+
/**
738+
* Verification using a Publisher that supports multiple subscribers
739+
* @param shouldBlowUp if true {@link RuntimeException} will be thrown during second subscription.
740+
*/
741+
final PublisherVerification<Integer> multiSubscribersPublisherVerification(final boolean shouldBlowUp) {
742+
return new PublisherVerification<Integer>(newTestEnvironment()) {
743+
744+
@Override
745+
public Publisher<Integer> createPublisher(final long elements) {
746+
return new Publisher<Integer>() {
747+
748+
private final Collection<CancelableSubscription> subscriptions = new CopyOnWriteArrayList<CancelableSubscription>();
749+
private final AtomicLong source = new AtomicLong(elements);
750+
751+
@Override
752+
public void subscribe(Subscriber<? super Integer> s) {
753+
// onSubscribe first
754+
CancelableSubscription subscription = new CancelableSubscription(s);
755+
s.onSubscribe(subscription);
756+
if (shouldBlowUp && !subscriptions.isEmpty()) {
757+
s.onError(new RuntimeException("Unexpected additional subscriber"));
758+
} else {
759+
subscriptions.add(subscription);
760+
}
761+
}
762+
763+
class CancelableSubscription implements Subscription {
764+
765+
final AtomicBoolean canceled = new AtomicBoolean();
766+
Subscriber<? super Integer> subscriber;
767+
768+
CancelableSubscription(Subscriber<? super Integer> subscriber) {
769+
this.subscriber = subscriber;
770+
}
771+
772+
@Override
773+
public void request(long n) {
774+
if (!canceled.get()) {
775+
for (long i = 0; i < n; i++) {
776+
if (source.getAndDecrement() < 0) {
777+
canceled.set(true);
778+
subscriber.onComplete();
779+
} else {
780+
subscriber.onNext((int) i);
781+
}
782+
}
783+
}
784+
}
785+
786+
@Override
787+
public void cancel() {
788+
canceled.set(true);
789+
subscriber = null;
790+
subscriptions.remove(this);
791+
}
792+
}
793+
794+
};
795+
}
796+
797+
@Override
798+
public Publisher<Integer> createFailedPublisher() {
799+
return SKIP;
800+
}
801+
};
802+
}
803+
719804
/**
720805
* Verification using a Publisher that publishes elements even with no demand available
721806
*/

0 commit comments

Comments
 (0)