The Inquisitive Coder - Davy Brion’s Blog

Thinking outside of the typical .NET box

The Multithreaded Task Executor

Posted by Davy Brion on May 26th, 2008

Sometimes, you’ve got a bunch of actions that you need to execute in a loop. The problem is that those actions are all performed synchronously so this could take some time depending on the action. But, if the action itself is thread-safe, and the actions are not dependent on the results of previous actions, you might get much better performance if you spread that workload over a few different threads. Especially if you have multiple CPU cores.

Wouldn’t it be cool if you could do something like this:

            MultiThreadedTaskExecutor taskExecutor = new MultiThreadedTaskExecutor(numberOfThreadsToUse);

 

            foreach (Input input in inputs)

            {

                Input newVariable = input;

                taskExecutor.QueueTask(() => ProcessInput(newVariable));

            }

 

            taskExecutor.RunTasksAndWait();

Note: the reason why you need to use a new variable inside the loop is to avoid that the ‘input’ variable (not the reference to the object, but the actual variable) is captured by the anonymous method. If you don’t use a new variable, each created anonymous method would refer to the ‘input’ loop variable, which by the time all the tasks are executed points to the last Input instance in the inputs collection. The result would be that each task is executed on the same input instance. This is a known issue with variable capturing and anonymous methods.

Anyways… the code above basically spreads the workload over the given number of threads.

The rough, not-quite-production-ready code of the MultiThreadedTaskExecutor class looks like this:

    public class MultiThreadedTaskExecutor

    {

        private readonly List<Thread> threads = new List<Thread>();

        private readonly List<EventWaitHandle> eventWaitHandles = new List<EventWaitHandle>();

        private readonly List<Type> swallowedExceptionTypes;

 

        private readonly Queue<Action> taskQueue;

        private readonly object queueMonitor = new object();

 

        public MultiThreadedTaskExecutor(int numberOfThreads)

        {

            swallowedExceptionTypes = new List<Type>();

            taskQueue = new Queue<Action>();

            CreateThreads(numberOfThreads);

        }

 

        public void QueueTask(Action task)

        {

            taskQueue.Enqueue(task);

        }

 

        public void AddExceptionTypeToSwallow(Type type)

        {

            swallowedExceptionTypes.Add(type);

        }

 

        public void RunTasksAndWait()

        {

            foreach (Thread thread in threads)

            {

                thread.Start();

            }

 

            WaitHandle.WaitAll(eventWaitHandles.ToArray());

        }

 

        private void CreateThreads(int number)

        {

            for (int i = 0; i < number; i++)

            {

                var eventWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);

                eventWaitHandles.Add(eventWaitHandle);

                threads.Add(new Thread(() => ProcessTasks(eventWaitHandle)));

            }

        }

 

        private void ProcessTasks(EventWaitHandle eventWaitHandle)

        {

            try

            {

                Action action;

 

                while ((action = GetTask()) != null)

                {

                    try

                    {

                        action();

                    }

                    catch (Exception e)

                    {

                        if (!swallowedExceptionTypes.Contains(e.GetType())) throw;

                    }

                }

            }

            finally

            {

                eventWaitHandle.Set();

            }

        }

 

        private Action GetTask()

        {

            lock (queueMonitor)

            {

                if (taskQueue.Count == 0) return null;

                return taskQueue.Dequeue();

            }

        }

    }

So how does it work? It uses a queue to hold each task that was added by the consumer of the class. It creates the given amount of threads and also creates an EventWaitHandle for each thread. Then when the user starts the execution with a call to RunTasksAndWait, each thread is started and then the call to RunTasksAndWait will wait until each thread is finished. In the meantime, each thread will get the next task off the queue and executes it. If an exception is thrown within the task, it is caught and is either swallowed or rethrown (the consumer can add exception types that can be swallowed). Each thread keeps doing this until it can’t get a new task off the queue. When that happens, the thread signals the EventWaitHandler and then it dies. When all threads are dead, RunTasksAndWait will stop blocking and control is returned to the caller. All of the tasks have been executed and the workload has been spread over the given amount of threads.

Note: due to the call to WaitHandle.WaitAll, this won’t work on STA threads because the implementation of WaitHandle.WaitAll simply throws a NotSupportedException on STA threads.

Keep in mind that this is a rough version of this code… it definitely needs a bit more polish (better exception handling for when the threads are interrupted and stuff like that mostly) but you get the idea :)

But if you know a better way to do this, or if you spot flaws in this implemenation, i’d love to hear about it :)

4 Responses to “The Multithreaded Task Executor”

  1. Justin Rudd Says:

    You lock queueMonitor in GetTask, but you don’t lock it in QueueTask so you could still corrupt the queue or have bad reads of the memory. Since you are using an anonymous lambda in the constructor of thread, you should know that Thread now has a constructor which takes an object as a parameter that it can pass as state to your thread procedure.

    Oh yeah…if you haven’t already check out the Parallel Framework for .NET. It has a nice Task management system complete with Futures. Although it would definitely break your “don’t touch till released rule” :)

  2. Davy Brion Says:

    well, the tasks are queued in a single thread, so the fact that QueueTask doesn’t lock queueMonitor is not that big of a problem, as long as this class is used ‘correctly’. But it definitely would be better if it simply locked in that method as well though, you’re definitely right about that :)

    but what state do you think i should pass to the constructor of the Thread? Do you mean getting rid of the lambda, and passing the WaitHandle as the state object reference?

    And yes, Parallel Framework for .NET is on my TODO list :p
    And yes, i will wait until the final release :p

  3. Justin Rudd Says:

    well, the tasks are queued in a single thread, so the fact that QueueTask doesn’t lock queueMonitor is not that big of a problem, as long as this class is used ‘correctly’

    Famous last words! :)

    The problem (IMHO) is you don’t “know” how Queue is implemented. Sure you could go dig through with reflector but it was probably not written with multiple threads accessing it at the same time. So you lock queueMonitor and call Count, but if the other thread is calling QueueTask, it might allocate memory for the underlying array (assuming an array is being used) BEFORE the call to Count. If the memory gets published BEFORE the actual element is put into the array, then Count returns 1 and Queue.Dequeue could return a null reference as well as reallocate the array that just got allocated. Back in the QueueTask thread, you are now accessing an element in an array that doesn’t exist.

    I’m probably being overly paranoid, but consider the following code…

    for(int i = 0; i < m_RemainingWork; ++i)

    The above code caused a very subtle race condition because some other code was calling Interlocked.Decrement(ref m_RemainingWork). The dev thought that the original for loop would completely execute before any of the background threads reduced m_RemainingWork. And on his Vista box, that is exactly what happened. On my 2008 server box (which treats all threads as equals), it blew up every time.

    And one other observation, your ProcessTasks is going to take up quite a bit of CPU because it is basically a SpinLock waiting on work. There is no niceness there. The threads will continually look for work without every taking a break. Not sure about your use cases, so maybe there is a steady stream of work coming in and that doesn’t happen.

    And yes, I mean pass the wait handle as the state. The lambda is redundant. You’ll have to cast instead (from object to WaitHandle). I love lambdas (I wrote a whole logging library using them - http://www.codeplex.com/LiveLabsLogging), but I generally don’t use them if I don’t have to. But that’s just my opinion.

  4. Davy Brion Says:

    hehe yea, statements like “as long as this class is used correctly” always blow up in your face :)

    and no, i wouldn’t say you’re overly paranoid… when it comes to threading it’s probably best to be paranoid ;)

Leave a Reply

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>