Skip to content
This repository was archived by the owner on Nov 1, 2024. It is now read-only.

Commit e15be0d

Browse files
authored
Merge pull request #5 from jroper/zerodep-improvements
Error handling improvements in zerodep impl
2 parents a001d8e + 4fef011 commit e15be0d

File tree

10 files changed

+114
-130
lines changed

10 files changed

+114
-130
lines changed

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/BuiltGraph.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -494,12 +494,7 @@ public void execute(Runnable command) {
494494
int signalsDrained = 0;
495495
while (!signals.isEmpty() && signalsDrained < 32) {
496496
signalsDrained++;
497-
Signal signal = signals.removeFirst();
498-
try {
499-
signal.signal();
500-
} catch (RuntimeException e) {
501-
signal.signalFailed(e);
502-
}
497+
signals.removeFirst().signal();
503498
}
504499

505500
// If there were more than 32 unrolled signals, we resubmit
@@ -509,9 +504,9 @@ public void execute(Runnable command) {
509504
});
510505
}
511506

512-
} catch (RuntimeException e) {
507+
} catch (Throwable t) {
513508
// shut down the stream
514-
streamFailure(e);
509+
streamFailure(t);
515510
// Clear remaining signals
516511
signals.clear();
517512
}
@@ -533,7 +528,11 @@ private void streamFailure(Throwable error) {
533528
// todo handle better
534529
error.printStackTrace();
535530
for (Port port : ports) {
536-
port.onStreamFailure(error);
531+
try {
532+
port.onStreamFailure(error);
533+
} catch (Exception e) {
534+
// Ignore
535+
}
537536
}
538537
ports.clear();
539538
}

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/CollectStage.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,7 @@ public void onPush() {
4040

4141
@Override
4242
public void onUpstreamFinish() {
43-
try {
44-
result.complete(collector.finisher().apply(container));
45-
} catch (RuntimeException e) {
46-
result.completeExceptionally(e);
47-
}
43+
result.complete(collector.finisher().apply(container));
4844
container = null;
4945
}
5046

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OnCompleteStage.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,8 @@ public void onPush() {
2525

2626
@Override
2727
public void onUpstreamFinish() {
28-
try {
29-
action.run();
30-
outlet.complete();
31-
} catch (RuntimeException e) {
32-
outlet.fail(e);
33-
}
28+
action.run();
29+
outlet.complete();
3430
}
3531

3632
@Override

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/OnTerminateStage.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,8 @@ public void onPush() {
2525

2626
@Override
2727
public void onUpstreamFinish() {
28-
try {
29-
action.run();
30-
outlet.complete();
31-
} catch (RuntimeException e) {
32-
outlet.fail(e);
33-
}
28+
action.run();
29+
outlet.complete();
3430
}
3531

3632
@Override

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/PublisherOutlet.java

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ public void onStreamFailure(Throwable reason) {
4141
}
4242
}
4343
if (!downstreamFinished) {
44-
failure = reason;
4544
if (subscriber != null) {
4645
downstreamFinished = true;
4746
subscriber.onError(reason);
47+
} else {
48+
failure = reason;
4849
}
4950
}
5051
}
@@ -91,6 +92,7 @@ public void cancel() {
9192

9293
@Override
9394
public void request(long n) {
95+
// possible optimization: place n in a queue, and dispatch singleton runnable to executor, to save an allocation
9496
builtGraph.execute(() -> {
9597
if (!upstreamFinished) {
9698
if (n <= 0) {
@@ -192,30 +194,21 @@ public void setListener(OutletListener listener) {
192194
this.listener = Objects.requireNonNull(listener, "Listener must not be null");
193195
}
194196

195-
private abstract class AbstractSignal implements Signal {
196-
@Override
197-
public void signalFailed(Throwable error) {
198-
onStreamFailure(error);
199-
}
200-
}
201-
202-
private final Signal onPullSignal = new AbstractSignal() {
203-
@Override
204-
public void signal() {
205-
if (!upstreamFinished) {
206-
pulled = true;
197+
private final Signal onPullSignal = () -> {
198+
if (!upstreamFinished) {
199+
pulled = true;
200+
try {
207201
listener.onPull();
202+
} catch (Exception e) {
203+
onStreamFailure(e);
208204
}
209205
}
210206
};
211207

212-
private final Signal onDownstreamFinishSignal = new AbstractSignal() {
213-
@Override
214-
public void signal() {
215-
if (!upstreamFinished) {
216-
upstreamFinished = true;
217-
listener.onDownstreamFinish();
218-
}
208+
private final Signal onDownstreamFinishSignal = () -> {
209+
if (!upstreamFinished) {
210+
upstreamFinished = true;
211+
listener.onDownstreamFinish();
219212
}
220213
};
221214
}

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/Signal.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,20 @@
55

66
/**
77
* A signal, used by inlets and outlets.
8-
*
9-
* A signal represents a callback to an inlet or outlet listener. All callbacks should be wrapped in a signal to
10-
* allow signals to be enqueued, so that they can be executed in an unrolled fashion, rather than recursively.
11-
*
12-
* Implementations of signal are only allocated when a stream or sub stream starts. They should carry no state, any
13-
* state associated with the signal should be held by the inlet or outlet. This ensures that no allocations are done
14-
* of signals while running the stream.
8+
* <p>
9+
* A signal is used to wrap callbacks to inlet and outlet listeners. Rather than invoking these directly, they are
10+
* enqueued to be invoked in an unrolled fashion, after the current callback is finished. This ensures that stages do
11+
* not have to handle reentrant invocations, when the stage invokes something on a inlet our outlet, it can be sure
12+
* that its state before invocation hasn't changed after the invocation returns. This also solves blowing the stack
13+
* as it ensures there is no unbounded recursion.
14+
* <p>
15+
* Generally speaking, the implementation of inlets and outlets will ensure that a new signal is not allocated each
16+
* time it is needed for pulls and pushes, rather the signal is instantiated once at start up and reused each time.
17+
* This is safe to do since those signals don't carry state.
1518
*/
1619
public interface Signal {
1720
/**
1821
* Invoke the signal.
1922
*/
2023
void signal();
21-
22-
/**
23-
* This will be invoked if {@link #signal()} throws an exception. Implementations of this should then terminate both
24-
* upstream and downstream. This mechanism means that {@code signal} doesn't have to do its own exception handling,
25-
* but rather exceptions can be handled generically by a stage.
26-
*/
27-
void signalFailed(Throwable t);
2824
}

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/StageInlet.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,30 @@ interface InletListener {
9090

9191
/**
9292
* Indicates that an element has been pushed. The element can be received using {@link StageInlet#grab()}.
93+
* <p>
94+
* If this throws an exception, the error will be passed to {@link #onUpstreamFailure(Throwable)}, upstream will
95+
* be cancelled, and the stage will not receive any further signals.
9396
*/
9497
void onPush();
9598

9699
/**
97-
* Indicates that upstream has completed the stream. No signals may be sent to the inlet after this has been invoked.
98-
*
99-
* This must be very careful not to throw an exception. If it does, then the signal to complete will not reach
100-
* downstream.
100+
* Indicates that upstream has completed the stream. Unless this throws an exception, no signals may be sent to the
101+
* inlet after this has been invoked.
102+
* <p>
103+
* If this throws an exception, the error will be passed to {@link #onUpstreamFailure(Throwable)}, upstream will
104+
* be cancelled, and the stage will not receive any further signals. Stages should be careful to ensure that if they
105+
* do throw from this method, that they are ready to receive that exception from {@code onUpstreamFailure}.
101106
*/
102107
void onUpstreamFinish();
103108

104109
/**
105-
* Indicates that upstream has completed the stream with a failure. No signals may be sent to the inlet after this has
106-
* been invoked.
107-
*
108-
* This must be very careful not to throw an exception. If it does, then the signal to complete will not reach
109-
* downstream.
110+
* Indicates that upstream has completed the stream with a failure. Once this has been invoked, no other signals
111+
* will be sent to this listener.
112+
* <p>
113+
* If this throws an exception, the entire stream will be shut down, since there's no other way to guarantee that
114+
* the failure signal will be propagated downstream. Hence, stages should generally not throw exceptions from this
115+
* method, particularly exceptions from user supplied callbacks, as such errors will not be recoverable (eg, a
116+
* recover stage won't be able to resume the stream).
110117
*/
111118
void onUpstreamFailure(Throwable error);
112119
}

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/StageOutlet.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,20 @@ public void onDownstreamFinish() {
7878
interface OutletListener {
7979
/**
8080
* A pull signal, indicates that downstream is ready to be pushed to.
81+
* <p>
82+
* If this throws an exception, the stage will be cancelled using {@link #onDownstreamFinish()} and the error will
83+
* be propagated downstream.
8184
*/
8285
void onPull();
8386

8487
/**
8588
* A completion signal, indicates that downstream has completed. No further signals may be sent to this outlet after
8689
* this signal is received.
87-
*
88-
* This must be very careful not to throw an exception. If it does, then the signal to cancel will not reach
89-
* upstream.
90+
* <p>
91+
* If this throws an exception, the entire stream will be shut down, since there's no other way to guarantee that
92+
* the cancel signal will be propagated upstream. Hence, stages should generally not throw exceptions from this
93+
* method, particularly exceptions from user supplied callbacks, as such errors will not be recoverable (eg, a
94+
* recover stage won't be able to resume the stream).
9095
*/
9196
void onDownstreamFinish();
9297
}

zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/StageOutletInlet.java

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -166,69 +166,67 @@ public void setListener(InletListener listener) {
166166
}
167167
}
168168

169-
private abstract class AbstractSignal implements Signal {
169+
private abstract class RecoverableSignal implements Signal {
170170
@Override
171-
public void signalFailed(Throwable error) {
172-
if (!outletFinished) {
173-
outletFinished = true;
174-
outletListener.onDownstreamFinish();
175-
}
176-
if (!inletFinished) {
177-
inletFinished = true;
178-
inletListener.onUpstreamFailure(error);
171+
public final void signal() {
172+
try {
173+
doSignal();
174+
} catch (Exception e) {
175+
onStreamFailure(e);
179176
}
180177
}
178+
179+
protected abstract void doSignal();
181180
}
182181

183-
private final Signal onPullSignal = new AbstractSignal() {
182+
private final Signal onPullSignal = new RecoverableSignal() {
184183
@Override
185-
public void signal() {
184+
protected void doSignal() {
186185
if (!outletFinished) {
187186
outletPulled = true;
188187
outletListener.onPull();
189188
}
190189
}
191190
};
192191

193-
private final Signal onDownstreamFinishSignal = new AbstractSignal() {
194-
@Override
195-
public void signal() {
196-
if (!outletFinished) {
197-
outletFinished = true;
198-
outletListener.onDownstreamFinish();
199-
}
192+
private final Signal onDownstreamFinishSignal = () -> {
193+
if (!outletFinished) {
194+
outletFinished = true;
195+
outletListener.onDownstreamFinish();
200196
}
201197
};
202198

203-
private final Signal onPushSignal = new AbstractSignal() {
199+
private final Signal onPushSignal = new RecoverableSignal() {
204200
@Override
205-
public void signal() {
201+
protected void doSignal() {
206202
if (!inletFinished) {
207203
inletPushed = true;
208204
inletListener.onPush();
209205
}
210206
}
211207
};
212208

213-
private final Signal onUpstreamFinishSignal = new AbstractSignal() {
214-
@Override
215-
public void signal() {
216-
if (!inletFinished) {
217-
inletFinished = true;
209+
private final Signal onUpstreamFinishSignal = () -> {
210+
if (!inletFinished) {
211+
inletFinished = true;
212+
try {
218213
inletListener.onUpstreamFinish();
214+
} catch (Exception e) {
215+
inletListener.onUpstreamFailure(e);
216+
if (!outletFinished) {
217+
outletFinished = true;
218+
outletListener.onDownstreamFinish();
219+
}
219220
}
220221
}
221222
};
222223

223-
private final Signal onUpstreamErrorSignal = new AbstractSignal() {
224-
@Override
225-
public void signal() {
226-
if (!inletFinished) {
227-
inletFinished = true;
228-
Throwable theFailure = failure;
229-
failure = null;
230-
inletListener.onUpstreamFailure(theFailure);
231-
}
224+
private final Signal onUpstreamErrorSignal = () -> {
225+
if (!inletFinished) {
226+
inletFinished = true;
227+
Throwable theFailure = failure;
228+
failure = null;
229+
inletListener.onUpstreamFailure(theFailure);
232230
}
233231
};
234232
}

0 commit comments

Comments
 (0)