Skip to content

Commit faa2dde

Browse files
committed
Added method for accessing the graph directly
1 parent a26cbad commit faa2dde

File tree

4 files changed

+96
-3
lines changed

4 files changed

+96
-3
lines changed

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/CompletionRunner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
2223
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2324

2425
import java.util.concurrent.CompletionStage;
@@ -58,6 +59,10 @@ public CompletionStage<T> run() {
5859
* @return A completion stage that will be redeemed with the result of the stream, or an error if the stream fails.
5960
*/
6061
public CompletionStage<T> run(ReactiveStreamsEngine engine) {
61-
return engine.buildCompletion(graphBuilder.build(false, false));
62+
return engine.buildCompletion(toGraph());
63+
}
64+
65+
Graph toGraph() {
66+
return graphBuilder.build(false, false);
6267
}
6368
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2018 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
******************************************************************************/
19+
20+
package org.eclipse.microprofile.reactive.streams;
21+
22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
23+
24+
/**
25+
* Exists to allow access to the {@code toGraph} methods on each builder, so that these methods don't have to be
26+
* exposed publicly in the API.
27+
*
28+
* This is intended only for use by implementations of the API, to get direct access to the graphs without having to
29+
* build a publisher, processor or subscriber. This is particularly useful for cases where an implementation would
30+
* like to do additional manipulation using its own API to the stream, but have those manipulations fused to the
31+
* graph being manipulated.
32+
*/
33+
public class GraphAccessor {
34+
35+
private GraphAccessor() {
36+
}
37+
38+
/**
39+
* Build the graph for the given {@link PublisherBuilder}.
40+
*
41+
* @param publisherBuilder The builder to build the graph for.
42+
* @return The built graph.
43+
*/
44+
public static Graph buildGraphFor(PublisherBuilder<?> publisherBuilder) {
45+
return publisherBuilder.toGraph();
46+
}
47+
48+
/**
49+
* Build the graph for the given {@link ProcessorBuilder}.
50+
*
51+
* @param processorBuilder The builder to build the graph for.
52+
* @return The built graph.
53+
*/
54+
public static Graph buildGraphFor(ProcessorBuilder<?, ?> processorBuilder) {
55+
return processorBuilder.toGraph();
56+
}
57+
58+
/**
59+
* Build the graph for the given {@link SubscriberBuilder}.
60+
*
61+
* @param subscriberBuilder The builder to build the graph for.
62+
* @return The built graph.
63+
*/
64+
public static Graph buildGraphFor(SubscriberBuilder<?, ?> subscriberBuilder) {
65+
return subscriberBuilder.toGraph();
66+
}
67+
68+
/**
69+
* Build the graph for the given {@link CompletionRunner}.
70+
*
71+
* @param completionRunner The runner to build the graph for.
72+
* @return The built graph.
73+
*/
74+
public static Graph buildGraphFor(CompletionRunner<?> completionRunner) {
75+
return completionRunner.toGraph();
76+
}
77+
78+
}

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ProcessorBuilder.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
2223
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2324
import org.eclipse.microprofile.reactive.streams.spi.Stage;
2425
import org.reactivestreams.Processor;
@@ -432,7 +433,11 @@ public Processor<T, R> buildRs() {
432433
* @return A {@link Processor} that will run this stream.
433434
*/
434435
public Processor<T, R> buildRs(ReactiveStreamsEngine engine) {
435-
return engine.buildProcessor(graphBuilder.build(true, true));
436+
return engine.buildProcessor(toGraph());
437+
}
438+
439+
Graph toGraph() {
440+
return graphBuilder.build(true, true);
436441
}
437442

438443
private <S> ProcessorBuilder<T, S> addStage(Stage stage) {

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/SubscriberBuilder.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
2223
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2324
import org.eclipse.microprofile.reactive.streams.spi.Stage;
2425

@@ -62,7 +63,11 @@ public CompletionSubscriber<T, R> build() {
6263
* @return A {@link CompletionSubscriber} that will run this stream.
6364
*/
6465
public CompletionSubscriber<T, R> build(ReactiveStreamsEngine engine) {
65-
return engine.buildSubscriber(graphBuilder.build(true, false));
66+
return engine.buildSubscriber(toGraph());
67+
}
68+
69+
Graph toGraph() {
70+
return graphBuilder.build(true, false);
6671
}
6772

6873
ReactiveStreamsGraphBuilder getGraphBuilder() {

0 commit comments

Comments
 (0)