Home‎ > ‎Parallel Computing‎ > ‎Concurrent Skip List‎ > ‎

Scheduler Algorithm

Here is a bit simplified code of my scheduler (some functions are omitted for brevity):

class worker_thread
{
    // global index of current thread
    size_t                        my_index;
    // local tasks
    deque<task_t>                 tasks;
    // 1-element queue for work requests from other threads
    atomic<size_t>                steal_req;
    // 1-element queue for work request acknowledges from other threads
    atomic<size_t>                steal_ack;

    // main thread loop
    void loop()
    {
        for (;;)
        {
            // while we have local work process it
            while (tasks.size() != 0)
            {
                // pop task in LIFO order
                task_t task = tasks.back();
                tasks.pop_back();

                // user processing
                execute(task);

                // check for steal requests from other threads
                if (steal_req.load(memory_order_relaxed) != no_requests)
                    process_work_request();
            }

            // try to get some work from other threads
            if (false == send_steal_request())
                break;
        }
    }

    void process_work_request()
    {
        // get thief descriptor
        size_t thief_idx = steal_req.load(memory_order_relaxed);
        worker_thread&; thief = get_thread(thief_idx);
        if (tasks.size())
        {
            // pop task in FIFO order
            task_t task = tasks.back();
            tasks.pop_back();

            // synchronous user processing
            steal(task);

            // give it to the thift
            thief.tasks.push_back(task);
        }
        // notify the thief that the operation is completed
        thief.steal_ack.store(1, memory_order_release);
    }

    bool send_steal_request()
    {
        for (;;)
        {
            // choose a victim
            size_t victim_idx = choose_randomly();
            worker_thread&; victim = get_thread(victim_idx);

            // send a request to it (if it's not busy processing another request)
            steal_ack.store(0, memory_order_relaxed);
            size_t cmp = victim.steal_req.load(memory_order_relaxed);
            for (;;)
            {
                if (cmp != (size_t)-1)
                    break;
                if (victim.steal_req.compare_exchange_strong(cmp, my_index, memory_order_acq_rel))
                    break;
            }
            // request is sent?
            if (cmp == no_requests)
            {
                // wait for ack
                while (steal_ack.load(memory_order_acquire) == 0)
                    Sleep(0);
                // check as to whether we got some work or not
                if (tasks.size())
                    return true;
            }
            // termination condition
            if (no_work_in_the_system())
                return false;
        }
    }
};

Move on to Performance Result

Comments