Skip to content

Commit 5aad94a

Browse files
OK
1 parent 4e65bfc commit 5aad94a

File tree

2 files changed

+27
-20
lines changed

2 files changed

+27
-20
lines changed

README.md

+13-7
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
This library helps at building simple [TPL Dataflow](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) pipelines,
66
that enforce the following guarantees:
77

8-
1. In case any constituent block fails, all other blocks will complete as soon as possible.
8+
1. In case any constituent dataflow block fails, all the other blocks will complete
9+
as soon as possible.
910
2. When a pipeline as a whole completes either successfully or with an error, all of its
10-
constituent blocks will be also completed.
11+
constituent dataflow blocks will be also completed.
1112
3. The [`Completion`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.completion)
1213
of the pipeline propagates all errors that may have occurred in all blocks,
1314
accumulated inside a flat [`AggregateException`](https://docs.microsoft.com/en-us/dotnet/api/system.aggregateexception).
@@ -19,22 +20,25 @@ provides a deeper insight about why this library exists.
1920
The problem with building pipelines using the traditional [`LinkTo`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.linkto) method,
2021
configured with the [`PropagateCompletion`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowlinkoptions.propagatecompletion) option,
2122
is that it allows the possibility of deadlocks and leaked
22-
active fire-and-forget blocks:
23+
fire-and-forget dataflow blocks:
2324

2425
1. A deadlock can occur in case a producer is blocked, waiting
25-
for empty space in the input buffer of the first block of a bounded pipeline, and any other
26+
for empty space in the input buffer of the first dataflow block of a bounded pipeline, and any other
2627
block except from the first one fails. In this case the producer will never be unblocked,
2728
because the first block will postpone all incoming messages ad infinitum, never accepting
2829
or declining any offered message.
2930

3031
2. A leaked fire-and-forget block can occur under similar
31-
circumstances. When any but the first block fails, the error will be propagated
32+
circumstances. When any but the first dataflow block fails, the error will be propagated
3233
downstream but not upstream. So the pipeline will soon signal its completion, while
3334
some blocks near the top may still be in a running state. These blocks will be leaked as
3435
fire-and-forget blocks, consuming resources and potentialy modifying the state of the
3536
application in unpredictable ways. Or they can just get stuck and become the source of a
3637
deadlock, as described previously.
3738

39+
3. The standard approach for propagating errors, the `PropagateCompletion` option,
40+
results to deeply nested [`AggregateException`](https://docs.microsoft.com/en-us/dotnet/api/system.aggregateexception)s.
41+
3842
This library attempts to fix these problems.
3943

4044
## How to make a pipeline
@@ -80,7 +84,7 @@ Whether it can emit messages depends on the type of the last block added in the
8084

8185
When the pipeline is created, all the blocks are linked automatically with the built-in [`LinkTo`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.linkto) method,
8286
configured with the [`PropagateCompletion`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowlinkoptions.propagatecompletion) option set to `false`.
83-
Then a continuation is attached to the completion of each block, that takes an appropriate
87+
Then a continuation is attached to the `Completion` of each block, that takes an appropriate
8488
action depending on how the block was completed. If the block was completed successfully or
8589
it was canceled, the completion is propagated to the next block by invoking the next block's
8690
[`Complete`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.complete) method.
@@ -92,7 +96,9 @@ passing a special `PipelineException` as argument.
9296
Faulting the blocks is required in order to empty their input and output buffers,
9397
so that the pipeline can complete ASAP. This special exception is not propagated
9498
through the `Completion` of the generated pipeline, but it can be observed by querying
95-
the `Completion` property of the individual blocks.
99+
the `Completion` property of the individual blocks. Observing this exception just means that
100+
this block was not the first that failed. It is possible that the `PipelineException`
101+
may coexist with other exceptions in the same dataflow block.
96102

97103
## Discussion
98104

src/SimpleTplDataflowPipelines/PipelineBuilder.cs

+14-13
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ internal PipelineBuilder(ITargetBlock<TInput> target, IDataflowBlock lastBlock,
7171
/// <remarks>
7272
/// After calling this method, the blocks have been linked and are now owned by
7373
/// the pipeline for the rest of their existence.
74-
/// The pipeline represents the completion of all blocks, and propagates all of
75-
/// their errors. The pipeline completes when all the blocks have completed.
74+
/// The pipeline represents the completion of its constituent blocks, and
75+
/// propagates all of their errors. The pipeline completes when all the blocks
76+
/// have completed.
7677
/// If any block fails, the whole pipeline fails, and all non-completed blocks
7778
/// are forcefully completed and their output is discarded.
7879
/// </remarks>
@@ -143,8 +144,9 @@ public PipelineBuilder<TInput> LinkTo(ITargetBlock<TOutput> block)
143144
/// <remarks>
144145
/// After calling this method, the blocks have been linked and are now owned by
145146
/// the pipeline for the rest of their existence.
146-
/// The pipeline represents the completion of all blocks, and propagates all of
147-
/// their errors. The pipeline completes when all the blocks have completed.
147+
/// The pipeline represents the completion of its constituent blocks, and
148+
/// propagates all of their errors. The pipeline completes when all the blocks
149+
/// have completed.
148150
/// If any block fails, the whole pipeline fails, and all non-completed blocks
149151
/// are forcefully completed and their output is discarded.
150152
/// </remarks>
@@ -170,6 +172,7 @@ internal static Task CreatePipeline<TInput>(ITargetBlock<TInput> target,
170172
{
171173
Debug.Assert(target != null);
172174
Debug.Assert(linkDelegates != null);
175+
173176
var completions = new List<Task>();
174177
var failureActions = new List<Action>();
175178
Action onError = () =>
@@ -183,19 +186,18 @@ internal static Task CreatePipeline<TInput>(ITargetBlock<TInput> target,
183186
foreach (var failureAction in failureActionsLocal) failureAction();
184187
};
185188

186-
// Invoking the linkDelegates links all the blocks together, and populates the
187-
// completions and failureActions lists.
189+
// Invoking the linkDelegates populates the completions and failureActions lists.
188190
var finalActions = new List<Action>();
189191
foreach (var linkDelegate in linkDelegates)
190192
{
191-
var finalAction = linkDelegate(completions, failureActions, onError);
192-
finalActions.Add(finalAction);
193+
finalActions.Add(linkDelegate(completions, failureActions, onError));
193194
}
194195

195-
// Invoking the finalActions attaches a continuation to all blocks
196+
// Invoking the finalActions links all the blocks together, and attaches a
197+
// continuation to each block.
196198
foreach (var finalAction in finalActions) finalAction();
197199

198-
// Combine the completions of all blocks, excluding the sentinel exceptions
200+
// Combine the completions of all blocks, excluding the sentinel exceptions.
199201
return Task.WhenAll(completions).ContinueWith(t =>
200202
{
201203
if (!t.IsFaulted) return t;
@@ -227,7 +229,7 @@ internal static Action LinkTo<TOutput>(IDataflowBlock block,
227229

228230
completions.Add(block.Completion);
229231

230-
Action failureAction = () =>
232+
failureActions.Add(() =>
231233
{
232234
if (block.Completion.IsCompleted) return;
233235
block.Fault(new PipelineException());
@@ -236,8 +238,7 @@ internal static Action LinkTo<TOutput>(IDataflowBlock block,
236238
if (blockAsSource != null)
237239
_ = blockAsSource.LinkTo(
238240
DataflowBlock.NullTarget<TOutput>(), _nullTargetLinkOptions);
239-
};
240-
lock (failureActions) failureActions.Add(failureAction);
241+
});
241242

242243
// Propagating the completion of the blocks follows the same pattern implemented
243244
// internally by the TPL Dataflow library. The ContinueWith method is used for

0 commit comments

Comments
 (0)