Skip to content

Commit d1e0462

Browse files
committed
TestEnvironment: Fix timeout arithmetic
Motivation: The timeout calculation decrementing `totalTimeoutRemainingNs` is incorrect which results in premature timeouts and test failures. Signed-off-by: Scott Mitchell <[email protected]>
1 parent 944163a commit d1e0462

File tree

2 files changed

+205
-1
lines changed

2 files changed

+205
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.tck.PublisherVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import java.util.concurrent.BlockingQueue;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.LinkedBlockingQueue;
17+
18+
@Test // Must be here for TestNG to find and run this, do not remove
19+
public class AsyncRangePublisherTest extends PublisherVerification<Integer> {
20+
private static final int TERMINAL_DELAY_MS = 20;
21+
private static final int DEFAULT_TIMEOUT_MS = 5000;
22+
private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2;
23+
private ExecutorService e;
24+
@BeforeClass
25+
void before() { e = Executors.newCachedThreadPool(); }
26+
@AfterClass
27+
void after() { if (e != null) e.shutdown(); }
28+
29+
public AsyncRangePublisherTest() {
30+
super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS));
31+
}
32+
33+
@Override
34+
public Publisher<Integer> createPublisher(long elements) {
35+
return new AsyncPublisher<Integer>(new RangePublisher(1, (int)elements), e);
36+
}
37+
38+
@Override
39+
public Publisher<Integer> createFailedPublisher() {
40+
return null;
41+
}
42+
43+
private static final class AsyncPublisher<T> implements Publisher<T> {
44+
private final Publisher<T> original;
45+
private final Executor executor;
46+
47+
private AsyncPublisher(Publisher<T> original, Executor executor) {
48+
this.original = requireNonNull(original);
49+
this.executor = requireNonNull(executor);
50+
}
51+
52+
@Override
53+
public void subscribe(Subscriber<? super T> s) {
54+
AsyncSubscriber.wrapAndSubscribe(original, requireNonNull(s), executor);
55+
}
56+
57+
private static final class AsyncSubscriber<T> implements Subscriber<T> {
58+
private final BlockingQueue<Object> signalQueue = new LinkedBlockingQueue<Object>();
59+
60+
static <T> void wrapAndSubscribe(final Publisher<T> publisher,
61+
final Subscriber<? super T> targetSubscriber, final Executor executor) {
62+
final AsyncSubscriber<T> asyncSubscriber = new AsyncSubscriber<T>();
63+
try {
64+
executor.execute(new Runnable() {
65+
private Subscription subscription;
66+
private boolean terminated;
67+
@Override
68+
public void run() {
69+
try {
70+
for (; ; ) {
71+
final Object signal = asyncSubscriber.signalQueue.take();
72+
if (signal instanceof Cancelled) {
73+
return;
74+
} else if (signal instanceof TerminalSignal) {
75+
// sleep intentional to verify TestEnvironment.expectError behavior.
76+
Thread.sleep(TERMINAL_DELAY_MS);
77+
78+
TerminalSignal terminalSignal = (TerminalSignal) signal;
79+
terminated = true;
80+
if (terminalSignal.cause == null) {
81+
targetSubscriber.onComplete();
82+
} else {
83+
targetSubscriber.onError(terminalSignal.cause);
84+
}
85+
return;
86+
} else if (signal instanceof OnSubscribeSignal) {
87+
subscription = ((OnSubscribeSignal) signal).subscription;
88+
targetSubscriber.onSubscribe(subscription);
89+
} else {
90+
@SuppressWarnings("unchecked") final T onNextSignal = ((OnNextSignal<T>) signal).onNext;
91+
targetSubscriber.onNext(onNextSignal);
92+
}
93+
}
94+
} catch (Throwable cause) {
95+
if (!terminated) {
96+
try {
97+
if (subscription == null) {
98+
targetSubscriber.onSubscribe(noopSubscription());
99+
} else {
100+
subscription.cancel();
101+
}
102+
} finally {
103+
terminated = true;
104+
targetSubscriber.onError(new IllegalStateException("run loop interrupted", cause));
105+
}
106+
}
107+
}
108+
}
109+
});
110+
} catch (Throwable cause) {
111+
try {
112+
targetSubscriber.onSubscribe(noopSubscription());
113+
} finally {
114+
targetSubscriber.onError(new IllegalStateException("Executor rejected", cause));
115+
}
116+
// Publisher rejected the target subscriber and terminated it, don't continue to subscribe to avoid
117+
// duplicate termination.
118+
return;
119+
}
120+
publisher.subscribe(asyncSubscriber);
121+
}
122+
123+
@Override
124+
public void onSubscribe(final Subscription s) {
125+
signalQueue.add(new OnSubscribeSignal(new Subscription() {
126+
@Override
127+
public void request(long n) {
128+
s.request(n);
129+
}
130+
131+
@Override
132+
public void cancel() {
133+
try {
134+
s.cancel();
135+
} finally {
136+
signalQueue.add(new Cancelled());
137+
}
138+
}
139+
}));
140+
}
141+
142+
@Override
143+
public void onNext(T t) {
144+
signalQueue.add(new OnNextSignal<T>(t));
145+
}
146+
147+
@Override
148+
public void onError(Throwable t) {
149+
signalQueue.add(new TerminalSignal(requireNonNull(t)));
150+
}
151+
152+
@Override
153+
public void onComplete() {
154+
signalQueue.add(new TerminalSignal(null));
155+
}
156+
}
157+
158+
private static Subscription noopSubscription() {
159+
return new Subscription() {
160+
@Override
161+
public void request(long n) {
162+
}
163+
164+
@Override
165+
public void cancel() {
166+
}
167+
};
168+
}
169+
170+
private static final class TerminalSignal {
171+
private final Throwable cause;
172+
173+
private TerminalSignal(Throwable cause) {
174+
this.cause = cause;
175+
}
176+
}
177+
178+
private static final class OnSubscribeSignal {
179+
private final Subscription subscription;
180+
181+
private OnSubscribeSignal(Subscription subscription) {
182+
this.subscription = subscription;
183+
}
184+
}
185+
186+
private static final class OnNextSignal<T> {
187+
private final T onNext;
188+
189+
private OnNextSignal(T onNext) {
190+
this.onNext = onNext;
191+
}
192+
}
193+
194+
private static final class Cancelled {
195+
}
196+
197+
private static <T> T requireNonNull(T o) {
198+
if (o == null) {
199+
throw new NullPointerException();
200+
}
201+
return o;
202+
}
203+
}
204+
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeou
11121112

11131113
if (env.asyncErrors.isEmpty()) {
11141114
timeStampBNs = System.nanoTime();
1115-
totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
1115+
totalTimeoutRemainingNs -= (timeStampBNs - timeStampANs);
11161116
timeStampANs = timeStampBNs;
11171117

11181118
if (totalTimeoutRemainingNs <= 0) {

0 commit comments

Comments
 (0)