Unbounded SPSC Queue

Unbounded single-producer/single-consumer node-based queue. Internal non-reducible cache of nodes is used. Dequeue operation is always wait-free. Enqueue operation is wait-free in common case (when there is available node in the cache), otherwise enqueue operation calls ::operator new(), so probably not wait-free. No atomic RMW operations nor heavy memory fences are used, i.e. enqueue and dequeue operations issue just several plain loads, several plain stores and one conditional branching. Cache-conscious data layout is used, so producer and consumer can work simultaneously causing no cache-coherence traffic.

Single-producer/single-consumer queue can be used for communication with thread which services hardware device (wait-free property is required), or when there are naturally only one producer and one consumer. Also N single-producer/single-consumer queues can be used to construct multi-producer/single-consumer queue, or N^2 queues can be used to construct fully-connected system of N threads (other partially-connected topologies are also possible).

Hardware platform: x86-32/64 
Compiler: Intel C++ Compiler 

// load with 'consume' (data-dependent) memory ordering 
template<typename T> 
T load_consume(T const* addr) 
    // hardware fence is implicit on x86 
    T v = *const_cast<T const volatile*>(addr); 
    __memory_barrier(); // compiler fence 
    return v; 

// store with 'release' memory ordering 
template<typename T> 
void store_release(T* addr, T v) 
    // hardware fence is implicit on x86 
    __memory_barrier(); // compiler fence 
    *const_cast<T volatile*>(addr) = v; 

// cache line size on modern x86 processors (in bytes) 
size_t const cache_line_size = 64; 
// single-producer/single-consumer queue 
template<typename T> 
class spsc_queue 
public: 
  spsc_queue() 
  { 
      node* n = new node; 
      n->next_ = 0; 
      tail_ = head_ = first_= tail_copy_ = n; 
  } 

  ~spsc_queue() 
  { 
      node* n = first_; 
      do 
      { 
          node* next = n->next_; 
          delete n; 
          n = next; 
      } 
      while (n); 
  } 

  void enqueue(T v) 
  { 
      node* n = alloc_node(); 
      n->next_ = 0; 
      n->value_ = v; 
      store_release(&head_->next_, n); 
      head_ = n; 
  } 

  // returns 'false' if queue is empty 
  bool dequeue(T& v) 
  { 
      if (load_consume(&tail_->next_)) 
      { 
          v = tail_->next_->value_; 
          store_release(&tail_, tail_->next_); 
          return true; 
      } 
      else 
      { 
          return false; 
      } 
  } 

private: 
  // internal node structure 
  struct node 
  { 
      node* next_; 
      T value_; 
  }; 

  // consumer part 
  // accessed mainly by consumer, infrequently be producer 
  node* tail_; // tail of the queue 

  // delimiter between consumer part and producer part, 
  // so that they situated on different cache lines 
  char cache_line_pad_ [cache_line_size]; 

  // producer part 
  // accessed only by producer 
  node* head_; // head of the queue 
  node* first_; // last unused node (tail of node cache) 
  node* tail_copy_; // helper (points somewhere between first_ and 
tail_) 

  node* alloc_node() 
  { 
      // first tries to allocate node from internal node cache, 
      // if attempt fails, allocates node via ::operator new() 

      if (first_ != tail_copy_) 
      { 
          node* n = first_; 
          first_ = first_->next_; 
          return n; 
      } 
      tail_copy_ = load_consume(&tail_); 
      if (first_ != tail_copy_) 
      { 
          node* n = first_; 
          first_ = first_->next_; 
          return n; 
      } 
      node* n = new node; 
      return n; 
  } 

  spsc_queue(spsc_queue const&); 
  spsc_queue& operator = (spsc_queue const&); 

}; 

// usage example 
int main() 
  spsc_queue<int> q; 
  q.enqueue(1); 
  q.enqueue(2); 
  int v; 
  bool b = q.dequeue(v); 
  b = q.dequeue(v); 
  q.enqueue(3); 
  q.enqueue(4); 
  b = q.dequeue(v); 
  b = q.dequeue(v); 
  b = q.dequeue(v); 
}


Comments