Простенький вспомогательный компонет, который позволяет натравливать группу потоков на поэтапную задачу. Позволяет распараллеливать очень мелко-гранулярные задачи, т.к. синхронизация сведена к минимуму. Пример использования: int main() { std::vector<int> array1 (100); std::vector<int> array2 (100); phased_thread_team(3, [&](phaser_t& phaser, size_t thread_count, size_t thread_index) { uint64_t phase = 0; for (size_t i = 0; i != 10; i += 1) { size_t my_begin = array1.size() / thread_count * thread_index + std::min(thread_index, array1.size() % thread_count); size_t my_end = array1.size() / thread_count * (thread_index + 1) + std::min(thread_index + 1, array1.size() % thread_count); printf("phase %Iu, thread %Iu, processing %Iu-%Iu\n", i, thread_index, my_begin, my_end); for (size_t j = my_begin; j != my_end; j += 1) array1[j] += array2[j]; phaser.wait(phase); } }); } Вывод: phase 0, thread 0, processing 0-34 phase 0, thread 1, processing 34-67 phase 0, thread 2, processing 67-100 phase 1, thread 2, processing 67-100 phase 1, thread 0, processing 0-34 phase 1, thread 1, processing 34-67 phase 2, thread 1, processing 34-67 phase 2, thread 2, processing 67-100 phase 2, thread 0, processing 0-34 phase 3, thread 0, processing 0-34 phase 3, thread 1, processing 34-67 phase 3, thread 2, processing 67-100 phase 4, thread 2, processing 67-100 phase 4, thread 1, processing 34-67 phase 4, thread 0, processing 0-34 phase 5, thread 0, processing 0-34 phase 5, thread 1, processing 34-67 phase 5, thread 2, processing 67-100 phase 6, thread 2, processing 67-100 phase 6, thread 1, processing 34-67 phase 6, thread 0, processing 0-34 phase 7, thread 0, processing 0-34 phase 7, thread 1, processing 34-67 phase 7, thread 2, processing 67-100 phase 8, thread 2, processing 67-100 phase 8, thread 1, processing 34-67 phase 8, thread 0, processing 0-34 phase 9, thread 0, processing 0-34 phase 9, thread 1, processing 34-67 phase 9, thread 2, processing 67-100 Реализация: #define NOMINMAX #include <windows.h> #include <process.h> #include <intrin.h> #include <stdio.h> #include <vector> #include <algorithm> typedef unsigned __int64 uint64_t; class phaser_t { public: phaser_t(uint64_t thread_count) : thread_count_(thread_count) , phase_() { } void wait(uint64_t& local_phase) { local_phase += thread_count_; uint64_t phase = _InterlockedIncrement64((__int64 volatile*)&phase_); while (phase < local_phase) { _mm_pause(); phase = phase_; } } private: uint64_t const thread_count_; char pad1_ [64]; uint64_t volatile phase_; char pad2_ [64]; phaser_t(phaser_t const&); void operator = (phaser_t const&); }; template<typename body_t> void phased_thread_team(size_t thread_count, body_t const& body) { struct context_t { HANDLE thread; phaser_t* phaser; size_t thread_count; size_t thread_index; body_t* body; static unsigned __stdcall thread_func(void* p) { context_t& ctx = *static_cast<context_t*>(p); (*ctx.body)(*ctx.phaser, ctx.thread_count, ctx.thread_index); return 0; } }; if (thread_count == 0) { SYSTEM_INFO info = {}; GetSystemInfo(&info); thread_count = info.dwNumberOfProcessors; } phaser_t phaser (thread_count); std::vector<context_t> threads (thread_count); for (size_t i = 0; i != thread_count; i += 1) { threads[i].phaser = &phaser; threads[i].thread_count = thread_count; threads[i].thread_index = i; threads[i].body = const_cast<body_t*>(&body); if (i) threads[i].thread = (HANDLE)_beginthreadex(0, 0, &context_t::thread_func, &threads[i], 0, 0); } context_t::thread_func(&threads[0]); for (size_t i = 0; i != thread_count; i += 1) { if (i) { WaitForSingleObject(threads[i].thread, INFINITE); CloseHandle(threads[i].thread); } } } |