14
14
import org .reactivestreams .Publisher ;
15
15
import org .reactivestreams .Subscriber ;
16
16
import org .reactivestreams .Subscription ;
17
- import org .reactivestreams .tck .flow .support .SubscriberBufferOverflowException ;
18
17
import org .reactivestreams .tck .flow .support .Optional ;
18
+ import org .reactivestreams .tck .flow .support .SubscriberBufferOverflowException ;
19
19
20
20
import java .util .Collections ;
21
21
import java .util .LinkedList ;
24
24
import java .util .concurrent .CopyOnWriteArrayList ;
25
25
import java .util .concurrent .CountDownLatch ;
26
26
import java .util .concurrent .TimeUnit ;
27
- import java .util .concurrent .atomic .AtomicBoolean ;
28
27
import java .util .concurrent .atomic .AtomicReference ;
29
28
30
29
import static org .testng .Assert .assertTrue ;
@@ -39,6 +38,7 @@ public class TestEnvironment {
39
38
private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS" ;
40
39
41
40
private final long defaultTimeoutMillis ;
41
+ private final long defaultPollTimeoutMillis ;
42
42
private final long defaultNoSignalsTimeoutMillis ;
43
43
private final boolean printlnDebug ;
44
44
@@ -50,15 +50,47 @@ public class TestEnvironment {
50
50
* the implementation, but can in some cases result in longer time to
51
51
* run the tests.
52
52
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
53
+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
54
+ * preempted by an asynchronous event.
53
55
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
54
56
* @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
55
57
*/
56
- public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , boolean printlnDebug ) {
58
+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , long defaultPollTimeoutMillis ,
59
+ boolean printlnDebug ) {
57
60
this .defaultTimeoutMillis = defaultTimeoutMillis ;
61
+ this .defaultPollTimeoutMillis = defaultPollTimeoutMillis ;
58
62
this .defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis ;
59
63
this .printlnDebug = printlnDebug ;
60
64
}
61
65
66
+ /**
67
+ * Tests must specify the timeout for expected outcome of asynchronous
68
+ * interactions. Longer timeout does not invalidate the correctness of
69
+ * the implementation, but can in some cases result in longer time to
70
+ * run the tests.
71
+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
72
+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
73
+ * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
74
+ */
75
+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , boolean printlnDebug ) {
76
+ this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , defaultTimeoutMillis , printlnDebug );
77
+ }
78
+
79
+ /**
80
+ * Tests must specify the timeout for expected outcome of asynchronous
81
+ * interactions. Longer timeout does not invalidate the correctness of
82
+ * the implementation, but can in some cases result in longer time to
83
+ * run the tests.
84
+ *
85
+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
86
+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
87
+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
88
+ * preempted by an asynchronous event.
89
+ */
90
+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , long defaultPollTimeoutMillis ) {
91
+ this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , defaultPollTimeoutMillis , false );
92
+ }
93
+
62
94
/**
63
95
* Tests must specify the timeout for expected outcome of asynchronous
64
96
* interactions. Longer timeout does not invalidate the correctness of
@@ -69,7 +101,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
69
101
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
70
102
*/
71
103
public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis ) {
72
- this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , false );
104
+ this (defaultTimeoutMillis , defaultTimeoutMillis , defaultNoSignalsTimeoutMillis );
73
105
}
74
106
75
107
/**
@@ -81,7 +113,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
81
113
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
82
114
*/
83
115
public TestEnvironment (long defaultTimeoutMillis ) {
84
- this (defaultTimeoutMillis , defaultTimeoutMillis , false );
116
+ this (defaultTimeoutMillis , defaultTimeoutMillis , defaultTimeoutMillis );
85
117
}
86
118
87
119
/**
@@ -519,42 +551,57 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
519
551
}
520
552
521
553
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , String requiredMessagePart ) throws Exception {
522
- expectErrorWithMessage (expected , requiredMessagePart , env .defaultTimeoutMillis ());
554
+ expectErrorWithMessage (expected , Collections . singletonList ( requiredMessagePart ) , env .defaultTimeoutMillis (), env . defaultPollTimeoutMillis );
523
555
}
524
556
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives ) throws Exception {
525
- expectErrorWithMessage (expected , requiredMessagePartAlternatives , env .defaultTimeoutMillis ());
557
+ expectErrorWithMessage (expected , requiredMessagePartAlternatives , env .defaultTimeoutMillis (), env . defaultPollTimeoutMillis );
526
558
}
527
559
528
560
@ SuppressWarnings ("ThrowableResultOfMethodCallIgnored" )
529
561
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , String requiredMessagePart , long timeoutMillis ) throws Exception {
530
562
expectErrorWithMessage (expected , Collections .singletonList (requiredMessagePart ), timeoutMillis );
531
563
}
564
+
532
565
public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives , long timeoutMillis ) throws Exception {
533
- final E err = expectError (expected , timeoutMillis );
566
+ expectErrorWithMessage (expected , requiredMessagePartAlternatives , timeoutMillis , timeoutMillis );
567
+ }
568
+
569
+ public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives ,
570
+ long totalTimeoutMillis , long pollTimeoutMillis ) throws Exception {
571
+ final E err = expectError (expected , totalTimeoutMillis , pollTimeoutMillis );
534
572
final String message = err .getMessage ();
535
-
573
+
536
574
boolean contains = false ;
537
- for (String requiredMessagePart : requiredMessagePartAlternatives )
575
+ for (String requiredMessagePart : requiredMessagePartAlternatives )
538
576
if (message .contains (requiredMessagePart )) contains = true ; // not short-circuting loop, it is expected to
539
577
assertTrue (contains ,
540
- String .format ("Got expected exception [%s] but missing message part [%s], was: %s" ,
541
- err .getClass (), "anyOf: " + requiredMessagePartAlternatives , err .getMessage ()));
578
+ String .format ("Got expected exception [%s] but missing message part [%s], was: %s" ,
579
+ err .getClass (), "anyOf: " + requiredMessagePartAlternatives , err .getMessage ()));
542
580
}
543
581
544
582
public <E extends Throwable > E expectError (Class <E > expected ) throws Exception {
545
583
return expectError (expected , env .defaultTimeoutMillis ());
546
584
}
547
585
548
586
public <E extends Throwable > E expectError (Class <E > expected , long timeoutMillis ) throws Exception {
549
- return expectError (expected , timeoutMillis , String . format ( "Expected onError(%s)" , expected . getName ()) );
587
+ return expectError (expected , timeoutMillis , env . defaultPollTimeoutMillis );
550
588
}
551
589
552
590
public <E extends Throwable > E expectError (Class <E > expected , String errorMsg ) throws Exception {
553
591
return expectError (expected , env .defaultTimeoutMillis (), errorMsg );
554
592
}
555
593
556
594
public <E extends Throwable > E expectError (Class <E > expected , long timeoutMillis , String errorMsg ) throws Exception {
557
- return received .expectError (expected , timeoutMillis , errorMsg );
595
+ return expectError (expected , timeoutMillis , env .defaultPollTimeoutMillis , errorMsg );
596
+ }
597
+
598
+ public <E extends Throwable > E expectError (Class <E > expected , long totalTimeoutMillis , long pollTimeoutMillis ) throws Exception {
599
+ return expectError (expected , totalTimeoutMillis , pollTimeoutMillis , String .format ("Expected onError(%s)" , expected .getName ()));
600
+ }
601
+
602
+ public <E extends Throwable > E expectError (Class <E > expected , long totalTimeoutMillis , long pollTimeoutMillis ,
603
+ String errorMsg ) throws Exception {
604
+ return received .expectError (expected , totalTimeoutMillis , pollTimeoutMillis , errorMsg );
558
605
}
559
606
560
607
public void expectNone () throws InterruptedException {
@@ -1025,22 +1072,44 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
1025
1072
} // else, ok
1026
1073
}
1027
1074
1028
- @ SuppressWarnings ("unchecked" )
1075
+ /**
1076
+ * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
1077
+ */
1078
+ @ Deprecated
1029
1079
public <E extends Throwable > E expectError (Class <E > clazz , long timeoutMillis , String errorMsg ) throws Exception {
1030
- Thread .sleep (timeoutMillis );
1031
-
1032
- if (env .asyncErrors .isEmpty ()) {
1033
- return env .flopAndFail (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
1034
- } else {
1035
- // ok, there was an expected error
1036
- Throwable thrown = env .asyncErrors .remove (0 );
1080
+ return expectError (clazz , timeoutMillis , timeoutMillis , errorMsg );
1081
+ }
1037
1082
1038
- if (clazz .isInstance (thrown )) {
1039
- return (E ) thrown ;
1083
+ @ SuppressWarnings ("unchecked" )
1084
+ final <E extends Throwable > E expectError (Class <E > clazz , final long totalTimeoutMillis ,
1085
+ long pollTimeoutMillis ,
1086
+ String errorMsg ) throws Exception {
1087
+ long totalTimeoutRemainingMillis = totalTimeoutMillis ;
1088
+ long timeStampA = System .nanoTime ();
1089
+ long timeStampB ;
1090
+
1091
+ for (;;) {
1092
+ Thread .sleep (Math .min (pollTimeoutMillis , totalTimeoutRemainingMillis ));
1093
+
1094
+ if (env .asyncErrors .isEmpty ()) {
1095
+ timeStampB = System .nanoTime ();
1096
+ totalTimeoutRemainingMillis =- timeStampB - timeStampA ;
1097
+ timeStampA = timeStampB ;
1098
+
1099
+ if (totalTimeoutRemainingMillis <= 0 ) {
1100
+ return env .flopAndFail (String .format ("%s within %d ms" , errorMsg , totalTimeoutMillis ));
1101
+ }
1040
1102
} else {
1103
+ // ok, there was an expected error
1104
+ Throwable thrown = env .asyncErrors .remove (0 );
1041
1105
1042
- return env .flopAndFail (String .format ("%s within %d ms; Got %s but expected %s" ,
1043
- errorMsg , timeoutMillis , thrown .getClass ().getCanonicalName (), clazz .getCanonicalName ()));
1106
+ if (clazz .isInstance (thrown )) {
1107
+ return (E ) thrown ;
1108
+ } else {
1109
+
1110
+ return env .flopAndFail (String .format ("%s within %d ms; Got %s but expected %s" ,
1111
+ errorMsg , totalTimeoutMillis , thrown .getClass ().getCanonicalName (), clazz .getCanonicalName ()));
1112
+ }
1044
1113
}
1045
1114
}
1046
1115
}
0 commit comments