@@ -189,6 +189,16 @@ public long maxSupportedSubscribers() {
189
189
return Long .MAX_VALUE ;
190
190
}
191
191
192
+ /**
193
+ * Override this method and return {@code true} if the {@link Processor} returned by the
194
+ * {@link #createIdentityProcessor(int)} coordinates its {@link Subscriber}s
195
+ * request amounts and only delivers onNext signals if all Subscribers have
196
+ * indicated (via their Subscription#request(long)) they are ready to receive elements.
197
+ */
198
+ public boolean doesCoordinatedEmission () {
199
+ return false ;
200
+ }
201
+
192
202
////////////////////// TEST ENV CLEANUP /////////////////////////////////////
193
203
194
204
@ BeforeMethod
@@ -415,17 +425,33 @@ public TestSetup apply(Long aLong) throws Throwable {
415
425
final ManualSubscriberWithErrorCollection <T > sub2 = new ManualSubscriberWithErrorCollection <T >(env );
416
426
env .subscribe (processor , sub2 );
417
427
418
- sub1 .request (1 );
419
- expectRequest ();
420
- final T x = sendNextTFromUpstream ();
421
- expectNextElement (sub1 , x );
422
- sub1 .request (1 );
428
+ final Exception ex = new RuntimeException ("Test exception" );
423
429
424
- // sub1 has received one element, and has one demand pending
425
- // sub2 has not yet requested anything
430
+ if (doesCoordinatedEmission ()) {
431
+ sub1 .request (1 );
432
+ sub2 .request (1 );
426
433
427
- final Exception ex = new RuntimeException ("Test exception" );
428
- sendError (ex );
434
+ final T x = sendNextTFromUpstream ();
435
+
436
+ expectNextElement (sub1 , x );
437
+ expectNextElement (sub2 , x );
438
+
439
+ sub1 .request (1 );
440
+ sub2 .request (1 );
441
+
442
+ sendError (ex );
443
+ } else {
444
+ sub1 .request (1 );
445
+ expectRequest ();
446
+ final T x = sendNextTFromUpstream ();
447
+ expectNextElement (sub1 , x );
448
+ sub1 .request (1 );
449
+
450
+ // sub1 has received one element, and has one demand pending
451
+ // sub2 has not yet requested anything
452
+
453
+ sendError (ex );
454
+ }
429
455
sub1 .expectError (ex );
430
456
sub2 .expectError (ex );
431
457
@@ -673,15 +699,29 @@ public TestSetup apply(Long subscribers) throws Throwable {
673
699
// sub1 now has 18 pending
674
700
// sub2 has 0 pending
675
701
676
- final T z = sendNextTFromUpstream ();
677
- expectNextElement (sub1 , z );
678
- sub2 .expectNone (); // since sub2 hasn't requested anything yet
702
+ if (doesCoordinatedEmission ()) {
703
+ sub2 .expectNone (); // since sub2 hasn't requested anything yet
679
704
680
- sub2 .request (1 );
681
- expectNextElement (sub2 , z );
705
+ sub2 .request (1 );
682
706
683
- if (totalRequests == 3 ) {
684
- expectRequest ();
707
+ final T z = sendNextTFromUpstream ();
708
+ expectNextElement (sub1 , z );
709
+ expectNextElement (sub2 , z );
710
+
711
+ if (totalRequests == 3 ) {
712
+ expectRequest ();
713
+ }
714
+ } else {
715
+ final T z = sendNextTFromUpstream ();
716
+ expectNextElement (sub1 , z );
717
+ sub2 .expectNone (); // since sub2 hasn't requested anything yet
718
+
719
+ sub2 .request (1 );
720
+ expectNextElement (sub2 , z );
721
+
722
+ if (totalRequests == 3 ) {
723
+ expectRequest ();
724
+ }
685
725
}
686
726
687
727
// to avoid error messages during test harness shutdown
0 commit comments