Skip to content

Commit 3f8a79b

Browse files
authored
Merge pull request #346 from akarnokd/TCKShouldCancel
Fix missing cancel() in tests that don't consume the entire source
2 parents 241dbda + 0c94670 commit 3f8a79b

File tree

2 files changed

+231
-35
lines changed

2 files changed

+231
-35
lines changed

tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java

+55-35
Original file line numberDiff line numberDiff line change
@@ -211,17 +211,20 @@ public void required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfPr
211211
public void run(Publisher<T> pub) throws InterruptedException {
212212

213213
ManualSubscriber<T> sub = env.newManualSubscriber(pub);
214-
215-
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
216-
sub.request(1);
217-
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
218-
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
219-
220-
sub.request(1);
221-
sub.request(2);
222-
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
223-
224-
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
214+
try {
215+
sub.expectNone(String.format("Publisher %s produced value before the first `request`: ", pub));
216+
sub.request(1);
217+
sub.nextElement(String.format("Publisher %s produced no element after first `request`", pub));
218+
sub.expectNone(String.format("Publisher %s produced unrequested: ", pub));
219+
220+
sub.request(1);
221+
sub.request(2);
222+
sub.nextElements(3, env.defaultTimeoutMillis(), String.format("Publisher %s produced less than 3 elements after two respective `request` calls", pub));
223+
224+
sub.expectNone(String.format("Publisher %sproduced unrequested ", pub));
225+
} finally {
226+
sub.cancel();
227+
}
225228
}
226229
});
227230
}
@@ -473,30 +476,39 @@ public void required_spec109_mustIssueOnSubscribeForNonNullSubscriber() throws T
473476
@Override
474477
public void run(Publisher<T> pub) throws Throwable {
475478
final Latch onSubscribeLatch = new Latch(env);
476-
pub.subscribe(new Subscriber<T>() {
477-
@Override
478-
public void onError(Throwable cause) {
479-
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
480-
}
481-
482-
@Override
483-
public void onSubscribe(Subscription subs) {
484-
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
485-
onSubscribeLatch.close();
486-
}
487-
488-
@Override
489-
public void onNext(T elem) {
490-
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
491-
}
492-
493-
@Override
494-
public void onComplete() {
495-
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
479+
final AtomicReference<Subscription> cancel = new AtomicReference<Subscription>();
480+
try {
481+
pub.subscribe(new Subscriber<T>() {
482+
@Override
483+
public void onError(Throwable cause) {
484+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onError always");
485+
}
486+
487+
@Override
488+
public void onSubscribe(Subscription subs) {
489+
cancel.set(subs);
490+
onSubscribeLatch.assertOpen("Only one onSubscribe call expected");
491+
onSubscribeLatch.close();
492+
}
493+
494+
@Override
495+
public void onNext(T elem) {
496+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onNext always");
497+
}
498+
499+
@Override
500+
public void onComplete() {
501+
onSubscribeLatch.assertClosed("onSubscribe should be called prior to onComplete always");
502+
}
503+
});
504+
onSubscribeLatch.expectClose("Should have received onSubscribe");
505+
env.verifyNoAsyncErrorsNoDelay();
506+
} finally {
507+
Subscription s = cancel.getAndSet(null);
508+
if (s != null) {
509+
s.cancel();
496510
}
497-
});
498-
onSubscribeLatch.expectClose("Should have received onSubscribe");
499-
env.verifyNoAsyncErrorsNoDelay();
511+
}
500512
}
501513
});
502514
}
@@ -545,7 +557,15 @@ public void run(Publisher<T> pub) throws Throwable {
545557
ManualSubscriber<T> sub1 = env.newManualSubscriber(pub);
546558
ManualSubscriber<T> sub2 = env.newManualSubscriber(pub);
547559

548-
env.verifyNoAsyncErrors();
560+
try {
561+
env.verifyNoAsyncErrors();
562+
} finally {
563+
try {
564+
sub1.cancel();
565+
} finally {
566+
sub2.cancel();
567+
}
568+
}
549569
}
550570
});
551571
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/************************************************************************
2+
* Licensed under Public Domain (CC0) *
3+
* *
4+
* To the extent possible under law, the person who associated CC0 with *
5+
* this code has waived all copyright and related or neighboring *
6+
* rights to this code. *
7+
* *
8+
* You should have received a copy of the CC0 legalcode along with this *
9+
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
10+
************************************************************************/
11+
12+
package org.reactivestreams.tck;
13+
14+
import java.util.Map;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.*;
19+
import org.testng.annotations.*;
20+
21+
@Test
22+
public class RangePublisherTest extends PublisherVerification<Integer> {
23+
24+
static final Map<Integer, StackTraceElement[]> stacks = new ConcurrentHashMap<Integer, StackTraceElement[]>();
25+
26+
static final Map<Integer, Boolean> states = new ConcurrentHashMap<Integer, Boolean>();
27+
28+
static final AtomicInteger id = new AtomicInteger();
29+
30+
@AfterClass
31+
public static void afterClass() {
32+
boolean fail = false;
33+
StringBuilder b = new StringBuilder();
34+
for (Map.Entry<Integer, Boolean> t : states.entrySet()) {
35+
if (!t.getValue()) {
36+
b.append("\r\n-------------------------------");
37+
for (Object o : stacks.get(t.getKey())) {
38+
b.append("\r\nat ").append(o);
39+
}
40+
fail = true;
41+
}
42+
}
43+
if (fail) {
44+
throw new AssertionError("Cancellations were missing:" + b);
45+
}
46+
}
47+
48+
public RangePublisherTest() {
49+
super(new TestEnvironment());
50+
}
51+
52+
@Override
53+
public Publisher<Integer> createPublisher(long elements) {
54+
return new RangePublisher(1, elements);
55+
}
56+
57+
@Override
58+
public Publisher<Integer> createFailedPublisher() {
59+
return null;
60+
}
61+
62+
static final class RangePublisher
63+
implements Publisher<Integer> {
64+
65+
final StackTraceElement[] stacktrace;
66+
67+
final long start;
68+
69+
final long count;
70+
71+
RangePublisher(long start, long count) {
72+
this.stacktrace = Thread.currentThread().getStackTrace();
73+
this.start = start;
74+
this.count = count;
75+
}
76+
77+
@Override
78+
public void subscribe(Subscriber<? super Integer> s) {
79+
if (s == null) {
80+
throw new NullPointerException();
81+
}
82+
83+
int ids = id.incrementAndGet();
84+
85+
RangeSubscription parent = new RangeSubscription(s, ids, start, start + count);
86+
stacks.put(ids, stacktrace);
87+
states.put(ids, false);
88+
s.onSubscribe(parent);
89+
}
90+
91+
static final class RangeSubscription extends AtomicLong implements Subscription {
92+
93+
private static final long serialVersionUID = 9066221863682220604L;
94+
95+
final Subscriber<? super Integer> actual;
96+
97+
final int ids;
98+
99+
final long end;
100+
101+
long index;
102+
103+
volatile boolean cancelled;
104+
105+
RangeSubscription(Subscriber<? super Integer> actual, int ids, long start, long end) {
106+
this.actual = actual;
107+
this.ids = ids;
108+
this.index = start;
109+
this.end = end;
110+
}
111+
112+
@Override
113+
public void request(long n) {
114+
if (!cancelled) {
115+
if (n <= 0L) {
116+
cancelled = true;
117+
states.put(ids, true);
118+
actual.onError(new IllegalArgumentException("§3.9 violated"));
119+
return;
120+
}
121+
122+
for (;;) {
123+
long r = get();
124+
long u = r + n;
125+
if (u < 0L) {
126+
u = Long.MAX_VALUE;
127+
}
128+
if (compareAndSet(r, u)) {
129+
if (r == 0) {
130+
break;
131+
}
132+
return;
133+
}
134+
}
135+
136+
long idx = index;
137+
long f = end;
138+
139+
for (;;) {
140+
long e = 0;
141+
while (e != n && idx != f) {
142+
if (cancelled) {
143+
return;
144+
}
145+
146+
actual.onNext((int)idx);
147+
148+
idx++;
149+
e++;
150+
}
151+
152+
if (idx == f) {
153+
if (!cancelled) {
154+
states.put(ids, true);
155+
actual.onComplete();
156+
}
157+
return;
158+
}
159+
160+
index = idx;
161+
n = addAndGet(-n);
162+
if (n == 0) {
163+
break;
164+
}
165+
}
166+
}
167+
}
168+
169+
@Override
170+
public void cancel() {
171+
cancelled = true;
172+
states.put(ids, true);
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)