From 18f3532f0cb25a8ec69c8ef65447d3aeb6d3010b Mon Sep 17 00:00:00 2001 From: James Roper Date: Mon, 19 Nov 2018 10:42:34 +1100 Subject: [PATCH 1/2] Wait for demand before signaling onNext Fixes #277. Fixes two whitebox subscriber tests where they weren't waiting for demand before signaling onNext. --- .../reactivestreams/tck/SubscriberWhiteboxVerification.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java index 57970c46..d5dadd56 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberWhiteboxVerification.java @@ -267,6 +267,7 @@ public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCall public void run(WhiteboxTestStage stage) throws InterruptedException { stage.puppet().triggerRequest(1); stage.puppet().signalCancel(); + stage.expectRequest(); stage.signalNext(); stage.puppet().triggerRequest(1); @@ -437,7 +438,12 @@ public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced( @Override public void run(WhiteboxTestStage stage) throws InterruptedException { stage.puppet().triggerRequest(2); + long requestedElements = stage.expectRequest(); stage.probe.expectNext(stage.signalNext()); + // Some subscribers may only request one element at a time. + if (requestedElements < 2) { + stage.expectRequest(); + } stage.probe.expectNext(stage.signalNext()); stage.probe.expectNone(); From 17b22f3de7b79fa5d346cc4bd13f05e9fd6e2588 Mon Sep 17 00:00:00 2001 From: James Roper Date: Tue, 20 Nov 2018 10:52:11 +1100 Subject: [PATCH 2/2] Added unit tests ensuring that requests have been expected --- .../SubscriberWhiteboxVerificationTest.java | 162 +++++++++++++++++- 1 file changed, 159 insertions(+), 3 deletions(-) diff --git a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java index bd0737d7..0441e6ed 100644 --- a/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java +++ b/tck/src/test/java/org/reactivestreams/tck/SubscriberWhiteboxVerificationTest.java @@ -23,7 +23,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * Validates that the TCK's {@link SubscriberWhiteboxVerification} fails with nice human readable errors. @@ -31,8 +34,8 @@ */ public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport { - private ExecutorService ex; - @BeforeClass void before() { ex = Executors.newFixedThreadPool(4); } + private ScheduledExecutorService ex; + @BeforeClass void before() { ex = Executors.newScheduledThreadPool(4); } @AfterClass void after() { if (ex != null) ex.shutdown(); } @Test @@ -217,6 +220,51 @@ public Subscriber apply(WhiteboxSubscriberProbe probe) throws }, "But I thought it's cancelled!"); } + @Test + public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel_shouldWaitForDemandBeforeSignalling() throws Throwable { + customSubscriberVerification(new Function, Subscriber>() { + @Override + public Subscriber apply(WhiteboxSubscriberProbe probe) throws Throwable { + + final AtomicBoolean demandRequested = new AtomicBoolean(false); + + return new SimpleSubscriberWithProbe(probe) { + @Override public void onSubscribe(final Subscription s) { + this.subscription = s; + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override public void triggerRequest(final long elements) { + ex.schedule(new Runnable() { + @Override + public void run() { + demandRequested.set(true); + subscription.request(elements); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + + @Override public void signalCancel() { + // Delay this too to ensure that cancel isn't invoked before request. + ex.schedule(new Runnable() { + @Override + public void run() { + subscription.cancel(); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + }); + } + + @Override public void onNext(Integer element) { + if (!demandRequested.get()) { + throw new RuntimeException("onNext signalled without demand!"); + } + probe.registerOnNext(element); + } + }; + } + }).required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel(); + } + @Test public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall_shouldFail() throws Throwable { requireTestFailure(new ThrowingRunnable() { @@ -299,7 +347,7 @@ public Subscriber apply(WhiteboxSubscriberProbe probe) throws } @Test - public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldFail() throws Throwable { + public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldPass() throws Throwable { // sanity checks the "happy path", that triggerRequest() propagates the right demand customSubscriberVerification(new Function, Subscriber>() { @Override @@ -309,6 +357,114 @@ public Subscriber apply(WhiteboxSubscriberProbe probe) throws }).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); } + @Test + public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandBeforeSignalling() throws Throwable { + customSubscriberVerification(new Function, Subscriber>() { + @Override + public Subscriber apply(WhiteboxSubscriberProbe probe) throws Throwable { + + final AtomicBoolean demandRequested = new AtomicBoolean(false); + return new SimpleSubscriberWithProbe(probe) { + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override + public void triggerRequest(final long elements) { + ex.schedule(new Runnable() { + @Override + public void run() { + demandRequested.set(true); + subscription.request(elements); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + + @Override + public void signalCancel() { + // Delay this too to ensure that cancel isn't invoked before request. + ex.schedule(new Runnable() { + @Override + public void run() { + subscription.cancel(); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + }); + } + + @Override + public void onNext(Integer element) { + if (!demandRequested.get()) { + throw new RuntimeException("onNext signalled without demand!"); + } + probe.registerOnNext(element); + } + }; + } + }).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); + } + + @Test + public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandTwiceForOneAtATimeSubscribers() throws Throwable { + customSubscriberVerification(new Function, Subscriber>() { + @Override + public Subscriber apply(WhiteboxSubscriberProbe probe) throws Throwable { + + final AtomicLong outstandingRequest = new AtomicLong(0); + final AtomicBoolean demandRequested = new AtomicBoolean(); + return new SimpleSubscriberWithProbe(probe) { + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override + public void triggerRequest(final long elements) { + outstandingRequest.getAndAdd(elements); + ex.schedule(new Runnable() { + @Override + public void run() { + demandRequested.set(true); + subscription.request(1); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + + @Override + public void signalCancel() { + // Delay this too to ensure that cancel isn't invoked before request. + ex.schedule(new Runnable() { + @Override + public void run() { + subscription.cancel(); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + }); + } + + @Override + public void onNext(Integer element) { + if (!demandRequested.getAndSet(false)) { + throw new RuntimeException("onNext signalled without demand!"); + } + if (outstandingRequest.decrementAndGet() > 0) { + ex.schedule(new Runnable() { + @Override + public void run() { + demandRequested.set(true); + subscription.request(1); + } + }, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS); + } + probe.registerOnNext(element); + } + }; + } + }).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(); + } + + // FAILING IMPLEMENTATIONS // /**