Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+tck #284 support "demand when all downstreams demand" Processor in TCK #287

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.reactivestreams.tck.support.Function;
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
import org.reactivestreams.tck.support.PublisherVerificationRules;
import org.reactivestreams.tck.support.TestException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -387,23 +388,29 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
@Override
public TestSetup apply(Long aLong) throws Throwable {
return new TestSetup(env, processorBufferSize) {{
return new TestSetup(env, processorBufferSize, false) {{
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor, sub1);

final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);

// connect upstream
env.subscribe(this, processor);
// connect downstreams
env.subscribe(processor, sub1);
sub1.request(2);
env.subscribe(processor, sub2);
sub2.request(1);

sub1.request(1);
// request bubbles up to upstream publisher:
expectRequest();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
sub1.request(1);

// sub1 has received one element, and has one demand pending
// sub2 has not yet requested anything
// sub2 has received one element, and no more pending demand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is that:

  • we did not request from sub2, which made it impossible for "lazy, wait for both" processors to start requesting from upstream
  • now we request at least one element from each downstream, causing one element to be signalled down
  • we now check that the error signal overtakes the outstanding elements, which was the goal of the test


final Exception ex = new RuntimeException("Test exception");
// if upstream fails, both should get the error signal
final Exception ex = new TestException();
sendError(ex);
sub1.expectError(ex);
sub2.expectError(ex);
Expand Down Expand Up @@ -472,11 +479,11 @@ public void onError(Throwable cause) {
// must immediately pass on `onError` events received from its upstream to its downstream
@Test
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
new TestSetup(env, processorBufferSize) {{
new TestSetup(env, processorBufferSize, true) {{
final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
env.subscribe(processor, sub);

final Exception ex = new RuntimeException("Test exception");
final Exception ex = new TestException();
sendError(ex);
sub.expectError(ex); // "immediately", i.e. without a preceding request

Expand Down Expand Up @@ -629,13 +636,13 @@ public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLong
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
@Override
public TestSetup apply(Long subscribers) throws Throwable {
return new TestSetup(env, processorBufferSize) {{
ManualSubscriber<T> sub1 = newSubscriber();
return new TestSetup(env, processorBufferSize, false) {{
final ManualSubscriber<T> sub1 = newSubscriber();
sub1.request(20);

long totalRequests = expectRequest();
final T x = sendNextTFromUpstream();
expectNextElement(sub1, x);
expectNextElement(sub1, x); // correct, this is not valid in case of "wait for slowest"

if (totalRequests == 1) {
totalRequests += expectRequest();
Expand Down Expand Up @@ -700,11 +707,11 @@ public abstract class TestSetup extends ManualPublisher<T> {

final Processor<T, T> processor;

public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
public TestSetup(TestEnvironment env, int testBufferSize, boolean subscribeProcessorToEnvPublisher) throws InterruptedException {
super(env);
tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
processor = createIdentityProcessor(testBufferSize);
subscribe(processor);
if (subscribeProcessorToEnvPublisher) subscribe(processor);
}

public ManualSubscriber<T> newSubscriber() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public <T> T flopAndFail(String msg) {
}


public <T> void subscribe(Publisher<T> pub, Subscriber<T> sub) throws InterruptedException {
pub.subscribe(sub);
verifyNoAsyncErrorsNoDelay();
}

public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
subscribe(pub, sub, defaultTimeoutMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.support.NonFatal;
import org.reactivestreams.tck.support.TCKVerificationSupport;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors.
Expand All @@ -27,25 +28,36 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
@Test
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
requireTestSkip(new ThrowingRunnable() {
@Override public void run() throws Throwable {
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
@Override
public void run() throws Throwable {
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) {
@Override
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
return new NoopProcessor();
}

@Override public ExecutorService publisherExecutorService() { return ex; }
@Override
public ExecutorService publisherExecutorService() {
return ex;
}

@Override public Integer createElement(int element) { return element; }
@Override
public Integer createElement(int element) {
return element;
}

@Override public Publisher<Integer> createHelperPublisher(long elements) {
@Override
public Publisher<Integer> createHelperPublisher(long elements) {
return SKIP;
}

@Override public Publisher<Integer> createFailedPublisher() {
@Override
public Publisher<Integer> createFailedPublisher() {
return SKIP;
}

@Override public long maxSupportedSubscribers() {
@Override
public long maxSupportedSubscribers() {
return 1; // can only support 1 subscribe => unable to run this test
}
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
Expand Down Expand Up @@ -115,6 +127,145 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
}, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS);
}

@Test
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand() throws Throwable {
final TestEnvironment env = newTestEnvironment();
new IdentityProcessorVerification<Integer>(env, DEFAULT_TIMEOUT_MILLIS) {
@Override
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) { // knowingly ignoring buffer size, acting as-if 0
return new Processor<Integer, Integer>() {

private volatile Subscription upstreamSubscription;

private final CopyOnWriteArrayList<MySubscription> subs = new CopyOnWriteArrayList<MySubscription>();
private final CopyOnWriteArrayList<Subscriber<? super Integer>> subscribers = new CopyOnWriteArrayList<Subscriber<? super Integer>>();
private final AtomicLong demand1 = new AtomicLong();
private final AtomicLong demand2 = new AtomicLong();
private final CountDownLatch awaitLatch = new CountDownLatch(2); // to know when both subscribers have signalled demand

@Override
public void subscribe(final Subscriber<? super Integer> s) {
int subscriberCount = subs.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work if subscribe is called from different threads concurrently.

switch (subscriberCount) {
case 0:
s.onSubscribe(createSubscription(awaitLatch, s, demand1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that the Subscriber will become visible to the onNext before onSubscribe is called and may run in arbitrary order in respect to each other or even concurrently with each other.

break;
case 1:
s.onSubscribe(createSubscription(awaitLatch, s, demand2));
break;
default:
throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount));
}
}

@Override
public void onSubscribe(Subscription s) {
this.upstreamSubscription = s;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Subscribers come first and issue a request, how will this Subscription be notified about it?

}

@Override
public void onNext(Integer elem) {
for (Subscriber<? super Integer> subscriber : subscribers) {
try {
subscriber.onNext(elem);
} catch (Throwable t) {
env.flop(t, String.format("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
}
}
}

@Override
public void onError(Throwable t) {
for (Subscriber<? super Integer> subscriber : subscribers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will retain all subscribers after the terminal event indefinitely.

try {
subscriber.onError(t);
} catch (Exception ex) {
env.flop(ex, String.format("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
}
}
}

@Override
public void onComplete() {
for (Subscriber<? super Integer> subscriber : subscribers) {
try {
subscriber.onComplete();
} catch (Exception ex) {
env.flop(ex, String.format("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
}
}
}

private Subscription createSubscription(CountDownLatch awaitLatch, final Subscriber<? super Integer> s, final AtomicLong demand) {
final MySubscription sub = new MySubscription(awaitLatch, s, demand);
subs.add(sub);
subscribers.add(s);
return sub;
}

final class MySubscription implements Subscription {
private final CountDownLatch awaitLatch;
private final Subscriber<? super Integer> s;
private final AtomicLong demand;

public MySubscription(CountDownLatch awaitTwoLatch, Subscriber<? super Integer> s, AtomicLong demand) {
this.awaitLatch = awaitTwoLatch;
this.s = s;
this.demand = demand;
}

@Override
public void request(final long n) {
ex.execute(new Runnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is ex guaranteed to execute these in-order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order in respect to what? As I've said a couple of times, one can't expect request to be in sync with any other calls and every non-trivial producer has to be conservative and thus thread- and reentrant-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine a multithreaded ExecutorService. 2 request is issued, 2 Runnables are enqueued, Pool Thread A pulls out the first Runnable but gets preempted. Pool Thread B pulls out the second Runnable and starts executing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly. And they will call upstreamSubscription.request(d) concurrently on line 228. This is not a problem.

The problem is that I don't see any request coordination. For example, if Subscriber S1 requests 5 and Subscriber requests 10, the upstream source will emit 15 elements to both S1 and S2, violating the contract.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd In any case, allowing concurrency within the Processor is of little utility and only serves to make the implementation harder to reason about. Had I more time then I'd implement an example Processor that could be reused for the TCK (as with the example publishers and subscribers). I have the utmost respect for your competency here so if you think you'd be able to contribute something like that, then I'd love to review it. Let me know if this is something you'd want to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to incorporate the TCK into RxJava 2.0 but I could only test a couple of our components. The problem with the TCK I see is that it expects a certain shape of Publishers and Processors whereas the spec, in my opinion, allows much more variety.

One indication to this that we have a just Publisher which only returns a single value. Testing it with the TCK passes half the tests and ignores the other because just will never emit an onError. The opposite, error Publisher will never emit a value but only an onError so I'm not certain the TCK test would skip the first half of it.

In addition, there is also the question if a Processor should do fine-grained request coordination or not. We have a controversial implementation in RxJava 2.0 where the processor requests Long.MAX_VALUE, tracks the request amount of its Subscribers and emits ˙onError to a Subscriber that can't keep up. This kind of Processor is untestable with the TCK as, even if the Subscriber it uses can keep up, expects the Processor to replay values and not just drop them if there are no Subscribers.

I'm also guessing that having RxJava 2.0 as the dependency to support the TCK test is out of question.

@Override
public void run() {
if (demand.get() >= 0) {
demand.addAndGet(n);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race-condition here between addAndGet and cancel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may work but can overflow demand into a negative value.

awaitLatch.countDown();
try {
awaitLatch.await(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS);
final long d = demand.getAndSet(0);
if (d > 0) upstreamSubscription.request(d);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, upstreamSubscriber may be null if the Processor's Subscriber end hasn't been subscribed to something.

} catch (InterruptedException e) {
env.flop(e, "Interrupted while awaiting for all downstreams to signal some demand.");
} catch (Throwable t) {
env.flop(t, "Subscription#request has thrown an exception, which is illegal!");
}
} // else cancel was called, do nothing
}
});
}

@Override
public void cancel() {
demand.set(-1); // marks subscription as cancelled
}

@Override
public String toString() {
return String.format("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)", s, demand);
}
}
};
}

@Override
public ExecutorService publisherExecutorService() {
return ex;
}

@Override
public Integer createElement(int element) {
return element;
}

@Override
public Publisher<Integer> createFailedPublisher() {
return SKIP;
}
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
}

// FAILING IMPLEMENTATIONS //

final Publisher<Integer> SKIP = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public SyncTriggeredDemandSubscriberTest() {
super(new TestEnvironment());
}

@SuppressWarnings("unchecked")
@Override public void triggerRequest(final Subscriber<? super Integer> subscriber) {
((SyncTriggeredDemandSubscriber<? super Integer>)subscriber).triggerDemand(1);
}
Expand Down