Skip to content

Commit f606fa1

Browse files
committed
TestEnvironment: Fix timeout arithmetic
Motivation: The timeout calculation decrementing `totalTimeoutRemainingNs` is incorrect which results in premature timeouts and test failures. Signed-off-by: Scott Mitchell <[email protected]>
1 parent 944163a commit f606fa1

File tree

2 files changed

+213
-1
lines changed

2 files changed

+213
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package org.reactivestreams.example.unicast;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscriber;
5+
import org.reactivestreams.Subscription;
6+
import org.reactivestreams.tck.PublisherVerification;
7+
import org.reactivestreams.tck.TestEnvironment;
8+
import org.testng.annotations.AfterClass;
9+
import org.testng.annotations.BeforeClass;
10+
import org.testng.annotations.Test;
11+
12+
import java.util.concurrent.BlockingQueue;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.LinkedBlockingQueue;
17+
18+
@Test // Must be here for TestNG to find and run this, do not remove
19+
public class AsyncRangePublisherTest extends PublisherVerification<Integer> {
20+
private static final int TERMINAL_DELAY_MS = 20;
21+
private static final int DEFAULT_TIMEOUT_MS = 5000;
22+
private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2;
23+
private ExecutorService e;
24+
@BeforeClass
25+
void before() { e = Executors.newCachedThreadPool(); }
26+
@AfterClass
27+
void after() { if (e != null) e.shutdown(); }
28+
29+
public AsyncRangePublisherTest() {
30+
super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS));
31+
}
32+
33+
@Override
34+
public Publisher<Integer> createPublisher(long elements) {
35+
return new AsyncPublisher<Integer>(new RangePublisher(1, (int)elements), e);
36+
}
37+
38+
@Override
39+
public Publisher<Integer> createFailedPublisher() {
40+
return null;
41+
}
42+
43+
private static final class AsyncPublisher<T> implements Publisher<T> {
44+
private final Publisher<T> original;
45+
private final Executor executor;
46+
47+
private AsyncPublisher(Publisher<T> original, Executor executor) {
48+
this.original = requireNonNull(original);
49+
this.executor = requireNonNull(executor);
50+
}
51+
52+
@Override
53+
public void subscribe(Subscriber<? super T> s) {
54+
AsyncSubscriber.wrapAndSubscribe(original, requireNonNull(s), executor);
55+
}
56+
57+
private static final class AsyncSubscriber<T> implements Subscriber<T> {
58+
private final BlockingQueue<Object> signalQueue = new LinkedBlockingQueue<Object>();
59+
60+
static <T> void wrapAndSubscribe(final Publisher<T> publisher,
61+
final Subscriber<? super T> targetSubscriber, final Executor executor) {
62+
final AsyncSubscriber<T> asyncSubscriber = new AsyncSubscriber<T>();
63+
try {
64+
executor.execute(new Runnable() {
65+
private Subscription subscription;
66+
private boolean terminated;
67+
@Override
68+
public void run() {
69+
try {
70+
for (; ; ) {
71+
final Object signal = asyncSubscriber.signalQueue.take();
72+
if (signal instanceof Cancelled) {
73+
return;
74+
} else if (signal instanceof TerminalSignal) {
75+
// sleep intentional to verify TestEnvironment.expectError behavior.
76+
Thread.sleep(TERMINAL_DELAY_MS);
77+
78+
TerminalSignal terminalSignal = (TerminalSignal) signal;
79+
terminated = true;
80+
if (terminalSignal.cause == null) {
81+
targetSubscriber.onComplete();
82+
} else {
83+
targetSubscriber.onError(terminalSignal.cause);
84+
}
85+
return;
86+
} else if (signal instanceof OnSubscribeSignal) {
87+
// We distribute the subscription downstream and may also call cancel on this
88+
// thread if an exception is thrown. Since there is no concurrency allowed, make
89+
// the subscription safe for concurrency.
90+
subscription = concurrentSafe(((OnSubscribeSignal) signal).subscription);
91+
targetSubscriber.onSubscribe(subscription);
92+
} else {
93+
@SuppressWarnings("unchecked") final T onNextSignal = ((OnNextSignal<T>) signal).onNext;
94+
targetSubscriber.onNext(onNextSignal);
95+
}
96+
}
97+
} catch (Throwable cause) {
98+
if (!terminated) {
99+
try {
100+
if (subscription == null) {
101+
targetSubscriber.onSubscribe(noopSubscription());
102+
} else {
103+
subscription.cancel();
104+
}
105+
} finally {
106+
terminated = true;
107+
targetSubscriber.onError(new IllegalStateException("run loop interrupted", cause));
108+
}
109+
}
110+
}
111+
}
112+
});
113+
} catch (Throwable cause) {
114+
try {
115+
targetSubscriber.onSubscribe(noopSubscription());
116+
} finally {
117+
targetSubscriber.onError(new IllegalStateException("Executor rejected", cause));
118+
}
119+
// Publisher rejected the target subscriber and terminated it, don't continue to subscribe to avoid
120+
// duplicate termination.
121+
return;
122+
}
123+
publisher.subscribe(asyncSubscriber);
124+
}
125+
126+
@Override
127+
public void onSubscribe(final Subscription s) {
128+
signalQueue.add(new OnSubscribeSignal(new Subscription() {
129+
@Override
130+
public void request(long n) {
131+
s.request(n);
132+
}
133+
134+
@Override
135+
public void cancel() {
136+
try {
137+
s.cancel();
138+
} finally {
139+
signalQueue.add(new Cancelled());
140+
}
141+
}
142+
}));
143+
}
144+
145+
@Override
146+
public void onNext(T t) {
147+
signalQueue.add(new OnNextSignal<T>(t));
148+
}
149+
150+
@Override
151+
public void onError(Throwable t) {
152+
signalQueue.add(new TerminalSignal(requireNonNull(t)));
153+
}
154+
155+
@Override
156+
public void onComplete() {
157+
signalQueue.add(new TerminalSignal(null));
158+
}
159+
}
160+
161+
private static Subscription concurrentSafe(final Subscription subscription) {
162+
// TODO: make concurrent safe. TCK interacts from the subscription concurrently so lock isn't sufficient.
163+
return subscription;
164+
}
165+
166+
private static Subscription noopSubscription() {
167+
return new Subscription() {
168+
@Override
169+
public void request(long n) {
170+
}
171+
172+
@Override
173+
public void cancel() {
174+
}
175+
};
176+
}
177+
178+
private static final class TerminalSignal {
179+
private final Throwable cause;
180+
181+
private TerminalSignal(Throwable cause) {
182+
this.cause = cause;
183+
}
184+
}
185+
186+
private static final class OnSubscribeSignal {
187+
private final Subscription subscription;
188+
189+
private OnSubscribeSignal(Subscription subscription) {
190+
this.subscription = subscription;
191+
}
192+
}
193+
194+
private static final class OnNextSignal<T> {
195+
private final T onNext;
196+
197+
private OnNextSignal(T onNext) {
198+
this.onNext = onNext;
199+
}
200+
}
201+
202+
private static final class Cancelled {
203+
}
204+
205+
private static <T> T requireNonNull(T o) {
206+
if (o == null) {
207+
throw new NullPointerException();
208+
}
209+
return o;
210+
}
211+
}
212+
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeou
11121112

11131113
if (env.asyncErrors.isEmpty()) {
11141114
timeStampBNs = System.nanoTime();
1115-
totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
1115+
totalTimeoutRemainingNs -= (timeStampBNs - timeStampANs);
11161116
timeStampANs = timeStampBNs;
11171117

11181118
if (totalTimeoutRemainingNs <= 0) {

0 commit comments

Comments
 (0)