15
15
import akka .stream .Attributes ;
16
16
import akka .stream .Materializer ;
17
17
import akka .stream .javadsl .*;
18
+ import org .eclipse .microprofile .reactive .streams .CompletionBuilder ;
19
+ import org .eclipse .microprofile .reactive .streams .ProcessorBuilder ;
20
+ import org .eclipse .microprofile .reactive .streams .PublisherBuilder ;
21
+ import org .eclipse .microprofile .reactive .streams .SubscriberBuilder ;
18
22
import org .eclipse .microprofile .reactive .streams .SubscriberWithResult ;
19
23
import org .eclipse .microprofile .reactive .streams .spi .Graph ;
20
24
import org .eclipse .microprofile .reactive .streams .spi .ReactiveStreamsEngine ;
@@ -45,7 +49,8 @@ public AkkaEngine(Materializer materializer) {
45
49
46
50
@ Override
47
51
public <T > Publisher <T > buildPublisher (Graph graph ) throws UnsupportedStageException {
48
- // Optimization - if it's just a publisher, return it directly
52
+ // Optimization - if it's just a pub
53
+ // lisher, return it directly
49
54
Stage firstStage = graph .getStages ().iterator ().next ();
50
55
if (graph .getStages ().size () == 1 && firstStage instanceof Stage .PublisherStage ) {
51
56
return (Publisher ) ((Stage .PublisherStage ) firstStage ).getRsPublisher ();
@@ -55,13 +60,21 @@ public <T> Publisher<T> buildPublisher(Graph graph) throws UnsupportedStageExcep
55
60
.toMat (Sink .asPublisher (AsPublisher .WITHOUT_FANOUT ), Keep .right ()));
56
61
}
57
62
63
+ /**
64
+ * Convert a publisher builder to a source.
65
+ */
66
+ public <T > Source <T , NotUsed > buildSource (PublisherBuilder <T > publisher ) throws UnsupportedStageException {
67
+ return buildSource (publisher .toGraph ());
68
+ }
69
+
58
70
private <T > Source <T , NotUsed > buildSource (Graph graph ) throws UnsupportedStageException {
59
71
Source source = null ;
60
72
Flow flow = Flow .create ();
61
73
for (Stage stage : graph .getStages ()) {
62
74
if (source == null ) {
63
75
source = toSource (stage );
64
- } else {
76
+ }
77
+ else {
65
78
flow = applyStage (flow , stage );
66
79
}
67
80
}
@@ -71,15 +84,26 @@ private <T> Source<T, NotUsed> buildSource(Graph graph) throws UnsupportedStageE
71
84
72
85
@ Override
73
86
public <T , R > SubscriberWithResult <T , R > buildSubscriber (Graph graph ) throws UnsupportedStageException {
87
+ return (SubscriberWithResult ) materialize (Source .asSubscriber ()
88
+ .toMat (buildSink (graph ), (subscriber , result ) ->
89
+ new SubscriberWithResult ((Subscriber ) subscriber , (CompletionStage ) result )));
90
+ }
91
+
92
+ /**
93
+ * Convert a subscriber builder to a sink.
94
+ */
95
+ public <T , R > Sink <T , CompletionStage <R >> buildSink (SubscriberBuilder <T , R > subscriber ) throws UnsupportedStageException {
96
+ return buildSink (subscriber .toGraph ());
97
+ }
98
+
99
+ private <T , R > Sink <T , CompletionStage <R >> buildSink (Graph graph ) throws UnsupportedStageException {
74
100
Flow flow = Flow .create ();
75
101
for (Stage stage : graph .getStages ()) {
76
102
if (stage .hasOutlet ()) {
77
103
flow = applyStage (flow , stage );
78
- } else {
79
- return (SubscriberWithResult ) materialize (Source .asSubscriber ()
80
- .via (flow )
81
- .toMat (toSink (stage ), (subscriber , result ) ->
82
- new SubscriberWithResult ((Subscriber ) subscriber , (CompletionStage ) result )));
104
+ }
105
+ else {
106
+ return flow .toMat (toSink (stage ), Keep .right ());
83
107
}
84
108
}
85
109
@@ -96,24 +120,48 @@ public <T, R> Processor<T, R> buildProcessor(Graph graph) throws UnsupportedStag
96
120
}
97
121
}
98
122
123
+ return (Processor ) materialize (buildFlow (graph ).toProcessor ());
124
+ }
125
+
126
+ /**
127
+ * Convert a processor builder to a flow.
128
+ */
129
+ public <T , R > Flow <T , R , NotUsed > buildFlow (ProcessorBuilder <T , R > processor ) throws UnsupportedStageException {
130
+ return buildFlow (processor .toGraph ());
131
+ }
132
+
133
+ private <T , R > Flow <T , R , NotUsed > buildFlow (Graph graph ) throws UnsupportedStageException {
99
134
Flow flow = Flow .create ();
100
135
for (Stage stage : graph .getStages ()) {
101
136
flow = applyStage (flow , stage );
102
137
}
103
- return ( Processor ) materialize ( flow . toProcessor ()) ;
138
+ return flow ;
104
139
}
105
140
106
141
@ Override
107
142
public <T > CompletionStage <T > buildCompletion (Graph graph ) throws UnsupportedStageException {
143
+ return materialize (buildRunnableGraph (graph ));
144
+ }
145
+
146
+ /**
147
+ * Convert a completion builder to a runnable graph.
148
+ */
149
+ public <T > RunnableGraph <CompletionStage <T >> buildCompletion (CompletionBuilder <T > completion ) throws UnsupportedStageException {
150
+ return buildRunnableGraph (completion .toGraph ());
151
+ }
152
+
153
+ private <T > RunnableGraph <CompletionStage <T >> buildRunnableGraph (Graph graph ) throws UnsupportedStageException {
108
154
Source source = null ;
109
155
Flow flow = Flow .create ();
110
156
for (Stage stage : graph .getStages ()) {
111
157
if (source == null ) {
112
158
source = toSource (stage );
113
- } else if (stage .hasOutlet ()) {
159
+ }
160
+ else if (stage .hasOutlet ()) {
114
161
flow = applyStage (flow , stage );
115
- } else {
116
- return (CompletionStage ) materialize (source .via (flow ).toMat (toSink (stage ), Keep .right ()));
162
+ }
163
+ else {
164
+ return source .via (flow ).toMat (toSink (stage ), Keep .right ());
117
165
}
118
166
}
119
167
@@ -124,43 +172,53 @@ private Flow applyStage(Flow flow, Stage stage) {
124
172
if (stage instanceof Stage .Map ) {
125
173
Function <Object , Object > mapper = (Function ) ((Stage .Map ) stage ).getMapper ();
126
174
return flow .map (mapper ::apply );
127
- } else if (stage instanceof Stage .Filter ) {
175
+ }
176
+ else if (stage instanceof Stage .Filter ) {
128
177
Predicate <Object > predicate = (Predicate ) (((Stage .Filter ) stage ).getPredicate ()).get ();
129
178
return flow .filter (predicate ::test );
130
- } else if (stage instanceof Stage .FlatMap ) {
179
+ }
180
+ else if (stage instanceof Stage .FlatMap ) {
131
181
Function <Object , Graph > mapper = (Function ) ((Stage .FlatMap ) stage ).getMapper ();
132
182
return flow .flatMapConcat (e -> buildSource (mapper .apply (e )));
133
- } else if (stage instanceof Stage .TakeWhile ) {
183
+ }
184
+ else if (stage instanceof Stage .TakeWhile ) {
134
185
Predicate <Object > predicate = (Predicate ) (((Stage .TakeWhile ) stage ).getPredicate ()).get ();
135
186
boolean inclusive = ((Stage .TakeWhile ) stage ).isInclusive ();
136
187
return flow .takeWhile (predicate ::test , inclusive );
137
- } else if (stage instanceof Stage .FlatMapCompletionStage ) {
188
+ }
189
+ else if (stage instanceof Stage .FlatMapCompletionStage ) {
138
190
Function <Object , CompletionStage <Object >> mapper = (Function ) ((Stage .FlatMapCompletionStage ) stage ).getMapper ();
139
191
return flow .mapAsync (1 , mapper ::apply );
140
- } else if (stage instanceof Stage .FlatMapIterable ) {
192
+ }
193
+ else if (stage instanceof Stage .FlatMapIterable ) {
141
194
Function <Object , Iterable <Object >> mapper = (Function ) ((Stage .FlatMapIterable ) stage ).getMapper ();
142
195
return flow .mapConcat (mapper ::apply );
143
- } else if (stage instanceof Stage .ProcessorStage ) {
196
+ }
197
+ else if (stage instanceof Stage .ProcessorStage ) {
144
198
Processor <Object , Object > processor = (Processor ) (((Stage .ProcessorStage ) stage ).getRsProcessor ());
145
199
Flow processorFlow ;
146
200
try {
147
201
processorFlow = Flow .fromProcessor (() -> processor );
148
- } catch (Exception e ) {
202
+ }
203
+ catch (Exception e ) {
149
204
// Technically can't happen, since the lambda we passed doesn't throw anything.
150
205
throw new RuntimeException ("Unexpected exception thrown" , e );
151
206
}
152
207
return flow .via (processorFlow );
153
- } else if (stage .hasInlet () && stage .hasOutlet ()) {
208
+ }
209
+ else if (stage .hasInlet () && stage .hasOutlet ()) {
154
210
throw new UnsupportedStageException (stage );
155
- } else {
211
+ }
212
+ else {
156
213
throw new IllegalStateException ("Got " + stage + " but needed a stage with an inlet and an outlet." );
157
214
}
158
215
}
159
216
160
217
private Sink toSink (Stage stage ) {
161
218
if (stage == Stage .FindFirst .INSTANCE ) {
162
219
return Sink .headOption ();
163
- } else if (stage instanceof Stage .Collect ) {
220
+ }
221
+ else if (stage instanceof Stage .Collect ) {
164
222
Collector collector = ((Stage .Collect ) stage ).getCollector ();
165
223
BiConsumer accumulator = collector .accumulator ();
166
224
Object firstContainer = collector .supplier ().get ();
@@ -170,31 +228,38 @@ private Sink toSink(Stage stage) {
170
228
Sink <Object , CompletionStage <Object >> sink = Sink .fold (firstContainer , (resultContainer , in ) -> {
171
229
if (resultContainer == NULL ) {
172
230
accumulator .accept (null , in );
173
- } else {
231
+ }
232
+ else {
174
233
accumulator .accept (resultContainer , in );
175
234
}
176
235
return resultContainer ;
177
236
});
178
237
if (collector .characteristics ().contains (Collector .Characteristics .IDENTITY_FINISH ) && firstContainer != NULL ) {
179
238
return sink ;
180
- } else {
239
+ }
240
+ else {
181
241
return sink .mapMaterializedValue (result -> result .thenApply (r -> {
182
242
if (r == NULL ) {
183
243
return collector .finisher ().apply (null );
184
- } else {
244
+ }
245
+ else {
185
246
return collector .finisher ().apply (r );
186
247
}
187
248
}));
188
249
}
189
- } else if (stage instanceof Stage .SubscriberStage ) {
250
+ }
251
+ else if (stage instanceof Stage .SubscriberStage ) {
190
252
return Flow .create ()
191
253
.viaMat (new TerminationWatcher (), Keep .right ())
192
254
.to ((Sink ) Sink .fromSubscriber (((Stage .SubscriberStage ) stage ).getRsSubscriber ()));
193
- } else if (stage == Stage .Cancel .INSTANCE ) {
255
+ }
256
+ else if (stage == Stage .Cancel .INSTANCE ) {
194
257
return Sink .cancelled ().mapMaterializedValue (n -> CompletableFuture .completedFuture (null ));
195
- } else if (stage .hasInlet () && !stage .hasOutlet ()) {
258
+ }
259
+ else if (stage .hasInlet () && !stage .hasOutlet ()) {
196
260
throw new UnsupportedStageException (stage );
197
- } else {
261
+ }
262
+ else {
198
263
throw new IllegalStateException ("Got " + stage + " but needed a stage with an inlet and no outlet." );
199
264
}
200
265
}
@@ -207,22 +272,28 @@ private Source toSource(Stage stage) {
207
272
int size = ((Collection ) elements ).size ();
208
273
if (size == 0 ) {
209
274
return Source .empty ();
210
- } else if (size == 1 ) {
275
+ }
276
+ else if (size == 1 ) {
211
277
return Source .single (elements .iterator ().next ());
212
278
}
213
279
}
214
280
return Source .from (elements );
215
- } else if (stage instanceof Stage .PublisherStage ) {
281
+ }
282
+ else if (stage instanceof Stage .PublisherStage ) {
216
283
return Source .fromPublisher (((Stage .PublisherStage ) stage ).getRsPublisher ());
217
- } else if (stage instanceof Stage .Concat ) {
284
+ }
285
+ else if (stage instanceof Stage .Concat ) {
218
286
Graph first = ((Stage .Concat ) stage ).getFirst ();
219
287
Graph second = ((Stage .Concat ) stage ).getSecond ();
220
288
return buildSource (first ).concat (buildSource (second ));
221
- } else if (stage instanceof Stage .Failed ) {
289
+ }
290
+ else if (stage instanceof Stage .Failed ) {
222
291
return Source .failed (((Stage .Failed ) stage ).getError ());
223
- } else if (stage .hasOutlet () && !stage .hasInlet ()) {
292
+ }
293
+ else if (stage .hasOutlet () && !stage .hasInlet ()) {
224
294
throw new UnsupportedStageException (stage );
225
- } else {
295
+ }
296
+ else {
226
297
throw new IllegalStateException ("Got " + stage + " but needed a stage with an outlet and no inlet." );
227
298
}
228
299
}
@@ -233,7 +304,7 @@ private <T> T materialize(RunnableGraph<T> graph) {
233
304
234
305
/**
235
306
* This attribute does nothing except ensures a reference to this AkkaEngine is kept by the running stream.
236
- *
307
+ * <p>
237
308
* This is to prevent the cleaner used in the AkkaEngineProvider from finding that the AkkaEngine is unreachable
238
309
* while a stream is still running, and shut the engine down. Once all streams stop running, the stream actor will
239
310
* be disposed and the engine will become unreachable (as long as no user code references it), then it can be shut
0 commit comments