Skip to content

Commit 0d0c4f5

Browse files
committed
Massive overhaul of streams TCK
* Introduced two different types of TCK tests, tests that test the API which don't need an SPI to run (for testing clean room implementations that don't depend on the API artifact that we produce), as well as the existing SPI tests, which test the engines implementation of each stage. * Added a lot more tests to each stage, testing things like cancel propagation, error propagation, etc. * Made a lot more things a lot stricter, for example, there's now a lot of null checking done, the of stage must ensure all errors are carried by the returned completion stage, even if an error is thrown when creating the iterator.
1 parent 6147150 commit 0d0c4f5

File tree

45 files changed

+2991
-645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2991
-645
lines changed

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/InternalStages.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323

2424
/**
2525
* Internal stages, used to capture the graph while being built, but never passed to a
26-
* {@link org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine}.
26+
* {@link org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine}. These exist for performance reasons,
27+
* allowing the builder to hold the graph as an immutable linked tree where multiple stages can be appended in constant
28+
* time, rather than needing to copy an array each time. However, when it comes to building the graph, it is first
29+
* flattened out to an array, removing any of the internal stages that held nested stages, etc.
2730
*/
2831
class InternalStages {
2932

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ProcessorBuilder.java

+3-19
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import org.reactivestreams.Subscriber;
2828

2929
import java.util.List;
30+
import java.util.Objects;
3031
import java.util.Optional;
3132
import java.util.concurrent.CompletionStage;
3233
import java.util.function.BiConsumer;
33-
import java.util.function.BiFunction;
3434
import java.util.function.BinaryOperator;
3535
import java.util.function.Consumer;
3636
import java.util.function.Function;
@@ -242,6 +242,7 @@ public ProcessorBuilder<T, R> dropWhile(Predicate<? super R> predicate) {
242242
* @return A new subscriber builder.
243243
*/
244244
public SubscriberBuilder<T, Void> forEach(Consumer<? super R> action) {
245+
Objects.requireNonNull(action, "Action must not be null");
245246
return collect(Collector.<R, Void, Void>of(
246247
() -> null,
247248
(n, r) -> action.accept(r),
@@ -303,24 +304,6 @@ public SubscriberBuilder<T, Optional<R>> reduce(BinaryOperator<R> accumulator) {
303304
return addTerminalStage(new Stage.Collect(Reductions.reduce(accumulator)));
304305
}
305306

306-
/**
307-
* Perform a reduction on the elements of this stream, using the provided identity value, accumulation function and
308-
* combiner function.
309-
* <p>
310-
* The result of the reduction is returned in the {@link CompletionSubscriber}.
311-
*
312-
* @param identity The identity value.
313-
* @param accumulator The accumulator function.
314-
* @param combiner The combiner function.
315-
* @return A new subscriber builder.
316-
*/
317-
public <S> SubscriberBuilder<T, S> reduce(S identity,
318-
BiFunction<S, ? super R, S> accumulator,
319-
BinaryOperator<S> combiner) {
320-
321-
return addTerminalStage(new Stage.Collect(Reductions.reduce(identity, accumulator, combiner)));
322-
}
323-
324307
/**
325308
* Collect the elements emitted by this processor builder using the given {@link Collector}.
326309
* <p>
@@ -536,6 +519,7 @@ public Processor<T, R> buildRs() {
536519
* @return A {@link Processor} that will run this stream.
537520
*/
538521
public Processor<T, R> buildRs(ReactiveStreamsEngine engine) {
522+
Objects.requireNonNull(engine, "Engine must not be null");
539523
return engine.buildProcessor(toGraph());
540524
}
541525

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/PublisherBuilder.java

+4-19
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import org.reactivestreams.Subscriber;
2828

2929
import java.util.List;
30+
import java.util.Objects;
3031
import java.util.Optional;
3132
import java.util.concurrent.CompletionStage;
3233
import java.util.function.BiConsumer;
33-
import java.util.function.BiFunction;
3434
import java.util.function.BinaryOperator;
3535
import java.util.function.Consumer;
3636
import java.util.function.Function;
@@ -241,6 +241,7 @@ public PublisherBuilder<T> dropWhile(Predicate<? super T> predicate) {
241241
* @return A new completion builder.
242242
*/
243243
public CompletionRunner<Void> forEach(Consumer<? super T> action) {
244+
Objects.requireNonNull(action, "Action must not be null");
244245
return collect(Collector.<T, Void, Void>of(
245246
() -> null,
246247
(n, t) -> action.accept(t),
@@ -301,24 +302,6 @@ public CompletionRunner<Optional<T>> reduce(BinaryOperator<T> accumulator) {
301302
return addTerminalStage(new Stage.Collect(Reductions.reduce(accumulator)));
302303
}
303304

304-
/**
305-
* Perform a reduction on the elements of this stream, using the provided identity value, accumulation function and
306-
* combiner function.
307-
* <p>
308-
* The result of the reduction is returned in the {@link CompletionStage}.
309-
*
310-
* @param identity The identity value.
311-
* @param accumulator The accumulator function.
312-
* @param combiner The combiner function.
313-
* @return A new completion builder.
314-
*/
315-
public <S> CompletionRunner<S> reduce(S identity,
316-
BiFunction<S, ? super T, S> accumulator,
317-
BinaryOperator<S> combiner) {
318-
319-
return addTerminalStage(new Stage.Collect(Reductions.reduce(identity, accumulator, combiner)));
320-
}
321-
322305
/**
323306
* Find the first element emitted by the {@link Publisher}, and return it in a
324307
* {@link CompletionStage}.
@@ -390,6 +373,7 @@ public CompletionRunner<Void> to(Subscriber<T> subscriber) {
390373
* @return A {@link CompletionRunner} that completes when the stream completes.
391374
*/
392375
public <R> CompletionRunner<R> to(SubscriberBuilder<T, R> subscriber) {
376+
Objects.requireNonNull(subscriber, "Subscriber must not be null");
393377
return addTerminalStage(new InternalStages.Nested(subscriber.getGraphBuilder()));
394378
}
395379

@@ -536,6 +520,7 @@ public Publisher<T> buildRs() {
536520
* @return A {@link Publisher} that will run this stream.
537521
*/
538522
public Publisher<T> buildRs(ReactiveStreamsEngine engine) {
523+
Objects.requireNonNull(engine, "Engine must not be null");
539524
return engine.buildPublisher(toGraph());
540525
}
541526

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ReactiveStreams.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Arrays;
2828
import java.util.Collections;
29+
import java.util.Objects;
2930
import java.util.function.Supplier;
3031
import java.util.function.UnaryOperator;
3132
import java.util.stream.Stream;
@@ -163,6 +164,7 @@ public static <T> SubscriberBuilder<T, Void> fromSubscriber(Subscriber<? extends
163164
* @return A publisher builder.
164165
*/
165166
public static <T> PublisherBuilder<T> iterate(T seed, UnaryOperator<T> f) {
167+
Objects.requireNonNull(f, "Operator must not be null");
166168
return fromIterable(() -> Stream.iterate(seed, f).iterator());
167169
}
168170

@@ -174,6 +176,7 @@ public static <T> PublisherBuilder<T> iterate(T seed, UnaryOperator<T> f) {
174176
* @return A publisher builder.
175177
*/
176178
public static <T> PublisherBuilder<T> generate(Supplier<? extends T> s) {
179+
Objects.requireNonNull(s, "Supplier must not be null");
177180
return fromIterable(() -> Stream.<T>generate((Supplier) s).iterator());
178181
}
179182

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/Reductions.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import java.util.Objects;
2223
import java.util.Optional;
23-
import java.util.function.BiFunction;
2424
import java.util.function.BinaryOperator;
2525
import java.util.stream.Collector;
2626

@@ -33,7 +33,7 @@ private Reductions() {
3333
}
3434

3535
static <T> Collector<T, ?, Optional<T>> reduce(BinaryOperator<T> reducer) {
36-
36+
Objects.requireNonNull(reducer, "Reduction function must not be null");
3737
return Collector.of(Reduction<T>::new,
3838
(r, t) -> {
3939
if (r.value == null) {
@@ -59,25 +59,14 @@ else if (s.value != null) {
5959
}
6060

6161
static <T> Collector<T, ?, T> reduce(T identity, BinaryOperator<T> reducer) {
62-
62+
Objects.requireNonNull(reducer, "Reduction function must not be null");
6363
return Collector.of(() -> new Reduction<>(identity),
6464
(r, t) -> r.value = reducer.apply(r.value, t),
6565
(r, s) -> r.replace(reducer.apply(r.value, s.value)),
6666
r -> r.value
6767
);
6868
}
6969

70-
static <T, S> Collector<T, ?, S> reduce(S identity,
71-
BiFunction<S, ? super T, S> accumulator,
72-
BinaryOperator<S> combiner) {
73-
74-
return Collector.of(() -> new Reduction<>(identity),
75-
(r, t) -> r.value = accumulator.apply(r.value, t),
76-
(r, s) -> r.replace(combiner.apply(r.value, s.value)),
77-
r -> r.value
78-
);
79-
}
80-
8170
private static class Reduction<T> {
8271
private T value;
8372

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/SubscriberBuilder.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2424
import org.eclipse.microprofile.reactive.streams.spi.Stage;
2525

26+
import java.util.Objects;
27+
2628
/**
2729
* A builder for a {@link org.reactivestreams.Subscriber} and its result.
2830
* <p>
@@ -63,6 +65,7 @@ public CompletionSubscriber<T, R> build() {
6365
* @return A {@link CompletionSubscriber} that will run this stream.
6466
*/
6567
public CompletionSubscriber<T, R> build(ReactiveStreamsEngine engine) {
68+
Objects.requireNonNull(engine, "Engine must not be null");
6669
return engine.buildSubscriber(toGraph());
6770
}
6871

0 commit comments

Comments
 (0)