Skip to content

Commit b7c8c83

Browse files
committed
TestEnvironment: Fix timeout arithmetic
Motivation: The timeout calculation decrementing `totalTimeoutRemainingNs` is incorrect which results in premature timeouts and test failures. Signed-off-by: Scott Mitchell <[email protected]>
1 parent 944163a commit b7c8c83

File tree

2 files changed

+182
-1
lines changed

2 files changed

+182
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.tck.PublisherVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import java.util.concurrent.BlockingQueue;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.LinkedBlockingQueue;
17+
18+
@Test // Must be here for TestNG to find and run this, do not remove
19+
public class AsyncRangePublisherTest extends PublisherVerification<Integer> {
20+
private static final int TERMINAL_DELAY_MS = 20;
21+
private static final int DEFAULT_TIMEOUT_MS = 5000;
22+
private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2;
23+
private ExecutorService e;
24+
@BeforeClass
25+
void before() { e = Executors.newCachedThreadPool(); }
26+
@AfterClass
27+
void after() { if (e != null) e.shutdown(); }
28+
29+
public AsyncRangePublisherTest() {
30+
super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS));
31+
}
32+
33+
@Override
34+
public Publisher<Integer> createPublisher(long elements) {
35+
return new AsyncPublisher<Integer>(new RangePublisher(1, (int)elements), e);
36+
}
37+
38+
@Override
39+
public Publisher<Integer> createFailedPublisher() {
40+
return null;
41+
}
42+
43+
private static final class AsyncPublisher<T> implements Publisher<T> {
44+
private final Publisher<T> original;
45+
private final Executor executor;
46+
47+
private AsyncPublisher(Publisher<T> original, Executor executor) {
48+
this.original = requireNonNull(original);
49+
this.executor = requireNonNull(executor);
50+
}
51+
52+
@Override
53+
public void subscribe(Subscriber<? super T> s) {
54+
AsyncSubscriber.wrapAndSubscribe(original, requireNonNull(s), executor);
55+
}
56+
57+
private static final class AsyncSubscriber<T> implements Subscriber<T> {
58+
private final BlockingQueue<Object> signalQueue = new LinkedBlockingQueue<Object>();
59+
60+
static <T> void wrapAndSubscribe(final Publisher<T> publisher,
61+
final Subscriber<? super T> targetSubscriber, final Executor executor) {
62+
final AsyncSubscriber<T> asyncSubscriber = new AsyncSubscriber<T>();
63+
try {
64+
executor.execute(new Runnable() {
65+
@Override
66+
public void run() {
67+
for (; ; ) {
68+
try {
69+
final Object signal = asyncSubscriber.signalQueue.take();
70+
if (signal instanceof Cancelled) {
71+
return;
72+
} else if (signal instanceof TerminalSignal) {
73+
Thread.sleep(TERMINAL_DELAY_MS);
74+
TerminalSignal terminalSignal = (TerminalSignal) signal;
75+
if (terminalSignal.cause == null) {
76+
targetSubscriber.onComplete();
77+
} else {
78+
targetSubscriber.onError(terminalSignal.cause);
79+
}
80+
return;
81+
} else if (signal instanceof OnSubscribeSignal) {
82+
targetSubscriber.onSubscribe(((OnSubscribeSignal) signal).subscription);
83+
} else {
84+
@SuppressWarnings("unchecked")
85+
final T onNextSignal = ((OnNextSignal<T>) signal).onNext;
86+
targetSubscriber.onNext(onNextSignal);
87+
}
88+
} catch (InterruptedException ex) {
89+
throw new RuntimeException(ex);
90+
}
91+
}
92+
}
93+
});
94+
} catch (Throwable cause) {
95+
targetSubscriber.onSubscribe(new Subscription() {
96+
@Override
97+
public void request(long n) {
98+
}
99+
100+
@Override
101+
public void cancel() {
102+
}
103+
});
104+
targetSubscriber.onError(new IllegalStateException("Executor rejected", cause));
105+
// Publisher rejected the target subscriber and terminated it, don't continue to subscribe to avoid
106+
// duplicate termination.
107+
return;
108+
}
109+
publisher.subscribe(asyncSubscriber);
110+
}
111+
112+
@Override
113+
public void onSubscribe(final Subscription s) {
114+
signalQueue.add(new OnSubscribeSignal(new Subscription() {
115+
@Override
116+
public void request(long n) {
117+
s.request(n);
118+
}
119+
120+
@Override
121+
public void cancel() {
122+
try {
123+
s.cancel();
124+
} finally {
125+
signalQueue.add(new Cancelled());
126+
}
127+
}
128+
}));
129+
}
130+
131+
@Override
132+
public void onNext(T t) {
133+
signalQueue.add(new OnNextSignal<T>(t));
134+
}
135+
136+
@Override
137+
public void onError(Throwable t) {
138+
signalQueue.add(new TerminalSignal(requireNonNull(t)));
139+
}
140+
141+
@Override
142+
public void onComplete() {
143+
signalQueue.add(new TerminalSignal(null));
144+
}
145+
}
146+
147+
private static final class TerminalSignal {
148+
private final Throwable cause;
149+
150+
private TerminalSignal(Throwable cause) {
151+
this.cause = cause;
152+
}
153+
}
154+
155+
private static final class OnSubscribeSignal {
156+
private final Subscription subscription;
157+
158+
private OnSubscribeSignal(Subscription subscription) {
159+
this.subscription = subscription;
160+
}
161+
}
162+
163+
private static final class OnNextSignal<T> {
164+
private final T onNext;
165+
166+
private OnNextSignal(T onNext) {
167+
this.onNext = onNext;
168+
}
169+
}
170+
171+
private static final class Cancelled {
172+
}
173+
174+
private static <T> T requireNonNull(T o) {
175+
if (o == null) {
176+
throw new NullPointerException();
177+
}
178+
return o;
179+
}
180+
}
181+
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeou
11121112

11131113
if (env.asyncErrors.isEmpty()) {
11141114
timeStampBNs = System.nanoTime();
1115-
totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
1115+
totalTimeoutRemainingNs -= (timeStampBNs - timeStampANs);
11161116
timeStampANs = timeStampBNs;
11171117

11181118
if (totalTimeoutRemainingNs <= 0) {

0 commit comments

Comments
 (0)