Skip to content

Commit 886a91d

Browse files
authored
Merge pull request #76 from otaviojava/remove_duplicate_class
Remove the ScheduledPublisher as duplicated code
2 parents dda1ada + 951b054 commit 886a91d

File tree

3 files changed

+80
-73
lines changed

3 files changed

+80
-73
lines changed

streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/FlatMapPublisherStageVerification.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.CompletableFuture;
3333
import java.util.concurrent.CompletionStage;
3434
import java.util.concurrent.TimeUnit;
35-
import java.util.concurrent.atomic.AtomicBoolean;
3635
import java.util.concurrent.atomic.AtomicInteger;
3736

3837
import static org.testng.Assert.assertEquals;
@@ -64,43 +63,9 @@ public void flatMapStageShouldPropagateRuntimeExceptions() {
6463
public void flatMapStageShouldOnlySubscribeToOnePublisherAtATime() throws Exception {
6564
AtomicInteger activePublishers = new AtomicInteger();
6665

67-
// A publisher that publishes one element 100ms after being requested,
68-
// and then completes 100ms later. It also uses activePublishers to ensure
69-
// that it is the only publisher that is subscribed to at any one time.
70-
class ScheduledPublisher implements Publisher<Integer> {
71-
private final int id;
72-
private AtomicBoolean published = new AtomicBoolean(false);
73-
74-
private ScheduledPublisher(int id) {
75-
this.id = id;
76-
}
77-
78-
@Override
79-
public void subscribe(Subscriber<? super Integer> subscriber) {
80-
assertEquals(activePublishers.incrementAndGet(), 1);
81-
subscriber.onSubscribe(new Subscription() {
82-
@Override
83-
public void request(long n) {
84-
if (published.compareAndSet(false, true)) {
85-
getExecutorService().schedule(() -> {
86-
subscriber.onNext(id);
87-
getExecutorService().schedule(() -> {
88-
activePublishers.decrementAndGet();
89-
subscriber.onComplete();
90-
}, 100, TimeUnit.MILLISECONDS);
91-
}, 100, TimeUnit.MILLISECONDS);
92-
}
93-
}
94-
95-
@Override
96-
public void cancel() {
97-
}
98-
});
99-
}
100-
}
10166

10267
CompletionStage<List<Integer>> result = ReactiveStreams.of(1, 2, 3, 4, 5)
103-
.flatMapPublisher(id -> new ScheduledPublisher(id))
68+
.flatMapPublisher(id -> new ScheduledPublisher(id, activePublishers, () -> getExecutorService()))
10469
.toList()
10570
.run(getEngine());
10671

streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/FlatMapStageVerification.java

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.CompletionStage;
3333
import java.util.concurrent.TimeUnit;
34-
import java.util.concurrent.atomic.AtomicBoolean;
3534
import java.util.concurrent.atomic.AtomicInteger;
3635
import java.util.function.Function;
3736

@@ -64,43 +63,8 @@ public void flatMapStageShouldPropagateRuntimeExceptions() {
6463
public void flatMapStageShouldOnlySubscribeToOnePublisherAtATime() throws Exception {
6564
AtomicInteger activePublishers = new AtomicInteger();
6665

67-
// A publisher that publishes one element 100ms after being requested,
68-
// and then completes 100ms later. It also uses activePublishers to ensure
69-
// that it is the only publisher that is subscribed to at any one time.
70-
class ScheduledPublisher implements Publisher<Integer> {
71-
private final int id;
72-
private AtomicBoolean published = new AtomicBoolean(false);
73-
74-
private ScheduledPublisher(int id) {
75-
this.id = id;
76-
}
77-
78-
@Override
79-
public void subscribe(Subscriber<? super Integer> subscriber) {
80-
assertEquals(activePublishers.incrementAndGet(), 1);
81-
subscriber.onSubscribe(new Subscription() {
82-
@Override
83-
public void request(long n) {
84-
if (published.compareAndSet(false, true)) {
85-
getExecutorService().schedule(() -> {
86-
subscriber.onNext(id);
87-
getExecutorService().schedule(() -> {
88-
activePublishers.decrementAndGet();
89-
subscriber.onComplete();
90-
}, 100, TimeUnit.MILLISECONDS);
91-
}, 100, TimeUnit.MILLISECONDS);
92-
}
93-
}
94-
95-
@Override
96-
public void cancel() {
97-
}
98-
});
99-
}
100-
}
101-
10266
CompletionStage<List<Integer>> result = ReactiveStreams.of(1, 2, 3, 4, 5)
103-
.flatMap(id -> ReactiveStreams.fromPublisher(new ScheduledPublisher(id)))
67+
.flatMap(id -> ReactiveStreams.fromPublisher(new ScheduledPublisher(id, activePublishers, () -> getExecutorService())))
10468
.toList()
10569
.run(getEngine());
10670

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2018 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
******************************************************************************/
19+
20+
package org.eclipse.microprofile.reactive.streams.tck;
21+
22+
23+
import org.reactivestreams.Publisher;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
26+
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.function.Supplier;
32+
33+
import static org.testng.Assert.assertEquals;
34+
35+
/**
36+
* A publisher that publishes one element 100ms after being requested,
37+
* and then completes 100ms later. It also uses activePublishers to ensure
38+
* that it is the only publisher that is subscribed to at any one time.
39+
*/
40+
class ScheduledPublisher implements Publisher<Integer> {
41+
private final int id;
42+
private AtomicBoolean published = new AtomicBoolean(false);
43+
private final AtomicInteger activePublishers;
44+
private final Supplier<ScheduledExecutorService> supplier;
45+
46+
ScheduledPublisher(int id, AtomicInteger activePublishers, Supplier<ScheduledExecutorService> supplier) {
47+
this.id = id;
48+
this.activePublishers = activePublishers;
49+
this.supplier = supplier;
50+
}
51+
52+
@Override
53+
public void subscribe(Subscriber<? super Integer> subscriber) {
54+
assertEquals(activePublishers.incrementAndGet(), 1);
55+
subscriber.onSubscribe(new Subscription() {
56+
@Override
57+
public void request(long n) {
58+
if (published.compareAndSet(false, true)) {
59+
getExecutorService().schedule(() -> {
60+
subscriber.onNext(id);
61+
getExecutorService().schedule(() -> {
62+
activePublishers.decrementAndGet();
63+
subscriber.onComplete();
64+
}, 100, TimeUnit.MILLISECONDS);
65+
}, 100, TimeUnit.MILLISECONDS);
66+
}
67+
}
68+
69+
@Override
70+
public void cancel() {
71+
}
72+
});
73+
}
74+
75+
private ScheduledExecutorService getExecutorService() {
76+
return supplier.get();
77+
}
78+
}

0 commit comments

Comments
 (0)