Skip to content

Wait for demand before signaling onNext #441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCall
public void run(WhiteboxTestStage stage) throws InterruptedException {
stage.puppet().triggerRequest(1);
stage.puppet().signalCancel();
stage.expectRequest();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be there an unit test that failed before this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, shouldn't the expectRequest be before the signalCancel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also could it be possible, and legal, that a request followed by a cancel is coalesced into just a cancel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be there an unit test that failed before this change?

I'll add one.

Thinking about this some more, shouldn't the expectRequest be before the signalCancel?:

I don't think it matters, in fact the reason I put it in this order is that I think this order makes it more likely that the test will test what it's intending to test. Triggering a request, waiting for that request to come, then signalling cancel, then immediately sending the next message, means that there's a high chance this will pass due to signal not being handled yet. However, waiting for the request after signalling cancel increases the likelihood that the cancel signal has been handled before we emit the next element, thereby increasing the likelihood that the test will actually test what it says its testing (that elements emitted after a cancel signal is handled won't cause an error).

Also could it be possible, and legal, that a request followed by a cancel is coalesced into just a cancel?

On a subscription, yes, that's legal and possible, but this isn't a subscription, this is the TCK puppet, and the TCK defines how implementations of the puppet are are to behave. It requires that the request signal does result in a request for demand to the subscription, regardless of what signal comes immediately after. And when these signals get forwarded onto the subscription, we know the subscription won't coalesce the signals because that is a subscription that the TCK has implemented itself in ManualPublisher, and it doesn't coalesce signals, it captures them all for assertions to be run later, assertions like expectRequest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, waiting for the request after signalling cancel increases the likelihood that the cancel signal has been handled before we emit the next element, thereby increasing the likelihood that the test will actually test what it says its testing (that elements emitted after a cancel signal is handled won't cause an error).

Since it is, as you say, not a full solution, may I suggest that it makes sense to add that as a comment, since it is non-apparent as to its intent?

Copy link

@jrudolph jrudolph May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a subscription, yes, that's legal and possible, but this isn't a subscription, this is the TCK puppet, and the TCK defines how implementations of the puppet are are to behave. It requires that the request signal does result in a request for demand to the subscription, regardless of what signal comes immediately after. And when these signals get forwarded onto the subscription, we know the subscription won't coalesce the signals because that is a subscription that the TCK has implemented itself in ManualPublisher, and it doesn't coalesce signals, it captures them all for assertions to be run later, assertions like expectRequest.

These tests are also used in the IdentityProcessorVerification tests which implements the SubscriberPuppet as forwarding the triggers as is to the processor under test. So, IIUC the invariants you state would have to be valid for any identity processor implementation you might want to test. That would mean, that you cannot use the TCK to check processor implementations that do coalescing of requests and cancel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case, perhaps we should change this to expectRequest before we signalCancel, and then insert a new expectCancelling after that, before we signalNext, to ensure that the subscriber did call cancel before we sent the next element - that's I think the original intent of the ordering of operations here, and matches the description of the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this here: #462

stage.signalNext();

stage.puppet().triggerRequest(1);
Expand Down Expand Up @@ -437,7 +438,12 @@ public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced(
@Override
public void run(WhiteboxTestStage stage) throws InterruptedException {
stage.puppet().triggerRequest(2);
long requestedElements = stage.expectRequest();
stage.probe.expectNext(stage.signalNext());
// Some subscribers may only request one element at a time.
if (requestedElements < 2) {
stage.expectRequest();
}
stage.probe.expectNext(stage.signalNext());

stage.probe.expectNone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Validates that the TCK's {@link SubscriberWhiteboxVerification} fails with nice human readable errors.
* <b>Important: Please note that all Subscribers implemented in this file are *wrong*!</b>
*/
public class SubscriberWhiteboxVerificationTest extends TCKVerificationSupport {

private ExecutorService ex;
@BeforeClass void before() { ex = Executors.newFixedThreadPool(4); }
private ScheduledExecutorService ex;
@BeforeClass void before() { ex = Executors.newScheduledThreadPool(4); }
@AfterClass void after() { if (ex != null) ex.shutdown(); }

@Test
Expand Down Expand Up @@ -217,6 +220,51 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
}, "But I thought it's cancelled!");
}

@Test
public void required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel_shouldWaitForDemandBeforeSignalling() throws Throwable {
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
@Override
public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {

final AtomicBoolean demandRequested = new AtomicBoolean(false);

return new SimpleSubscriberWithProbe(probe) {
@Override public void onSubscribe(final Subscription s) {
this.subscription = s;
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override public void triggerRequest(final long elements) {
ex.schedule(new Runnable() {
@Override
public void run() {
demandRequested.set(true);
subscription.request(elements);
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}

@Override public void signalCancel() {
// Delay this too to ensure that cancel isn't invoked before request.
ex.schedule(new Runnable() {
@Override
public void run() {
subscription.cancel();
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}
});
}

@Override public void onNext(Integer element) {
if (!demandRequested.get()) {
throw new RuntimeException("onNext signalled without demand!");
}
probe.registerOnNext(element);
}
};
}
}).required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel();
}

@Test
public void required_spec209_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall_shouldFail() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
Expand Down Expand Up @@ -299,7 +347,7 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
}

@Test
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldFail() throws Throwable {
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldPass() throws Throwable {
// sanity checks the "happy path", that triggerRequest() propagates the right demand
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
@Override
Expand All @@ -309,6 +357,114 @@ public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws
}).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
}

@Test
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandBeforeSignalling() throws Throwable {
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
@Override
public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {

final AtomicBoolean demandRequested = new AtomicBoolean(false);
return new SimpleSubscriberWithProbe(probe) {
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(final long elements) {
ex.schedule(new Runnable() {
@Override
public void run() {
demandRequested.set(true);
subscription.request(elements);
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}

@Override
public void signalCancel() {
// Delay this too to ensure that cancel isn't invoked before request.
ex.schedule(new Runnable() {
@Override
public void run() {
subscription.cancel();
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}
});
}

@Override
public void onNext(Integer element) {
if (!demandRequested.get()) {
throw new RuntimeException("onNext signalled without demand!");
}
probe.registerOnNext(element);
}
};
}
}).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
}

@Test
public void required_spec308_requestMustRegisterGivenNumberElementsToBeProduced_shouldWaitForDemandTwiceForOneAtATimeSubscribers() throws Throwable {
customSubscriberVerification(new Function<WhiteboxSubscriberProbe<Integer>, Subscriber<Integer>>() {
@Override
public Subscriber<Integer> apply(WhiteboxSubscriberProbe<Integer> probe) throws Throwable {

final AtomicLong outstandingRequest = new AtomicLong(0);
final AtomicBoolean demandRequested = new AtomicBoolean();
return new SimpleSubscriberWithProbe(probe) {
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
probe.registerOnSubscribe(new SubscriberPuppet() {
@Override
public void triggerRequest(final long elements) {
outstandingRequest.getAndAdd(elements);
ex.schedule(new Runnable() {
@Override
public void run() {
demandRequested.set(true);
subscription.request(1);
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}

@Override
public void signalCancel() {
// Delay this too to ensure that cancel isn't invoked before request.
ex.schedule(new Runnable() {
@Override
public void run() {
subscription.cancel();
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}
});
}

@Override
public void onNext(Integer element) {
if (!demandRequested.getAndSet(false)) {
throw new RuntimeException("onNext signalled without demand!");
}
if (outstandingRequest.decrementAndGet() > 0) {
ex.schedule(new Runnable() {
@Override
public void run() {
demandRequested.set(true);
subscription.request(1);
}
}, TestEnvironment.envDefaultTimeoutMillis() / 2, TimeUnit.MILLISECONDS);
}
probe.registerOnNext(element);
}
};
}
}).required_spec308_requestMustRegisterGivenNumberElementsToBeProduced();
}


// FAILING IMPLEMENTATIONS //

/**
Expand Down