Skip to content

Commit c85de55

Browse files
authored
Merge pull request #64 from jroper/to-graph
Added method for accessing the graph directly
2 parents 86b4cc4 + a0590fa commit c85de55

File tree

7 files changed

+215
-3
lines changed

7 files changed

+215
-3
lines changed

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/CompletionRunner.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
2223
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2324

2425
import java.util.concurrent.CompletionStage;
@@ -58,6 +59,10 @@ public CompletionStage<T> run() {
5859
* @return A completion stage that will be redeemed with the result of the stream, or an error if the stream fails.
5960
*/
6061
public CompletionStage<T> run(ReactiveStreamsEngine engine) {
61-
return engine.buildCompletion(graphBuilder.build(false, false));
62+
return engine.buildCompletion(toGraph());
63+
}
64+
65+
Graph toGraph() {
66+
return graphBuilder.build(false, false);
6267
}
6368
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2018 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
******************************************************************************/
19+
20+
package org.eclipse.microprofile.reactive.streams;
21+
22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
23+
24+
/**
25+
* Exists to allow access to the {@code toGraph} methods on each builder, so that these methods don't have to be
26+
* exposed publicly in the API.
27+
*
28+
* This is intended only for use by implementations of the API, to get direct access to the graphs without having to
29+
* build a publisher, processor or subscriber. This is particularly useful for cases where an implementation would
30+
* like to do additional manipulation using its own API to the stream, but have those manipulations fused to the
31+
* graph being manipulated.
32+
*/
33+
public class GraphAccessor {
34+
35+
private GraphAccessor() {
36+
}
37+
38+
/**
39+
* Build the graph for the given {@link PublisherBuilder}.
40+
*
41+
* @param publisherBuilder The builder to build the graph for.
42+
* @return The built graph.
43+
*/
44+
public static Graph buildGraphFor(PublisherBuilder<?> publisherBuilder) {
45+
return publisherBuilder.toGraph();
46+
}
47+
48+
/**
49+
* Build the graph for the given {@link ProcessorBuilder}.
50+
*
51+
* @param processorBuilder The builder to build the graph for.
52+
* @return The built graph.
53+
*/
54+
public static Graph buildGraphFor(ProcessorBuilder<?, ?> processorBuilder) {
55+
return processorBuilder.toGraph();
56+
}
57+
58+
/**
59+
* Build the graph for the given {@link SubscriberBuilder}.
60+
*
61+
* @param subscriberBuilder The builder to build the graph for.
62+
* @return The built graph.
63+
*/
64+
public static Graph buildGraphFor(SubscriberBuilder<?, ?> subscriberBuilder) {
65+
return subscriberBuilder.toGraph();
66+
}
67+
68+
/**
69+
* Build the graph for the given {@link CompletionRunner}.
70+
*
71+
* @param completionRunner The runner to build the graph for.
72+
* @return The built graph.
73+
*/
74+
public static Graph buildGraphFor(CompletionRunner<?> completionRunner) {
75+
return completionRunner.toGraph();
76+
}
77+
78+
}

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/ProcessorBuilder.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
2223
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2324
import org.eclipse.microprofile.reactive.streams.spi.Stage;
2425
import org.reactivestreams.Processor;
@@ -470,7 +471,11 @@ public Processor<T, R> buildRs() {
470471
* @return A {@link Processor} that will run this stream.
471472
*/
472473
public Processor<T, R> buildRs(ReactiveStreamsEngine engine) {
473-
return engine.buildProcessor(graphBuilder.build(true, true));
474+
return engine.buildProcessor(toGraph());
475+
}
476+
477+
Graph toGraph() {
478+
return graphBuilder.build(true, true);
474479
}
475480

476481
private <S> ProcessorBuilder<T, S> addStage(Stage stage) {

streams/api/src/main/java/org/eclipse/microprofile/reactive/streams/SubscriberBuilder.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.eclipse.microprofile.reactive.streams;
2121

22+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
2223
import org.eclipse.microprofile.reactive.streams.spi.ReactiveStreamsEngine;
2324
import org.eclipse.microprofile.reactive.streams.spi.Stage;
2425

@@ -62,7 +63,11 @@ public CompletionSubscriber<T, R> build() {
6263
* @return A {@link CompletionSubscriber} that will run this stream.
6364
*/
6465
public CompletionSubscriber<T, R> build(ReactiveStreamsEngine engine) {
65-
return engine.buildSubscriber(graphBuilder.build(true, false));
66+
return engine.buildSubscriber(toGraph());
67+
}
68+
69+
Graph toGraph() {
70+
return graphBuilder.build(true, false);
6671
}
6772

6873
ReactiveStreamsGraphBuilder getGraphBuilder() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2018 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
******************************************************************************/
19+
20+
package org.eclipse.microprofile.reactive.streams.tck;
21+
22+
import org.eclipse.microprofile.reactive.streams.GraphAccessor;
23+
import org.eclipse.microprofile.reactive.streams.ReactiveStreams;
24+
import org.eclipse.microprofile.reactive.streams.spi.Graph;
25+
import org.eclipse.microprofile.reactive.streams.spi.Stage;
26+
import org.testng.annotations.Test;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
import static org.testng.Assert.assertEquals;
32+
import static org.testng.Assert.assertTrue;
33+
34+
/**
35+
* Test for the GraphAccessor class.
36+
*
37+
* This does not need an implementation of the engine to verify it.
38+
*/
39+
public class GraphAccessorVerification {
40+
41+
@Test
42+
public void buildGraphForPublisherShouldProduceTheCorrectGraph() {
43+
Graph graph = GraphAccessor.buildGraphFor(ReactiveStreams.of(1).map(i -> i * 2));
44+
assertEquals(2, graph.getStages().size());
45+
List<Stage> stages = new ArrayList<>(graph.getStages());
46+
assertInstanceOf(stages.get(0), Stage.Of.class);
47+
assertInstanceOf(stages.get(1), Stage.Map.class);
48+
}
49+
50+
@Test
51+
public void buildGraphForProcessorShouldProduceTheCorrectGraph() {
52+
Graph graph = GraphAccessor.buildGraphFor(ReactiveStreams.<Integer>builder().filter(i -> i > 10).takeWhile(i -> i < 20));
53+
assertEquals(2, graph.getStages().size());
54+
List<Stage> stages = new ArrayList<>(graph.getStages());
55+
assertInstanceOf(stages.get(0), Stage.Filter.class);
56+
assertInstanceOf(stages.get(1), Stage.TakeWhile.class);
57+
}
58+
59+
@Test
60+
public void buildGraphForSubscriberShouldProduceTheCorrectGraph() {
61+
Graph graph = GraphAccessor.buildGraphFor(ReactiveStreams.<Integer>builder().limit(10).toList());
62+
assertEquals(2, graph.getStages().size());
63+
List<Stage> stages = new ArrayList<>(graph.getStages());
64+
assertInstanceOf(stages.get(0), Stage.Limit.class);
65+
assertInstanceOf(stages.get(1), Stage.Collect.class);
66+
}
67+
68+
@Test
69+
public void buildGraphForCompletionRunnerShouldProduceTheCorrectGraph() {
70+
Graph graph = GraphAccessor.buildGraphFor(ReactiveStreams.failed(new Exception()).distinct().cancel());
71+
assertEquals(3, graph.getStages().size());
72+
List<Stage> stages = new ArrayList<>(graph.getStages());
73+
assertInstanceOf(stages.get(0), Stage.Failed.class);
74+
assertInstanceOf(stages.get(1), Stage.Distinct.class);
75+
assertEquals(Stage.Cancel.INSTANCE, stages.get(2));
76+
}
77+
78+
private static void assertInstanceOf(Object obj, Class<?> clazz) {
79+
assertTrue(clazz.isInstance(obj), obj + " is not a an instance of " + clazz);
80+
}
81+
}

streams/tck/src/main/java/org/eclipse/microprofile/reactive/streams/tck/ReactiveStreamsTck.java

+3
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ public Object[] allTests() {
114114
allTests.addAll(stageVerification.reactiveStreamsTckVerifiers());
115115
}
116116

117+
// Add tests that aren't dependent on the dependencies.
118+
allTests.add(new GraphAccessorVerification());
119+
117120
return allTests.stream().filter(this::isEnabled).toArray();
118121
}
119122

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2018 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* You may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
******************************************************************************/
19+
20+
package org.eclipse.microprofile.reactive.streams.tck;
21+
22+
import org.testng.annotations.Factory;
23+
24+
/**
25+
* This runs any tests that don't require an implementation to run.
26+
*/
27+
public class TckTest {
28+
29+
@Factory
30+
public Object[] independentTests() {
31+
return new Object[] {
32+
new GraphAccessorVerification()
33+
};
34+
}
35+
}

0 commit comments

Comments
 (0)