Skip to content

Commit 0c31068

Browse files
authored
Merge pull request #83 from jroper/streams-tck-massive-overhaul
Massive overhaul of streams TCK
2 parents 6147150 + 0d0c4f5 commit 0c31068

File tree

45 files changed

+2991
-645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2991
-645
lines changed

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/InternalStages.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323

2424
/**
2525
* Internal stages, used to capture the graph while being built, but never passed to a
26-
* {@link org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine}.
26+
* {@link org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine}. These exist for performance reasons,
27+
* allowing the builder to hold the graph as an immutable linked tree where multiple stages can be appended in constant
28+
* time, rather than needing to copy an array each time. However, when it comes to building the graph, it is first
29+
* flattened out to an array, removing any of the internal stages that held nested stages, etc.
2730
*/
2831
class InternalStages {
2932

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ProcessorBuilder.java

+3-19
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import org.reactivestreams.Subscriber;
2828

2929
import java.util.List;
30+
import java.util.Objects;
3031
import java.util.Optional;
3132
import java.util.concurrent.CompletionStage;
3233
import java.util.function.BiConsumer;
33-
import java.util.function.BiFunction;
3434
import java.util.function.BinaryOperator;
3535
import java.util.function.Consumer;
3636
import java.util.function.Function;
@@ -242,6 +242,7 @@ public ProcessorBuilder<T, R> dropWhile(Predicate<? super R> predicate) {
242242
* @return A new subscriber builder.
243243
*/
244244
public SubscriberBuilder<T, Void> forEach(Consumer<? super R> action) {
245+
Objects.requireNonNull(action, "Action must not be null");
245246
return collect(Collector.<R, Void, Void>of(
246247
() -> null,
247248
(n, r) -> action.accept(r),
@@ -303,24 +304,6 @@ public SubscriberBuilder<T, Optional<R>> reduce(BinaryOperator<R> accumulator) {
303304
return addTerminalStage(new Stage.Collect(Reductions.reduce(accumulator)));
304305
}
305306

306-
/**
307-
* Perform a reduction on the elements of this stream, using the provided identity value, accumulation function and
308-
* combiner function.
309-
* <p>
310-
* The result of the reduction is returned in the {@link CompletionSubscriber}.
311-
*
312-
* @param identity The identity value.
313-
* @param accumulator The accumulator function.
314-
* @param combiner The combiner function.
315-
* @return A new subscriber builder.
316-
*/
317-
public <S> SubscriberBuilder<T, S> reduce(S identity,
318-
BiFunction<S, ? super R, S> accumulator,
319-
BinaryOperator<S> combiner) {
320-
321-
return addTerminalStage(new Stage.Collect(Reductions.reduce(identity, accumulator, combiner)));
322-
}
323-
324307
/**
325308
* Collect the elements emitted by this processor builder using the given {@link Collector}.
326309
* <p>
@@ -536,6 +519,7 @@ public Processor<T, R> buildRs() {
536519
* @return A {@link Processor} that will run this stream.
537520
*/
538521
public Processor<T, R> buildRs(ReactiveStreamsEngine engine) {
522+
Objects.requireNonNull(engine, "Engine must not be null");
539523
return engine.buildProcessor(toGraph());
540524
}
541525

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/PublisherBuilder.java

+4-19
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import org.reactivestreams.Subscriber;
2828

2929
import java.util.List;
30+
import java.util.Objects;
3031
import java.util.Optional;
3132
import java.util.concurrent.CompletionStage;
3233
import java.util.function.BiConsumer;
33-
import java.util.function.BiFunction;
3434
import java.util.function.BinaryOperator;
3535
import java.util.function.Consumer;
3636
import java.util.function.Function;
@@ -241,6 +241,7 @@ public PublisherBuilder<T> dropWhile(Predicate<? super T> predicate) {
241241
* @return A new completion builder.
242242
*/
243243
public CompletionRunner<Void> forEach(Consumer<? super T> action) {
244+
Objects.requireNonNull(action, "Action must not be null");
244245
return collect(Collector.<T, Void, Void>of(
245246
() -> null,
246247
(n, t) -> action.accept(t),
@@ -301,24 +302,6 @@ public CompletionRunner<Optional<T>> reduce(BinaryOperator<T> accumulator) {
301302
return addTerminalStage(new Stage.Collect(Reductions.reduce(accumulator)));
302303
}
303304

304-
/**
305-
* Perform a reduction on the elements of this stream, using the provided identity value, accumulation function and
306-
* combiner function.
307-
* <p>
308-
* The result of the reduction is returned in the {@link CompletionStage}.
309-
*
310-
* @param identity The identity value.
311-
* @param accumulator The accumulator function.
312-
* @param combiner The combiner function.
313-
* @return A new completion builder.
314-
*/
315-
public <S> CompletionRunner<S> reduce(S identity,
316-
BiFunction<S, ? super T, S> accumulator,
317-
BinaryOperator<S> combiner) {
318-
319-
return addTerminalStage(new Stage.Collect(Reductions.reduce(identity, accumulator, combiner)));
320-
}
321-
322305
/**
323306
* Find the first element emitted by the {@link Publisher}, and return it in a
324307
* {@link CompletionStage}.
@@ -390,6 +373,7 @@ public CompletionRunner<Void> to(Subscriber<T> subscriber) {
390373
* @return A {@link CompletionRunner} that completes when the stream completes.
391374
*/
392375
public <R> CompletionRunner<R> to(SubscriberBuilder<T, R> subscriber) {
376+
Objects.requireNonNull(subscriber, "Subscriber must not be null");
393377
return addTerminalStage(new InternalStages.Nested(subscriber.getGraphBuilder()));
394378
}
395379

@@ -536,6 +520,7 @@ public Publisher<T> buildRs() {
536520
* @return A {@link Publisher} that will run this stream.
537521
*/
538522
public Publisher<T> buildRs(ReactiveStreamsEngine engine) {
523+
Objects.requireNonNull(engine, "Engine must not be null");
539524
return engine.buildPublisher(toGraph());
540525
}
541526

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ReactiveStreams.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Arrays;
2828
import java.util.Collections;
29+
import java.util.Objects;
2930
import java.util.function.Supplier;
3031
import java.util.function.UnaryOperator;
3132
import java.util.stream.Stream;
@@ -163,6 +164,7 @@ public static <T> SubscriberBuilder<T, Void> fromSubscriber(Subscriber<? extends
163164
* @return A publisher builder.
164165
*/
165166
public static <T> PublisherBuilder<T> iterate(T seed, UnaryOperator<T> f) {
167+
Objects.requireNonNull(f, "Operator must not be null");
166168
return fromIterable(() -> Stream.iterate(seed, f).iterator());
167169
}
168170

@@ -174,6 +176,7 @@ public static <T> PublisherBuilder<T> iterate(T seed, UnaryOperator<T> f) {
174176
* @return A publisher builder.
175177
*/
176178
public static <T> PublisherBuilder<T> generate(Supplier<? extends T> s) {
179+
Objects.requireNonNull(s, "Supplier must not be null");
177180
return fromIterable(() -> Stream.<T>generate((Supplier) s).iterator());
178181
}
179182

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/Reductions.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import java.util.Objects;
2223
import java.util.Optional;
23-
import java.util.function.BiFunction;
2424
import java.util.function.BinaryOperator;
2525
import java.util.stream.Collector;
2626

@@ -33,7 +33,7 @@ private Reductions() {
3333
}
3434

3535
static <T> Collector<T, ?, Optional<T>> reduce(BinaryOperator<T> reducer) {
36-
36+
Objects.requireNonNull(reducer, "Reduction function must not be null");
3737
return Collector.of(Reduction<T>::new,
3838
(r, t) -> {
3939
if (r.value == null) {
@@ -59,25 +59,14 @@ else if (s.value != null) {
5959
}
6060

6161
static <T> Collector<T, ?, T> reduce(T identity, BinaryOperator<T> reducer) {
62-
62+
Objects.requireNonNull(reducer, "Reduction function must not be null");
6363
return Collector.of(() -> new Reduction<>(identity),
6464
(r, t) -> r.value = reducer.apply(r.value, t),
6565
(r, s) -> r.replace(reducer.apply(r.value, s.value)),
6666
r -> r.value
6767
);
6868
}
6969

70-
static <T, S> Collector<T, ?, S> reduce(S identity,
71-
BiFunction<S, ? super T, S> accumulator,
72-
BinaryOperator<S> combiner) {
73-
74-
return Collector.of(() -> new Reduction<>(identity),
75-
(r, t) -> r.value = accumulator.apply(r.value, t),
76-
(r, s) -> r.replace(combiner.apply(r.value, s.value)),
77-
r -> r.value
78-
);
79-
}
80-
8170
private static class Reduction<T> {
8271
private T value;
8372

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/SubscriberBuilder.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2424
import org.eclipse.microprofile.reactive.streams.spi.Stage;
2525

26+
import java.util.Objects;
27+
2628
/**
2729
* A builder for a {@link org.reactivestreams.Subscriber} and its result.
2830
* <p>
@@ -63,6 +65,7 @@ public CompletionSubscriber<T, R> build() {
6365
* @return A {@link CompletionSubscriber} that will run this stream.
6466
*/
6567
public CompletionSubscriber<T, R> build(ReactiveStreamsEngine engine) {
68+
Objects.requireNonNull(engine, "Engine must not be null");
6669
return engine.buildSubscriber(toGraph());
6770
}
6871

0 commit comments

Comments
 (0)