Skip to content

Commit 352b397

Browse files
committed
rework tck and examples to follow 1.9 rule
1 parent fc173b2 commit 352b397

File tree

3 files changed

+42
-48
lines changed

3 files changed

+42
-48
lines changed

examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java

+35-35
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import java.util.Iterator;
1919
import java.util.Collections;
2020
import java.util.concurrent.Executor;
21-
import java.util.concurrent.atomic.AtomicBoolean;
2221
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicInteger;
2323

2424
/**
2525
* AsyncIterablePublisher is an implementation of Reactive Streams `Publisher`
@@ -59,7 +59,6 @@ public void subscribe(final Subscriber<? super T> s) {
5959
// These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls
6060
static interface Signal {};
6161
enum Cancel implements Signal { Instance; };
62-
enum Subscribe implements Signal { Instance; };
6362
enum Send implements Signal { Instance; };
6463
static final class Request implements Signal {
6564
final long n;
@@ -87,7 +86,7 @@ final class SubscriptionImpl implements Subscription, Runnable {
8786

8887
// We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself,
8988
// which would violate rule 1.3 among others (no concurrent notifications).
90-
private final AtomicBoolean on = new AtomicBoolean(false);
89+
private final AtomicInteger wip = new AtomicInteger(0);
9190

9291
// This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17
9392
private void doRequest(final long n) {
@@ -205,46 +204,47 @@ private void signal(final Signal signal) {
205204

206205
// This is the main "event loop" if you so will
207206
@Override public final void run() {
208-
if(on.get()) { // establishes a happens-before relationship with the end of the previous run
209-
try {
210-
final Signal s = inboundSignals.poll(); // We take a signal off the queue
211-
if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
207+
int remainingWork = 1;
208+
for (;;) {
212209

213-
// Below we simply unpack the `Signal`s and invoke the corresponding methods
214-
if (s instanceof Request)
215-
doRequest(((Request)s).n);
216-
else if (s == Send.Instance)
217-
doSend();
218-
else if (s == Cancel.Instance)
219-
doCancel();
220-
else if (s == Subscribe.Instance)
221-
doSubscribe();
210+
Signal s;
211+
while ((s = inboundSignals.poll()) != null) {
212+
if (cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
213+
return;
222214
}
223-
} finally {
224-
on.set(false); // establishes a happens-before relationship with the beginning of the next run
225-
if(!inboundSignals.isEmpty()) // If we still have signals to process
226-
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
215+
216+
// Below we simply unpack the `Signal`s and invoke the corresponding methods
217+
if (s instanceof Request)
218+
doRequest(((Request) s).n);
219+
else if (s == Send.Instance)
220+
doSend();
221+
else if (s == Cancel.Instance)
222+
doCancel();
223+
}
224+
225+
remainingWork = wip.addAndGet(-remainingWork); // establishes a happens-before relationship with the beginning of the next run
226+
if (remainingWork == 0) {
227+
return;
227228
}
228229
}
229230
}
230231

231232
// This method makes sure that this `Subscription` is only running on one Thread at a time,
232233
// this is important to make sure that we follow rule 1.3
233234
private final void tryScheduleToExecute() {
234-
if(on.compareAndSet(false, true)) {
235-
try {
236-
executor.execute(this);
237-
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
238-
if (!cancelled) {
239-
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
240-
try {
241-
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
242-
} finally {
243-
inboundSignals.clear(); // We're not going to need these anymore
244-
// This subscription is cancelled by now, but letting it become schedulable again means
245-
// that we can drain the inboundSignals queue if anything arrives after clearing
246-
on.set(false);
247-
}
235+
if (wip.getAndIncrement() != 0) { // ensure happens-before with already running work
236+
return;
237+
}
238+
239+
try {
240+
executor.execute(this);
241+
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
242+
if (!cancelled) {
243+
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
244+
try {
245+
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
246+
} finally {
247+
inboundSignals.clear(); // We're not going to need these anymore
248248
}
249249
}
250250
}
@@ -263,7 +263,7 @@ private final void tryScheduleToExecute() {
263263
// method is only intended to be invoked once, and immediately after the constructor has
264264
// finished.
265265
void init() {
266-
signal(Subscribe.Instance);
266+
doSubscribe();
267267
}
268268
};
269269
}

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,11 @@ public void onSubscribe(Subscription s) {
306306
concurrentAccessBarrier.enterSignal(signal);
307307

308308
subs = s;
309-
subs.request(1);
310309

311310
concurrentAccessBarrier.leaveSignal(signal);
311+
312+
//request after leave signal since request may be offloaded so it is going to be false positive racing
313+
subs.request(1);
312314
}
313315

314316
@Override
@@ -1099,7 +1101,7 @@ public void onNext(T element) {
10991101
}
11001102
}
11011103
};
1102-
env.subscribe(pub, sub, env.defaultTimeoutMillis());
1104+
env.subscribe(pub, sub);
11031105

11041106
// eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)`
11051107
// we're pretty sure to overflow from those

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -297,28 +297,20 @@ public <T> T flopAndFail(String msg) {
297297

298298

299299
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
300-
subscribe(pub, sub, defaultTimeoutMillis);
301-
}
302-
303-
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub, long timeoutMillis) throws InterruptedException {
304300
pub.subscribe(sub);
305-
sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub));
301+
sub.subscription.expectCompletion(0, String.format("Could not subscribe %s to Publisher %s", sub, pub));
306302
verifyNoAsyncErrorsNoDelay();
307303
}
308304

309305
public <T> ManualSubscriber<T> newBlackholeSubscriber(Publisher<T> pub) throws InterruptedException {
310306
ManualSubscriberWithSubscriptionSupport<T> sub = new BlackholeSubscriberWithSubscriptionSupport<T>(this);
311-
subscribe(pub, sub, defaultTimeoutMillis());
307+
subscribe(pub, sub);
312308
return sub;
313309
}
314310

315311
public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub) throws InterruptedException {
316-
return newManualSubscriber(pub, defaultTimeoutMillis());
317-
}
318-
319-
public <T> ManualSubscriber<T> newManualSubscriber(Publisher<T> pub, long timeoutMillis) throws InterruptedException {
320312
ManualSubscriberWithSubscriptionSupport<T> sub = new ManualSubscriberWithSubscriptionSupport<T>(this);
321-
subscribe(pub, sub, timeoutMillis);
313+
subscribe(pub, sub);
322314
return sub;
323315
}
324316

0 commit comments

Comments
 (0)