The Multithreaded Task Executor
Posted by Davy Brion on 26th May 2008
Sometimes, you’ve got a bunch of actions that you need to execute in a loop. The problem is that those actions are all performed synchronously so this could take some time depending on the action. But, if the action itself is thread-safe, and the actions are not dependent on the results of previous actions, you might get much better performance if you spread that workload over a few different threads. Especially if you have multiple CPU cores.
Wouldn’t it be cool if you could do something like this:
MultiThreadedTaskExecutor taskExecutor = new MultiThreadedTaskExecutor(numberOfThreadsToUse);
foreach (Input input in inputs)
{
Input newVariable = input;
taskExecutor.QueueTask(() => ProcessInput(newVariable));
}
taskExecutor.RunTasksAndWait();
Note: the reason why you need to use a new variable inside the loop is to avoid that the ‘input’ variable (not the reference to the object, but the actual variable) is captured by the anonymous method. If you don’t use a new variable, each created anonymous method would refer to the ‘input’ loop variable, which by the time all the tasks are executed points to the last Input instance in the inputs collection. The result would be that each task is executed on the same input instance. This is a known issue with variable capturing and anonymous methods.
Anyways… the code above basically spreads the workload over the given number of threads.
The rough, not-quite-production-ready code of the MultiThreadedTaskExecutor class looks like this:
public class MultiThreadedTaskExecutor
{
private readonly List<Thread> threads = new List<Thread>();
private readonly List<EventWaitHandle> eventWaitHandles = new List<EventWaitHandle>();
private readonly List<Type> swallowedExceptionTypes;
private readonly Queue<Action> taskQueue;
private readonly object queueMonitor = new object();
public MultiThreadedTaskExecutor(int numberOfThreads)
{
swallowedExceptionTypes = new List<Type>();
taskQueue = new Queue<Action>();
CreateThreads(numberOfThreads);
}
public void QueueTask(Action task)
{
taskQueue.Enqueue(task);
}
public void AddExceptionTypeToSwallow(Type type)
{
swallowedExceptionTypes.Add(type);
}
public void RunTasksAndWait()
{
foreach (Thread thread in threads)
{
thread.Start();
}
WaitHandle.WaitAll(eventWaitHandles.ToArray());
}
private void CreateThreads(int number)
{
for (int i = 0; i < number; i++)
{
var eventWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
eventWaitHandles.Add(eventWaitHandle);
threads.Add(new Thread(() => ProcessTasks(eventWaitHandle)));
}
}
private void ProcessTasks(EventWaitHandle eventWaitHandle)
{
try
{
Action action;
while ((action = GetTask()) != null)
{
try
{
action();
}
catch (Exception e)
{
if (!swallowedExceptionTypes.Contains(e.GetType())) throw;
}
}
}
finally
{
eventWaitHandle.Set();
}
}
private Action GetTask()
{
lock (queueMonitor)
{
if (taskQueue.Count == 0) return null;
return taskQueue.Dequeue();
}
}
}
So how does it work? It uses a queue to hold each task that was added by the consumer of the class. It creates the given amount of threads and also creates an EventWaitHandle for each thread. Then when the user starts the execution with a call to RunTasksAndWait, each thread is started and then the call to RunTasksAndWait will wait until each thread is finished. In the meantime, each thread will get the next task off the queue and executes it. If an exception is thrown within the task, it is caught and is either swallowed or rethrown (the consumer can add exception types that can be swallowed). Each thread keeps doing this until it can’t get a new task off the queue. When that happens, the thread signals the EventWaitHandler and then it dies. When all threads are dead, RunTasksAndWait will stop blocking and control is returned to the caller. All of the tasks have been executed and the workload has been spread over the given amount of threads.
Note: due to the call to WaitHandle.WaitAll, this won’t work on STA threads because the implementation of WaitHandle.WaitAll simply throws a NotSupportedException on STA threads.
Keep in mind that this is a rough version of this code… it definitely needs a bit more polish (better exception handling for when the threads are interrupted and stuff like that mostly) but you get the idea
But if you know a better way to do this, or if you spot flaws in this implemenation, i’d love to hear about it
Posted in Multithreading, Patterns | 4 Comments »