Unbounded SPSC Queue

Unbounded single-producer/single-consumer node-based queue. Internal non-reducible cache of nodes is used. Dequeue operation is always wait-free. Enqueue operation is wait-free in common case (when there is available node in the cache), otherwise enqueue operation calls ::operator new(), so probably not wait-free. No atomic RMW operations nor heavy memory fences are used, i.e. enqueue and dequeue operations issue just several plain loads, several plain stores and one conditional branching. Cache-conscious data layout is used, so producer and consumer can work simultaneously causing no cache-coherence traffic.

Single-producer/single-consumer queue can be used for communication with thread which services hardware device (wait-free property is required), or when there are naturally only one producer and one consumer. Also N single-producer/single-consumer queues can be used to construct multi-producer/single-consumer queue, or N^2 queues can be used to construct fully-connected system of N threads (other partially-connected topologies are also possible).

Hardware platform: x86-32/64

Compiler: Intel C++ Compiler

// load with 'consume' (data-dependent) memory ordering

template<typename T>

T load_consume(T const* addr)

{

// hardware fence is implicit on x86

T v = *const_cast<T const volatile*>(addr);

__memory_barrier(); // compiler fence

return v;

}

// store with 'release' memory ordering

template<typename T>

void store_release(T* addr, T v)

{

// hardware fence is implicit on x86

__memory_barrier(); // compiler fence

*const_cast<T volatile*>(addr) = v;

}

// cache line size on modern x86 processors (in bytes)

size_t const cache_line_size = 64;

// single-producer/single-consumer queue

template<typename T>

class spsc_queue

{

public:

spsc_queue()

{

node* n = new node;

n->next_ = 0;

tail_ = head_ = first_= tail_copy_ = n;

}

~spsc_queue()

{

node* n = first_;

do

{

node* next = n->next_;

delete n;

n = next;

}

while (n);

}

void enqueue(T v)

{

node* n = alloc_node();

n->next_ = 0;

n->value_ = v;

store_release(&head_->next_, n);

head_ = n;

}

// returns 'false' if queue is empty

bool dequeue(T& v)

{

if (load_consume(&tail_->next_))

{

v = tail_->next_->value_;

store_release(&tail_, tail_->next_);

return true;

}

else

{

return false;

}

}

private:

// internal node structure

struct node

{

node* next_;

T value_;

};

// consumer part

// accessed mainly by consumer, infrequently be producer

node* tail_; // tail of the queue

// delimiter between consumer part and producer part,

// so that they situated on different cache lines

char cache_line_pad_ [cache_line_size];

// producer part

// accessed only by producer

node* head_; // head of the queue

node* first_; // last unused node (tail of node cache)

node* tail_copy_; // helper (points somewhere between first_ and

tail_)

node* alloc_node()

{

// first tries to allocate node from internal node cache,

// if attempt fails, allocates node via ::operator new()

if (first_ != tail_copy_)

{

node* n = first_;

first_ = first_->next_;

return n;

}

tail_copy_ = load_consume(&tail_);

if (first_ != tail_copy_)

{

node* n = first_;

first_ = first_->next_;

return n;

}

node* n = new node;

return n;

}

spsc_queue(spsc_queue const&);

spsc_queue& operator = (spsc_queue const&);

};

// usage example

int main()

{

spsc_queue<int> q;

q.enqueue(1);

q.enqueue(2);

int v;

bool b = q.dequeue(v);

b = q.dequeue(v);

q.enqueue(3);

q.enqueue(4);

b = q.dequeue(v);

b = q.dequeue(v);

b = q.dequeue(v);

}