Testing the lock-free queue

Having discussed a test app for testing the multithreaded use of the lock-free stack, it’s now time for the lock-free queue.

Queue of businessmenFirst of all, I decided to play around with the lock-free queue more than I had done with the lock-free stack. The queue works by enqueuing at the tail and dequeuing at the head of the internal linked list. Because an enqueue requires two updates to the linked list (adding the new node, and moving the tail pointer to the new node) and it’s hard to get them to both happen during the enqueue operation, Michael and Scott, the inventors of this particular technique, decided that updating the tail pointer wasn’t ultra-important straightaway. If it happened, great, but, if not, they deferred it to the next thread performing an enqueue or a dequeue. In fact, the enqueue operation requires the tail to be pointing to the last node before the enqueue can happen. In other words, the tail can never become adrift by more than one position from the end. If a particular enqueue doesn’t update that tail pointer, the very next one will force it to happen. Since there is a dummy node in the queue implementation holding no data, this means that the tail can never be or point ‘outside’ the linked list; it’s not like we can get some weird ABA problem where the tail is pointing at an old node no longer part of the list.

Michael and Scott went even further: they made the dequeue operation update the tail position if it was required. This, frankly, made the dequeue code harder to understand and harder to test. And, in reality, it’s just not required. Again: the tail can’t become adrift by more than one node since the next enqueue will update it before the new node can be added (in particular, the tail’s next pointer must be null before the new node is added).

So I decided to remove the tail update code from the Dequeue() method altogether. That, plus some more tidying up advised by CodeRush’s code issues feature, led to this latest code for the lock-free queue:

using System;
using JmBucknall.Threading;

namespace JmBucknall.Structures {

  public class LockFreeQueue<T> {
    SingleLinkNode<T> head;
    SingleLinkNode<T> tail;

    public LockFreeQueue() {
      head = new SingleLinkNode<T>();
      tail = head;
    }

    public void Enqueue(T item) {
      SingleLinkNode<T> newNode = new SingleLinkNode<T> { Item = item };

      while (true) {
        SingleLinkNode<T> oldTail = tail;
        SingleLinkNode<T> oldTailNext = oldTail.Next;

        if (tail == oldTail) {
          if (oldTailNext != null) {
            SyncMethods.CAS<SingleLinkNode<T>>(ref tail, oldTail, oldTailNext);
          }
          else {
            if (SyncMethods.CAS<SingleLinkNode<T>>(ref tail.Next, null, newNode)) {
              SyncMethods.CAS<SingleLinkNode<T>>(ref tail, oldTail, newNode);
              return;
            }
          }
        }
      }
    }

    public bool Dequeue(out T item) {
      while (true) {
        SingleLinkNode<T> oldHead = head;
        SingleLinkNode<T> oldHeadNext = oldHead.Next;

        if (oldHead == head) {
          if (oldHeadNext == null) {
            item = default(T);
            return false;
          }
          if (SyncMethods.CAS<SingleLinkNode<T>>(ref head, oldHead, oldHeadNext)) {
            item = oldHeadNext.Item;
            return true;
          }
        }
      }
    }

    public T Dequeue() {
      T result;
      Dequeue(out result);
      return result;
    }
  }
}

For the Enqueue() method, a new node is allocated and initialized. We then enter an infinite loop (it’ll be exited when we add the new node properly at the end of the linked list) and take copies of the tail and the tail’s next node. If there’s no change in the tail, and the next node is not null, the tail has fallen behind and we (try to) update its position. If the next node is null, we try to add the new node. If that was successful, we (try to) update the tail and return. Simple enough: update the tail if needed and add the new node on the end, afterwards trying to update the tail. The new node will only be added if the tail’s next node is null.

Dequeue() is much simpler than before. No longer do we worry about the tail; let Enqueue() deal with that. We take a copy of the head and its next node. If the head hasn’t changed and the next node is null, we can return the default value and false (to indicate the queue was empty at that point in time). If the head hasn’t changed and the next node is not null, we try to advance the head to its next node. If we’re successful, the old head’s next node is now the dummy head node and we can return its item with impunity and true to indicate a successful dequeue.

Now we can look at the test program. It works in exactly the same way as in the stack case, so I’ll leave the description to that post.

using System;
using System.Threading;
using JmBucknall.Structures;

namespace MultithreadedQueueTester {

  static class Globals {
    public const int TopValue = 10000000;
    public const int EnqueuerCount = 2;
    public const int DequeuerCount = 5;
    public static int EnqueueValue;
    public static bool NoMoreData;
  }

  public class ThreadData {
    public LockFreeQueue<int> queue;
    public ManualResetEvent completion;
    public int[] results;
    public ThreadData(LockFreeQueue<int> queue, ManualResetEvent completion, int[] results) {
      this.queue = queue;
      this.completion = completion;
      this.results = results;
    }
  }

  static class EnqueueEngine {
    static public void Execute(Object stateInfo) {
      ThreadData data = stateInfo as ThreadData;
      int enqueueCount = 0;

      int nextValue = Interlocked.Increment(ref Globals.EnqueueValue);
      while (nextValue <= Globals.TopValue) {
        data.queue.Enqueue(nextValue);
        enqueueCount++;
        nextValue = Interlocked.Increment(ref Globals.EnqueueValue);
      }

      Console.WriteLine(String.Format("Enqueue count: {0}", enqueueCount));
      data.completion.Set();
    }
  }

  static class DequeueEngine {
    static public void Execute(Object stateInfo) {
      ThreadData data = stateInfo as ThreadData;

      int dequeueCount = 0;
      while (true) {
        int value = data.queue.Dequeue();
        if (value == 0) {
          if (Globals.NoMoreData)
            break;
          Thread.Sleep(1); //minor wait when queue is empty
        }
        else {
          dequeueCount++;
          int oldValue = Interlocked.CompareExchange(ref data.results[value], 1, 0);
          if (oldValue != 0) {
            Console.WriteLine(String.Format("Error: already dequeued {0}", value));
          }
        }
      }

      Console.WriteLine(String.Format("Dequeue count: {0}", dequeueCount));
      data.completion.Set();
    }
  }

  class Program {
    private static ManualResetEvent[] QueueWorkItems(int count, LockFreeQueue<int> queue, int[] results, WaitCallback callback) {
      ManualResetEvent[] events = new ManualResetEvent[count];
      for (int i = 0; i < count; i++) {
        events[i] = new ManualResetEvent(false);
        ThreadPool.QueueUserWorkItem(callback, new ThreadData(queue, events[i], results));
      }
      return events;
    }

    static void Main(string[] args) {
      DateTime start = DateTime.Now;

      Console.WriteLine("create the shared queue");
      LockFreeQueue<int> queue = new LockFreeQueue<int>();

      Console.WriteLine("create the shared results array");
      int[] results = new int[Globals.TopValue + 1];

      Console.WriteLine("create the completion events");
      var enqueuersDone = QueueWorkItems(Globals.EnqueuerCount, queue, results, EnqueueEngine.Execute);
      var dequeuersDone = QueueWorkItems(Globals.DequeuerCount, queue, results, DequeueEngine.Execute);

      Console.WriteLine("wait for the enqueuers to be done");
      ManualResetEvent.WaitAll(enqueuersDone);

      Console.WriteLine("signal the dequeuers to stop, and wait");
      Globals.NoMoreData = true;
      ManualResetEvent.WaitAll(dequeuersDone);

      Console.WriteLine("check that all values were dequeued");
      for (int i = 1; i <= Globals.TopValue; i++) {
        if (results[i] != 1)
          Console.WriteLine(String.Format("Error: {0} was never dequeued.", i));
      }

      DateTime end = DateTime.Now;
      Console.WriteLine("Done");
      Console.WriteLine(end - start);

      Console.ReadLine();
    }
  }
}

Running this test app shows the lock-free queue works well and correctly in a multithreaded environment on a four-core machine.

References

 

Album cover for DecksandrumsandrockandrollNow playing:
Propellerheads - History Repeating
(from Decksandrumsandrockandroll)

Loading similar posts...   Loading links to posts on similar topics...

4 Responses

#1 Dew Drop – March 29, 2010 | Alvin Ashcraft's Morning Dew said...
30-Mar-10 5:20 AM

Pingback from Dew Drop – March 29, 2010 | Alvin Ashcraft's Morning Dew

 avatar
#2 Nova said...
25-Jun-12 9:45 PM

Hi, I'm intresting in lock-free queue impl's performance. I notice that your impl contains "SyncMethods.CAS" underneeth, so would you mind testing your impl with mine? I just need to cmpare the enqueu/dequeue speed, thanks in advance.

Here is the simple sync queue:

class SyncQueue<t>
{
  private Queue<t> queue = new Queue<t>();
  private object syncLock = new Object();
  public void Enqueue(T obj)
  {
    lock(syncLock)
    {
      queue.Enqueue(obj);
    }
  }
  public T Dequeue()
  {
    if (queue.Count > 0)
    {
      lock(syncLock)
      {
        if (queue.Count > 0)
        {
          return queue.Dequeue();
        }
      }
    }
    return default(T);
  }
  public int Count
  {
    get { return queue.Count; }
  }
}
julian m bucknall avatar
#3 julian m bucknall said...
28-Jun-12 2:13 PM

Nova: Er, what? You're interested, but not so interested that you'd do the work yourself? I'll pass, thanks.

(More remarks: how many cores? hyperthreading enabled? amount of RAM? speed of bus? My guess is that if you limit the number of threads to the number of cores or less, the simple locking version will be faster especially since the CLR team have optimized the locking code to use a spinlock -- a different kind of lockfree algorithm -- before upgrading to a full lock if it cannot be acquired.)

Cheers, Julian

 avatar
#4 Nova said...
28-Jun-12 7:10 PM

Thanks for your reply, and the "spinlock" is a generous help to me, thanks again.

Definitely I will do the test myself if you could show me the "SyncMethods.CAS" impl, thanks in advance.

I've tested the version in "Concurrent Programming on Windows", and it seems that my full-lock version is at least 2 times faster than the lock-free one(enqueue 8million obj, with 4 threads on a 2 core Win7 PC and with 32 threads on a 16 core Win2003 server). If add more threads, the full-lock version even more faster, holy why...

Maybe my test code has sth. completely wrong that I haven't noticed:

// .Net 2.0, VS2008
class Program
{
  static DateTime begin;
  static SyncQueue<ulong> queue = new SyncQueue<ulong>();
  static void threadFunc()
  {
    int id = Thread.CurrentThread.ManagedThreadId;
    ulong count = 0;
    while (true)
    {
      queue.Enqueue(++count);
      if (queue.Count >= 8000000)
      {
        TimeSpan ts = DateTime.Now - begin;
         Console.WriteLine("Thread " + id.ToString() + " - " + ts.TotalMilliseconds.ToString());
        break;
      }
    }
  }
  static void Main(string[] args)
  {
    begin = DateTime.Now;
    for (int i = 0; i < 32; ++i)
    {
      Thread p = new Thread(new ThreadStart(threadFunc));
      p.IsBackground = true;
      p.Start();
    }
    Console.ReadLine();
  }
}

Sorry for my poor English and thanks a lot!

Leave a response

Note: some MarkDown is allowed, but HTML is not. Expand to show what's available.

  •  Emphasize with italics: surround word with underscores _emphasis_
  •  Emphasize strongly: surround word with double-asterisks **strong**
  •  Link: surround text with square brackets, url with parentheses [text](url)
  •  Inline code: surround text with backticks `IEnumerable`
  •  Unordered list: start each line with an asterisk, space * an item
  •  Ordered list: start each line with a digit, period, space 1. an item
  •  Insert code block: start each line with four spaces
  •  Insert blockquote: start each line with right-angle-bracket, space > Now is the time...
Preview of response