Home‎ > ‎In Russian‎ > ‎

phased_thread_team

Простенький вспомогательный компонет, который позволяет натравливать группу потоков на поэтапную задачу. Позволяет распараллеливать очень мелко-гранулярные задачи, т.к. синхронизация сведена к минимуму.
Пример использования:
int main()
{
    std::vector<int> array1 (100);
    std::vector<int> array2 (100);

    phased_thread_team(3, [&](phaser_t& phaser, size_t thread_count, size_t thread_index)
    {
        uint64_t phase = 0;
        for (size_t i = 0; i != 10; i += 1)
        {
            size_t my_begin = array1.size() / thread_count * thread_index + std::min(thread_index, array1.size() % thread_count);
            size_t my_end = array1.size() / thread_count * (thread_index + 1) + std::min(thread_index + 1, array1.size() % thread_count);
            printf("phase %Iu, thread %Iu, processing %Iu-%Iu\n", i, thread_index, my_begin, my_end);
            for (size_t j = my_begin; j != my_end; j += 1)
                array1[j] += array2[j];
            phaser.wait(phase);
        }
    });
}

Вывод:
    phase 0, thread 0, processing 0-34
    phase 0, thread 1, processing 34-67
    phase 0, thread 2, processing 67-100
    phase 1, thread 2, processing 67-100
    phase 1, thread 0, processing 0-34
    phase 1, thread 1, processing 34-67
    phase 2, thread 1, processing 34-67
    phase 2, thread 2, processing 67-100
    phase 2, thread 0, processing 0-34
    phase 3, thread 0, processing 0-34
    phase 3, thread 1, processing 34-67
    phase 3, thread 2, processing 67-100
    phase 4, thread 2, processing 67-100
    phase 4, thread 1, processing 34-67
    phase 4, thread 0, processing 0-34
    phase 5, thread 0, processing 0-34
    phase 5, thread 1, processing 34-67
    phase 5, thread 2, processing 67-100
    phase 6, thread 2, processing 67-100
    phase 6, thread 1, processing 34-67
    phase 6, thread 0, processing 0-34
    phase 7, thread 0, processing 0-34
    phase 7, thread 1, processing 34-67
    phase 7, thread 2, processing 67-100
    phase 8, thread 2, processing 67-100
    phase 8, thread 1, processing 34-67
    phase 8, thread 0, processing 0-34
    phase 9, thread 0, processing 0-34
    phase 9, thread 1, processing 34-67
    phase 9, thread 2, processing 67-100

Реализация:
#define NOMINMAX
#include <windows.h>
#include <process.h>
#include <intrin.h>
#include <stdio.h>
#include <vector>
#include <algorithm>

typedef unsigned __int64 uint64_t;

class phaser_t
{
public:
    phaser_t(uint64_t thread_count)
        : thread_count_(thread_count)
        , phase_()
    {
    }

    void wait(uint64_t& local_phase)
    {
        local_phase += thread_count_;
        uint64_t phase = _InterlockedIncrement64((__int64 volatile*)&phase_);
        while (phase < local_phase)
        {
            _mm_pause();
            phase = phase_;
        }
   }

private:
    uint64_t const              thread_count_;
    char                        pad1_ [64];
    uint64_t volatile           phase_;
    char                        pad2_ [64];

    phaser_t(phaser_t const&);
    void operator = (phaser_t const&);
};

template<typename body_t>
void phased_thread_team(size_t thread_count, body_t const& body)
{
    struct context_t
    {
        HANDLE                      thread;
        phaser_t*                   phaser;
        size_t                      thread_count;
        size_t                      thread_index;
        body_t*                     body;

        static unsigned __stdcall thread_func(void* p)
        {
            context_t& ctx = *static_cast<context_t*>(p);
            (*ctx.body)(*ctx.phaser, ctx.thread_count, ctx.thread_index);
            return 0;
        }
    };

    if (thread_count == 0)
    {
        SYSTEM_INFO info = {};
        GetSystemInfo(&info);
        thread_count = info.dwNumberOfProcessors;
    }

    phaser_t phaser (thread_count);
    std::vector<context_t> threads (thread_count);
    for (size_t i = 0; i != thread_count; i += 1)
    {
        threads[i].phaser = &phaser;
        threads[i].thread_count = thread_count;
        threads[i].thread_index = i;
        threads[i].body = const_cast<body_t*>(&body);
        if (i)
            threads[i].thread = (HANDLE)_beginthreadex(0, 0, &context_t::thread_func, &threads[i], 0, 0);
    }

    context_t::thread_func(&threads[0]);

    for (size_t i = 0; i != thread_count; i += 1)
    {
        if (i)
        {
            WaitForSingleObject(threads[i].thread, INFINITE);
            CloseHandle(threads[i].thread);
        }
    }
}

Comments