Skip to content

Commit dda1ada

Browse files
authored
Merge pull request #67 from cescoffier/define-recover-stages
Define the onErrorResume stage.
2 parents 41f41e2 + a17afd4 commit dda1ada

File tree

5 files changed

+476
-2
lines changed

5 files changed

+476
-2
lines changed

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ProcessorBuilder.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,71 @@ public ProcessorBuilder<T, R> onError(Consumer<Throwable> errorHandler) {
407407
return addStage(new Stage.OnError(errorHandler));
408408
}
409409

410+
/**
411+
* Returns a stream containing all the elements from this stream. Additionally, in the case of failure, rather than
412+
* invoking {@link #onError(Consumer)}, it invokes the given method and emits the result as final event of the stream.
413+
*
414+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
415+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
416+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
417+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead emit the return value of
418+
* the passed function. This operator prevents errors from propagating or to supply fallback data should errors be
419+
* encountered.
420+
*
421+
* @param errorHandler the function returning the value that need to be emitting instead of the error.
422+
* The function must not return {@code null}
423+
* @return The new processor
424+
*/
425+
public ProcessorBuilder<T, R> onErrorResume(Function<Throwable, R> errorHandler) {
426+
return addStage(new Stage.OnErrorResume(errorHandler));
427+
}
428+
429+
/**
430+
* Returns a stream containing all the elements from this stream. Additionally, in the case of failure, rather than
431+
* invoking {@link #onError(Consumer)}, it invokes the given method and emits the returned {@link PublisherBuilder}
432+
* instead.
433+
*
434+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
435+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
436+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
437+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead relinquish control to the
438+
* {@link PublisherBuilder} returned from given function, which invoke the subscriber's <code>onNext</code> method if
439+
* it is able to do so. In such a case, because no publisher necessarily invokes <code>onError</code>, the subscriber
440+
* may never know that an error happened.
441+
*
442+
* @param errorHandler the function returning the stream that need to be emitting instead of the error.
443+
* The function must not return {@code null}
444+
* @return The new processor
445+
*/
446+
public ProcessorBuilder<T, R> onErrorResumeWith(Function<Throwable, PublisherBuilder<R>> errorHandler) {
447+
return addStage(new Stage.OnErrorResumeWith(errorHandler.andThen(PublisherBuilder::toGraph)));
448+
}
449+
450+
/**
451+
* Returns a stream containing all the elements from this stream. Additionally, in the case of failure, rather than
452+
* invoking {@link #onError(Consumer)}, it invokes the given method and emits the returned {@link PublisherBuilder}
453+
* instead.
454+
*
455+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
456+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
457+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
458+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead relinquish control to the
459+
* {@link PublisherBuilder} returned from given function, which invoke the subscriber's <code>onNext</code> method if
460+
* it is able to do so. In such a case, because no publisher necessarily invokes <code>onError</code>, the subscriber
461+
* may never know that an error happened.
462+
*
463+
* @param errorHandler the function returning the stream that need to be emitting instead of the error.
464+
* The function must not return {@code null}
465+
* @return The new processor
466+
*/
467+
public ProcessorBuilder<T, R> onErrorResumeWithPublisher(Function<Throwable, Publisher<R>> errorHandler) {
468+
return addStage(new Stage.OnErrorResumeWith(
469+
errorHandler
470+
.andThen(ReactiveStreams::fromPublisher)
471+
.andThen(PublisherBuilder::toGraph))
472+
);
473+
}
474+
410475
/**
411476
* Returns a stream containing all the elements from this stream, additionally performing the provided action when this
412477
* stream completes or failed. The given action does not know if the stream failed or completed. If you need to

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/PublisherBuilder.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,71 @@ public PublisherBuilder<T> onError(Consumer<Throwable> errorHandler) {
425425
return addStage(new Stage.OnError(errorHandler));
426426
}
427427

428+
/**
429+
* Returns a stream containing all the elements from this stream. Additionally, in the case of failure, rather than
430+
* invoking {@link #onError(Consumer)}, it invokes the given method and emits the result as final event of the stream.
431+
*
432+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
433+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
434+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
435+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead emit the return value of
436+
* the passed function. This operator prevents errors from propagating or to supply fallback data should errors be
437+
* encountered.
438+
*
439+
* @param errorHandler the function returning the value that need to be emitting instead of the error.
440+
* The function must not return {@code null}
441+
* @return The new publisher
442+
*/
443+
public PublisherBuilder<T> onErrorResume(Function<Throwable, T> errorHandler) {
444+
return addStage(new Stage.OnErrorResume(errorHandler));
445+
}
446+
447+
/**
448+
* Returns a stream containing all the elements from this stream. Additionally, in the case of failure, rather than
449+
* invoking {@link #onError(Consumer)}, it invokes the given method and emits the returned {@link PublisherBuilder}
450+
* instead.
451+
*
452+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
453+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
454+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
455+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead relinquish control to the
456+
* {@link PublisherBuilder} returned from given function, which invoke the subscriber's <code>onNext</code> method if
457+
* it is able to do so. In such a case, because no publisher necessarily invokes <code>onError</code>, the subscriber
458+
* may never know that an error happened.
459+
*
460+
* @param errorHandler the function returning the stream that need to be emitting instead of the error.
461+
* The function must not return {@code null}
462+
* @return The new publisher
463+
*/
464+
public PublisherBuilder<T> onErrorResumeWith(Function<Throwable, PublisherBuilder<T>> errorHandler) {
465+
return addStage(new Stage.OnErrorResumeWith(errorHandler.andThen(PublisherBuilder::toGraph)));
466+
}
467+
468+
/**
469+
* Returns a stream containing all the elements from this stream. Additionally, in the case of failure, rather than
470+
* invoking {@link #onError(Consumer)}, it invokes the given method and emits the returned {@link PublisherBuilder}
471+
* instead.
472+
*
473+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
474+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
475+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
476+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead relinquish control to the
477+
* {@link PublisherBuilder} returned from given function, which invoke the subscriber's <code>onNext</code> method if
478+
* it is able to do so. In such a case, because no publisher necessarily invokes <code>onError</code>, the subscriber
479+
* may never know that an error happened.
480+
*
481+
* @param errorHandler the function returning the stream that need to be emitting instead of the error.
482+
* The function must not return {@code null}
483+
* @return The new publisher
484+
*/
485+
public <S> PublisherBuilder<S> onErrorResumeWithPublisher(Function<Throwable, Publisher<? extends S>> errorHandler) {
486+
return addStage(new Stage.OnErrorResumeWith(
487+
errorHandler
488+
.andThen(ReactiveStreams::fromPublisher)
489+
.andThen(PublisherBuilder::toGraph))
490+
);
491+
}
492+
428493
/**
429494
* Returns a stream containing all the elements from this stream, additionally performing the provided action when this
430495
* stream completes or failed. The given action does not know if the stream failed or completed. If you need to

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/spi/Stage.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,73 @@ public Runnable getAction() {
559559
}
560560
}
561561

562+
/**
563+
* A stage to handle error from upstream. It builds a stream containing all the elements from upstream. Additionally,
564+
* in the case of failure, rather than invoking {@link Subscriber#onError(Throwable)}, it invokes a given method and
565+
* emits the result as final event of the stream.
566+
*
567+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
568+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
569+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
570+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead emit the return value of
571+
* the passed function. This operator prevents errors from propagating or to supply fallback data should errors be
572+
* encountered.
573+
*
574+
* Any {@link RuntimeException} thrown by the function should be propagated down the stream as an error.
575+
*
576+
*/
577+
final class OnErrorResume implements Inlet, Outlet {
578+
private final Function<Throwable, ?> function;
579+
580+
581+
public OnErrorResume(Function<Throwable, ?> function) {
582+
this.function = function;
583+
}
584+
585+
/**
586+
* The error handler.
587+
*
588+
* @return the error handler.
589+
*/
590+
public Function<Throwable, ?> getFunction() {
591+
return function;
592+
}
593+
}
594+
595+
/**
596+
* A stage to handle error from upstream. It builds a stream containing all the elements from upstream. Additionally,
597+
* in the case of failure, rather than invoking {@link Subscriber#onError(Throwable)}, it invokes a given method and
598+
* switch the control to the returned stream.
599+
*
600+
* By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
601+
* the stream (publisher) invokes its subscriber's <code>onError</code> method, and then terminate without invoking
602+
* any more of its subscriber's methods. This operator changes this behavior. If the current stream encounters an
603+
* error, instead of invoking its subscriber's <code>onError</code> method, it will instead relinquish control to the
604+
* {@link org.eclipse.microprofile.reactive.streams.PublisherBuilder} returned from given function, which invoke the
605+
* subscriber's <code>onNext</code> method if it is able to do so. In such a case, because no publisher necessarily
606+
* invokes <code>onError</code>, the subscriber may never know that an error happened.
607+
*
608+
* Any {@link RuntimeException} thrown by the function should be propagated down the stream as an error.
609+
*
610+
*/
611+
final class OnErrorResumeWith implements Inlet, Outlet {
612+
private final Function<Throwable, Graph> function;
613+
614+
615+
public OnErrorResumeWith(Function<Throwable, Graph> function) {
616+
this.function = function;
617+
}
618+
619+
/**
620+
* The error handler.
621+
*
622+
* @return the error handler.
623+
*/
624+
public Function<Throwable, Graph> getFunction() {
625+
return function;
626+
}
627+
}
628+
562629
/**
563630
* A failed publisher.
564631
* <p>
@@ -616,7 +683,7 @@ public Graph getSecond() {
616683
}
617684

618685
final class Cancel implements Inlet {
619-
public final static Cancel INSTANCE = new Cancel();
686+
public static final Cancel INSTANCE = new Cancel();
620687

621688
private Cancel() {
622689
}

0 commit comments

Comments
 (0)