.NET 4 introduces various parallel-programming primitives that abstract away some of the messy details that developers have to deal with when implementing parallel programs from scratch.
Introduction
.NET Framework 4.0 introduces parallel programming which
helps us in writing parallel programs that target for multi core machines with
ease.
Using various parallel programming primitives available in
.NET Framework 4.0 code becomes more readable, better performing and less error
prone.
We can use Tasks instead of Thread Pool Work items which
provide options like make task wait, cancel tasks, schedule continuation tasks
and exception handling.
1.
Parallel Task Library - System.Threading.Tasks
- System.Threading.Tasks.Task –Asynchronous operations
(Parallel Tasks)
- System.Threading.Tasks.Parallel Class which
includes Parallel form of For and ForEach loops
2.
PLINQ – Parallel implementation of LINQ to
Objects.
Parallel Tasks
Tasks represent small units of work items to be executed
asynchronously. Parallel Tasks helps in performing expensive work items
asynchronously and provides features like Wait on tasks, cancel task, schedule
tasks to run in continuation with other task.
The simplest way to run a task parallel is to use
Parallel.Invoke method and by passing a delegate to it. Action delegate for
each item of work can either call a method inside or can have code inline
Ex: Parallel.Invoke(() => DoSomeWork(), () =>
DoSomeOtherWork());
Task is represented by a class System.Threading.Tasks.Task and the result of a parallel task is represented as class System.Threading.Tasks.Task
which inherits Task class. This class has properties like Result which gives
the return value and Status with which we can get the status of the Task
whether it started, ran to completion or cancelled in the middle etc.
// Create a task and supply a user delegate by using a lambda expression.
var taskA = new Task(() => Console.WriteLine("Hello from taskA."));
// Start the task.
taskA.Start();StartNew method creates and starts the task in one
operation.
Example
Task a = Task.Factory.StartNew(() => { return Compute(0); });
Task b = Task.Factory.StartNew(() => { return Compute(1); });
Task c = Task.Factory.StartNew(() => { return Compute(2); });
int value = a.Result b.Result c.Result;
Continuation
Task
task.ContinueWith( () => {
Console.WriteLine("Computation completed"); });
Task Wait
example
Task[] tasks = new Task[3]
{
Task.Factory.StartNew(() => MethodA()),
Task.Factory.StartNew(() => MethodB()),
Task.Factory.StartNew(() => MethodC())
};
//Block until all tasks complete.
Task.WaitAll(tasks);
Task Cancel
example
To cancel task which has not started yet, StartNew method of
Task Factory accepts Cancellation Token as an input parameter. Call Cancel
method of CancelToken class.
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
Task task1 = Task.Factory.StartNew(() => {... }, token);
tokenSource.Cancel();
To cancel task in the middle, Task itself can regularly poll
by calling tokenSource.ThrowIfCancellationRequested () method which will cancel
the task even if it is started before token cancel is called.
Creating Nested
Tasks
Child tasks can be created internal to the delegate method
of the parent task and can be attached to the parent task to make sure that
parent task wait on all child tasks to be completed.
var parent = Task.Factory.StartNew(() =>
{
Console.WriteLine("Parent task beginning.");
var child = Task.Factory.StartNew(() =>
{
Thread.SpinWait(5000000);
Console.WriteLine("Attached child completed.");
}, TaskCreationOptions.AttachedToParent);
});
parent.Wait();
Console.WriteLine("Parent task completed.");
Important
points
1. Always create a local variable while accessing
loop iteration variables from the task body as directly accessing it will more
often gives undesired result
for (int i = 0; i < 5; i )
{
int iLocal = i;
Task.Factory.StartNew(()=> Console.WriteLine(iLocal));
}
2. AVOID
waiting on
tasks while holding a lock. Waiting on a task while holding a lock can lead to
a deadlock if the task itself attempts to take the same lock.
Never
use like below
Lock(object)
{
Task task = Task.Factory.StartNew(() =>{ return Compute(1);});
Task.Wiat();
}
3. CONSIDER wrapping asynchronous
method calls with tasks by using FromAsync method of Task Factory class.
FromAsync method which integrates this new Task Parallel Library API with all asynchronous
programming modal that’s based on IAssyncResults.
Parallel.For
and Parallel.ForEach
System.Threading.Tasks.Parallel
class provides several method overloads for For and ForEach methods. In these
operations collections are partitioned to take advantage of multi core
processer where different local cores execute different segments concurrently
and hence improving the performance to the greater extent.
Parallel For - it internally
it partitions dynamically the data that is given and uses tasks under the
cover.
Implemention looks
similar to the sequential loop
//Sequential for loop
For(int i=0,iProcessItem(i));
//Sequential foreach
foreach (var item in sourceCollection)
{
Process(item);
}
Parallel.ForEach(sourceCollection,item => Process(item));
We cannot use Break
or Exit statements in Parallel For or Foreach unlike sequential loop because those
language constructs are valid for loops, and a parallel "loop" is
actually a method, not a loop. Instead, you use either the Stop or Break
methods.
Some of the
overloads for Parallel.For accepts a Action
delegate as input parameter. Here int represents the iteration and ParallelLoopState
object which compiler creates has methods like Break and Stop methods which
allows us to stop a loop or break based on the conditions.
Example
private static void StopLoop()
{
Console.WriteLine("Stoploop...");
double[] source = MakeDemoSource(1000, 1);
ConcurrentStack results = new ConcurrentStack();
Parallel.For(0, source.Length, (i,loopState) => {
//Take the first 100 values that are retrieved
// from anywhere in the source.
if (i < 100)
{
// Accessing shared object on each iteration is not efficient. See remarks.
double d = Compute(source[i]);
results.Push(d);
}
else
{
loopState.Stop();
return;
}
}); // Close Parallel.For
Console.WriteLine("Results contains {0} elements", results.Count());
}
static void BreakAtThreshold()
{
double[] source = MakeDemoSource(10000, 1.0002);
ConcurrentStack results = new ConcurrentStack();
Parallel.For(0, source.Length, (i,loopState) =>
{
double d = Compute(source[i]);
results.Push(d);
if (d > .2)
{
// Might be called more than once!
loopState.Break();
Console.WriteLine("Break called at iteration {0}. d = {1} ", i, d);
Thread.Sleep(1000);
}
});
Console.WriteLine("Resultscontains {0} elements", results.Count());
}
Important: To Store and Retrieve Thread local data
Consider a scenario
where you are preparing collection of your custom type by looping through some
list.
Simple example
// delimited string containg emp details
String extractedstring = "ID\tName\nID\tName";
string[] emps = extractedstring.Split('\n');
List objList = new List();
Parallel.ForEach(emps, () => new Employee(),(emp, loop, obj) =>
{
string[] details = emp.Split('\t');
obj.ID = details[0];
obj.Name = details[1];
return obj;
}, (result) => { lock (objList) { objList.Add(result); } });
To implement the
above example using parallel Foreach we will consider a different overloaded
method of foreach.
Parallel.ForEach(sourcecollection,local
initialization, Func, Action);
// delimited string containg emp details
String extractedstring = "ID\tName\nID\tName";
string[] emps = extractedstring.Split('\n');
List objList = new List();
Parallel.ForEach(emps, () => new Employee(),(emp, loop, obj) =>
{
string[] details = emp.Split('\t');
obj.ID = details[0];
obj.Name = details[1];
return obj;
}, (result) => { lock (objList) { objList.Add(result); } });
Important
points
1.
DO use
parallel loops Parallel.For and Parallel.ForEach to speed up operations where
an expensive, independent operation needs to be performed for each input in a
sequence.
2.
DO make
sure that the loop body delegate is thread-safe, since it will be called from
multiple threads concurrently.
3.
DO verify
that the loop body delegate does not make assumptions about the order in which
loop iterations will execute. For example, there is no guarantee that a thread
will process its partition of input elements in the order in which they appear
in the input, even though in the current version it will.
4.
CONSIDER
increasing the work done by each iteration in a parallel loop if it
is very low. The body of a parallel loop is a delegate, and invoking it incurs
some overhead. If the work done by the loop body is very small, the delegate
invocation overhead may dominate the running time.
Also Read
Parallel Computing in .NET 4.0 - Parallel LINQ Parallel Computing in .NET 4.0 - Concurrent Collections