Skip to content
This repository was archived by the owner on Nov 1, 2024. It is now read-only.

Commit fd82c86

Browse files
authored
Merge pull request #6 from jroper/recover-stages
Added support for the recover stages
2 parents a53a114 + b173970 commit fd82c86

File tree

8 files changed

+168
-22
lines changed

8 files changed

+168
-22
lines changed

Diff for: akka/src/main/java/com/lightbend/microprofile/reactive/streams/akka/AkkaEngine.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44

55
package com.lightbend.microprofile.reactive.streams.akka;
66

7-
import akka.Done;
87
import akka.NotUsed;
8+
import akka.japi.JavaPartialFunction;
99
import akka.stream.Attributes;
1010
import akka.stream.Materializer;
11+
import akka.stream.SourceShape;
1112
import akka.stream.javadsl.*;
1213
import org.eclipse.microprofile.reactive.streams.CompletionRunner;
1314
import org.eclipse.microprofile.reactive.streams.CompletionSubscriber;
@@ -22,11 +23,7 @@
2223
import org.reactivestreams.Processor;
2324
import org.reactivestreams.Publisher;
2425

25-
import java.util.Collection;
26-
import java.util.HashMap;
27-
import java.util.HashSet;
28-
import java.util.Map;
29-
import java.util.Set;
26+
import java.util.*;
3027
import java.util.concurrent.CompletableFuture;
3128
import java.util.concurrent.CompletionStage;
3229
import java.util.function.BiConsumer;
@@ -260,6 +257,26 @@ else if (size == 1) {
260257
addFlowStage(Stage.OnComplete.class, (flow, stage) -> flow.via(TerminationPeeker.onComplete(stage.getAction())));
261258
addFlowStage(Stage.OnError.class, (flow, stage) -> flow.via(TerminationPeeker.onError(stage.getConsumer())));
262259
addFlowStage(Stage.OnTerminate.class, (flow, stage) -> flow.via(TerminationPeeker.onTerminate(stage.getAction())));
260+
addFlowStage(Stage.OnErrorResume.class, (flow, stage) -> {
261+
Function<Throwable, Object> function = (Function) stage.getFunction();
262+
return flow.recover(new JavaPartialFunction<Throwable, Object>() {
263+
@Override
264+
public Object apply(Throwable x, boolean isCheck) throws Exception {
265+
if (isCheck) return null;
266+
else return function.apply(x);
267+
}
268+
});
269+
});
270+
addFlowStage(Stage.OnErrorResumeWith.class, (flow, stage) -> {
271+
Function<Throwable, Graph> function = (Function) stage.getFunction();
272+
return flow.recoverWithRetries(1, new JavaPartialFunction<Throwable, akka.stream.Graph<SourceShape<Object>, NotUsed>>() {
273+
@Override
274+
public akka.stream.Graph<SourceShape<Object>, NotUsed> apply(Throwable x, boolean isCheck) throws Exception {
275+
if (isCheck) return null;
276+
else return buildSource(function.apply(x));
277+
}
278+
});
279+
});
263280

264281
// Sinks
265282
addSinkStage(Stage.FindFirst.class, stage -> Sink.headOption());

Diff for: bin/buildDeps.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#!/bin/bash
22

33
# This should be set to the commit hash that is being tracked. Needed even if TRACKING_PR is set.
4-
TRACKING_COMMIT="41f41e2"
4+
TRACKING_COMMIT="886a91d5"
55
# To track a particular pull request, put it's number here, otherwise comment it out.
6-
# TRACKING_PR="64"
6+
# TRACKING_PR="67"
77

88
set -e
99

Diff for: zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java

+4
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,10 @@ private void addStage(Stage stage, StageInlet inlet, Publisher publisher, StageO
416416
addStage(new OnErrorStage<>(BuiltGraph.this, inlet, outlet, ((Stage.OnError) stage).getConsumer()));
417417
} else if (stage instanceof Stage.OnTerminate) {
418418
addStage(new OnTerminateStage<>(BuiltGraph.this, inlet, outlet, ((Stage.OnTerminate) stage).getAction()));
419+
} else if (stage instanceof Stage.OnErrorResume) {
420+
addStage(new OnErrorResumeStage(BuiltGraph.this, inlet, outlet, ((Stage.OnErrorResume) stage).getFunction()));
421+
} else if (stage instanceof Stage.OnErrorResumeWith) {
422+
addStage(new OnErrorResumeWithStage(BuiltGraph.this, inlet, outlet, ((Stage.OnErrorResumeWith) stage).getFunction()));
419423
} else {
420424
throw new UnsupportedStageException(stage);
421425
}

Diff for: zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OfStage.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ public OfStage(BuiltGraph builtGraph, StageOutlet<T> outlet, Iterable<T> element
2323

2424
@Override
2525
protected void postStart() {
26-
if (!elements.hasNext()) {
27-
outlet.complete();
26+
try {
27+
if (!elements.hasNext()) {
28+
outlet.complete();
29+
}
30+
} catch (Exception e) {
31+
outlet.fail(e);
2832
}
2933
}
3034

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
package com.lightbend.microprofile.reactive.streams.zerodep;
5+
6+
import java.util.function.Function;
7+
8+
class OnErrorResumeStage<T> extends GraphStage implements InletListener, OutletListener {
9+
10+
private final StageInlet<T> inlet;
11+
private final StageOutlet<T> outlet;
12+
private final Function<Throwable, T> function;
13+
14+
private T recoveredElement;
15+
16+
public OnErrorResumeStage(BuiltGraph builtGraph, StageInlet<T> inlet, StageOutlet<T> outlet, Function<Throwable, T> function) {
17+
super(builtGraph);
18+
this.inlet = inlet;
19+
this.outlet = outlet;
20+
this.function = function;
21+
22+
inlet.setListener(this);
23+
outlet.setListener(this);
24+
}
25+
26+
@Override
27+
public void onPush() {
28+
outlet.push(inlet.grab());
29+
}
30+
31+
@Override
32+
public void onUpstreamFinish() {
33+
outlet.complete();
34+
}
35+
36+
@Override
37+
public void onUpstreamFailure(Throwable error) {
38+
try {
39+
T element = function.apply(error);
40+
if (outlet.isAvailable()) {
41+
outlet.push(element);
42+
outlet.complete();
43+
} else {
44+
recoveredElement = element;
45+
}
46+
} catch (Exception e) {
47+
outlet.fail(e);
48+
}
49+
}
50+
51+
@Override
52+
public void onPull() {
53+
if (recoveredElement != null) {
54+
outlet.push(recoveredElement);
55+
outlet.complete();
56+
} else {
57+
inlet.pull();
58+
}
59+
}
60+
61+
@Override
62+
public void onDownstreamFinish() {
63+
inlet.cancel();
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
package com.lightbend.microprofile.reactive.streams.zerodep;
5+
6+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
7+
8+
import java.util.function.Function;
9+
10+
class OnErrorResumeWithStage<T> extends GraphStage implements InletListener {
11+
12+
private final StageInlet<T> inlet;
13+
private final StageOutlet<T> outlet;
14+
private final Function<Throwable, Graph> function;
15+
16+
public OnErrorResumeWithStage(BuiltGraph builtGraph, StageInlet<T> inlet, StageOutlet<T> outlet, Function<Throwable, Graph> function) {
17+
super(builtGraph);
18+
this.inlet = inlet;
19+
this.outlet = outlet;
20+
this.function = function;
21+
22+
inlet.setListener(this);
23+
outlet.forwardTo(inlet);
24+
}
25+
26+
@Override
27+
public void onPush() {
28+
outlet.push(inlet.grab());
29+
}
30+
31+
@Override
32+
public void onUpstreamFinish() {
33+
outlet.complete();
34+
}
35+
36+
@Override
37+
public void onUpstreamFailure(Throwable error) {
38+
try {
39+
Graph graph = function.apply(error);
40+
41+
BuiltGraph.SubStageInlet<T> newInlet = createSubInlet(graph);
42+
43+
// Wire the new inlet directly to/from the outlet, this stage no longer has any involvement in the stream.
44+
newInlet.forwardTo(outlet);
45+
outlet.forwardTo(newInlet);
46+
47+
newInlet.start();
48+
if (outlet.isAvailable()) {
49+
newInlet.pull();
50+
}
51+
} catch (Exception e) {
52+
outlet.fail(e);
53+
}
54+
}
55+
}

Diff for: zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/Signal.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
* <p>
99
* A signal is used to wrap callbacks to inlet and outlet listeners. Rather than invoking these directly, they are
1010
* enqueued to be invoked in an unrolled fashion, after the current callback is finished. This ensures that stages do
11-
* not have to handle reentrant invocations, when the stage invokes something on a inlet our outlet, it can be sure
12-
* that its state before invocation hasn't changed after the invocation returns. This also solves blowing the stack
13-
* as it ensures there is no unbounded recursion.
11+
* not have to handle reentrant invocations, when the stage invokes something on an inlet or outlet, it can be sure
12+
* that its state before invocation hasn't changed after the invocation returns. This also prevents any unbounded
13+
* recursion due to stages synchronously responding back and forth to each others signals.
1414
* <p>
1515
* Generally speaking, the implementation of inlets and outlets will ensure that a new signal is not allocated each
1616
* time it is needed for pulls and pushes, rather the signal is instantiated once at start up and reused each time.
1717
* This is safe to do since those signals don't carry state.
1818
*/
19-
public interface Signal {
19+
interface Signal {
2020
/**
2121
* Invoke the signal.
2222
*/

Diff for: zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/StageInlet.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,19 @@ interface InletListener {
9191
/**
9292
* Indicates that an element has been pushed. The element can be received using {@link StageInlet#grab()}.
9393
* <p>
94-
* If this throws an exception, the error will be passed to {@link #onUpstreamFailure(Throwable)}, upstream will
95-
* be cancelled, and the stage will not receive any further signals.
94+
* If this throws an exception, the error will be passed to {@link #onUpstreamFailure(Throwable)}, anything upstream
95+
* from this inlet will be cancelled, and the stage listening will not receive any further signals.
9696
*/
9797
void onPush();
9898

9999
/**
100100
* Indicates that upstream has completed the stream. Unless this throws an exception, no signals may be sent to the
101101
* inlet after this has been invoked.
102102
* <p>
103-
* If this throws an exception, the error will be passed to {@link #onUpstreamFailure(Throwable)}, upstream will
104-
* be cancelled, and the stage will not receive any further signals. Stages should be careful to ensure that if they
105-
* do throw from this method, that they are ready to receive that exception from {@code onUpstreamFailure}.
103+
* If this throws an exception, the error will be passed to {@link #onUpstreamFailure(Throwable)}, anything upstream
104+
* from this inlet will be cancelled, and the stage will not receive any further signals. Stages should be careful
105+
* to ensure that if they do throw from this method, that they are ready to receive that exception from
106+
* {@code onUpstreamFailure}.
106107
*/
107108
void onUpstreamFinish();
108109

@@ -111,9 +112,9 @@ interface InletListener {
111112
* will be sent to this listener.
112113
* <p>
113114
* If this throws an exception, the entire stream will be shut down, since there's no other way to guarantee that
114-
* the failure signal will be propagated downstream. Hence, stages should generally not throw exceptions from this
115-
* method, particularly exceptions from user supplied callbacks, as such errors will not be recoverable (eg, a
116-
* recover stage won't be able to resume the stream).
115+
* the failure signal will be propagated downstream. Hence, stages should not throw exceptions from this method,
116+
* particularly exceptions from user supplied callbacks, as such errors will not be recoverable (eg, a recover stage
117+
* won't be able to resume the stream).
117118
*/
118119
void onUpstreamFailure(Throwable error);
119120
}

0 commit comments

Comments
 (0)