Fast, generic circular queue (array) in .NET
I had a simple problem: had to use a generic queue to store
my temporary results before a worker thread can pick them up. The problem is
that the default .NET generic Queue<T> implementation (which internally
uses a nice cyclic array) is dynamic, meaning if you keep adding items, it
keeps growing.
This is how my cyclic queue can alive.
This is how my cyclic queue can alive.
Usually this is something really great but if you are adding
(queuing) items faster into the Queue than consuming it, you will run out of
memory (not to mention the fierce garbage collection that occurs when resizing
takes place).
So, I needed a simple key-valued cyclic queue that can hold
a maximum number of generic items and can be accessed by two threads: a
consumer and a producer.
The first attempt used .NET’s own serialization primitive,
AutoResetEvent, which worked well but was really slow. After several approached
I’ve found a very simple method: just use the counter (pointer) to see whether
the consumer is not passing the producer.
If the consumer is fast (too many Push), it will hit Thread.Sleep to give
chance to the other thread to add more items. If the reader (Pop) is faster, it will simply return a false value, indication there is nothing to read. This approach is simple, very
fast and customizable.
The reason it uses key-value is because my data is basically
key-value based and if you store your key-value into a KeyValuePair, the copy
to and from it is fairly slow. Again, this had to be really fast!
So, if you need a cyclic queue, please feel free to use the
one below or get inspiration from it.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Collections.Concurrent; using System.Threading; namespace MapReduce.NET.Collections { public class CircularBuffer<K,V> { const uint buffSize = 10 * 1000; K[] buffKey = new K[buffSize]; V[] buffValue = new V[buffSize]; uint ptrWrite; uint ptrRead; internal int DelayedWriteCount { get; set; } private bool mapInProgress = true; internal bool MapInProgress { get { return mapInProgress; } set { mapInProgress = value; } } internal bool HasNext { get { return ptrWrite > ptrRead; } } public void Push(K key, V value) { if (key == null) throw new ArgumentNullException("key"); uint ptrWriteNext = ptrWrite + 1; while (ptrWriteNext == ptrRead + buffSize) { Thread.Sleep(0); DelayedWriteCount++; } uint buffptr = ptrWrite % buffSize; buffKey[buffptr] = key; buffValue[buffptr] = value; ptrWrite = ptrWriteNext; } internal bool Pop(out K key, out V value) { uint ptrReadNext = ptrRead + 1; key = default(K); value = default(V); if (ptrReadNext > ptrWrite) { return false; } uint buffptr = ptrRead % buffSize; key = buffKey[buffptr]; value = buffValue[buffptr]; ptrRead = ptrReadNext; return true; } } }
Why not use a better implementation of modulo (as oppose to C#'s %) that provides proper modulo integer add and subtract behavior, mapping all integer values between the specific range [0...n-1] and using that to index a fix length buffer, Buf[n].
ReplyDeleteFor example, instead of i = a%b, use:
i = (a < 0) ? ((a+1)%b + b-1) : (a%b)
http://stackoverflow.com/questions/2691025/mathematical-modulus-in-c-sharp
Not too sure what you mean:
Delete1., There is no better implementation of modulo than the operator itself :)
2., The code you sent simply uses the built in modulo for non negative numbers
All your code would do is add an extra branch for negative numbers but as the code works with non negative numbers (uint) probably you'd get a compiler warning (or if not, the compiler would just remove that branch).
Hi Adam
ReplyDeletejust came across your great blog. Really like it.
I am the first to admit that I am a medium skilled amateur programmer
(like everyone was once) and while I would like to try this circular queu, I have a hard time implementing it.
Can you provide a mini implementation example?
Thanks a lot
great
ReplyDelete