Skip to content

Commit 10c6a25

Browse files
committed
TestEnvironment: Fix timeout arithmetic
Motivation: The timeout calculation decrementing `totalTimeoutRemainingNs` is incorrect which results in premature timeouts and test failures. Signed-off-by: Scott Mitchell <[email protected]>
1 parent 944163a commit 10c6a25

File tree

2 files changed

+216
-1
lines changed

2 files changed

+216
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/***************************************************
2+
* Licensed under MIT No Attribution (SPDX: MIT-0) *
3+
***************************************************/
4+
package org.reactivestreams.example.unicast;
5+
6+
import org.reactivestreams.Publisher;
7+
import org.reactivestreams.Subscriber;
8+
import org.reactivestreams.Subscription;
9+
import org.reactivestreams.tck.PublisherVerification;
10+
import org.reactivestreams.tck.TestEnvironment;
11+
import org.testng.annotations.AfterClass;
12+
import org.testng.annotations.BeforeClass;
13+
import org.testng.annotations.Test;
14+
15+
import java.util.concurrent.BlockingQueue;
16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.LinkedBlockingQueue;
20+
21+
@Test // Must be here for TestNG to find and run this, do not remove
22+
public class AsyncRangePublisherTest extends PublisherVerification<Integer> {
23+
private static final int TERMINAL_DELAY_MS = 20;
24+
private static final int DEFAULT_TIMEOUT_MS = 5000;
25+
private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2;
26+
private ExecutorService e;
27+
@BeforeClass
28+
void before() { e = Executors.newCachedThreadPool(); }
29+
@AfterClass
30+
void after() { if (e != null) e.shutdown(); }
31+
32+
public AsyncRangePublisherTest() {
33+
super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS));
34+
}
35+
36+
@Override
37+
public Publisher<Integer> createPublisher(long elements) {
38+
return new AsyncPublisher<Integer>(new RangePublisher(1, (int)elements), e);
39+
}
40+
41+
@Override
42+
public Publisher<Integer> createFailedPublisher() {
43+
return null;
44+
}
45+
46+
private static final class AsyncPublisher<T> implements Publisher<T> {
47+
private final Publisher<T> original;
48+
private final Executor executor;
49+
50+
private AsyncPublisher(Publisher<T> original, Executor executor) {
51+
this.original = requireNonNull(original);
52+
this.executor = requireNonNull(executor);
53+
}
54+
55+
@Override
56+
public void subscribe(Subscriber<? super T> s) {
57+
AsyncSubscriber.wrapAndSubscribe(original, requireNonNull(s), executor);
58+
}
59+
60+
private static final class AsyncSubscriber<T> implements Subscriber<T> {
61+
private final BlockingQueue<Object> signalQueue = new LinkedBlockingQueue<Object>();
62+
63+
static <T> void wrapAndSubscribe(final Publisher<T> publisher,
64+
final Subscriber<? super T> targetSubscriber, final Executor executor) {
65+
final AsyncSubscriber<T> asyncSubscriber = new AsyncSubscriber<T>();
66+
try {
67+
executor.execute(new Runnable() {
68+
private Subscription subscription;
69+
private boolean terminated;
70+
@Override
71+
public void run() {
72+
try {
73+
for (; ; ) {
74+
final Object signal = asyncSubscriber.signalQueue.take();
75+
if (signal instanceof Cancelled) {
76+
return;
77+
} else if (signal instanceof TerminalSignal) {
78+
// sleep intentional to verify TestEnvironment.expectError behavior.
79+
Thread.sleep(TERMINAL_DELAY_MS);
80+
81+
TerminalSignal terminalSignal = (TerminalSignal) signal;
82+
terminated = true;
83+
if (terminalSignal.cause == null) {
84+
targetSubscriber.onComplete();
85+
} else {
86+
targetSubscriber.onError(terminalSignal.cause);
87+
}
88+
return;
89+
} else if (signal instanceof OnSubscribeSignal) {
90+
// We distribute the subscription downstream and may also call cancel on this
91+
// thread if an exception is thrown. Since there is no concurrency allowed, make
92+
// the subscription safe for concurrency.
93+
subscription = concurrentSafe(((OnSubscribeSignal) signal).subscription);
94+
targetSubscriber.onSubscribe(subscription);
95+
} else {
96+
@SuppressWarnings("unchecked") final T onNextSignal = ((OnNextSignal<T>) signal).onNext;
97+
targetSubscriber.onNext(onNextSignal);
98+
}
99+
}
100+
} catch (Throwable cause) {
101+
if (!terminated) {
102+
try {
103+
if (subscription == null) {
104+
targetSubscriber.onSubscribe(noopSubscription());
105+
} else {
106+
subscription.cancel();
107+
}
108+
} finally {
109+
terminated = true;
110+
targetSubscriber.onError(new IllegalStateException("run loop interrupted", cause));
111+
}
112+
}
113+
}
114+
}
115+
});
116+
} catch (Throwable cause) {
117+
try {
118+
targetSubscriber.onSubscribe(noopSubscription());
119+
} finally {
120+
targetSubscriber.onError(new IllegalStateException("Executor rejected", cause));
121+
}
122+
// Publisher rejected the target subscriber and terminated it, don't continue to subscribe to avoid
123+
// duplicate termination.
124+
return;
125+
}
126+
publisher.subscribe(asyncSubscriber);
127+
}
128+
129+
@Override
130+
public void onSubscribe(final Subscription s) {
131+
signalQueue.add(new OnSubscribeSignal(new Subscription() {
132+
@Override
133+
public void request(long n) {
134+
s.request(n);
135+
}
136+
137+
@Override
138+
public void cancel() {
139+
try {
140+
s.cancel();
141+
} finally {
142+
signalQueue.add(new Cancelled());
143+
}
144+
}
145+
}));
146+
}
147+
148+
@Override
149+
public void onNext(T t) {
150+
signalQueue.add(new OnNextSignal<T>(t));
151+
}
152+
153+
@Override
154+
public void onError(Throwable t) {
155+
signalQueue.add(new TerminalSignal(requireNonNull(t)));
156+
}
157+
158+
@Override
159+
public void onComplete() {
160+
signalQueue.add(new TerminalSignal(null));
161+
}
162+
}
163+
164+
private static Subscription concurrentSafe(final Subscription subscription) {
165+
// TODO: make concurrent safe. TCK interacts from the subscription concurrently so lock isn't sufficient.
166+
return subscription;
167+
}
168+
169+
private static Subscription noopSubscription() {
170+
return new Subscription() {
171+
@Override
172+
public void request(long n) {
173+
}
174+
175+
@Override
176+
public void cancel() {
177+
}
178+
};
179+
}
180+
181+
private static final class TerminalSignal {
182+
private final Throwable cause;
183+
184+
private TerminalSignal(Throwable cause) {
185+
this.cause = cause;
186+
}
187+
}
188+
189+
private static final class OnSubscribeSignal {
190+
private final Subscription subscription;
191+
192+
private OnSubscribeSignal(Subscription subscription) {
193+
this.subscription = subscription;
194+
}
195+
}
196+
197+
private static final class OnNextSignal<T> {
198+
private final T onNext;
199+
200+
private OnNextSignal(T onNext) {
201+
this.onNext = onNext;
202+
}
203+
}
204+
205+
private static final class Cancelled {
206+
}
207+
208+
private static <T> T requireNonNull(T o) {
209+
if (o == null) {
210+
throw new NullPointerException();
211+
}
212+
return o;
213+
}
214+
}
215+
}

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ final <E extends Throwable> E expectError(Class<E> clazz, final long totalTimeou
11121112

11131113
if (env.asyncErrors.isEmpty()) {
11141114
timeStampBNs = System.nanoTime();
1115-
totalTimeoutRemainingNs =- timeStampBNs - timeStampANs;
1115+
totalTimeoutRemainingNs -= (timeStampBNs - timeStampANs);
11161116
timeStampANs = timeStampBNs;
11171117

11181118
if (totalTimeoutRemainingNs <= 0) {

0 commit comments

Comments
 (0)