Wait-free Object Storage with Single-word Atomic Operations

By object storage I mean a component that can be used in the following scenario:

object_storage g_s;

// a plurality of "reader" threads periodically acquire and release a

// reference to "a current" version of some global object

while (...)

{

T* obj = g_s.acquire();

// do something with obj

g_s.release(obj);

}

// a "writer" thread periodically (but infrequently) updates the

object

while (...)

{

T* old_obj = g_s.write_lock();

T* new_obj = create_new_object(old_obj);

g_s.write_unlock(new_obj); // installs new version of the object

}

Such a component is useful to manage such objects like global

application settings (that still can be episodically updates by an

administrator, but constantly used everywhere), or routing table in a

network router (which is updated each X seconds/minutes, but read

during processing of each packet), etc.

And, yes, the component uses MVCC (multi-version concurrency control),

that is at each moment in time there can be several alive version of

an object (and it's generally a good idea in a concurrent system).

Of course you can implement it with a mutex + a reference counted

object in the following way:

std::mutex g_mtx;

std::mutex g_write_mtx;

std::shared_ptr<T> g_ptr;

std::shared_ptr<T> read_lock()

{

std::mutex::scoped_lock l (g_mtx);

return g_ptr; // shared_ptr copy

}

std::shared_ptr<T> write_lock()

{

g_write_mtx.lock();

std::mutex::scoped_lock l (g_mtx);

return g_ptr; // shared_ptr copy

}

void write_unlock(std::shared_ptr<T> const& ptr)

{

{

std::mutex::scoped_lock l (g_mtx);

g_ptr = ptr;

}

g_write_mtx.unlock();

}

Simple. But performance is not all that great. There is at least 1

atomic RMW on a heavy contented memory location for mutex lock/unlock

+ 1 atomic RMW on a heavy contended reference counter. Plus some

possibility of blocking on the mutex.

The wait-free storage algorithm issues only 1 atomic RMW per acquire/

release + contention is somewhat distributed since it uses so called

"differential reference counting" (DRC), that is all "+1" go to one

counter, while all "-1" go to another counter.

I posted a similar algorithm some time ago. But the problem is that general-purpose DRC requires double-word

atomic RMW operations, and they are not available on some

architectures as the saying goes.

DRC generally combines pointer to the current version of an object and

"outer" reference counter into a single memory "word" (that can be

manipulated atomically) (while "inner" reference counter resides in

the object). The idea is to limit number of alive versions of an

object (which is once again generally a good idea, for example, you

expect there to be only 1 alive version most of the time, and 2

versions some time after update, now, if number of alive versions

starts to grow unboundedly, something weird goes and you can just run

out of memory). Now, when we have at most, say, 4 versions of an

object, we can store an index (0..3) instead of a pointer, which frees

all other bits for the counter.

The rest is pie (to the extent possible for lockfree algorithms).

Here is an implementation for MSVC/Win32:

#define OBJECT_COUNT 4

#define OBJECT_MASK (OBJECT_COUNT-1)

#define COUNT_MASK (~OBJECT_MASK)

#define COUNT_INC OBJECT_COUNT

#define PERSISTENT 1

#define TEMPORAL 2

typedef struct lf_object_t

{

uintptr_t volatile rc; // "inner" counter

lf_object_t* volatile* back_ptr;

void (*dtor)(void* obj);

} lf_object_t;

struct lf_store_t

{

uintptr_t volatile state; // "outer" counter + index to

lf_store_t::objects

lf_object_t* volatile objects [OBJECT_COUNT];

CRITICAL_SECTION write_mtx;

};

void lf_store_create (lf_store_t*

store, lf_object_t* obj, void(*dtor)(void*))

{

size_t i;

store->state = 0;

InitializeCriticalSection(&store->write_mtx);

store->objects[0] = obj;

for (i = 1; i != OBJECT_COUNT; i += 1)

store->objects[i] = 0;

obj->rc = PERSISTENT;

obj->back_ptr = &store->objects[0];

obj->dtor = dtor;

}

static void lf_store_release_object (lf_object_t* obj)

{

assert(obj->rc == 0);

assert(obj->back_ptr[0] == obj);

obj->back_ptr[0] = 0;

obj->dtor(obj);

}

void lf_store_destroy (lf_store_t*

store)

{

uintptr_t idx;

lf_object_t* obj;

idx = store->state & OBJECT_MASK;

obj = store->objects[idx];

obj->rc -= (store->state & COUNT_MASK) / OBJECT_COUNT * TEMPORAL +

PERSISTENT;

lf_store_release_object(obj);

DeleteCriticalSection(&store->write_mtx);

}

lf_object_t* lf_store_read_acquire (lf_store_t*

store)

{

uintptr_t prev;

uintptr_t idx;

// completely wait-free

// increment outer counter and simultaneously read index of the

current object

prev = (uintptr_t)_InterlockedExchangeAdd((long volatile*)&store-

>state, COUNT_INC);

idx = prev & OBJECT_MASK;

return store->objects[idx];

}

void lf_store_read_release (lf_object_t* obj)

{

uintptr_t prev;

// increment inner counter

prev = (uintptr_t)_InterlockedExchangeAdd((long volatile*)&obj-

>rc, TEMPORAL) + TEMPORAL;

if (prev == 0)

lf_store_release_object(obj);

}

lf_object_t* lf_store_write_lock (lf_store_t*

store)

{

uintptr_t idx;

EnterCriticalSection(&store->write_mtx);

idx = store->state & OBJECT_MASK;

return store->objects[idx];

}

void lf_store_write_unlock (lf_store_t*

store, lf_object_t* obj, void(*dtor)(void*))

{

uintptr_t prev;

uintptr_t idx;

uintptr_t old_cnt;

uintptr_t old_idx;

uintptr_t cnt_dif;

uintptr_t cnt_res;

lf_object_t* old_obj;

// find free object slot

for (;;)

{

for (idx = 0; idx != OBJECT_COUNT; idx += 1)

{

if (store->objects[idx] == 0)

break;

}

if (idx != OBJECT_COUNT)

break;

SwitchToThread();

}

// prepare the object for publication

store->objects[idx] = obj;

obj->rc = PERSISTENT;

obj->back_ptr = &store->objects[idx];

obj->dtor = dtor;

// publish the object

// and simultaneously grab previous value of the outer counter

prev = (uintptr_t)_InterlockedExchange((long volatile*)&store-

>state, idx);

old_cnt = prev & COUNT_MASK;

old_idx = prev & OBJECT_MASK;

old_obj = store->objects[old_idx];

assert(old_idx != idx);

assert(old_obj->back_ptr == &store->objects[old_idx]);

// transfer value of the outer counter to the inner counter

// only now object's inner counter can drop to zero

cnt_dif = (uintptr_t)-(intptr_t)(old_cnt / OBJECT_COUNT * TEMPORAL

+ PERSISTENT);

cnt_res = (uintptr_t)_InterlockedExchangeAdd((long

volatile*)&old_obj->rc, cnt_dif) + cnt_dif;

LeaveCriticalSection(&store->write_mtx);

if (cnt_res == 0)

lf_store_release_object(old_obj);

}

Some additional comments. So when there are already OBJECT_COUNT alive

objects, and a writer wants to install a new version, he waits until

at least one old version will go away (some form of automatic overload

control). However, be careful - that introduces implicit dependency

between threads and can lead to unexpected deadlocks (writer waits for

readers to release an old version, while readers are blocked on some

resource acquired by the writer).

The algorithm can be easily modified to incorporate an object pool, so

that there are always OBJECT_COUNT objects that are are reused as

needed (no need to constantly allocate/deallocate objects).

lf_store_t::objects always holds pointers to objects, while separate

parallel array of atomic flags tracks in-use/free indication (or

perhaps lsb of lf_store_t::objects is used as in-use/free flag).

I did not test it with Relacy Race Detector:

http://groups.google.com/group/relacy

so potentially there are some minor bugs, but I believe the algorithm

is working as a whole.

Here is small single-threaded test-case + necessary includes:

#include <windows.h>

#include <intrin.h>

#include <assert.h>

static long volatile my_object_count;

struct my_object : lf_object_t

{

int data;

my_object(int data)

: data(data)

{

_InterlockedIncrement(&my_object_count);

}

~my_object()

{

_InterlockedDecrement(&my_object_count);

}

static void dtor(void* obj)

{

delete static_cast<my_object*>(obj);

}

};

int main()

{

lf_store_t store;

lf_store_create(&store, new my_object (1), &my_object::dtor);

{

my_object* obj = (my_object*)lf_store_read_acquire(&store);

assert(obj->data == 1);

lf_store_read_release(obj);

}

{

my_object* obj = (my_object*)lf_store_write_lock(&store);

assert(obj->data == 1);

lf_store_write_unlock(&store, new my_object (2),

&my_object::dtor);

}

{

my_object* obj = (my_object*)lf_store_read_acquire(&store);

assert(obj->data == 2);

my_object* obj2 = (my_object*)lf_store_write_lock(&store);

assert(obj2->data == 2);

lf_store_write_unlock(&store, new my_object (3),

&my_object::dtor);

lf_store_read_release(obj);

}

{

my_object* obj = (my_object*)lf_store_write_lock(&store);

assert(obj->data == 3);

lf_store_write_unlock(&store, new my_object (4),

&my_object::dtor);

}

lf_store_destroy(&store);

assert(my_object_count == 0);

}