@@ -450,6 +450,8 @@ public TestSetup apply(Long aLong) throws Throwable {
450
450
sub1 .request (1 );
451
451
sub2 .request (1 );
452
452
453
+ expectRequest ();
454
+
453
455
final T x = sendNextTFromUpstream ();
454
456
455
457
expectNextElement (sub1 , x );
@@ -459,9 +461,18 @@ public TestSetup apply(Long aLong) throws Throwable {
459
461
sub2 .request (1 );
460
462
} else {
461
463
sub1 .request (1 );
462
- expectRequest ();
464
+
465
+ expectRequest (env .defaultTimeoutMillis (),
466
+ "If the Processor coordinates requests/emissions when having multiple Subscribers"
467
+ + " at once, please override doesCoordinatedEmission() in this "
468
+ + "IdentityProcessorVerification to allow this test to pass." );
469
+
463
470
final T x = sendNextTFromUpstream ();
464
- expectNextElement (sub1 , x );
471
+ expectNextElement (sub1 , x ,
472
+ "If the Processor coordinates requests/emissions when having multiple Subscribers"
473
+ + " at once, please override doesCoordinatedEmission() in this "
474
+ + "IdentityProcessorVerification to allow this test to pass." );
475
+
465
476
sub1 .request (1 );
466
477
467
478
// sub1 has received one element, and has one demand pending
@@ -744,7 +755,10 @@ public TestSetup apply(Long subscribers) throws Throwable {
744
755
expectNextElement (sub2 , z );
745
756
} else {
746
757
final T z = sendNextTFromUpstream ();
747
- expectNextElement (sub1 , z );
758
+ expectNextElement (sub1 , z ,
759
+ "If the Processor coordinates requests/emissions when having multiple Subscribers"
760
+ + " at once, please override doesCoordinatedEmission() in this "
761
+ + "IdentityProcessorVerification to allow this test to pass." );
748
762
sub2 .expectNone (); // since sub2 hasn't requested anything yet
749
763
750
764
sub2 .request (1 );
@@ -818,6 +832,13 @@ public void expectNextElement(ManualSubscriber<T> sub, T expected) throws Interr
818
832
}
819
833
}
820
834
835
+ public void expectNextElement (ManualSubscriber <T > sub , T expected , String errorMessageAddendum ) throws InterruptedException {
836
+ final T elem = sub .nextElement (String .format ("timeout while awaiting %s. %s" , expected , errorMessageAddendum ));
837
+ if (!elem .equals (expected )) {
838
+ env .flop (String .format ("Received `onNext(%s)` on downstream but expected `onNext(%s)`" , elem , expected ));
839
+ }
840
+ }
841
+
821
842
public T sendNextTFromUpstream () throws InterruptedException {
822
843
final T x = nextT ();
823
844
sendNext (x );
0 commit comments