Skip to content

Commit a5d1f6d

Browse files
committed
unify the request to just one class
1 parent 4d1dd0b commit a5d1f6d

File tree

2 files changed

+2
-66
lines changed

2 files changed

+2
-66
lines changed

streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/FlatMapPublisherStageVerification.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -64,43 +64,9 @@ public void flatMapStageShouldPropagateRuntimeExceptions() {
6464
public void flatMapStageShouldOnlySubscribeToOnePublisherAtATime() throws Exception {
6565
AtomicInteger activePublishers = new AtomicInteger();
6666

67-
// A publisher that publishes one element 100ms after being requested,
68-
// and then completes 100ms later. It also uses activePublishers to ensure
69-
// that it is the only publisher that is subscribed to at any one time.
70-
class ScheduledPublisher implements Publisher<Integer> {
71-
private final int id;
72-
private AtomicBoolean published = new AtomicBoolean(false);
73-
74-
private ScheduledPublisher(int id) {
75-
this.id = id;
76-
}
77-
78-
@Override
79-
public void subscribe(Subscriber<? super Integer> subscriber) {
80-
assertEquals(activePublishers.incrementAndGet(), 1);
81-
subscriber.onSubscribe(new Subscription() {
82-
@Override
83-
public void request(long n) {
84-
if (published.compareAndSet(false, true)) {
85-
getExecutorService().schedule(() -> {
86-
subscriber.onNext(id);
87-
getExecutorService().schedule(() -> {
88-
activePublishers.decrementAndGet();
89-
subscriber.onComplete();
90-
}, 100, TimeUnit.MILLISECONDS);
91-
}, 100, TimeUnit.MILLISECONDS);
92-
}
93-
}
94-
95-
@Override
96-
public void cancel() {
97-
}
98-
});
99-
}
100-
}
10167

10268
CompletionStage<List<Integer>> result = ReactiveStreams.of(1, 2, 3, 4, 5)
103-
.flatMapPublisher(id -> new ScheduledPublisher(id))
69+
.flatMapPublisher(id -> new ScheduledPublisher(id, activePublishers, () -> getExecutorService()))
10470
.toList()
10571
.run(getEngine());
10672

streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/FlatMapStageVerification.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -67,40 +67,10 @@ public void flatMapStageShouldOnlySubscribeToOnePublisherAtATime() throws Except
6767
// A publisher that publishes one element 100ms after being requested,
6868
// and then completes 100ms later. It also uses activePublishers to ensure
6969
// that it is the only publisher that is subscribed to at any one time.
70-
class ScheduledPublisher implements Publisher<Integer> {
71-
private final int id;
72-
private AtomicBoolean published = new AtomicBoolean(false);
73-
74-
private ScheduledPublisher(int id) {
75-
this.id = id;
76-
}
77-
78-
@Override
79-
public void subscribe(Subscriber<? super Integer> subscriber) {
80-
assertEquals(activePublishers.incrementAndGet(), 1);
81-
subscriber.onSubscribe(new Subscription() {
82-
@Override
83-
public void request(long n) {
84-
if (published.compareAndSet(false, true)) {
85-
getExecutorService().schedule(() -> {
86-
subscriber.onNext(id);
87-
getExecutorService().schedule(() -> {
88-
activePublishers.decrementAndGet();
89-
subscriber.onComplete();
90-
}, 100, TimeUnit.MILLISECONDS);
91-
}, 100, TimeUnit.MILLISECONDS);
92-
}
93-
}
9470

95-
@Override
96-
public void cancel() {
97-
}
98-
});
99-
}
100-
}
10171

10272
CompletionStage<List<Integer>> result = ReactiveStreams.of(1, 2, 3, 4, 5)
103-
.flatMap(id -> ReactiveStreams.fromPublisher(new ScheduledPublisher(id)))
73+
.flatMap(id -> ReactiveStreams.fromPublisher(new ScheduledPublisher(id, activePublishers, () -> getExecutorService())))
10474
.toList()
10575
.run(getEngine());
10676

0 commit comments

Comments
 (0)