Parallel Computing in .NET 4.0 – Task Parallel Library & PLINQ


.NET 4 introduces various parallel-programming primitives that abstract away some of the messy details that developers have to deal with when implementing parallel programs from scratch.

Introduction

.NET Framework 4.0 introduces parallel programming which helps us in writing parallel programs that target for multi core machines with ease.

Using various parallel programming primitives available in .NET Framework 4.0 code becomes more readable, better performing and less error prone.

We can use Tasks instead of Thread Pool Work items which provide options like make task wait, cancel tasks, schedule continuation tasks and exception handling.

1.       Parallel Task Library - System.Threading.Tasks
  • System.Threading.Tasks.Task –Asynchronous operations (Parallel Tasks)
  • System.Threading.Tasks.Parallel Class which includes Parallel form of For and ForEach loops
2.       PLINQ – Parallel implementation of LINQ to Objects.

 Parallel Tasks

Tasks represent small units of work items to be executed asynchronously. Parallel Tasks helps in performing expensive work items asynchronously and provides features like Wait on tasks, cancel task, schedule tasks to run in continuation with other task.

The simplest way to run a task parallel is to use Parallel.Invoke method and by passing a delegate to it. Action delegate for each item of work can either call a method inside or can have code inline

Ex: Parallel.Invoke(() => DoSomeWork(), () =>
DoSomeOtherWork());

Task is represented by a class System.Threading.Tasks.Task and the result of a parallel task is represented as class System.Threading.Tasks.Task which inherits Task class. This class has properties like Result which gives the return value and Status with which we can get the status of the Task whether it started, ran to completion or cancelled in the middle etc.

// Create a task and supply a user delegate by using a lambda expression.
var taskA = new Task(() => Console.WriteLine("Hello from taskA."));
// Start the task.
taskA.Start();

StartNew method creates and starts the task in one operation.

Example

Task a = Task.Factory.StartNew(() => { return Compute(0); }); 
Task b = Task.Factory.StartNew(() => { return Compute(1); }); 
Task c = Task.Factory.StartNew(() => { return Compute(2); }); 
int value = a.Result b.Result c.Result;

Continuation Task

task.ContinueWith( () => {
Console.WriteLine("Computation completed"); });

Task Wait example

Task[] tasks = new Task[3]
{
 Task.Factory.StartNew(() => MethodA()),
 Task.Factory.StartNew(() => MethodB()),
 Task.Factory.StartNew(() => MethodC())
};
//Block until all tasks complete.
Task.WaitAll(tasks);

Task Cancel example

To cancel task which has not started yet, StartNew method of Task Factory accepts Cancellation Token as an input parameter. Call Cancel method of CancelToken class.

var tokenSource = new CancellationTokenSource(); 
var token = tokenSource.Token; 
Task task1 = Task.Factory.StartNew(() => {... }, token);
tokenSource.Cancel();

To cancel task in the middle, Task itself can regularly poll by calling tokenSource.ThrowIfCancellationRequested () method which will cancel the task even if it is started before token cancel is called.

Creating Nested Tasks

Child tasks can be created internal to the delegate method of the parent task and can be attached to the parent task to make sure that parent task wait on all child tasks to be completed.

var parent = Task.Factory.StartNew(() => 
{
 Console.WriteLine("Parent task beginning.");
 var child = Task.Factory.StartNew(() =>
 {
 Thread.SpinWait(5000000);
 Console.WriteLine("Attached child completed.");
 }, TaskCreationOptions.AttachedToParent);
});

parent.Wait();
Console.WriteLine("Parent task completed.");


Important points

1.   Always create a local variable while accessing loop iteration variables from the task body as directly accessing it will more often gives undesired result

for (int i = 0; i < 5; i )
{
int iLocal = i;
Task.Factory.StartNew(()=> Console.WriteLine(iLocal));
}

2.       AVOID waiting on tasks while holding a lock. Waiting on a task while holding a lock can lead to a deadlock if the task itself attempts to take the same lock.
Never use like below

Lock(object)
{
Task task = Task.Factory.StartNew(() =>{ return Compute(1);});
Task.Wiat();
}

3.   CONSIDER wrapping asynchronous method calls with tasks by using FromAsync method of Task Factory class. FromAsync method which integrates this new Task Parallel Library API with all asynchronous programming modal that’s based on IAssyncResults.

Parallel.For and Parallel.ForEach

System.Threading.Tasks.Parallel class provides several method overloads for For and ForEach methods. In these operations collections are partitioned to take advantage of multi core processer where different local cores execute different segments concurrently and hence improving the performance to the greater extent.

Parallel For - it internally it partitions dynamically the data that is given and uses tasks under the cover.

Implemention looks similar to the sequential loop

//Sequential for loop 
For(int i=0,iProcessItem(i));

//Sequential foreach
foreach (var item in sourceCollection)
{
 Process(item);
}

Parallel.ForEach(sourceCollection,item => Process(item));

We cannot use Break or Exit statements in Parallel For or Foreach unlike sequential loop because those language constructs are valid for loops, and a parallel "loop" is actually a method, not a loop. Instead, you use either the Stop or Break methods.

Some of the overloads for Parallel.For accepts a Action delegate as input parameter. Here int represents the iteration and ParallelLoopState object which compiler creates has methods like Break and Stop methods which allows us to stop a loop or break based on the conditions.

Example

private static void StopLoop()
{
 Console.WriteLine("Stoploop...");
 double[] source = MakeDemoSource(1000, 1);
 ConcurrentStack results = new ConcurrentStack();

 Parallel.For(0, source.Length, (i,loopState) => { 
 //Take the first 100 values that are retrieved
 // from anywhere in the source.
 if (i < 100)
 {
 // Accessing shared object on each iteration is not efficient. See remarks.
 double d = Compute(source[i]);
 results.Push(d);
 }
 else
 {
 loopState.Stop();
 return;
 }
}); // Close Parallel.For
Console.WriteLine("Results contains {0} elements", results.Count());
} 

static void BreakAtThreshold()
{
 double[] source = MakeDemoSource(10000, 1.0002); 
 ConcurrentStack results = new ConcurrentStack();

 Parallel.For(0, source.Length, (i,loopState) => 
 {
    double d = Compute(source[i]);
    results.Push(d);
    if (d > .2)
    {
     // Might be called more than once! 
     loopState.Break();
     Console.WriteLine("Break called at iteration {0}. d = {1} ", i, d);
     Thread.Sleep(1000);
    }
 });
 Console.WriteLine("Resultscontains {0} elements", results.Count()); 
} 

Important: To Store and Retrieve Thread local data

Consider a scenario where you are preparing collection of your custom type by looping through some list.

Simple example

// delimited string containg emp details 
String extractedstring = "ID\tName\nID\tName";
string[] emps = extractedstring.Split('\n');
List objList = new List();
Parallel.ForEach(emps, () => new Employee(),(emp, loop, obj) =>
{
 string[] details = emp.Split('\t');
 obj.ID = details[0];
 obj.Name = details[1];
 return obj;
}, (result) => { lock (objList) { objList.Add(result); } });

To implement the above example using parallel Foreach we will consider a different overloaded method of foreach.

Parallel.ForEach(sourcecollection,local initialization, Func, Action);

// delimited string containg emp details 
String extractedstring = "ID\tName\nID\tName";
string[] emps = extractedstring.Split('\n');
List objList = new List();
Parallel.ForEach(emps, () => new Employee(),(emp, loop, obj) =>
{
 string[] details = emp.Split('\t');
 obj.ID = details[0];
 obj.Name = details[1];
 return obj;
}, (result) => { lock (objList) { objList.Add(result); } });

Important points

1.       DO use parallel loops Parallel.For and Parallel.ForEach to speed up operations where an expensive, independent operation needs to be performed for each input in a sequence.
2.       DO make sure that the loop body delegate is thread-safe, since it will be called from multiple threads concurrently.
3.       DO verify that the loop body delegate does not make assumptions about the order in which loop iterations will execute. For example, there is no guarantee that a thread will process its partition of input elements in the order in which they appear in the input, even though in the current version it will.
4.       CONSIDER increasing the work done by each iteration in a parallel loop if it is very low. The body of a parallel loop is a delegate, and invoking it incurs some overhead. If the work done by the loop body is very small, the delegate invocation overhead may dominate the running time.

Also Read

Parallel Computing in .NET 4.0 - Parallel LINQ
Parallel Computing in .NET 4.0 - Concurrent Collections

Related tags

Parallel Programming, .NET Framework 4.0, Parallel Computing