Differential Reference Counting: Implementation

First, let's outline the data structures and layouts. What we need is a base class for reference-counted objects, strong pointer layout and definitions of 2 types of pointers:
struct drc_base
{
    // low word is 'strong inner' counter
    // high word is 'basic inner' counter
    atomic_dword state;

    // destruction function
    void (*dtor)(drc_base*);
};

struct strong_ref
{
    // low word is pointer
    // high word is 'outer' counter
    atomic_dword state;
};

typedef drc_base* basic_ptr;
typedef strong_ref* strong_ptr;

The interface we are going to implement is as follows:
/** Initializes the reference-counted object pointed by 'obj'
 *  'dtor' will be used to destroy the object,
 *  when number of references drops to 0
 */
void        basic_init              (basic_ptr ptr, void (*dtor)(drc_base*));

/** Plain acquire with basic thread-safety
 *  Can be called only if a thread already owns a reference to the object
 */
void        basic_acquire           (basic_ptr ptr);

/** Releases a reference to the object pointed by 'obj'
 */
void        basic_release           (basic_ptr ptr);

/** Initializes strong pointer 'ptr' with initial value of 'obj'
 */
void        strong_init             (strong_ptr ptr, basic_ptr obj);

/** Strongly thread-safe acquire
 */
basic_ptr   strong_acquire          (strong_ptr ptr);

/** Stores object 'obj' to the pointer 'ptr'
 */
void        strong_store            (strong_ptr ptr, basic_ptr obj);

/** Stores object 'obj' to the pointer 'ptr',
 *  and returns previous value
 */
basic_ptr   strong_exchange         (strong_ptr ptr, basic_ptr obj);

/** Stores object 'xchg' to the pointer 'ptr'
 *  if it's value is equal to '*cmp',
 *  on failure updates 'cmp' with current value
 */
bool        strong_compare_exchange (strong_ptr ptr,
                basic_ptr* cmp, basic_ptr xchg);

Invariants and logic we are going to implement are as follows. Inner strong counter is the number of strong pointers to the object out there. Inner basic counter is the number of usual references to the object out there. When both counters drop to zero, the object is destroyed. Outer counter temporarily collects increments for the inner basic counter, which are transfered to the latter when the strong pointer is updated to point to another object (before that the object can't be destroyed anyway, because, well, there is a strong pointer to it).

Let's put it all together:

Below is the 'basic' part of implementation, I hope it's mostly evident:
void basic_init(basic_ptr ptr, void (*dtor)(drc_base*))
{
    // crate it with 0 strong references and 1 basic reference
    atomic_dword_store(&ptr->state, 0, 1, memory_order_relaxed);
    ptr->dtor = dtor;
}

void basic_acquire(basic_ptr ptr)
{
    // add 1 basic reference
    basic_acquire(ptr, 0, 1);
}

void basic_release(basic_ptr ptr)
{
    // remove 1 basic reference
    basic_release(ptr, 0, 1);
}

// helper function, used elsewhere
static void basic_acquire(basic_ptr ptr, uintptr_t strong, uintptr_t basic)
{
    uintptr_t prev_lo, prev_hi; // we are not interested in previous values here
    atomic_dword_fetch_add(&ptr->state, strong, basic, &prev_lo, &prev_hi, memory_order_relaxed);
}

// helper function, used elsewhere
static void basic_release(basic_ptr ptr, uintptr_t strong, uintptr_t basic)
{
    uintptr_t prev_lo, prev_hi;
    atomic_dword_fetch_add(&ptr->state, -(intptr_t)strong, -(intptr_t)basic, &prev_lo, &prev_hi, memory_order_relaxed);
    // if both counters drop to zero, destroy the object
    if (prev_lo - strong == 0 && prev_hi - basic == 0)
        ptr->dtor(ptr);
}

And here is the 'strong' part, it's trickier, so pay attention to details (and comments):
void strong_init(strong_ptr ptr, basic_ptr obj)
{
    // first, we need to announce another strong pointer to the object
    basic_acquire(obj, 1, 0);
    // then setup the strong pointer with 0 'outer' counts
    atomic_dword_store(&ptr->state, (uintptr_t)obj, 0, memory_order_relaxed);
}

basic_ptr strong_acquire(strong_ptr ptr)
{
    uintptr_t prev_lo, prev_hi;
    // atomically increment 'outer' counter and load the current pointer
    atomic_dword_fetch_add(&ptr->state, 0, 1, &prev_lo, &prev_hi, memory_order_acquire);
    return (basic_ptr)prev_lo;
}

void strong_store(strong_ptr ptr, basic_ptr obj)
{
    if (obj)
    {
        // since we are going to store it in a strong pointer,
        // we need to increment the inner strong counter
        basic_acquire(obj, 1, 0);
    }
    uintptr_t prev_obj, prev_count;
    // atomically exchange pointer and counter
    atomic_dword_exchange(&ptr->state, (uintptr_t)obj, 0, &prev_obj, &prev_count, memory_order_acq_rel);
    if (prev_obj)
    {
        // pay attention to what is happening here
        // we are revoking 1 strong reference, since the object is not stored in the strong pointer anymore
        // plus, we are transferring cached in the outer counter references to the inner basic counter
        basic_release((basic_ptr)prev_obj, 1, -(intptr_t)prev_count);
    }
}

basic_ptr strong_exchange(strong_ptr ptr, basic_ptr obj)
{
    // the function mostly repeats strong_store()
    // except that we hold back 1 basic reference,
    // because the function returns pointer to an object,
    // and the object must be acquired, otherwise it can be destroyed

    if (obj)
        basic_acquire(obj, 1, 0);
    uintptr_t prev_obj, prev_count;
    atomic_dword_exchange(&ptr->state, (uintptr_t)obj, 0, &prev_obj, &prev_count, memory_order_acq_rel);
    if (prev_obj)
        basic_release((basic_ptr)prev_obj, 1, -(intptr_t)(prev_count + 1));
    return (basic_ptr)prev_obj;
}

bool strong_compare_exchange(strong_ptr ptr, basic_ptr* cmp, basic_ptr xchg)
{
    if (xchg)
    {
        // since we are going to store it in a strong pointer,
        // we need to increment the inner strong counter
        basic_acquire(xchg, 1, 0);
    }
    uintptr_t cur_lo, cur_hi;
    // we need to do initial load in order to obtain the counter value
    atomic_dword_load(&ptr->state, &cur_lo, &cur_hi, memory_order_relaxed);
    for (;;)
    {
        // compare pointer values
        if (*cmp != (basic_ptr)cur_lo)
        {
            // if the pointers do not match,
            // we need to release comparand,
            // because we are going to overwrite it
            if (*cmp)
                basic_release(*cmp, 0, 1);
            // then, acquire current value and store it in cmp
            // (strictly saying, we can acquire another object, not the cur_lo,
            // and theretically it can be equal to cmp, so it's kind of weak CAS,
            // that is, it can fail spuriously)
            *cmp = strong_acquire(ptr);
            // and finally revert inner strong counter,
            // since we did not stored it in the strong pointer
            if (xchg)
                basic_release(xchg, 1, 0);
            return false;
        }
        if (atomic_dword_compare_exchange(&ptr->state, &cur_lo, &cur_hi, (uintptr_t)xchg, 0, memory_order_acq_rel))
        {
            // it the CAS succeeded, update counters as in strong_store()
            if (cur_lo)
                basic_release((basic_ptr)cur_lo, 1, -(intptr_t)cur_hi);
            return true;
        }
        // otherwise, retry the operation
        // (perhaps, it's just the counter that is changed)
    }
}


Note that in 64-bit mode inner counters can be 32-bit and consequently combined into a single word, this can be of help if a 64-bit architecture does not support double-word atomic operations. Moreover, x86 architecture (IA-32, Intel64) supports only double-word CAS (but not XADD, XCHG), so current implementation is only lockfree on it (because of the CAS-loops); if counters are combined into a single word then the implementation will become wait-free.
Also in 64-bit mode, pointers frequently can be "compressed", for example, on Intel64/Windows it's possible to compress pointers into 39 bits, so 25 bits remain for the counter. This allows to pack strong pointer (ptr+counter) into a single word as well.

Here is an example of how strongly thread-safe reference counting can be applied to an abstract network router, it's kind of the main usage pattern:
struct routing_table : drc_base
{
    // ...
};

strong_ref g_routing_table;

void worker_thread()
{
    for (;;)
    {
        request_t* req = get_request();
        routing_table* table = (routing_table*)strong_acquire(&g_routing_table);
        route(req, table);
        basic_release(table);
    }
}

void updater_thread()
{
    for (;;)
    {
        std::this_thread::sleep(1000);
        routing_table* current_table = (routing_table*)strong_acquire(&g_routing_table);
        routing_table* new_table = create_new_routing_table(current_table);
        strong_store(&g_routing_table, new_table);
        basic_release(current_table);
        basic_release(new_table);
    }
}

Credits for the algorithm go to Joseph Seigh and his awesome Atomic-Ptr-Plus, Chris Thomasson and his proxy-collector, and finally to Alexander Shuvaev who invented how to make the algorithm waitfree.

I've attached the full implementation along with small single-threaded test and implementation of atomic_dword for IA-32/Intel64 below.

ċ
differential_reference_counting.zip
(6k)
Dmitry Vyukov,
Jan 7, 2011, 8:44 AM
Comments