@@ -269,9 +269,6 @@ private Builder buildGraph(Graph graph, Shape shape) {
269
269
}
270
270
}
271
271
272
- ports .addAll (builderPorts );
273
- stages .addAll (builderStages );
274
-
275
272
return this ;
276
273
}
277
274
@@ -284,8 +281,22 @@ private void verifyReady() {
284
281
for (Port port : builderPorts ) {
285
282
port .verifyReady ();
286
283
}
284
+ ports .addAll (builderPorts );
285
+ }
286
+
287
+ /**
288
+ * Start the stages on this listener
289
+ */
290
+ private void startGraph () {
291
+ execute (() -> {
292
+ for (GraphStage stage : builderStages ) {
293
+ stages .add (stage );
294
+ stage .postStart ();
295
+ }
296
+ });
287
297
}
288
298
299
+
289
300
private <T > SubStageInlet <T > inlet () {
290
301
Objects .requireNonNull (lastInlet , "Not an inlet graph" );
291
302
assert result == null ;
@@ -360,18 +371,8 @@ private void addStage(Stage stage, StageInlet inlet, Publisher publisher, StageO
360
371
addStage (new OfStage (BuiltGraph .this , outlet ,
361
372
((Stage .Of ) stage ).getElements ()));
362
373
} else if (stage instanceof Stage .Concat ) {
363
-
364
- // Use this builder to build each of the sub stages that are being concatenated as an inlet graph, and then
365
- // capture the last inlet of each to pass to the concat stage.
366
- buildGraph (((Stage .Concat ) stage ).getFirst (), Shape .INLET );
367
- StageInlet firstInlet = lastInlet ;
368
- lastInlet = null ;
369
-
370
- buildGraph (((Stage .Concat ) stage ).getSecond (), Shape .INLET );
371
- StageInlet secondInlet = lastInlet ;
372
- lastInlet = null ;
373
-
374
- addStage (new ConcatStage (BuiltGraph .this , firstInlet , secondInlet , outlet ));
374
+ Stage .Concat concat = (Stage .Concat ) stage ;
375
+ addStage (new ConcatStage (BuiltGraph .this , buildSubInlet (concat .getFirst ()), buildSubInlet (concat .getSecond ()), outlet ));
375
376
} else if (stage instanceof Stage .PublisherStage ) {
376
377
addStage (new ConnectorStage (BuiltGraph .this , ((Stage .PublisherStage ) stage ).getRsPublisher (), subscriber ));
377
378
} else if (stage instanceof Stage .Failed ) {
@@ -517,17 +518,6 @@ public void execute(Runnable command) {
517
518
});
518
519
}
519
520
520
- /**
521
- * Start the whole graph.
522
- */
523
- private void startGraph () {
524
- execute (() -> {
525
- for (GraphStage stage : stages ) {
526
- stage .postStart ();
527
- }
528
- });
529
- }
530
-
531
521
private void streamFailure (Throwable error ) {
532
522
// todo handle better
533
523
error .printStackTrace ();
@@ -558,8 +548,6 @@ final class SubStageInlet<T> implements StageInlet<T> {
558
548
private final List <GraphStage > subStages ;
559
549
private final List <Port > subStagePorts ;
560
550
561
- private boolean started = false ;
562
-
563
551
private SubStageInlet (StageInlet <T > delegate , List <GraphStage > subStages , List <Port > subStagePorts ) {
564
552
this .delegate = delegate ;
565
553
this .subStages = subStages ;
@@ -568,20 +556,24 @@ private SubStageInlet(StageInlet<T> delegate, List<GraphStage> subStages, List<P
568
556
569
557
void start () {
570
558
subStagePorts .forEach (Port ::verifyReady );
571
- started = true ;
572
- subStages .forEach (GraphStage ::postStart );
559
+ ports .addAll (subStagePorts );
560
+ for (GraphStage stage : subStages ) {
561
+ stages .add (stage );
562
+ stage .postStart ();
563
+ }
573
564
}
574
565
575
566
private void shutdown () {
576
- stages .removeAll (subStages );
577
- ports .removeAll (subStagePorts );
567
+ // Do it in a signal, this ensures that if shutdown happens while something is iterating through
568
+ // the ports, we don't get a concurrent modification exception.
569
+ enqueueSignal (() -> {
570
+ stages .removeAll (subStages );
571
+ ports .removeAll (subStagePorts );
572
+ });
578
573
}
579
574
580
575
@ Override
581
576
public void pull () {
582
- if (!started ) {
583
- throw new IllegalStateException ("Pull before the sub stream has been started." );
584
- }
585
577
delegate .pull ();
586
578
}
587
579
@@ -602,17 +594,11 @@ public boolean isClosed() {
602
594
603
595
@ Override
604
596
public void cancel () {
605
- if (!started ) {
606
- throw new IllegalStateException ("Cancel before the sub stream has been started." );
607
- }
608
597
delegate .cancel ();
609
598
}
610
599
611
600
@ Override
612
601
public T grab () {
613
- if (!started ) {
614
- throw new IllegalStateException ("Grab before the sub stream has been started." );
615
- }
616
602
return delegate .grab ();
617
603
}
618
604
0 commit comments