Skip to content

Commit 89a4dc3

Browse files
committed
+tck #284 support "demand when all downstreams demand" Processor in TCK
1 parent 4264e1d commit 89a4dc3

File tree

3 files changed

+182
-25
lines changed

3 files changed

+182
-25
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.reactivestreams.tck.support.Function;
1212
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
1313
import org.reactivestreams.tck.support.PublisherVerificationRules;
14+
import org.reactivestreams.tck.support.TestException;
1415
import org.testng.annotations.BeforeMethod;
1516
import org.testng.annotations.Test;
1617

@@ -387,23 +388,29 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
387388
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
388389
@Override
389390
public TestSetup apply(Long aLong) throws Throwable {
390-
return new TestSetup(env, processorBufferSize) {{
391+
return new TestSetup(env, processorBufferSize, false) {{
391392
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
392-
env.subscribe(processor, sub1);
393-
394393
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
394+
395+
// connect upstream
396+
env.subscribe(this, processor);
397+
// connect downstreams
398+
env.subscribe(processor, sub1);
399+
sub1.request(2);
395400
env.subscribe(processor, sub2);
401+
sub2.request(1);
396402

397-
sub1.request(1);
403+
// request bubbles up to upstream publisher:
398404
expectRequest();
399405
final T x = sendNextTFromUpstream();
400406
expectNextElement(sub1, x);
401407
sub1.request(1);
402408

403409
// sub1 has received one element, and has one demand pending
404-
// sub2 has not yet requested anything
410+
// sub2 has received one element, and no more pending demand
405411

406-
final Exception ex = new RuntimeException("Test exception");
412+
// if upstream fails, both should get the error signal
413+
final Exception ex = new TestException();
407414
sendError(ex);
408415
sub1.expectError(ex);
409416
sub2.expectError(ex);
@@ -472,11 +479,11 @@ public void onError(Throwable cause) {
472479
// must immediately pass on `onError` events received from its upstream to its downstream
473480
@Test
474481
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
475-
new TestSetup(env, processorBufferSize) {{
482+
new TestSetup(env, processorBufferSize, true) {{
476483
final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
477484
env.subscribe(processor, sub);
478485

479-
final Exception ex = new RuntimeException("Test exception");
486+
final Exception ex = new TestException();
480487
sendError(ex);
481488
sub.expectError(ex); // "immediately", i.e. without a preceding request
482489

@@ -629,13 +636,13 @@ public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLong
629636
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
630637
@Override
631638
public TestSetup apply(Long subscribers) throws Throwable {
632-
return new TestSetup(env, processorBufferSize) {{
633-
ManualSubscriber<T> sub1 = newSubscriber();
639+
return new TestSetup(env, processorBufferSize, false) {{
640+
final ManualSubscriber<T> sub1 = newSubscriber();
634641
sub1.request(20);
635642

636643
long totalRequests = expectRequest();
637644
final T x = sendNextTFromUpstream();
638-
expectNextElement(sub1, x);
645+
expectNextElement(sub1, x); // correct, this is not valid in case of "wait for slowest"
639646

640647
if (totalRequests == 1) {
641648
totalRequests += expectRequest();
@@ -700,11 +707,11 @@ public abstract class TestSetup extends ManualPublisher<T> {
700707

701708
final Processor<T, T> processor;
702709

703-
public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
710+
public TestSetup(TestEnvironment env, int testBufferSize, boolean subscribeProcessorToEnvPublisher) throws InterruptedException {
704711
super(env);
705712
tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
706713
processor = createIdentityProcessor(testBufferSize);
707-
subscribe(processor);
714+
if (subscribeProcessorToEnvPublisher) subscribe(processor);
708715
}
709716

710717
public ManualSubscriber<T> newSubscriber() throws InterruptedException {

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public <T> T flopAndFail(String msg) {
183183
}
184184

185185

186+
public <T> void subscribe(Publisher<T> pub, Subscriber<T> sub) throws InterruptedException {
187+
pub.subscribe(sub);
188+
verifyNoAsyncErrorsNoDelay();
189+
}
186190

187191
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
188192
subscribe(pub, sub, defaultTimeoutMillis);

tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java

+158-12
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import org.testng.annotations.BeforeClass;
1010
import org.testng.annotations.Test;
1111

12-
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.Executors;
12+
import java.util.concurrent.*;
13+
import java.util.concurrent.atomic.AtomicLong;
1414

1515
/**
1616
* Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors.
@@ -27,25 +27,24 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2727
@Test
2828
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
2929
requireTestSkip(new ThrowingRunnable() {
30-
@Override public void run() throws Throwable {
31-
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32-
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
30+
@Override
31+
public void run() throws Throwable {
32+
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) {
33+
@Override
34+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
3335
return new NoopProcessor();
3436
}
3537

3638
@Override public ExecutorService publisherExecutorService() { return ex; }
3739

3840
@Override public Integer createElement(int element) { return element; }
3941

40-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
41-
return SKIP;
42-
}
42+
@Override public Publisher<Integer> createHelperPublisher(long elements) { return SKIP; }
4343

44-
@Override public Publisher<Integer> createFailedPublisher() {
45-
return SKIP;
46-
}
44+
@Override public Publisher<Integer> createFailedPublisher() { return SKIP; }
4745

48-
@Override public long maxSupportedSubscribers() {
46+
@Override
47+
public long maxSupportedSubscribers() {
4948
return 1; // can only support 1 subscribe => unable to run this test
5049
}
5150
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
@@ -115,6 +114,153 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
115114
}, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS);
116115
}
117116

117+
@Test
118+
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand() throws Throwable {
119+
final TestEnvironment env = newTestEnvironment();
120+
new IdentityProcessorVerification<Integer>(env, DEFAULT_TIMEOUT_MILLIS) {
121+
@Override
122+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) { // knowingly ignoring buffer size, acting as-if 0
123+
return new Processor<Integer, Integer>() {
124+
125+
private volatile Subscription upstreamSubscription;
126+
127+
private final CopyOnWriteArrayList<MySubscription> subs = new CopyOnWriteArrayList<MySubscription>();
128+
private final CopyOnWriteArrayList<Subscriber<? super Integer>> subscribers = new CopyOnWriteArrayList<Subscriber<? super Integer>>();
129+
private final AtomicLong demand1 = new AtomicLong();
130+
private final AtomicLong demand2 = new AtomicLong();
131+
private final CountDownLatch awaitLatch = new CountDownLatch(2); // to know when both subscribers have signalled demand
132+
133+
@Override
134+
public void subscribe(final Subscriber<? super Integer> s) {
135+
int subscriberCount = subs.size();
136+
if (subscriberCount == 0) s.onSubscribe(createSubscription(awaitLatch, s, demand1));
137+
else if (subscriberCount == 1) s.onSubscribe(createSubscription(awaitLatch, s, demand2));
138+
else throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount));
139+
}
140+
141+
@Override
142+
public void onSubscribe(Subscription s) {
143+
this.upstreamSubscription = s;
144+
}
145+
146+
@Override
147+
public void onNext(Integer elem) {
148+
for (Subscriber<? super Integer> subscriber : subscribers) {
149+
try {
150+
subscriber.onNext(elem);
151+
} catch (Exception ex) {
152+
env.flop(ex, String.format("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
153+
}
154+
}
155+
}
156+
157+
@Override
158+
public void onError(Throwable t) {
159+
for (Subscriber<? super Integer> subscriber : subscribers) {
160+
try {
161+
subscriber.onError(t);
162+
} catch (Exception ex) {
163+
env.flop(ex, String.format("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
164+
}
165+
}
166+
}
167+
168+
@Override
169+
public void onComplete() {
170+
for (Subscriber<? super Integer> subscriber : subscribers) {
171+
try {
172+
subscriber.onComplete();
173+
} catch (Exception ex) {
174+
env.flop(ex, String.format("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
175+
}
176+
}
177+
}
178+
179+
private Subscription createSubscription(CountDownLatch awaitLatch, final Subscriber<? super Integer> s, final AtomicLong demand) {
180+
final MySubscription sub = new MySubscription(awaitLatch, s, demand);
181+
subs.add(sub);
182+
subscribers.add(s);
183+
return sub;
184+
}
185+
186+
final class MySubscription implements Subscription {
187+
private final CountDownLatch awaitLatch;
188+
private final Subscriber<? super Integer> s;
189+
private final AtomicLong demand;
190+
191+
public MySubscription(CountDownLatch awaitTwoLatch, Subscriber<? super Integer> s, AtomicLong demand) {
192+
this.awaitLatch = awaitTwoLatch;
193+
this.s = s;
194+
this.demand = demand;
195+
}
196+
197+
@Override
198+
public void request(final long n) {
199+
new Thread(new Runnable() {
200+
@Override
201+
public void run() {
202+
demand.addAndGet(n); // naive, but good enough for the test
203+
awaitLatch.countDown();
204+
try {
205+
awaitLatch.await(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS);
206+
while (demand.getAndDecrement() > 0) {
207+
upstreamSubscription.request(1);
208+
}
209+
} catch (InterruptedException e) {
210+
env.flop(e, "Interrupted while awaiting for all downstreams to signal some demand.");
211+
}
212+
213+
}
214+
}).start();
215+
}
216+
217+
@Override
218+
public void cancel() {
219+
demand.set(Long.MIN_VALUE); // naive but OK for this test
220+
}
221+
222+
@Override
223+
public String toString() {
224+
return String.format("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)", s, demand);
225+
}
226+
}
227+
};
228+
}
229+
230+
@Override
231+
public ExecutorService publisherExecutorService() {
232+
return ex;
233+
}
234+
235+
@Override
236+
public Integer createElement(int element) {
237+
return element;
238+
}
239+
240+
@Override
241+
public Publisher<Integer> createHelperPublisher(long elements) {
242+
return new Publisher<Integer>() {
243+
@Override
244+
public void subscribe(final Subscriber<? super Integer> s) {
245+
s.onSubscribe(new NoopSubscription() {
246+
@Override
247+
public void request(long n) {
248+
for (int i = 0; i < 10; i++) {
249+
s.onNext(i);
250+
}
251+
}
252+
});
253+
}
254+
};
255+
}
256+
257+
@Override
258+
public Publisher<Integer> createFailedPublisher() {
259+
return SKIP;
260+
}
261+
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
262+
}
263+
118264
// FAILING IMPLEMENTATIONS //
119265

120266
final Publisher<Integer> SKIP = null;

0 commit comments

Comments
 (0)