phased_thread_team
Простенький вспомогательный компонет, который позволяет натравливать группу потоков на поэтапную задачу. Позволяет распараллеливать очень мелко-гранулярные задачи, т.к. синхронизация сведена к минимуму.
Пример использования:
int main()
{
std::vector<int> array1 (100);
std::vector<int> array2 (100);
phased_thread_team(3, [&](phaser_t& phaser, size_t thread_count, size_t thread_index)
{
uint64_t phase = 0;
for (size_t i = 0; i != 10; i += 1)
{
size_t my_begin = array1.size() / thread_count * thread_index + std::min(thread_index, array1.size() % thread_count);
size_t my_end = array1.size() / thread_count * (thread_index + 1) + std::min(thread_index + 1, array1.size() % thread_count);
printf("phase %Iu, thread %Iu, processing %Iu-%Iu\n", i, thread_index, my_begin, my_end);
for (size_t j = my_begin; j != my_end; j += 1)
array1[j] += array2[j];
phaser.wait(phase);
}
});
}
Вывод:
phase 0, thread 0, processing 0-34
phase 0, thread 1, processing 34-67
phase 0, thread 2, processing 67-100
phase 1, thread 2, processing 67-100
phase 1, thread 0, processing 0-34
phase 1, thread 1, processing 34-67
phase 2, thread 1, processing 34-67
phase 2, thread 2, processing 67-100
phase 2, thread 0, processing 0-34
phase 3, thread 0, processing 0-34
phase 3, thread 1, processing 34-67
phase 3, thread 2, processing 67-100
phase 4, thread 2, processing 67-100
phase 4, thread 1, processing 34-67
phase 4, thread 0, processing 0-34
phase 5, thread 0, processing 0-34
phase 5, thread 1, processing 34-67
phase 5, thread 2, processing 67-100
phase 6, thread 2, processing 67-100
phase 6, thread 1, processing 34-67
phase 6, thread 0, processing 0-34
phase 7, thread 0, processing 0-34
phase 7, thread 1, processing 34-67
phase 7, thread 2, processing 67-100
phase 8, thread 2, processing 67-100
phase 8, thread 1, processing 34-67
phase 8, thread 0, processing 0-34
phase 9, thread 0, processing 0-34
phase 9, thread 1, processing 34-67
phase 9, thread 2, processing 67-100
Реализация:
#define NOMINMAX
#include <windows.h>
#include <process.h>
#include <intrin.h>
#include <stdio.h>
#include <vector>
#include <algorithm>
typedef unsigned __int64 uint64_t;
class phaser_t
{
public:
phaser_t(uint64_t thread_count)
: thread_count_(thread_count)
, phase_()
{
}
void wait(uint64_t& local_phase)
{
local_phase += thread_count_;
uint64_t phase = _InterlockedIncrement64((__int64 volatile*)&phase_);
while (phase < local_phase)
{
_mm_pause();
phase = phase_;
}
}
private:
uint64_t const thread_count_;
char pad1_ [64];
uint64_t volatile phase_;
char pad2_ [64];
phaser_t(phaser_t const&);
void operator = (phaser_t const&);
};
template<typename body_t>
void phased_thread_team(size_t thread_count, body_t const& body)
{
struct context_t
{
HANDLE thread;
phaser_t* phaser;
size_t thread_count;
size_t thread_index;
body_t* body;
static unsigned __stdcall thread_func(void* p)
{
context_t& ctx = *static_cast<context_t*>(p);
(*ctx.body)(*ctx.phaser, ctx.thread_count, ctx.thread_index);
return 0;
}
};
if (thread_count == 0)
{
SYSTEM_INFO info = {};
GetSystemInfo(&info);
thread_count = info.dwNumberOfProcessors;
}
phaser_t phaser (thread_count);
std::vector<context_t> threads (thread_count);
for (size_t i = 0; i != thread_count; i += 1)
{
threads[i].phaser = &phaser;
threads[i].thread_count = thread_count;
threads[i].thread_index = i;
threads[i].body = const_cast<body_t*>(&body);
if (i)
threads[i].thread = (HANDLE)_beginthreadex(0, 0, &context_t::thread_func, &threads[i], 0, 0);
}
context_t::thread_func(&threads[0]);
for (size_t i = 0; i != thread_count; i += 1)
{
if (i)
{
WaitForSingleObject(threads[i].thread, INFINITE);
CloseHandle(threads[i].thread);
}
}
}