phased_thread_team

Простенький вспомогательный компонет, который позволяет натравливать группу потоков на поэтапную задачу. Позволяет распараллеливать очень мелко-гранулярные задачи, т.к. синхронизация сведена к минимуму.

Пример использования:

int main()

{

std::vector<int> array1 (100);

std::vector<int> array2 (100);

phased_thread_team(3, [&](phaser_t& phaser, size_t thread_count, size_t thread_index)

{

uint64_t phase = 0;

for (size_t i = 0; i != 10; i += 1)

{

size_t my_begin = array1.size() / thread_count * thread_index + std::min(thread_index, array1.size() % thread_count);

size_t my_end = array1.size() / thread_count * (thread_index + 1) + std::min(thread_index + 1, array1.size() % thread_count);

printf("phase %Iu, thread %Iu, processing %Iu-%Iu\n", i, thread_index, my_begin, my_end);

for (size_t j = my_begin; j != my_end; j += 1)

array1[j] += array2[j];

phaser.wait(phase);

}

});

}

Вывод:

phase 0, thread 0, processing 0-34

phase 0, thread 1, processing 34-67

phase 0, thread 2, processing 67-100

phase 1, thread 2, processing 67-100

phase 1, thread 0, processing 0-34

phase 1, thread 1, processing 34-67

phase 2, thread 1, processing 34-67

phase 2, thread 2, processing 67-100

phase 2, thread 0, processing 0-34

phase 3, thread 0, processing 0-34

phase 3, thread 1, processing 34-67

phase 3, thread 2, processing 67-100

phase 4, thread 2, processing 67-100

phase 4, thread 1, processing 34-67

phase 4, thread 0, processing 0-34

phase 5, thread 0, processing 0-34

phase 5, thread 1, processing 34-67

phase 5, thread 2, processing 67-100

phase 6, thread 2, processing 67-100

phase 6, thread 1, processing 34-67

phase 6, thread 0, processing 0-34

phase 7, thread 0, processing 0-34

phase 7, thread 1, processing 34-67

phase 7, thread 2, processing 67-100

phase 8, thread 2, processing 67-100

phase 8, thread 1, processing 34-67

phase 8, thread 0, processing 0-34

phase 9, thread 0, processing 0-34

phase 9, thread 1, processing 34-67

phase 9, thread 2, processing 67-100

Реализация:

#define NOMINMAX

#include <windows.h>

#include <process.h>

#include <intrin.h>

#include <stdio.h>

#include <vector>

#include <algorithm>

typedef unsigned __int64 uint64_t;

class phaser_t

{

public:

phaser_t(uint64_t thread_count)

: thread_count_(thread_count)

, phase_()

{

}

void wait(uint64_t& local_phase)

{

local_phase += thread_count_;

uint64_t phase = _InterlockedIncrement64((__int64 volatile*)&phase_);

while (phase < local_phase)

{

_mm_pause();

phase = phase_;

}

}

private:

uint64_t const thread_count_;

char pad1_ [64];

uint64_t volatile phase_;

char pad2_ [64];

phaser_t(phaser_t const&);

void operator = (phaser_t const&);

};

template<typename body_t>

void phased_thread_team(size_t thread_count, body_t const& body)

{

struct context_t

{

HANDLE thread;

phaser_t* phaser;

size_t thread_count;

size_t thread_index;

body_t* body;

static unsigned __stdcall thread_func(void* p)

{

context_t& ctx = *static_cast<context_t*>(p);

(*ctx.body)(*ctx.phaser, ctx.thread_count, ctx.thread_index);

return 0;

}

};

if (thread_count == 0)

{

SYSTEM_INFO info = {};

GetSystemInfo(&info);

thread_count = info.dwNumberOfProcessors;

}

phaser_t phaser (thread_count);

std::vector<context_t> threads (thread_count);

for (size_t i = 0; i != thread_count; i += 1)

{

threads[i].phaser = &phaser;

threads[i].thread_count = thread_count;

threads[i].thread_index = i;

threads[i].body = const_cast<body_t*>(&body);

if (i)

threads[i].thread = (HANDLE)_beginthreadex(0, 0, &context_t::thread_func, &threads[i], 0, 0);

}

context_t::thread_func(&threads[0]);

for (size_t i = 0; i != thread_count; i += 1)

{

if (i)

{

WaitForSingleObject(threads[i].thread, INFINITE);

CloseHandle(threads[i].thread);

}

}

}