Skip to content
This repository was archived by the owner on Nov 1, 2024. It is now read-only.

Commit a001d8e

Browse files
authored
Merge pull request #4 from jroper/rework-zerodep
Significant rework of zerodep implementation
2 parents 1e6fee3 + ae1611f commit a001d8e

File tree

24 files changed

+576
-242
lines changed

24 files changed

+576
-242
lines changed

akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/AkkaEngine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package com.lightbend.microprofile.reactive.streams.akka;
66

7+
import akka.Done;
78
import akka.NotUsed;
89
import akka.stream.Attributes;
910
import akka.stream.Materializer;
@@ -256,6 +257,9 @@ else if (size == 1) {
256257
Predicate<Object> predicate = (Predicate) stage.getPredicate();
257258
return flow.dropWhile(predicate::test);
258259
});
260+
addFlowStage(Stage.OnComplete.class, (flow, stage) -> flow.via(TerminationPeeker.onComplete(stage.getAction())));
261+
addFlowStage(Stage.OnError.class, (flow, stage) -> flow.via(TerminationPeeker.onError(stage.getConsumer())));
262+
addFlowStage(Stage.OnTerminate.class, (flow, stage) -> flow.via(TerminationPeeker.onTerminate(stage.getAction())));
259263

260264
// Sinks
261265
addSinkStage(Stage.FindFirst.class, stage -> Sink.headOption());
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package com.lightbend.microprofile.reactive.streams.akka;
6+
7+
import akka.stream.Attributes;
8+
import akka.stream.FlowShape;
9+
import akka.stream.Inlet;
10+
import akka.stream.Outlet;
11+
import akka.stream.stage.AbstractInHandler;
12+
import akka.stream.stage.AbstractOutHandler;
13+
import akka.stream.stage.GraphStage;
14+
import akka.stream.stage.GraphStageLogic;
15+
import akka.stream.stage.GraphStageWithMaterializedValue;
16+
import scala.Tuple2;
17+
18+
import java.util.concurrent.CancellationException;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.CompletionStage;
21+
import java.util.function.Consumer;
22+
23+
/**
24+
* Differs from Flow.watchTermination in that termination callbacks are run in the stream, so if the callback experiences an error,
25+
* the stream fails.
26+
*/
27+
class TerminationPeeker<T> extends GraphStage<FlowShape<T, T>> {
28+
private final Inlet<T> in = Inlet.create("TerminationPeeker.in");
29+
private final Outlet<T> out = Outlet.create("TerminationPeeker.out");
30+
31+
private final FlowShape<T, T> shape = FlowShape.of(in, out);
32+
33+
private final Runnable onCompleteCallback;
34+
private final Consumer<Throwable> onErrorCallback;
35+
36+
private TerminationPeeker(Runnable onCompleteCallback, Consumer<Throwable> onErrorCallback) {
37+
this.onCompleteCallback = onCompleteCallback;
38+
this.onErrorCallback = onErrorCallback;
39+
}
40+
41+
static <T> GraphStage<FlowShape<T, T>> onComplete(Runnable action) {
42+
return new TerminationPeeker<>(action, t -> {});
43+
}
44+
45+
static <T> GraphStage<FlowShape<T, T>> onTerminate(Runnable action) {
46+
return new TerminationPeeker<>(action, t -> action.run());
47+
}
48+
49+
static <T> GraphStage<FlowShape<T, T>> onError(Consumer<Throwable> consumer) {
50+
return new TerminationPeeker<>(() -> {}, consumer);
51+
}
52+
53+
@Override
54+
public FlowShape<T, T> shape() {
55+
return shape;
56+
}
57+
58+
@Override
59+
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
60+
return new GraphStageLogic(shape()) {
61+
{
62+
setHandler(in, new AbstractInHandler() {
63+
@Override
64+
public void onPush() throws Exception {
65+
push(out, grab(in));
66+
}
67+
68+
@Override
69+
public void onUpstreamFinish() throws Exception {
70+
onCompleteCallback.run();
71+
complete(out);
72+
}
73+
74+
@Override
75+
public void onUpstreamFailure(Throwable ex) throws Exception {
76+
onErrorCallback.accept(ex);
77+
fail(out, ex);
78+
}
79+
});
80+
setHandler(out, new AbstractOutHandler() {
81+
@Override
82+
public void onPull() throws Exception {
83+
pull(in);
84+
}
85+
86+
@Override
87+
public void onDownstreamFinish() throws Exception {
88+
onCompleteCallback.run();
89+
cancel(in);
90+
}
91+
});
92+
}
93+
94+
};
95+
}
96+
}

akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/TerminationWatcher.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import java.util.concurrent.CompletableFuture;
1313
import java.util.concurrent.CompletionStage;
1414

15+
/**
16+
* Differs from Flow.watchTermination in that cancellation results in the completion stage failing with a cancellation exception.
17+
*/
1518
class TerminationWatcher<T> extends GraphStageWithMaterializedValue<FlowShape<T, T>, CompletionStage<Void>> {
1619
private final Inlet<T> in = Inlet.create("TerminationWatcher.in");
1720
private final Outlet<T> out = Outlet.create("TerminationWatcher.out");

bin/buildDeps.sh

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#!/bin/bash
22

33
# This should be set to the commit hash that is being tracked. Needed even if TRACKING_PR is set.
4-
TRACKING_COMMIT="60f67ef"
4+
TRACKING_COMMIT="41f41e2"
55
# To track a particular pull request, put it's number here, otherwise comment it out.
6-
TRACKING_PR="64"
6+
# TRACKING_PR="64"
77

88
set -e
99

@@ -25,6 +25,4 @@ fi
2525

2626
git checkout "${TRACKING_COMMIT}"
2727

28-
cd streams
29-
30-
mvn clean install -Dmaven.test.skip -Drat.skip=true -Dcheckstyle.skip=true -Dmaven.javadoc.skip=true -Dasciidoctor.skip=true
28+
mvn -am -pl streams/api,streams/tck clean install -Dmaven.test.skip -Drat.skip=true -Dcheckstyle.skip=true -Dmaven.javadoc.skip=true -Dasciidoctor.skip=true

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java

Lines changed: 24 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class BuiltGraph implements Executor {
4343
private static final int DEFAULT_BUFFER_LOW_WATERMARK = 4;
4444

4545
private final Executor mutex;
46-
private final Deque<UnrolledSignal> unrolledSignals = new ArrayDeque<>();
46+
private final Deque<Signal> signals = new ArrayDeque<>();
4747
private final Set<Port> ports = new LinkedHashSet<>();
4848
private final Set<GraphStage> stages = new LinkedHashSet<>();
4949

@@ -197,10 +197,10 @@ private Builder buildGraph(Graph graph, Shape shape) {
197197
if (isSubscriber(stage)) {
198198
// We're connecting a publisher to a subscriber, don't create any port, just record what the current
199199
// publisher is.
200-
if (previousStage instanceof Stage.PublisherStage) {
201-
currentPublisher = ((Stage.PublisherStage) previousStage).getRsPublisher();
200+
if (stage instanceof Stage.SubscriberStage) {
201+
currentSubscriber = ((Stage.SubscriberStage) stage).getRsSubscriber();
202202
} else {
203-
currentPublisher = ((Stage.ProcessorStage) previousStage).getRsProcessor();
203+
currentSubscriber = ((Stage.ProcessorStage) stage).getRsProcessor();
204204
}
205205
} else {
206206
// We're connecting a publisher to an inlet, create a subscriber inlet for that.
@@ -373,7 +373,7 @@ private void addStage(Stage stage, StageInlet inlet, Publisher publisher, StageO
373373

374374
addStage(new ConcatStage(BuiltGraph.this, firstInlet, secondInlet, outlet));
375375
} else if (stage instanceof Stage.PublisherStage) {
376-
addStage(new ConnectorStage<>(BuiltGraph.this, ((Stage.PublisherStage) stage).getRsPublisher(), subscriber));
376+
addStage(new ConnectorStage(BuiltGraph.this, ((Stage.PublisherStage) stage).getRsPublisher(), subscriber));
377377
} else if (stage instanceof Stage.Failed) {
378378
addStage(new FailedStage(BuiltGraph.this, outlet, ((Stage.Failed) stage).getError()));
379379
} else {
@@ -410,6 +410,12 @@ private void addStage(Stage stage, StageInlet inlet, Publisher publisher, StageO
410410
addStage(new DropWhileStage(BuiltGraph.this, inlet, outlet, ((Stage.DropWhile) stage).getPredicate()));
411411
} else if (stage instanceof Stage.Peek) {
412412
addStage(new PeekStage<>(BuiltGraph.this, inlet, outlet, ((Stage.Peek) stage).getConsumer()));
413+
} else if (stage instanceof Stage.OnComplete) {
414+
addStage(new OnCompleteStage<>(BuiltGraph.this, inlet, outlet, ((Stage.OnComplete) stage).getAction()));
415+
} else if (stage instanceof Stage.OnError) {
416+
addStage(new OnErrorStage<>(BuiltGraph.this, inlet, outlet, ((Stage.OnError) stage).getConsumer()));
417+
} else if (stage instanceof Stage.OnTerminate) {
418+
addStage(new OnTerminateStage<>(BuiltGraph.this, inlet, outlet, ((Stage.OnTerminate) stage).getAction()));
413419
} else {
414420
throw new UnsupportedStageException(stage);
415421
}
@@ -429,8 +435,6 @@ private void addStage(Stage stage, StageInlet inlet, Publisher publisher, StageO
429435
SubscriberInlet subscriberInlet = addPort(createSubscriberInlet());
430436
if (publisher != null) {
431437
addStage(new ConnectorStage(BuiltGraph.this, publisher, subscriberInlet));
432-
} else {
433-
firstSubscriber = subscriberInlet;
434438
}
435439
inlet = subscriberInlet;
436440
}
@@ -473,7 +477,7 @@ private boolean isPublisher(Stage stage) {
473477
* from all other signals on this graph. Any exceptions thrown by the command will cause the graph to be terminated
474478
* with a failure.
475479
* <p>
476-
* Commands are also allowed to (synchronously) emit unrolled signals, by adding them to the unrolledSignals queue.
480+
* Commands are also allowed to (synchronously) emit unrolled signals, by adding them to the signals queue.
477481
* Unrolled signals are used for breaking infinite recursion scenarios. This method will drain all unrolled signals
478482
* (including subsequent signals emitted by the unrolled signals themselves) after invocation of the command.
479483
*
@@ -488,14 +492,19 @@ public void execute(Runnable command) {
488492

489493
// Now drain a maximum of 32 signals from the queue
490494
int signalsDrained = 0;
491-
while (!unrolledSignals.isEmpty() && signalsDrained < 32) {
495+
while (!signals.isEmpty() && signalsDrained < 32) {
492496
signalsDrained++;
493-
unrolledSignals.poll().signal();
497+
Signal signal = signals.removeFirst();
498+
try {
499+
signal.signal();
500+
} catch (RuntimeException e) {
501+
signal.signalFailed(e);
502+
}
494503
}
495504

496505
// If there were more than 32 unrolled signals, we resubmit
497506
// to the executor to allow us to receive external signals
498-
if (!unrolledSignals.isEmpty()) {
507+
if (!signals.isEmpty()) {
499508
execute(() -> {
500509
});
501510
}
@@ -504,7 +513,7 @@ public void execute(Runnable command) {
504513
// shut down the stream
505514
streamFailure(e);
506515
// Clear remaining signals
507-
unrolledSignals.clear();
516+
signals.clear();
508517
}
509518
});
510519
}
@@ -524,20 +533,16 @@ private void streamFailure(Throwable error) {
524533
// todo handle better
525534
error.printStackTrace();
526535
for (Port port : ports) {
527-
try {
528-
port.onStreamFailure(error);
529-
} catch (RuntimeException e) {
530-
// Ignore
531-
}
536+
port.onStreamFailure(error);
532537
}
533538
ports.clear();
534539
}
535540

536541
/**
537542
* Enqueue a signal to be executed serially after the current signal processing finishes.
538543
*/
539-
void enqueueSignal(UnrolledSignal signal) {
540-
unrolledSignals.add(signal);
544+
void enqueueSignal(Signal signal) {
545+
signals.add(signal);
541546
}
542547

543548
/**
@@ -633,32 +638,3 @@ public void onUpstreamFailure(Throwable error) {
633638

634639
}
635640

636-
637-
/**
638-
* An unrolled signal.
639-
* <p>
640-
* It is possible for stages to get into an infinite recursion, doing push/pulls between each other. This interface
641-
* allows them to unroll the recursion, by adding the signal to the unrolledSignals queue in this class, which then
642-
* gets executed after the first callback is executed.
643-
*/
644-
interface UnrolledSignal {
645-
void signal();
646-
}
647-
648-
/**
649-
* A port, which may sit between two stages of this graph.
650-
*/
651-
interface Port {
652-
/**
653-
* If an exception is thrown by the graph, or otherwise encountered, each port will be shut down in the order they
654-
* were created, by invoking this. This method should implement any clean up associated with a port, if the port
655-
* isn't already shut down.
656-
*/
657-
void onStreamFailure(Throwable reason);
658-
659-
/**
660-
* Verify that this port is ready to start receiving signals.
661-
*/
662-
void verifyReady();
663-
}
664-

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CancelStage.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ class CancelStage extends GraphStage implements InletListener {
2323

2424
@Override
2525
protected void postStart() {
26-
if (!inlet.isClosed()) {
27-
inlet.cancel();
28-
}
26+
inlet.cancel();
2927
result.complete(null);
3028
}
3129

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CollectStage.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ public CollectStage(BuiltGraph builtGraph, StageInlet<T> inlet,
2929

3030
@Override
3131
protected void postStart() {
32-
// It's possible that an earlier stage finished immediately, so check first
33-
if (!inlet.isClosed()) {
34-
inlet.pull();
35-
}
32+
inlet.pull();
3633
}
3734

3835
@Override
@@ -43,7 +40,11 @@ public void onPush() {
4340

4441
@Override
4542
public void onUpstreamFinish() {
46-
result.complete(collector.finisher().apply(container));
43+
try {
44+
result.complete(collector.finisher().apply(container));
45+
} catch (RuntimeException e) {
46+
result.completeExceptionally(e);
47+
}
4748
container = null;
4849
}
4950

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/ConnectorStage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import org.reactivestreams.Publisher;
88
import org.reactivestreams.Subscriber;
99

10+
import java.util.Objects;
11+
1012
/**
1113
* Connector stage. Does nothing but connects a publisher to a subscriber when the graph starts.
1214
*/
@@ -16,8 +18,8 @@ public class ConnectorStage<T> extends GraphStage {
1618

1719
public ConnectorStage(BuiltGraph builtGraph, Publisher<T> publisher, Subscriber<T> subscriber) {
1820
super(builtGraph);
19-
this.publisher = publisher;
20-
this.subscriber = subscriber;
21+
this.publisher = Objects.requireNonNull(publisher);
22+
this.subscriber = Objects.requireNonNull(subscriber);
2123
}
2224

2325
@Override

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FailedStage.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ public FailedStage(BuiltGraph builtGraph, StageOutlet<?> outlet, Throwable error
2121

2222
@Override
2323
protected void postStart() {
24-
if (!outlet.isClosed()) {
25-
outlet.fail(error);
26-
}
24+
outlet.fail(error);
2725
}
2826

2927
@Override

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/FindFirstStage.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ class FindFirstStage<T> extends GraphStage implements InletListener {
2222

2323
@Override
2424
protected void postStart() {
25-
if (!inlet.isClosed()) {
26-
inlet.pull();
27-
}
25+
inlet.pull();
2826
}
2927

3028
@Override

0 commit comments

Comments
 (0)