@@ -32,9 +32,40 @@ public async Task PostCompletionAction()
3232 pipeline . Complete ( ) ;
3333 await pipeline . Completion . WithTimeout ( 1000 ) ;
3434 Console . WriteLine ( String . Join ( ", " , timestamps ) ) ;
35+ Assert . IsTrue ( timestamps . All ( x => x > 0L ) ) ;
3536 Assert . IsTrue ( timestamps . Zip ( timestamps . Skip ( 1 ) ) . All ( e => e . First <= e . Second ) ) ;
3637 }
3738
39+ [ TestMethod ]
40+ public void FailedPostCompletionAction ( )
41+ {
42+ var block1 = new TransformBlock < int , int > ( x => x ,
43+ new ExecutionDataflowBlockOptions ( ) { BoundedCapacity = 1 } ) ;
44+ var block2 = new ActionBlock < int > ( x => Thread . Sleep ( 20 ) ) ;
45+
46+ var pipeline = PipelineBuilder
47+ . BeginWith ( block1 )
48+ . WithPostCompletionAction ( t => throw new ApplicationException ( t . Status . ToString ( ) ) )
49+ . LinkTo ( block2 )
50+ . WithPostCompletionAction ( t => throw new ApplicationException ( t . Status . ToString ( ) ) )
51+ . ToPipeline ( ) ;
52+
53+ pipeline . Post ( 13 ) ;
54+ pipeline . Post ( 42 ) ;
55+ pipeline . Complete ( ) ;
56+
57+ var aex = Assert . ThrowsException < AggregateException > (
58+ ( ) => pipeline . Completion . Wait ( 100 ) ) ;
59+ Console . WriteLine ( String . Join ( ", " , aex . InnerExceptions . Select ( ex => ex . Message ) ) ) ;
60+ Assert . IsTrue ( aex . InnerExceptions . Count == 2 ) ;
61+ Assert . IsTrue ( aex . InnerExceptions . All ( ex => ex is ApplicationException ) ) ;
62+ Assert . IsTrue ( aex . InnerExceptions . Select ( ex => ex . Message ) . SequenceEqual ( new [ ] { "RanToCompletion" , "Faulted" } ) ) ;
63+ Assert . IsTrue ( block1 . Completion . IsCompletedSuccessfully ( ) ) ;
64+ Assert . IsTrue ( block2 . Completion . IsFaulted ) ;
65+ Assert . IsTrue ( block2 . Completion . Exception . InnerExceptions . Count == 1 ) ;
66+ Assert . IsTrue ( block2 . Completion . Exception . InnerException is PipelineException ) ;
67+ }
68+
3869 [ TestMethod ]
3970 public async Task UnlinkedBlocks ( )
4071 {
@@ -56,5 +87,39 @@ public async Task UnlinkedBlocks()
5687 var list = await ( ( IReceivableSourceBlock < int > ) pipeline ) . ToListAsync ( new CancellationTokenSource ( 1000 ) . Token ) ;
5788 Assert . IsTrue ( list . SequenceEqual ( source . Append ( 100 ) . Append ( 200 ) ) ) ;
5889 }
90+
91+ [ TestMethod ]
92+ public async Task FailedUnlinkedBlock ( )
93+ {
94+ var block2 = new ActionBlock < int > ( async x => { await Task . Delay ( 20 ) ; throw new ApplicationException ( x . ToString ( ) ) ; } ,
95+ new ExecutionDataflowBlockOptions ( ) { MaxDegreeOfParallelism = 2 , BoundedCapacity = 2 } ) ;
96+ var block1 = new ActionBlock < int > ( async x => await block2 . SendAsync ( x ) ,
97+ new ExecutionDataflowBlockOptions ( ) { BoundedCapacity = 2 } ) ;
98+
99+ var pipeline = PipelineBuilder
100+ . BeginWith ( block1 )
101+ . AddUnlinked ( block2 )
102+ . ToPipeline ( ) ;
103+
104+ var source = Enumerable . Range ( 1 , 10 ) ;
105+ await Task . Run ( async ( ) =>
106+ {
107+ foreach ( var item in source ) await pipeline . SendAsync ( item ) ;
108+ } ) . WithTimeout ( 500 ) ;
109+
110+ pipeline . Complete ( ) ;
111+ var aex = Assert . ThrowsException < AggregateException > (
112+ ( ) => pipeline . Completion . Wait ( 100 ) ) ;
113+ Console . WriteLine ( String . Join ( ", " , aex . InnerExceptions . Select ( ex => ex . Message ) ) ) ;
114+ Assert . IsTrue ( aex . InnerExceptions . Count == 2 ) ;
115+ Assert . IsTrue ( aex . InnerExceptions . All ( ex => ex is ApplicationException ) ) ;
116+ Assert . IsTrue ( aex . InnerExceptions . Select ( ex => ex . Message ) . OrderBy ( x => x ) . SequenceEqual ( new [ ] { "1" , "2" } ) ) ;
117+ Assert . IsTrue ( block1 . Completion . IsFaulted ) ;
118+ Assert . IsTrue ( block1 . Completion . Exception . InnerExceptions . Count == 1 ) ;
119+ Assert . IsTrue ( block1 . Completion . Exception . InnerException is PipelineException ) ;
120+ Assert . IsTrue ( block2 . Completion . IsFaulted ) ;
121+ Assert . IsTrue ( block2 . Completion . Exception . InnerExceptions . Count == 2 ) ;
122+ Assert . IsTrue ( block2 . Completion . Exception . InnerExceptions . All ( ex => ex is ApplicationException ) ) ;
123+ }
59124 }
60125}
0 commit comments