From 7fbd5653a18fec03411942e7dd5266f649452ae1 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Sun, 12 Jul 2015 20:20:47 +0200 Subject: [PATCH] +tck #284 support "demand when all downstreams demand" Processor in TCK --- .../tck/IdentityProcessorVerification.java | 33 ++-- .../reactivestreams/tck/TestEnvironment.java | 4 + .../IdentityProcessorVerificationTest.java | 171 +++++++++++++++++- .../SyncTriggeredDemandSubscriberTest.java | 1 + 4 files changed, 186 insertions(+), 23 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 5a534487..43f22251 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -11,6 +11,7 @@ import org.reactivestreams.tck.support.Function; import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules; import org.reactivestreams.tck.support.PublisherVerificationRules; +import org.reactivestreams.tck.support.TestException; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -387,23 +388,29 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo optionalMultipleSubscribersTest(2, new Function() { @Override public TestSetup apply(Long aLong) throws Throwable { - return new TestSetup(env, processorBufferSize) {{ + return new TestSetup(env, processorBufferSize, false) {{ final ManualSubscriberWithErrorCollection sub1 = new ManualSubscriberWithErrorCollection(env); - env.subscribe(processor, sub1); - final ManualSubscriberWithErrorCollection sub2 = new ManualSubscriberWithErrorCollection(env); + + // connect upstream + env.subscribe(this, processor); + // connect downstreams + env.subscribe(processor, sub1); + sub1.request(2); env.subscribe(processor, sub2); + sub2.request(1); - sub1.request(1); + // request bubbles up to upstream publisher: expectRequest(); final T x = sendNextTFromUpstream(); expectNextElement(sub1, x); sub1.request(1); // sub1 has received one element, and has one demand pending - // sub2 has not yet requested anything + // sub2 has received one element, and no more pending demand - final Exception ex = new RuntimeException("Test exception"); + // if upstream fails, both should get the error signal + final Exception ex = new TestException(); sendError(ex); sub1.expectError(ex); sub2.expectError(ex); @@ -472,11 +479,11 @@ public void onError(Throwable cause) { // must immediately pass on `onError` events received from its upstream to its downstream @Test public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception { - new TestSetup(env, processorBufferSize) {{ + new TestSetup(env, processorBufferSize, true) {{ final ManualSubscriberWithErrorCollection sub = new ManualSubscriberWithErrorCollection(env); env.subscribe(processor, sub); - final Exception ex = new RuntimeException("Test exception"); + final Exception ex = new TestException(); sendError(ex); sub.expectError(ex); // "immediately", i.e. without a preceding request @@ -629,13 +636,13 @@ public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLong optionalMultipleSubscribersTest(2, new Function() { @Override public TestSetup apply(Long subscribers) throws Throwable { - return new TestSetup(env, processorBufferSize) {{ - ManualSubscriber sub1 = newSubscriber(); + return new TestSetup(env, processorBufferSize, false) {{ + final ManualSubscriber sub1 = newSubscriber(); sub1.request(20); long totalRequests = expectRequest(); final T x = sendNextTFromUpstream(); - expectNextElement(sub1, x); + expectNextElement(sub1, x); // correct, this is not valid in case of "wait for slowest" if (totalRequests == 1) { totalRequests += expectRequest(); @@ -700,11 +707,11 @@ public abstract class TestSetup extends ManualPublisher { final Processor processor; - public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { + public TestSetup(TestEnvironment env, int testBufferSize, boolean subscribeProcessorToEnvPublisher) throws InterruptedException { super(env); tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE)); processor = createIdentityProcessor(testBufferSize); - subscribe(processor); + if (subscribeProcessorToEnvPublisher) subscribe(processor); } public ManualSubscriber newSubscriber() throws InterruptedException { diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index 0e32f5b8..db02037e 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -183,6 +183,10 @@ public T flopAndFail(String msg) { } + public void subscribe(Publisher pub, Subscriber sub) throws InterruptedException { + pub.subscribe(sub); + verifyNoAsyncErrorsNoDelay(); + } public void subscribe(Publisher pub, TestSubscriber sub) throws InterruptedException { subscribe(pub, sub, defaultTimeoutMillis); diff --git a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java index 2f4c0259..6cdf54c2 100644 --- a/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java @@ -4,13 +4,14 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.reactivestreams.tck.support.NonFatal; import org.reactivestreams.tck.support.TCKVerificationSupport; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; /** * Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors. @@ -27,25 +28,36 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport { @Test public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable { requireTestSkip(new ThrowingRunnable() { - @Override public void run() throws Throwable { - new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){ - @Override public Processor createIdentityProcessor(int bufferSize) { + @Override + public void run() throws Throwable { + new IdentityProcessorVerification(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) { + @Override + public Processor createIdentityProcessor(int bufferSize) { return new NoopProcessor(); } - @Override public ExecutorService publisherExecutorService() { return ex; } + @Override + public ExecutorService publisherExecutorService() { + return ex; + } - @Override public Integer createElement(int element) { return element; } + @Override + public Integer createElement(int element) { + return element; + } - @Override public Publisher createHelperPublisher(long elements) { + @Override + public Publisher createHelperPublisher(long elements) { return SKIP; } - @Override public Publisher createFailedPublisher() { + @Override + public Publisher createFailedPublisher() { return SKIP; } - @Override public long maxSupportedSubscribers() { + @Override + public long maxSupportedSubscribers() { return 1; // can only support 1 subscribe => unable to run this test } }.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(); @@ -115,6 +127,145 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo }, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS); } + @Test + public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand() throws Throwable { + final TestEnvironment env = newTestEnvironment(); + new IdentityProcessorVerification(env, DEFAULT_TIMEOUT_MILLIS) { + @Override + public Processor createIdentityProcessor(int bufferSize) { // knowingly ignoring buffer size, acting as-if 0 + return new Processor() { + + private volatile Subscription upstreamSubscription; + + private final CopyOnWriteArrayList subs = new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList> subscribers = new CopyOnWriteArrayList>(); + private final AtomicLong demand1 = new AtomicLong(); + private final AtomicLong demand2 = new AtomicLong(); + private final CountDownLatch awaitLatch = new CountDownLatch(2); // to know when both subscribers have signalled demand + + @Override + public void subscribe(final Subscriber s) { + int subscriberCount = subs.size(); + switch (subscriberCount) { + case 0: + s.onSubscribe(createSubscription(awaitLatch, s, demand1)); + break; + case 1: + s.onSubscribe(createSubscription(awaitLatch, s, demand2)); + break; + default: + throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount)); + } + } + + @Override + public void onSubscribe(Subscription s) { + this.upstreamSubscription = s; + } + + @Override + public void onNext(Integer elem) { + for (Subscriber subscriber : subscribers) { + try { + subscriber.onNext(elem); + } catch (Throwable t) { + env.flop(t, String.format("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber)); + } + } + } + + @Override + public void onError(Throwable t) { + for (Subscriber subscriber : subscribers) { + try { + subscriber.onError(t); + } catch (Exception ex) { + env.flop(ex, String.format("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber)); + } + } + } + + @Override + public void onComplete() { + for (Subscriber subscriber : subscribers) { + try { + subscriber.onComplete(); + } catch (Exception ex) { + env.flop(ex, String.format("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber)); + } + } + } + + private Subscription createSubscription(CountDownLatch awaitLatch, final Subscriber s, final AtomicLong demand) { + final MySubscription sub = new MySubscription(awaitLatch, s, demand); + subs.add(sub); + subscribers.add(s); + return sub; + } + + final class MySubscription implements Subscription { + private final CountDownLatch awaitLatch; + private final Subscriber s; + private final AtomicLong demand; + + public MySubscription(CountDownLatch awaitTwoLatch, Subscriber s, AtomicLong demand) { + this.awaitLatch = awaitTwoLatch; + this.s = s; + this.demand = demand; + } + + @Override + public void request(final long n) { + ex.execute(new Runnable() { + @Override + public void run() { + if (demand.get() >= 0) { + demand.addAndGet(n); + awaitLatch.countDown(); + try { + awaitLatch.await(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS); + final long d = demand.getAndSet(0); + if (d > 0) upstreamSubscription.request(d); + } catch (InterruptedException e) { + env.flop(e, "Interrupted while awaiting for all downstreams to signal some demand."); + } catch (Throwable t) { + env.flop(t, "Subscription#request has thrown an exception, which is illegal!"); + } + } // else cancel was called, do nothing + } + }); + } + + @Override + public void cancel() { + demand.set(-1); // marks subscription as cancelled + } + + @Override + public String toString() { + return String.format("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)", s, demand); + } + } + }; + } + + @Override + public ExecutorService publisherExecutorService() { + return ex; + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public Publisher createFailedPublisher() { + return SKIP; + } + }.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError(); + } + // FAILING IMPLEMENTATIONS // final Publisher SKIP = null; diff --git a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java index 0b099fcf..8c588deb 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java @@ -24,6 +24,7 @@ public SyncTriggeredDemandSubscriberTest() { super(new TestEnvironment()); } + @SuppressWarnings("unchecked") @Override public void triggerRequest(final Subscriber subscriber) { ((SyncTriggeredDemandSubscriber)subscriber).triggerDemand(1); }