Case Study: FastFlow Queue

FastFlow is a parallel programming framework for multicore platforms, which uses hybrid single-producer/single-consumer queues as a base underlying component for inter-thread communication. So let's study how the queues are designed and implemented, and if we can improve on them.

There are 5 main components:

1. Basic spsc array-based queue (ff::SWSR_Ptr_Buffer). It's used as a building block for the queue itself, and for various caching as well (source code).

2. Dynamic linked-list-based queue (ff::dynqueue). It's used to link several ff::SWSR_Ptr_Buffer together into a linked list, it allows us to build an unbounded queue from bounded ff::SWSR_Ptr_Buffer queues (source code).

3. Node cache for ff::dynqueue. It's an ff::SWSR_Ptr_Buffer used for caching of nodes.

4. ff::SWSR_Ptr_Buffer cache. It's also an ff::SWSR_Ptr_Buffer used for caching of, well, the same queues.

5. Component that binds it all together (ff::uSWSR_Ptr_Buffer). It manages caches, tracks a current buffer for a producer and for a consumer and so on (source code).

Let's try to visualize the structure:

Fig 1.

Yeah, it's a bit messy (provided that (3) and (4) are not even expanded - they are based on ff::SWSR_Ptr_Buffer, so look as (1)). What we see here is a lot of helper components, a lot of padding and a lot of indirection. Generally, it's not all that bad and overall design is quite fast and scalable, all cache line paddings are in place, and no atomic RMW nor expensive memory fences are on fast-paths. But does it need to be that complex? What I am going to do is to transform it to something that looks just as:

Fig 2.

So let's see what we can improve in the original design.

1. First, there is a completely unnecessary indirection uSWSR_Ptr_Buffer->SWSR_Ptr_Buffer->buffer->msg, and we can transform it to uSWSR_Ptr_Buffer->buffer->msg. An indirection can stall a thread, and it forces a thread to load an additional cache line. What's good in it? Nothing. In fig 2 you may see that the queue contains pointers directly into buffers.

2. There is completely no need in ff::dynqueue. We can embed 'next' links directly into buffers, and use them for linking. This way we eliminate ff::dynqueue and associated node cache. In fig 2 you may see that buffers are linked to each other internally.

3. We don't actually need SWSR_Ptr_Buffer cache as well. The main queue already links buffers together in the same way it would be done by a cache (FIFO). So in fig 2 you may see that a producer just holds a pointer 'last' to the last buffer which can be potentially reused, that is all what is required for caching of buffers. That is, a producer just checks as to whether the last buffer is already not used by a consumer or not, if so, a producer can reuse the last buffer for new messages.

4. There are some redundant conditional branching on fast-paths. I count 2 checks of enqueueing message for NULL (strictly saying, passing NULL into push() is a user's programming error, so it would be nice to be able to eliminate the check completely), 2 checks of buffer for fullness (only 1 is required), a check for prior queue initialization (if a user did not call init(), it's also a programming error on his side), and a check for overflow during index increment (which is plain unnecessary). And the same for the dequeue operation. That is, 6 conditional branches on fast-path for each operation, while only 1 is strictly necessary (whether the current buffer is not full/not empty or we must fall to slow-path).

5. A consumers checks for a presence of a next message as 'buffer[read_pos] != NULL', so buffers must be somehow initialized to NULLs. FastFlow approach is to memset buffers to NULL after allocation (which creates unpleasant burst of memory accesses and brings unnecessary data to cache), and then a consumer resets cells back to NULL after consumption (which may or may not be a bad thing depending on how a buffer will be used later). There is a better approach to initialization which does not produce memory access bursts and does not force consumers to acquire buffer's cache lines in a writable state. Basically, a producer initializes the next cell straight before setting up the current:

buffer[write_pos+1] = 0;

buffer[write_pos] = msg; // store-release

That's it. This makes memory accesses strictly local, and does not force any unnecessary cache line state changes (a producer will write to the next cell soon anyway).

6. Last but not least (actually the most important point), if one needs to stream the data, do stream the data (rather than pointers to the data). It (1) eliminates 1 layer of indirection (buffer->msg), (2) improves locality (the data is laid out continuously as opposed to scattered throughout the memory, (3) introduces kind of natural padding (that is, if a producer and a consumer work close to each other, in case of pointer based queue they will contend over a single cache line, but in case of data queue they will be further from each other), (4) eliminates dynamic memory allocation/deallocation overheads.

Now, let's sketch out data structure definition. We need to define buffer layout, as well as consumer's and producer's data:

class ff_queue

{

struct buffer_t

{

// pointer to the next buffer in the queue

buffer_t* next;

// size of the data

size_t size;

// the data

char data [0];

};

// consumer part:

// current position for reading

// (points somewhere into buffer_t::data)

char* volatile head_pos_;

// padding between consumer's and producer's parts

char pad_ [CACHE_LINE_SIZE];

// producer part:

// current position for writing

// (points somewhere into buffer_t::data)

char* tail_pos_;

// end of current buffer

char* tail_end_;

// helper variable

char* tail_next_;

// current 'tail' buffer

buffer_t* tail_buffer_;

// buffer cache

buffer_t* last_buffer_;

// default buffer size

size_t const buffer_size_;

// desired number of cached buffers

size_t const max_buffer_count_;

// current number of cached buffers

size_t buffer_count_;

// used as 'empty' marker

static size_t const eof = 1;

...

Now, let's sketch out the public interface:

class ff_queue

{

public:

void* enqueue_prepare (size_t size);

void enqueue_commit ();

void* dequeue_prepare ();

void dequeue_commit ();

// ...

Yes, since we want to pass the data (and not pointers to the data), we basically need to combine the queue and a memory manager. So now producer allocates memory directly from a queue:

void* msg = queue->enqueue_prepare(message_size);

// fill 'msg' with data

queue->enqueue_commit();

And consumer's code respectively looks like:

void* msg = queue->dequeue_prepare();

if (msg)

{

// consume msg

queue->dequeue_commit();

}

Now, let's implement the interface. enqueue_prepare() merely ensures that there is enough space in the current buffer for the message, and return current position in the buffer. enqueue_commit() merely writes EOF into the next cell, and then writes pointer to the next cell into the current cell:

void* enqueue_prepare (size_t size)

{

// round-up message size for proper alignment

size_t msg_size = ((uintptr_t)(size + sizeof(void*) - 1)

& ~(sizeof(void*) - 1)) + sizeof(void*);

// check as to whether there is enough space

// in the current buffer or not

if ((size_t)(tail_end_ - tail_pos_) >= msg_size + sizeof(void*))

{

// if yes, remember where next message starts

tail_next_ = tail_pos_ + msg_size;

// and return a pointer into the buffer

// for a user to fill with his data

return tail_pos_ + sizeof(void*);

}

else

{

// otherwise, fall to slow-path

return enqueue_prepare_slow(size);

}

}

void enqueue_commit ()

{

// prepare next cell

char* tail_next = tail_next_;

*(char* volatile*)tail_next = (char*)eof;

// update current cell

// (after this point the message becomes consumable)

atomic_addr_store_release((void* volatile*)tail_pos_, tail_next);

tail_pos_ = tail_next;

}

dequeue_prepare() loads and analyses value in the current cell. If there is no EOF flag, then there is a consumable message, otherwise there is either transfer to a next buffer or queue is empty:

void* dequeue_prepare ()

{

// load value in the current cell

void* next = atomic_addr_load_acquire((void* volatile*)head_pos_);

// if EOF flag is not set,

// then there is a consumable message

if (((uintptr_t)next & eof) == 0)

{

char* msg = head_pos_ + sizeof(void*);

return msg;

}

// otherwise there is just nothing or ...

else if (((uintptr_t)next & ~eof) == 0)

{

return 0;

}

// ... a tranfer to a next buffer

else

{

// in this case we just follow the pointer and retry

atomic_addr_store_release((void* volatile*)&head_pos_,

(char*)((uintptr_t)next & ~eof));

return dequeue_prepare();

}

}

void dequeue_commit ()

{

// follow the pointer to the next cell

char* next = *(char* volatile*)head_pos_;

assert(next != 0);

atomic_addr_store_release((void* volatile*)&head_pos_, next);

}

The only unimplemented function enqueue_prepare_slow() either obtains a buffer from cache (last_buffer_) or allocates a brand new buffer, then setups a transfer from a current buffer to the new one (a pointer combined with EOF flag), and then recursively calls enqueue_prepare() (which must succeed this time). I won't provide the code here, it's a slow-path anyway, refer to the full source code below.

I know, I know, after reading all that boring stuff you ask - and how much faster is it?

In order to test performance I created the following synthetic benchmark. There is a main thread and N worker threads, they all linked into a ring by means of the queues:

Main thread emits 50M messages, the last message is a special END token. Worker threads accept messages, do basic integrity verification, and pass them further along the ring. When a worker thread receives the END token, it terminates. The main thread does the same, with the exception that it does not pass messages further, that is, each message traverses the ring only once. Message sizes vary from 5 bytes to 20 bytes (4-byte length + payload).

I've tested it on an multiprocessor/multicore AMD machine (4 processors x 4 cores = 16 hardware threads total), and here are the results:

As you can see, my queue is not only faster, it also scales better - slope of the line is steeper. However, the only source of better scalability it data embedding into a queue (as opposed to passing pointers, that's point 6 in the list above). I've created another queue which is customized for pointer passing (ff_queue2 in the archive), it uses all the optimizations except data embedding, and on the same benchmark it shows only constant 5-20% speedup:

So, data embedding turns out to be a valuable method. But, of course, it's not always applicable. For example, if I have a read-only processing stage (like CRC validation), I want to merely hand off a pointer further, rather than copy data from one queue to another. Another example is parallel computations on large arrays/matrices, one doesn't really want to copy them around. However even such applications frequently create thin messages/tasks which contain something like few pointers to arrays/matrices and some indexes into them, and such messages are a perfect target for embedding.

Source code is attached below. The archive contains the queue (ff_queue), queue customized for pointer passing (ff_queue2), the benchmark code (quite messy) and required pieces of FastFlow. It can be compiled under Windows/MSVC and Linux/gcc.