Goal: A Governor for 10,000 tasks
My goal was to put a little sanity around 10k tasks, each of which pushed load against an HTTP endpoint. Starting all of them at once isn’t realistic or kind or stable. It can cause port exhaustion on the OS, and other unpleasant side-effects.
A little sanity would look like a governor that allowed X number of these to run in parallel at a time.
That sounds like the System.Threading.Tasks.Parallel
class in the .NET framework!
Nope, Actually Wrong
One would think that a class in the Task
namespace plays well with Tasks.
Nope.
Actually, Parallel.For()
will betray you if you pass it a bunch of tasks to run. You would think it would start some subset of them based on the specified or calculated degree of parallelism supported by your machine. But instead, all the tasks are started at the same time, and the Parallel.For() doesn’t wait for them to finish before deciding that everything is done.
Here’s a little LinqPad script to demonstrate. Notice that:
- The synchronous code works.
- The Async lambda does not do the same thing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks | |
void Main() | |
{ | |
var options = new ParallelOptions() { | |
MaxDegreeOfParallelism = 2 | |
}; | |
Parallel.For(0, 10, options, i => ExecTaskSync(i)); | |
Parallel.For(0, 10, options, async i => await ExecTask(i)); | |
Console.WriteLine("ALL DONE."); | |
} | |
// Define other methods and classes here | |
async Task<int> ExecTask(int index) | |
{ | |
Console.WriteLine($"Task {index} starting."); | |
await Task.Delay(1000); | |
Console.WriteLine($"Task {index} done."); | |
return index; | |
} | |
int ExecTaskSync(int index) | |
{ | |
Console.WriteLine($"Sync Task {index} starting."); | |
Thread.Sleep(1000); | |
Console.WriteLine($"Sync Task {index} done."); | |
return index; | |
} | |
/********** | |
* OUTPUT * | |
*********** | |
Sync Task 0 starting. | |
Sync Task 5 starting. | |
Sync Task 0 done. | |
Sync Task 1 starting. | |
Sync Task 5 done. | |
Sync Task 6 starting. | |
Sync Task 6 done. | |
Sync Task 7 starting. | |
Sync Task 1 done. | |
Sync Task 2 starting. | |
Sync Task 2 done. | |
Sync Task 3 starting. | |
Sync Task 7 done. | |
Sync Task 8 starting. | |
Sync Task 3 done. | |
Sync Task 8 done. | |
Sync Task 9 starting. | |
Sync Task 4 starting. | |
Sync Task 4 done. | |
Sync Task 9 done. | |
Task 0 starting. | |
Task 5 starting. | |
Task 1 starting. | |
Task 6 starting. | |
Task 7 starting. | |
Task 8 starting. | |
Task 9 starting. | |
Task 3 starting. | |
Task 4 starting. | |
Task 2 starting. | |
ALL DONE. | |
Task 2 done. | |
Task 4 done. | |
Task 9 done. | |
Task 0 done. | |
Task 8 done. | |
Task 3 done. | |
Task 7 done. | |
Task 5 done. | |
Task 6 done. | |
Task 1 done. | |
*/ |
Well, You see Nathan…
Joe Dev: “Well, you see Nathan, you need to understand that asynchronous execution is not the same as multi-threading or parallel execution on multiple cores.”
Me: “Understood.”
Joe Dev: “Parallel.For()
is about multi-threading. Async doesn’t really even need multiple threads. It’s about interleaving bits of work.”
Me: “Mmm hmm.”
Joe Dev: “So don’t you see how silly your mistake was?”
Me: “How silly of me to pass a Task
to the Parallel
class in the System.Threading.Tasks
namespace. O_o”
Hindsight = Duh?
In retrospect, as with most hindsight… “Of course… that’s totally obvious!”
Except that it wasn’t obvious on the front side of this experience.
From the Microsoft docs page:
The Parallel class provides library-based data parallel replacements for common operations such as for loops, for each loops, and execution of a set of statements.
Notice that little gem of a word “data” in that description? Yeah, me neither. That’s the hint that the implementation is focused on CPU intensive work. The Parallel.For()
(and it’s sibling methods) are great for spreading work across your CPU cores. That’s wonderful for crunching numbers, processing images, etc.
Async IO is not the same sort of beast.
And Parallel
is not Async friendly.
What Works…
There are numerous options that work well. They vary in how the operate, and have some slight trade-offs.
Task.WhenAll() – Variant #1 – Divide and Conquer
This approach divides the work into N buckets, with one Task to govern (exec) each bucket of tasks.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// using System.Threading.Tasks | |
var maxIterations = 10000; | |
var maxDOP = 10; | |
// Divide into groups. | |
var parallelGroups = Enumerable.Range(0, maxIterations) | |
.GroupBy(r => (r % maxDOP)); | |
var parallelTasks = parallelGroups.Select(groups => | |
{ | |
return Task.Run(async () => | |
{ | |
foreach (var i in groups) | |
{ | |
// Do Async Stuff. (like IO) | |
// await as you please. | |
} | |
}); | |
}); | |
await Task.WhenAll(parallelTasks); |
Task.WhenAll() – Variant #2 – SemaphoreSlim
Greg Bair pointed out that the Semaphore
is built for almost exactly this purpose. Brilliant. And we now have an async friendly version via SemaphoreSlim.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// using System.Threading.Tasks | |
const int MAX_DEGREE_OF_PARALLELISM = 2; | |
static SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_DEGREE_OF_PARALLELISM); | |
async void Main() | |
{ | |
var tasks = Enumerable.Range(0, 10) | |
.Select(i => ExecTask(i)) | |
.ToArray(); | |
await Task.WhenAll(tasks); | |
Console.WriteLine("ALL DONE."); | |
} | |
async Task<int> ExecTask(int index) | |
{ | |
await _semaphore.WaitAsync(); | |
try | |
{ | |
Console.WriteLine($"Task {index} starting."); | |
await Task.Delay(1000); | |
Console.WriteLine($"Task {index} done."); | |
return index; | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
} | |
/********** | |
* Output * | |
*********** | |
Task 0 starting. | |
Task 1 starting. | |
Task 1 done. | |
Task 0 done. | |
Task 3 starting. | |
Task 2 starting. | |
Task 2 done. | |
Task 3 done. | |
Task 4 starting. | |
Task 5 starting. | |
Task 5 done. | |
Task 4 done. | |
Task 7 starting. | |
Task 6 starting. | |
Task 6 done. | |
Task 7 done. | |
Task 8 starting. | |
Task 9 starting. | |
Task 8 done. | |
Task 9 done. | |
ALL DONE. | |
*/ |
Cheers!
Leave a Reply