The Inquisitive Coder - Davy Brion’s Blog

Thinking outside of the typical .NET box

Archive for the 'Multithreading' Category


The Multithreaded Task Executor

Posted by Davy Brion on 26th May 2008

Sometimes, you’ve got a bunch of actions that you need to execute in a loop. The problem is that those actions are all performed synchronously so this could take some time depending on the action. But, if the action itself is thread-safe, and the actions are not dependent on the results of previous actions, you might get much better performance if you spread that workload over a few different threads. Especially if you have multiple CPU cores.

Wouldn’t it be cool if you could do something like this:

            MultiThreadedTaskExecutor taskExecutor = new MultiThreadedTaskExecutor(numberOfThreadsToUse);

 

            foreach (Input input in inputs)

            {

                Input newVariable = input;

                taskExecutor.QueueTask(() => ProcessInput(newVariable));

            }

 

            taskExecutor.RunTasksAndWait();

Note: the reason why you need to use a new variable inside the loop is to avoid that the ‘input’ variable (not the reference to the object, but the actual variable) is captured by the anonymous method. If you don’t use a new variable, each created anonymous method would refer to the ‘input’ loop variable, which by the time all the tasks are executed points to the last Input instance in the inputs collection. The result would be that each task is executed on the same input instance. This is a known issue with variable capturing and anonymous methods.

Anyways… the code above basically spreads the workload over the given number of threads.

The rough, not-quite-production-ready code of the MultiThreadedTaskExecutor class looks like this:

    public class MultiThreadedTaskExecutor

    {

        private readonly List<Thread> threads = new List<Thread>();

        private readonly List<EventWaitHandle> eventWaitHandles = new List<EventWaitHandle>();

        private readonly List<Type> swallowedExceptionTypes;

 

        private readonly Queue<Action> taskQueue;

        private readonly object queueMonitor = new object();

 

        public MultiThreadedTaskExecutor(int numberOfThreads)

        {

            swallowedExceptionTypes = new List<Type>();

            taskQueue = new Queue<Action>();

            CreateThreads(numberOfThreads);

        }

 

        public void QueueTask(Action task)

        {

            taskQueue.Enqueue(task);

        }

 

        public void AddExceptionTypeToSwallow(Type type)

        {

            swallowedExceptionTypes.Add(type);

        }

 

        public void RunTasksAndWait()

        {

            foreach (Thread thread in threads)

            {

                thread.Start();

            }

 

            WaitHandle.WaitAll(eventWaitHandles.ToArray());

        }

 

        private void CreateThreads(int number)

        {

            for (int i = 0; i < number; i++)

            {

                var eventWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);

                eventWaitHandles.Add(eventWaitHandle);

                threads.Add(new Thread(() => ProcessTasks(eventWaitHandle)));

            }

        }

 

        private void ProcessTasks(EventWaitHandle eventWaitHandle)

        {

            try

            {

                Action action;

 

                while ((action = GetTask()) != null)

                {

                    try

                    {

                        action();

                    }

                    catch (Exception e)

                    {

                        if (!swallowedExceptionTypes.Contains(e.GetType())) throw;

                    }

                }

            }

            finally

            {

                eventWaitHandle.Set();

            }

        }

 

        private Action GetTask()

        {

            lock (queueMonitor)

            {

                if (taskQueue.Count == 0) return null;

                return taskQueue.Dequeue();

            }

        }

    }

So how does it work? It uses a queue to hold each task that was added by the consumer of the class. It creates the given amount of threads and also creates an EventWaitHandle for each thread. Then when the user starts the execution with a call to RunTasksAndWait, each thread is started and then the call to RunTasksAndWait will wait until each thread is finished. In the meantime, each thread will get the next task off the queue and executes it. If an exception is thrown within the task, it is caught and is either swallowed or rethrown (the consumer can add exception types that can be swallowed). Each thread keeps doing this until it can’t get a new task off the queue. When that happens, the thread signals the EventWaitHandler and then it dies. When all threads are dead, RunTasksAndWait will stop blocking and control is returned to the caller. All of the tasks have been executed and the workload has been spread over the given amount of threads.

Note: due to the call to WaitHandle.WaitAll, this won’t work on STA threads because the implementation of WaitHandle.WaitAll simply throws a NotSupportedException on STA threads.

Keep in mind that this is a rough version of this code… it definitely needs a bit more polish (better exception handling for when the threads are interrupted and stuff like that mostly) but you get the idea :)

But if you know a better way to do this, or if you spot flaws in this implemenation, i’d love to hear about it :)

Posted in Multithreading, Patterns | 4 Comments »

Easy non-blocking locking

Posted by Davy Brion on 13th May 2008

I’m currently reading Release It and in it, the author really stresses the importance of not having any blocking-calls in your code. The reason is fairly simple, if something goes wrong in a blocking call, the thread executing it might hang forever. If you’re servicing requests, you surely don’t want all of your request-handling threads to hang forever because before you know it, all of the threads will be blocked and no requests will be dealt with.

You may remember my post about thread-safe repositories from a while ago. In the code from that post, i use the lock keyword to make sure instances of the class are thread-safe. The problem with the lock keyword is that it uses the Monitor.Enter method, which blocks indefinitely until it can acquire a lock on the object you pass to it. And since you should avoid blocking calls, you really should use Monitor.TryEnter instead, because it allows you to set a timeout. If the lock can’t be acquired within the timeout period, the method simply returns false and you did not get a lock. This makes it possible to avoid blocking threads and deadlocks.

The problem is that the lock keyword makes everything pretty easy… it guarantees that the lock is released (by calling Monitor.Exit) when leaving the code block, whatever may have possible went wrong. So if you want to use Monitor.TryEnter, you basically have to use a try/finally everytime. Not only does that make the code more ugly, it’s so boring and tedious that it’s easy to screw up once in a while. So i started googling for a different approach, and luckily, Ian Griffith has two excellent posts on the subject. He basically uses a TimedLock struct in a using block… when you request the lock, you specify a timeout value, and when the lock can’t be acquired within the given timeout period, it throws a LockTimeOutException. Here’s a (simplified) version of the TimedLock struct:

    public struct TimedLock : IDisposable

    {

        private readonly object target;

 

        private TimedLock(object o)

        {

            target = o;

        }

 

        public void Dispose()

        {

            Monitor.Exit(target);

        }

 

        public static TimedLock Lock(object o)

        {

            return Lock(o, TimeSpan.FromSeconds(5));

        }

 

        public static TimedLock Lock(object o, TimeSpan timeout)

        {

            return Lock(o, timeout.Milliseconds);

        }

 

        public static TimedLock Lock(object o, int milliSeconds)

        {

            var timedLock = new TimedLock(o);

 

            if (!Monitor.TryEnter(o, milliSeconds))

            {

                throw new LockTimeoutException();

            }

 

            return timedLock;

        }

    }

 

    public class LockTimeoutException : ApplicationException

    {

        public LockTimeoutException() : base(“Timeout waiting for lock”) {}

    }

The version on his blog also has a way of detecting unreleased locks when you’re in debug mode.

So now you can basically change the following code:

        public virtual T Get(Id id)

        {

            lock(MonitorObject)

            {

                if (!Members.ContainsKey(id))

                {

                    return null;

                }

 

                return Members[id];

            }

        }

To this:

        public virtual T Get(Id id)

        {

            using (TimedLock.Lock(MonitorObject, 250))

            {

                if (!Members.ContainsKey(id))

                {

                    return null;

                }

 

                return Members[id];

            }

        }

So now it tries to acquire a lock on the MonitorObject instance with a timeout of 250 milliseconds. If it can’t acquire the lock, something is wrong and the LockTimeOutException is thrown. Which is a lot better than blocking indefinitely :)

Posted in Multithreading, Software Development | No Comments »

Thread-safe repositories

Posted by Davy Brion on 25th March 2008

First of all, this is not about true DDD repositories… i just needed an in-memory data container with basic storing/retrieving functionality so i figured i’d call it a Repository.

These repositories will be accessed by multiple concurrent threads so i needed to make sure that all access to the underlying dictionary is properly synchronized. But, i also want them to be highly usable and i especially wanted a thread-safe way of dynamically executing queries on them.

Here’s what i came up with:

    public class Repository<T> where T : Member

    {

        private readonly object _monitor = new object();

        private readonly Dictionary<Id, T> _members = new Dictionary<Id,T>();

 

        protected object Monitor

        {

            get { return _monitor; }

        }

 

        /// <summary>

        /// any derived classes that use this property are responsible for their own locking!

        /// (use the Monitor property for that)

        /// </summary>

        protected Dictionary<Id, T> Members

        {

            get { return _members; }

        }

 

        public virtual void Store(T member)

        {

            lock (Monitor)

            {

                Put(member);

            }

        }

 

        public virtual void StoreRange(IEnumerable<T> members)

        {

            lock (Monitor)

            {

                foreach (T member in members)

                {

                    Put(member);

                }

            }

        }

 

        public virtual void Remove(T member)

        {

            lock (Monitor)

            {

                if (Members.ContainsValue(member))

                {

                    Members.Remove(member.Id);

                }

            }

        }

 

        public virtual T Get(Id id)

        {

            lock (Monitor)

            {

                if (!Members.ContainsKey(id))

                {

                    return null;

                }

 

                return Members[id];

            }

        }

 

        public virtual T FindFirst(Func<T, bool> expression)

        {

            lock (Monitor)

            {

                return Members.Values.FirstOrDefault(expression);

            }

        }

 

        public virtual IEnumerable<T> FindAll(Func<T, bool> expression)

        {

            lock (Monitor)

            {

                return Members.Values.Where(expression).ExecuteImmediately();

            }

        }

 

        /// <summary>

        /// this method should ONLY be called when a lock on Monitor has been acquired!

        /// </summary>

        /// <param name=”member”></param>

        private void Put(T member)

        {

            // this overwrites existing entries with the same ID… it’s not a mistake ;)

            Members[member.Id] = member;

        }

    }

each access to the dictionary is properly synchronized (at least, i think so… if anyone knows of a better way to do the locking, please leave a comment) and i can still execute dynamic queries on them by passing lambda expressions to the FindFirst and FindAll methods, like this:

            User me = userReposority.FindFirst(u => u.FirstName == “Davy” && u.LastName == “Brion”);

            IEnumerable<User> usersWithNoLastName =

                userReposority.FindAll(u => string.IsNullOrEmpty(u.LastName));

Posted in Multithreading, Software Development | 2 Comments »