Parallel.For() is not Async Friendly

Goal: A Governor for 10,000 tasks

My goal was to put a little sanity around 10k tasks, each of which pushed load against an HTTP endpoint. Starting all of them at once isn’t realistic or kind or stable. It can cause port exhaustion on the OS, and other unpleasant side-effects.

A little sanity would look like a governor that allowed X number of these to run in parallel at a time.

That sounds like the System.Threading.Tasks.Parallel class in the .NET framework!

Nope, Actually Wrong

One would think that a class in the Task namespace plays well with Tasks.

Nope.

Actually, Parallel.For() will betray you if you pass it a bunch of tasks to run. You would think it would start some subset of them based on the specified or calculated degree of parallelism supported by your machine. But instead, all the tasks are started at the same time, and the Parallel.For() doesn’t wait for them to finish before deciding that everything is done.

Here’s a little LinqPad script to demonstrate. Notice that:

  1. The synchronous code works.
  2. The Async lambda does not do the same thing.


using System.Threading.Tasks
void Main()
{
var options = new ParallelOptions() {
MaxDegreeOfParallelism = 2
};
Parallel.For(0, 10, options, i => ExecTaskSync(i));
Parallel.For(0, 10, options, async i => await ExecTask(i));
Console.WriteLine("ALL DONE.");
}
// Define other methods and classes here
async Task<int> ExecTask(int index)
{
Console.WriteLine($"Task {index} starting.");
await Task.Delay(1000);
Console.WriteLine($"Task {index} done.");
return index;
}
int ExecTaskSync(int index)
{
Console.WriteLine($"Sync Task {index} starting.");
Thread.Sleep(1000);
Console.WriteLine($"Sync Task {index} done.");
return index;
}
/**********
* OUTPUT *
***********
Sync Task 0 starting.
Sync Task 5 starting.
Sync Task 0 done.
Sync Task 1 starting.
Sync Task 5 done.
Sync Task 6 starting.
Sync Task 6 done.
Sync Task 7 starting.
Sync Task 1 done.
Sync Task 2 starting.
Sync Task 2 done.
Sync Task 3 starting.
Sync Task 7 done.
Sync Task 8 starting.
Sync Task 3 done.
Sync Task 8 done.
Sync Task 9 starting.
Sync Task 4 starting.
Sync Task 4 done.
Sync Task 9 done.
Task 0 starting.
Task 5 starting.
Task 1 starting.
Task 6 starting.
Task 7 starting.
Task 8 starting.
Task 9 starting.
Task 3 starting.
Task 4 starting.
Task 2 starting.
ALL DONE.
Task 2 done.
Task 4 done.
Task 9 done.
Task 0 done.
Task 8 done.
Task 3 done.
Task 7 done.
Task 5 done.
Task 6 done.
Task 1 done.
*/

Well, You see Nathan…

Joe Dev: “Well, you see Nathan, you need to understand that asynchronous execution is not the same as multi-threading or parallel execution on multiple cores.”

Me: “Understood.”

Joe Dev: “Parallel.For() is about multi-threading. Async doesn’t really even need multiple threads. It’s about interleaving bits of work.”

Me: “Mmm hmm.”

Joe Dev: “So don’t you see how silly your mistake was?”

Me:  “How silly of me to pass a Task to the Parallel class in the System.Threading.Tasks namespace.  O_o”

 

Hindsight = Duh?

In retrospect, as with most hindsight… “Of course… that’s totally obvious!
Except that it wasn’t obvious on the front side of this experience.

From the Microsoft docs page:

The Parallel class provides library-based data parallel replacements for common operations such as for loops, for each loops, and execution of a set of statements.

Notice that little gem of a word “data” in that description? Yeah, me neither. That’s the hint that the implementation is focused on CPU intensive work. The Parallel.For() (and it’s sibling methods) are great for spreading work across your CPU cores. That’s wonderful for crunching numbers, processing images, etc.

Async IO is not the same sort of beast.

And Parallel is not Async friendly.

 

What Works…

There are numerous options that work well. They vary in how the operate, and have some slight trade-offs.

Task.WhenAll() – Variant #1 – Divide and Conquer

This approach divides the work into N buckets, with one Task to govern (exec) each bucket of tasks.


// using System.Threading.Tasks
var maxIterations = 10000;
var maxDOP = 10;
// Divide into groups.
var parallelGroups = Enumerable.Range(0, maxIterations)
.GroupBy(r => (r % maxDOP));
var parallelTasks = parallelGroups.Select(groups =>
{
return Task.Run(async () =>
{
foreach (var i in groups)
{
// Do Async Stuff. (like IO)
// await as you please.
}
});
});
await Task.WhenAll(parallelTasks);

 

Task.WhenAll() – Variant #2 – SemaphoreSlim

Greg Bair pointed out that the Semaphore is built for almost exactly this purpose. Brilliant. And we now have an async friendly version via SemaphoreSlim.


// using System.Threading.Tasks
const int MAX_DEGREE_OF_PARALLELISM = 2;
static SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_DEGREE_OF_PARALLELISM);
async void Main()
{
var tasks = Enumerable.Range(0, 10)
.Select(i => ExecTask(i))
.ToArray();
await Task.WhenAll(tasks);
Console.WriteLine("ALL DONE.");
}
async Task<int> ExecTask(int index)
{
await _semaphore.WaitAsync();
try
{
Console.WriteLine($"Task {index} starting.");
await Task.Delay(1000);
Console.WriteLine($"Task {index} done.");
return index;
}
finally
{
_semaphore.Release();
}
}
/**********
* Output *
***********
Task 0 starting.
Task 1 starting.
Task 1 done.
Task 0 done.
Task 3 starting.
Task 2 starting.
Task 2 done.
Task 3 done.
Task 4 starting.
Task 5 starting.
Task 5 done.
Task 4 done.
Task 7 starting.
Task 6 starting.
Task 6 done.
Task 7 done.
Task 8 starting.
Task 9 starting.
Task 8 done.
Task 9 done.
ALL DONE.
*/

Cheers!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Create a website or blog at WordPress.com

Up ↑

%d bloggers like this: