Skip to content

Commit a26cbad

Browse files
authored
Merge pull request #63 from jroper/finer-grained-stages
Extracted more stages out
2 parents a13e9b1 + 3111dcb commit a26cbad

File tree

10 files changed

+443
-194
lines changed

10 files changed

+443
-194
lines changed

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/Predicates.java

Lines changed: 0 additions & 105 deletions
This file was deleted.

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ProcessorBuilder.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public ProcessorBuilder<T, R> peek(Consumer<? super R> consumer) {
9090
* @return A new processor builder.
9191
*/
9292
public ProcessorBuilder<T, R> filter(Predicate<? super R> predicate) {
93-
return addStage(new Stage.Filter(() -> predicate));
93+
return addStage(new Stage.Filter(predicate));
9494
}
9595

9696
/**
@@ -178,20 +178,14 @@ public <S> ProcessorBuilder<T, S> flatMapIterable(Function<? super R, ? extends
178178
*
179179
* @param maxSize The maximum size of the returned stream.
180180
* @return A new processor builder.
181+
* @throws IllegalArgumentException If {@code maxSize} is less than zero.
181182
*/
182183
public ProcessorBuilder<T, R> limit(long maxSize) {
183184
if (maxSize < 0) {
184185
throw new IllegalArgumentException("Cannot limit a stream to less than zero elements.");
185186
}
186-
else if (maxSize == 0) {
187-
// todo this is perhaps not the desired behaviour - it means an element must be received before the stream will
188-
// be completed. but then again, limiting a stream to have zero size is a strange thing to do, as running the
189-
// stream in theory will then achieve nothing, so this edge case behavior probably isn't important to worry too
190-
// much about.
191-
return takeWhile(e -> false);
192-
}
193187
else {
194-
return addStage(new Stage.TakeWhile(() -> new Predicates.LimitPredicate<>(maxSize), true));
188+
return addStage(new Stage.Limit(maxSize));
195189
}
196190
}
197191

@@ -201,9 +195,13 @@ else if (maxSize == 0) {
201195
*
202196
* @param n The number of elements to discard.
203197
* @return A new processor builder.
198+
* @throws IllegalArgumentException If {@code n} is less than zero.
204199
*/
205200
public ProcessorBuilder<T, R> skip(long n) {
206-
return addStage(new Stage.Filter(() -> new Predicates.SkipPredicate<>(n)));
201+
if (n < 0) {
202+
throw new IllegalArgumentException("Cannot skip less than zero elements");
203+
}
204+
return addStage(new Stage.Skip(n));
207205
}
208206

209207
/**
@@ -215,7 +213,7 @@ public ProcessorBuilder<T, R> skip(long n) {
215213
* @return A new publisher builder.
216214
*/
217215
public ProcessorBuilder<T, R> takeWhile(Predicate<? super R> predicate) {
218-
return addStage(new Stage.TakeWhile(() -> predicate, false));
216+
return addStage(new Stage.TakeWhile(predicate));
219217
}
220218

221219
/**
@@ -229,7 +227,7 @@ public ProcessorBuilder<T, R> takeWhile(Predicate<? super R> predicate) {
229227
* @return A new processor builder.
230228
*/
231229
public ProcessorBuilder<T, R> dropWhile(Predicate<? super R> predicate) {
232-
return addStage(new Stage.Filter(() -> new Predicates.DropWhilePredicate<>(predicate)));
230+
return addStage(new Stage.DropWhile(predicate));
233231
}
234232

235233
/**

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/PublisherBuilder.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public PublisherBuilder<T> peek(Consumer<? super T> consumer) {
9090
* @return A new publisher builder.
9191
*/
9292
public PublisherBuilder<T> filter(Predicate<? super T> predicate) {
93-
return addStage(new Stage.Filter(() -> predicate));
93+
return addStage(new Stage.Filter(predicate));
9494
}
9595

9696
/**
@@ -179,16 +179,14 @@ public <S> PublisherBuilder<S> flatMapIterable(Function<? super T, ? extends Ite
179179
*
180180
* @param maxSize The maximum size of the returned stream.
181181
* @return A new publisher builder.
182+
* @throws IllegalArgumentException If {@code maxSize} is less than zero.
182183
*/
183184
public PublisherBuilder<T> limit(long maxSize) {
184185
if (maxSize < 0) {
185186
throw new IllegalArgumentException("Cannot limit a stream to less than zero elements.");
186187
}
187-
else if (maxSize == 0) {
188-
return takeWhile(e -> false);
189-
}
190188
else {
191-
return addStage(new Stage.TakeWhile(() -> new Predicates.LimitPredicate<>(maxSize), true));
189+
return addStage(new Stage.Limit(maxSize));
192190
}
193191
}
194192

@@ -198,9 +196,13 @@ else if (maxSize == 0) {
198196
*
199197
* @param n The number of elements to discard.
200198
* @return A new publisher builder.
199+
* @throws IllegalArgumentException If {@code n} is less than zero.
201200
*/
202201
public PublisherBuilder<T> skip(long n) {
203-
return addStage(new Stage.Filter(() -> new Predicates.SkipPredicate<>(n)));
202+
if (n < 0) {
203+
throw new IllegalArgumentException("Cannot skip less than zero elements");
204+
}
205+
return addStage(new Stage.Skip(n));
204206
}
205207

206208
/**
@@ -212,7 +214,7 @@ public PublisherBuilder<T> skip(long n) {
212214
* @return A new publisher builder.
213215
*/
214216
public PublisherBuilder<T> takeWhile(Predicate<? super T> predicate) {
215-
return addStage(new Stage.TakeWhile(() -> predicate, false));
217+
return addStage(new Stage.TakeWhile(predicate));
216218
}
217219

218220
/**
@@ -226,7 +228,7 @@ public PublisherBuilder<T> takeWhile(Predicate<? super T> predicate) {
226228
* @return A new publisher builder.
227229
*/
228230
public PublisherBuilder<T> dropWhile(Predicate<? super T> predicate) {
229-
return addStage(new Stage.Filter(() -> new Predicates.DropWhilePredicate<>(predicate)));
231+
return addStage(new Stage.DropWhile(predicate));
230232
}
231233

232234
/**

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/spi/Stage.java

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.function.Consumer;
2929
import java.util.function.Function;
3030
import java.util.function.Predicate;
31-
import java.util.function.Supplier;
3231
import java.util.stream.Collector;
3332

3433
/**
@@ -133,15 +132,40 @@ public Consumer<?> getConsumer() {
133132
/**
134133
* A filter stage.
135134
* <p>
136-
* The given predicate should be supplied when the stream is first run, and then invoked on each element consumed.
137-
* If it returns true, the element should be emitted.
135+
* The given predicate should be invoked on each element consumed. If it returns true, the element should be
136+
* emitted.
138137
* <p>
139138
* Any {@link RuntimeException} thrown by the predicate should be propagated down the stream as an error.
140139
*/
141140
final class Filter implements Inlet, Outlet {
142-
private final Supplier<Predicate<?>> predicate;
141+
private final Predicate<?> predicate;
142+
143+
public Filter(Predicate<?> predicate) {
144+
this.predicate = predicate;
145+
}
146+
147+
/**
148+
* The predicate.
149+
*
150+
* @return The predicate.
151+
*/
152+
public Predicate<?> getPredicate() {
153+
return predicate;
154+
}
155+
}
156+
157+
/**
158+
* A drop while stage.
159+
* <p>
160+
* The given predicate should be invoked on each element consumed, until it returns true. Each element that it
161+
* returns true for should be dropped, once it returns false, then all elements should be emitted.
162+
* <p>
163+
* Any {@link RuntimeException} thrown by the predicate should be propagated down the stream as an error.
164+
*/
165+
final class DropWhile implements Inlet, Outlet {
166+
private final Predicate<?> predicate;
143167

144-
public Filter(Supplier<Predicate<?>> predicate) {
168+
public DropWhile(Predicate<?> predicate) {
145169
this.predicate = predicate;
146170
}
147171

@@ -150,11 +174,55 @@ public Filter(Supplier<Predicate<?>> predicate) {
150174
*
151175
* @return The predicate.
152176
*/
153-
public Supplier<Predicate<?>> getPredicate() {
177+
public Predicate<?> getPredicate() {
154178
return predicate;
155179
}
156180
}
157181

182+
/**
183+
* A skip stage.
184+
* <p>
185+
* The first {@code skip} elements should be skipped, after that all elements should be emitted.
186+
*/
187+
final class Skip implements Inlet, Outlet {
188+
private final long skip;
189+
190+
public Skip(long skip) {
191+
this.skip = skip;
192+
}
193+
194+
/**
195+
* The number of elements to skip.
196+
*
197+
* @return The number of elements to skip.
198+
*/
199+
public long getSkip() {
200+
return skip;
201+
}
202+
}
203+
204+
/**
205+
* A limit stage.
206+
* <p>
207+
* Only {@code limit} elements should be emitted, once that many elements are emitted, the stream should be completed.
208+
*/
209+
final class Limit implements Inlet, Outlet {
210+
private final long limit;
211+
212+
public Limit(long limit) {
213+
this.limit = limit;
214+
}
215+
216+
/**
217+
* The limit.
218+
*
219+
* @return The limit.
220+
*/
221+
public long getLimit() {
222+
return limit;
223+
}
224+
}
225+
158226
/**
159227
* A stage returning a stream consisting of the distinct elements (according to {@link Object#equals(Object)}) of this
160228
* stream.
@@ -173,35 +241,25 @@ private Distinct() {
173241
* A take while stage.
174242
* <p>
175243
* The given predicate should be supplied when the stream is first run, and then invoked on each element consumed.
176-
* When it returns true, the element should be emitted, when it returns false, the element should only be emitted if
177-
* inclusive is true, and then the stream should be completed.
244+
* When it returns true, the element should be emitted, when it returns false the stream should be completed.
178245
* <p>
179246
* Any {@link RuntimeException} thrown by the predicate should be propagated down the stream as an error.
180247
*/
181248
final class TakeWhile implements Inlet, Outlet {
182-
private final Supplier<Predicate<?>> predicate;
183-
private final boolean inclusive;
249+
private final Predicate<?> predicate;
184250

185-
public TakeWhile(Supplier<Predicate<?>> predicate, boolean inclusive) {
251+
public TakeWhile(Predicate<?> predicate) {
186252
this.predicate = predicate;
187-
this.inclusive = inclusive;
188253
}
189254

190255
/**
191256
* The predicate.
192257
*
193258
* @return The predicate.
194259
*/
195-
public Supplier<Predicate<?>> getPredicate() {
260+
public Predicate<?> getPredicate() {
196261
return predicate;
197262
}
198-
199-
/**
200-
* Whether the element that this returns false on should be emitted or not.
201-
*/
202-
public boolean isInclusive() {
203-
return inclusive;
204-
}
205263
}
206264

207265
/**

0 commit comments

Comments
 (0)