TPL Dataflow is an in-process actor library on top of the Task Parallel Library enabling more robust concurrent programming. It was first introduced in the async-ctp (the preview for async/await) but was eventually released as a standalone nuget package. It abstracts away most of the hard work needed when building asynchronous and/or parallel processing code but I feel most people who might benefit from it aren’t aware of it.
A basic building block in the Dataflow library is the ActionBlock
. You simply create it, tell it what to do, start posting items into it and wait untill it’s done. It will buffer the items, start a task to process them sequentially (by default) and end the task when the buffer is empty. Here’s a simple example:
// Create a block with a synchronous action
var block = new ActionBlock<Hamster>(_ => _.Feed());
// Add items to the block and start processing
foreach (Hamster hamster in hamsters)
{
block.Post(hamster);
}
block.Complete(); // Tell the block to complete and stop accepting new items
await block.Completion; // Asynchronously wait until all items completed processing
However, production-ready code would need to support async/await, concurrency, cancellation and capping the internal buffer to avoid uncontrolled memory growth. ActionBlock
(and the rest of TPL Dataflow) support all that right out of the box with an options property-bag:
// Create a block with an asynchronous action
var block = new ActionBlock<Hamster>(
async _ => await _.FeedAsync(),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000, // Cap the item count
CancellationToken = cancellationToken, // Enable cancellation
MaxDegreeOfParallelism = Environment.ProcessorCount, // Parallelize on all cores
});
// Add items to the block and asynchronously wait if BoundedCapacity is reached
foreach (Hamster hamster in hamsters)
{
await block.SendAsync(hamster);
}
block.Complete();
await block.Completion;
Notice that we now use SendAsync
as it allows to asynchronously wait when there’s no more room in the block for new items as opposed to Post
which tries to add an item synchronously and returns false
if the block is full. This allows throttling producers when the consumers can’t handle the load. Implementing all these features yourself would take many lines of code, too much time and probably result in a few bugs.
Now, while ActionBlock
is the most useful block and alone cover the vast majority of use-cases, there are other blocks like BufferBlock
, TransformBlock
, BatchBlock
, etc. that can be joined together like LEGO to create processing pipelines (i.e. data flows):
// Create a block that receives IngredientBatches and generates Pizzas
var pizzaMakerBlock = new TransformBlock<IngredientBatch, Pizza>(
_ => Pizza.Make(_.Dough, _.Sauce, _.Toppings));
// Create a block to concurrently and asynchronously deliver the Pizzas
var deliveryBlock = new ActionBlock<Pizza>(
async _ => await _.DeliverAsync(),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20, // delivery guys
});
// Link the blocks together to build a pipeline
pizzaMakerBlock.LinkTo(
deliveryBlock,
new DataflowLinkOptions { PropagateCompletion = true });
// Add items to the first block and let them flow through the pipeline
foreach (IngredientBatch batch in ingredientBatches)
{
pizzaMakerBlock.Post(batch);
}
pizzaMakerBlock.Complete(); // Completion will propagate to the next block in the pipe
await deliveryBlock.Completion; // It's only necessary to wait for the last block
Dataflow’s simplicity and robustness however don’t seem to lead to popularity and usage. It only has 269 questions on Stack Overflow (>20% of them I answered) and it’s shadowed by Reactive Extensions (which can cover some of the same use-cases). My assumption is that being one of the first parts of .NET to be released as a separate nuget package limited the exposure it would have otherwise received (at least it led the way for present-day .NET Core where all libraries are separate nuget packages). But in my experience when people discover Dataflow and see how it solves their issues they are happy to embrace it.
So, this is me acting as a self-appointed TPL Dataflow evangelist. My entire product (Microsoft ATA) runs inside various ActionBlock
s’ delegates and I encourage you to consider how it fits your code and architecture as well.
If you already use TPL Dataflow I would be happy to hear about how and in what codebase in the comments.