How to work with TPL Dataflow Library in .Net

Take advantage of the TPL Dataflow Library to implement producer / consumer pattern in .Net

dataflow

The producer-consumer pattern is one of the most widely used patterns in parallel programming. It's typically used to isolate work that needs to be processed from the actual processing of the work. When using this pattern, you would typically have two distinct threads: one that produces and queues the tasks that are to be executed, and another that checks for new tasks that are to be executed and then processes those tasks if one is found.

There are various strategies that can be adopted to implement the producer-consumer pattern. You can leverage the .Net thread pool to queue the work items and then process them using another thread. To queue the work items, you can take advantage of the Task Parallel Library (also known as TPL). Incidentally, the TPL (Task Parallel Library) is one of the most interesting new features in the recent versions of .Net framework, having first been introduced in .Net framework 4.0. To work with TPL you would need to take advantage of the System.Threading.Tasks namespace.

How does the producer-consumer pattern work?

In the producer-consumer pattern, you have two distinct components that run on two different threads. These include, a producer component that produces some data that is pushed to the queue and a consumer that consumes the data that is stored in the queue. Note that the producer and the consumer both share the same buffer or memory block that holds the data.

There are a few considerations that you should ensure that your code that implements this pattern addresses. The producer should produce and store data in the queue as long as it is not full. On the contrary, the consumer should try to remove data from the queue as long as it is not empty. In other words, the producer should refrain from adding data or work items to a queue that is already full and the consumer should not try to read data or work items from the queue if the queue doesn't have any data.

The TPL Dataflow library

You can take advantage of the TPL Dataflow library to implement the producer-consumer pattern in .Net. The TPL Dataflow library is built upon the Task Parallel Library -- it's actually an abstraction on top of the Task Parallel Library and enables you to implement dataflow seamlessly. Note that the TPL Dataflow library is not available by default as part of .Net framework. You need to install it manually to be able to use it in your applications. This library is based on the Task Parallel Library and is available as part of the System.Threading.Tasks.Dataflow namespace. You can use the NuGet Package Manager and install the Microsoft.Tpl.Dataflow package to work with this library.

The following code snippet illustrates how you can create a BufferBlock that holds integers and then post data to it.

var buffer = new BufferBlock<Int32>();

for (int i = 0; i < 10; i++)

  {

     buffer.Post(i);

  }

To post data asynchronously, you can use the SendAsync method instead.

buffer.SendAsync<int>(i);

Let's now leverage a BufferBlock to implement a simple producer – consumer pattern. The BufferBlock would be used by the producer to store data. For the consumer, the same BufferBlock would act as the source block (from where data is to be read as long as it exists).

The following piece of code illustrates how you can create a BufferBlock, post data onto it and then use a consumer to consume data from the BufferBlock as long as data exists inside it.

var buffer = new BufferBlock<Int32>();

var consumer = ConsumeData(buffer);

for (int i = 0; i < 10; i++)

  {

     buffer.Post(i);

  }

 buffer.Complete();

 consumer.Wait();

Note that the ConsumeData method is used to consume data available inside the BufferBlock asynchronously. The following code snippet illustrates how you can consume data from the buffer asynchronously.

static async Task<long> Consumer(ISourceBlock<Int32> source)

        {

            var watch = System.Diagnostics.Stopwatch.StartNew();

            while (await source.OutputAvailableAsync())

            {

                Int32 data = source.Receive();

                Console.WriteLine(data);

            }

            watch.Stop();

            return watch.ElapsedMilliseconds;

        }

Here's the complete code listing that shows how this all works.

using System;

using System.Threading.Tasks;

using System.Threading.Tasks.Dataflow;

namespace ProducerConsumer

{

    class Program

    {

        static async Task<long> ConsumeData(ISourceBlock<Int32> source)

        {

            var watch = System.Diagnostics.Stopwatch.StartNew();

            while (await source.OutputAvailableAsync())

            {

                Int32 data = source.Receive();

                Console.WriteLine(data);

            }

            watch.Stop();

            return watch.ElapsedMilliseconds;

        }

        static void Main(string[] args)

        {

            var buffer = new BufferBlock<Int32>();

            var consumer = ConsumeData(buffer);

            for (int i = 0; i < 10; i++)

            {

                buffer.Post(i);

            }

            buffer.Complete();

            consumer.Wait();

            Console.WriteLine("Time taken: {0} ms.", consumer.Result);

            Console.Read();

        }

    }

}

When you execute the program, the data stored in the BufferBlock is displayed along with the time taken to execute the ConsumeData method. I would discuss more on the TPL Dataflow Library in future articles here.

This article is published as part of the IDG Contributor Network. Want to Join?

To comment on this article and other InfoWorld content, visit InfoWorld's LinkedIn page, Facebook page and Twitter stream.
From CIO: 8 Free Online Courses to Grow Your Tech Skills
Notice to our Readers
We're now using social media to take your comments and feedback. Learn more about this here.