99import org .testng .annotations .BeforeClass ;
1010import org .testng .annotations .Test ;
1111
12- import java .util .concurrent .ExecutorService ;
13- import java .util .concurrent .Executors ;
12+ import java .util .concurrent .* ;
13+ import java .util .concurrent .atomic . AtomicLong ;
1414
1515/**
1616* Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors.
@@ -27,25 +27,36 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2727 @ Test
2828 public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored () throws Throwable {
2929 requireTestSkip (new ThrowingRunnable () {
30- @ Override public void run () throws Throwable {
31- new IdentityProcessorVerification <Integer >(newTestEnvironment (), DEFAULT_TIMEOUT_MILLIS ){
32- @ Override public Processor <Integer , Integer > createIdentityProcessor (int bufferSize ) {
30+ @ Override
31+ public void run () throws Throwable {
32+ new IdentityProcessorVerification <Integer >(newTestEnvironment (), DEFAULT_TIMEOUT_MILLIS ) {
33+ @ Override
34+ public Processor <Integer , Integer > createIdentityProcessor (int bufferSize ) {
3335 return new NoopProcessor ();
3436 }
3537
36- @ Override public ExecutorService publisherExecutorService () { return ex ; }
38+ @ Override
39+ public ExecutorService publisherExecutorService () {
40+ return ex ;
41+ }
3742
38- @ Override public Integer createElement (int element ) { return element ; }
43+ @ Override
44+ public Integer createElement (int element ) {
45+ return element ;
46+ }
3947
40- @ Override public Publisher <Integer > createHelperPublisher (long elements ) {
48+ @ Override
49+ public Publisher <Integer > createHelperPublisher (long elements ) {
4150 return SKIP ;
4251 }
4352
44- @ Override public Publisher <Integer > createFailedPublisher () {
53+ @ Override
54+ public Publisher <Integer > createFailedPublisher () {
4555 return SKIP ;
4656 }
4757
48- @ Override public long maxSupportedSubscribers () {
58+ @ Override
59+ public long maxSupportedSubscribers () {
4960 return 1 ; // can only support 1 subscribe => unable to run this test
5061 }
5162 }.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError ();
@@ -115,6 +126,155 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
115126 }, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS );
116127 }
117128
129+ @ Test
130+ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand () throws Throwable {
131+ final TestEnvironment env = newTestEnvironment ();
132+ new IdentityProcessorVerification <Integer >(env , DEFAULT_TIMEOUT_MILLIS ) {
133+ @ Override
134+ public Processor <Integer , Integer > createIdentityProcessor (int bufferSize ) { // knowingly ignoring buffer size, acting as-if 0
135+ return new Processor <Integer , Integer >() {
136+
137+ private volatile Subscription upstreamSubscription ;
138+
139+ private final CopyOnWriteArrayList <MySubscription > subs = new CopyOnWriteArrayList <MySubscription >();
140+ private final CopyOnWriteArrayList <Subscriber <? super Integer >> subscribers = new CopyOnWriteArrayList <Subscriber <? super Integer >>();
141+ private final AtomicLong demand1 = new AtomicLong ();
142+ private final AtomicLong demand2 = new AtomicLong ();
143+ private final CountDownLatch awaitLatch = new CountDownLatch (2 ); // to know when both subscribers have signalled demand
144+
145+ @ Override
146+ public void subscribe (final Subscriber <? super Integer > s ) {
147+ int subscriberCount = subs .size ();
148+ if (subscriberCount == 0 ) s .onSubscribe (createSubscription (awaitLatch , s , demand1 ));
149+ else if (subscriberCount == 1 ) s .onSubscribe (createSubscription (awaitLatch , s , demand2 ));
150+ else throw new RuntimeException (String .format ("This for-test-purposes-processor supports only 2 subscribers, yet got %s!" , subscriberCount ));
151+ }
152+
153+ @ Override
154+ public void onSubscribe (Subscription s ) {
155+ this .upstreamSubscription = s ;
156+ }
157+
158+ @ Override
159+ public void onNext (Integer elem ) {
160+ for (Subscriber <? super Integer > subscriber : subscribers ) {
161+ try {
162+ subscriber .onNext (elem );
163+ } catch (Exception ex ) {
164+ env .flop (ex , String .format ("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13" , subscriber ));
165+ }
166+ }
167+ }
168+
169+ @ Override
170+ public void onError (Throwable t ) {
171+ for (Subscriber <? super Integer > subscriber : subscribers ) {
172+ try {
173+ subscriber .onError (t );
174+ } catch (Exception ex ) {
175+ env .flop (ex , String .format ("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13" , subscriber ));
176+ }
177+ }
178+ }
179+
180+ @ Override
181+ public void onComplete () {
182+ for (Subscriber <? super Integer > subscriber : subscribers ) {
183+ try {
184+ subscriber .onComplete ();
185+ } catch (Exception ex ) {
186+ env .flop (ex , String .format ("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13" , subscriber ));
187+ }
188+ }
189+ }
190+
191+ private Subscription createSubscription (CountDownLatch awaitLatch , final Subscriber <? super Integer > s , final AtomicLong demand ) {
192+ final MySubscription sub = new MySubscription (awaitLatch , s , demand );
193+ subs .add (sub );
194+ subscribers .add (s );
195+ return sub ;
196+ }
197+
198+ final class MySubscription implements Subscription {
199+ private final CountDownLatch awaitLatch ;
200+ private final Subscriber <? super Integer > s ;
201+ private final AtomicLong demand ;
202+
203+ public MySubscription (CountDownLatch awaitTwoLatch , Subscriber <? super Integer > s , AtomicLong demand ) {
204+ this .awaitLatch = awaitTwoLatch ;
205+ this .s = s ;
206+ this .demand = demand ;
207+ }
208+
209+ @ Override
210+ public void request (final long n ) {
211+ ex .execute (new Runnable () {
212+ @ Override
213+ public void run () {
214+ demand .addAndGet (n ); // naive, but good enough for the test
215+ awaitLatch .countDown ();
216+ try {
217+ awaitLatch .await (env .defaultTimeoutMillis (), TimeUnit .MILLISECONDS );
218+ while (demand .getAndDecrement () > 0 ) {
219+ upstreamSubscription .request (1 );
220+ }
221+ } catch (InterruptedException e ) {
222+ env .flop (e , "Interrupted while awaiting for all downstreams to signal some demand." );
223+ }
224+
225+ }
226+ });
227+ }
228+
229+ @ Override
230+ public void cancel () {
231+ demand .set (Long .MIN_VALUE ); // naive but OK for this test
232+ }
233+
234+ @ Override
235+ public String toString () {
236+ return String .format ("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)" , s , demand );
237+ }
238+ }
239+ };
240+ }
241+
242+ @ Override
243+ public ExecutorService publisherExecutorService () {
244+ return ex ;
245+ }
246+
247+ @ Override
248+ public Integer createElement (int element ) {
249+ return element ;
250+ }
251+
252+ @ Override
253+ public Publisher <Integer > createHelperPublisher (long elements ) {
254+ return new Publisher <Integer >() {
255+ @ Override
256+ public void subscribe (final Subscriber <? super Integer > s ) {
257+ s .onSubscribe (new NoopSubscription () {
258+ private int depth = 0 ; // simple unbounded recursion between onNext and request avoidance
259+ @ Override public void request (long n ) {
260+ if (depth == 0 ) {
261+ depth += 1 ;
262+ s .onNext (0 );
263+ depth -= 1 ;
264+ }
265+ }
266+ });
267+ }
268+ };
269+ }
270+
271+ @ Override
272+ public Publisher <Integer > createFailedPublisher () {
273+ return SKIP ;
274+ }
275+ }.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError ();
276+ }
277+
118278 // FAILING IMPLEMENTATIONS //
119279
120280 final Publisher <Integer > SKIP = null ;
0 commit comments