Intrusive MPSC node-based queue
Advantages:
+ Intrusive. No need for additional internal nodes.
+ Wait-free and fast producers. One XCHG is maximum what one can get with multi-producer non-distributed queue.
+ Extremely fast consumer. On fast-path it's atomic-free, XCHG executed per node batch, in order to grab 'last item'.
+ No need for node order reversion. So pop operation is always O(1).
+ ABA-free.
+ No need for PDR. That is, one can use this algorithm out-of-the-box. No need for thread registration/deregistration, periodic activity, deferred garbage etc.
Disadvantages:
- Push function is blocking wrt consumer. I.e. if producer blocked in (*), then consumer is blocked too. Fortunately 'window of inconsistency' is extremely small - producer must be blocked exactly in (*). Actually it's disadvantage only as compared with totally lockfree algorithm. It's still much better lockbased algorithm.
struct mpscq_node_t
{
mpscq_node_t* volatile next;
};
struct mpscq_t
{
mpscq_node_t* volatile head;
mpscq_node_t* tail;
mpscq_node_t stub;
};
#define MPSCQ_STATIC_INIT(self) {&self.stub, &self.stub, {0}}
void mpscq_create(mpscq_t* self)
{
self->head = &self->stub;
self->tail = &self->stub;
self->stub.next = 0;
}
void mpscq_push(mpscq_t* self, mpscq_node_t* n)
{
n->next = 0;
mpscq_node_t* prev = XCHG(&self->head, n);
//(*)
prev->next = n;
}
mpscq_node_t* mpscq_pop(mpscq_t* self)
{
mpscq_node_t* tail = self->tail;
mpscq_node_t* next = tail->next;
if (tail == &self->stub)
{
if (0 == next)
return 0;
self->tail = next;
tail = next;
next = next->next;
}
if (next)
{
self->tail = next;
return tail;
}
mpscq_node_t* head = self->head;
if (tail != head)
return 0;
mpscq_push(self, &self->stub);
next = tail->next;
if (next)
{
self->tail = next;
return tail;
}
return 0;
}