Saturday, January 28, 2012

Fast, generic circular queue (array) in .NET


I had a simple problem: had to use a generic queue to store my temporary results before a worker thread can pick them up. The problem is that the default .NET generic Queue<T> implementation (which internally uses a nice cyclic array) is dynamic, meaning if you keep adding items, it keeps growing.

This is how my cyclic queue can alive.

Usually this is something really great but if you are adding (queuing) items faster into the Queue than consuming it, you will run out of memory (not to mention the fierce garbage collection that occurs when resizing takes place).

So, I needed a simple key-valued cyclic queue that can hold a maximum number of generic items and can be accessed by two threads: a consumer and a producer.

The first attempt used .NET’s own serialization primitive, AutoResetEvent, which worked well but was really slow. After several approached I’ve found a very simple method: just use the counter (pointer) to see whether the consumer is not passing the producer.

If the consumer is fast (too many Push), it will hit Thread.Sleep to give chance to the other thread to add more items. If the reader (Pop) is faster, it will simply return a false value, indication there is nothing to read. This approach is simple, very fast and customizable.

The reason it uses key-value is because my data is basically key-value based and if you store your key-value into a KeyValuePair, the copy to and from it is fairly slow. Again, this had to be really fast!

So, if you need a cyclic queue, please feel free to use the one below or get inspiration from it.


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;

namespace MapReduce.NET.Collections
{
    public class CircularBuffer<K,V> 
    {
        const uint buffSize = 10 * 1000;
        K[] buffKey = new K[buffSize];
        V[] buffValue = new V[buffSize];
        uint ptrWrite;
        uint ptrRead;

        internal int DelayedWriteCount { get; set; }
 
        private bool mapInProgress = true;

        internal bool MapInProgress
        {
            get { return mapInProgress; }
            set { mapInProgress = value; }
        }

        internal bool HasNext
        {
            get
            {
                return ptrWrite > ptrRead;
            }
        }
        
        public void Push(K key, V value)
        {
            if (key == null)
                throw new ArgumentNullException("key");

            uint ptrWriteNext = ptrWrite + 1;

            while (ptrWriteNext == ptrRead + buffSize)
            {
                Thread.Sleep(0);
                DelayedWriteCount++;
            }

            uint buffptr = ptrWrite % buffSize;

            buffKey[buffptr] = key;
            buffValue[buffptr] = value;

            ptrWrite = ptrWriteNext;
        }

        internal bool Pop(out K key, out V value)
        {
            uint ptrReadNext = ptrRead + 1;

            key = default(K);
            value = default(V);

            if (ptrReadNext > ptrWrite)
            {
                return false;
            }

            uint buffptr = ptrRead % buffSize;

            key = buffKey[buffptr];
            value = buffValue[buffptr];

            ptrRead = ptrReadNext;


            return true;
        }
    }
}

3 comments:

  1. Why not use a better implementation of modulo (as oppose to C#'s %) that provides proper modulo integer add and subtract behavior, mapping all integer values between the specific range [0...n-1] and using that to index a fix length buffer, Buf[n].

    For example, instead of i = a%b, use:

    i = (a < 0) ? ((a+1)%b + b-1) : (a%b)

    http://stackoverflow.com/questions/2691025/mathematical-modulus-in-c-sharp

    ReplyDelete
    Replies
    1. Not too sure what you mean:

      1., There is no better implementation of modulo than the operator itself :)
      2., The code you sent simply uses the built in modulo for non negative numbers

      All your code would do is add an extra branch for negative numbers but as the code works with non negative numbers (uint) probably you'd get a compiler warning (or if not, the compiler would just remove that branch).

      Delete
  2. Hi Adam
    just came across your great blog. Really like it.

    I am the first to admit that I am a medium skilled amateur programmer
    (like everyone was once) and while I would like to try this circular queu, I have a hard time implementing it.
    Can you provide a mini implementation example?

    Thanks a lot

    ReplyDelete