36 #ifndef VIGRA_THREADPOOL_HXX 37 #define VIGRA_THREADPOOL_HXX 46 #include <condition_variable> 49 #include "mathutil.hxx" 50 #include "counting_iterator.hxx" 79 : numThreads_(actualNumThreads(
Auto))
101 return std::max(1,numThreads_);
118 numThreads_ = actualNumThreads(n);
125 static size_t actualNumThreads(
const int userNThreads)
127 #ifdef VIGRA_SINGLE_THREADED 130 return userNThreads >= 0
132 : userNThreads ==
Nice 133 ? std::thread::hardware_concurrency() / 2
134 : std::thread::hardware_concurrency();
198 std::future<typename std::result_of<F(int)>::type> enqueueReturning(F&& f) ;
206 std::future<void> enqueue(F&& f) ;
213 std::unique_lock<std::mutex> lock(queue_mutex);
214 finish_condition.wait(lock, [
this](){
return tasks.empty() && (busy == 0); });
222 return workers.size();
231 std::vector<std::thread> workers;
234 std::queue<std::function<void(int)> > tasks;
237 std::mutex queue_mutex;
238 std::condition_variable worker_condition;
239 std::condition_variable finish_condition;
241 std::atomic<unsigned int> busy, processed;
247 for(
size_t ti = 0; ti<actualNThreads; ++ti)
249 workers.emplace_back(
254 std::function<void(int)> task;
256 std::unique_lock<std::mutex> lock(this->queue_mutex);
263 this->worker_condition.wait(lock, [
this]{
return this->stop || !this->tasks.empty(); });
264 if(!this->tasks.empty())
267 task = std::move(this->tasks.front());
273 finish_condition.notify_one();
289 std::unique_lock<std::mutex> lock(queue_mutex);
292 worker_condition.notify_all();
293 for(std::thread &worker: workers)
298 inline std::future<typename std::result_of<F(int)>::type>
301 typedef typename std::result_of<F(int)>::type result_type;
302 typedef std::packaged_task<result_type(int)> PackageType;
304 auto task = std::make_shared<PackageType>(f);
305 auto res = task->get_future();
307 if(workers.size()>0){
309 std::unique_lock<std::mutex> lock(queue_mutex);
313 throw std::runtime_error(
"enqueue on stopped ThreadPool");
322 worker_condition.notify_one();
332 inline std::future<void>
335 typedef std::packaged_task<void(int)> PackageType;
337 auto task = std::make_shared<PackageType>(f);
338 auto res = task->get_future();
339 if(workers.size()>0){
341 std::unique_lock<std::mutex> lock(queue_mutex);
345 throw std::runtime_error(
"enqueue on stopped ThreadPool");
354 worker_condition.notify_one();
369 template<
class ITER,
class F>
370 inline void parallel_foreach_impl(
372 const std::ptrdiff_t nItems,
376 std::random_access_iterator_tag
378 std::ptrdiff_t workload = std::distance(iter, end);
379 vigra_precondition(workload == nItems || nItems == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
380 const float workPerThread = float(workload)/pool.
nThreads();
381 const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(
roundi(workPerThread/3.0), 1);
383 std::vector<std::future<void> > futures;
384 for( ;iter<end; iter+=chunkedWorkPerThread)
386 const size_t lc = std::min(workload, chunkedWorkPerThread);
388 futures.emplace_back(
393 for(size_t i=0; i<lc; ++i)
399 for (
auto & fut : futures)
408 template<
class ITER,
class F>
409 inline void parallel_foreach_impl(
411 const std::ptrdiff_t nItems,
415 std::forward_iterator_tag
418 nItems = std::distance(iter, end);
420 std::ptrdiff_t workload = nItems;
421 const float workPerThread = float(workload)/pool.
nThreads();
422 const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(
roundi(workPerThread/3.0), 1);
424 std::vector<std::future<void> > futures;
427 const size_t lc = std::min(chunkedWorkPerThread, workload);
429 futures.emplace_back(
434 auto iterCopy = iter;
435 for(size_t i=0; i<lc; ++i){
442 for (
size_t i = 0; i < lc; ++i)
447 vigra_postcondition(workload == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
454 for (
auto & fut : futures)
461 template<
class ITER,
class F>
462 inline void parallel_foreach_impl(
464 const std::ptrdiff_t nItems,
468 std::input_iterator_tag
470 size_t num_items = 0;
471 std::vector<std::future<void> > futures;
472 for (; iter != end; ++iter)
475 futures.emplace_back(
484 vigra_postcondition(num_items == nItems || nItems == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
485 for (
auto & fut : futures)
491 template<
class ITER,
class F>
492 inline void parallel_foreach_single_thread(
496 const std::ptrdiff_t nItems = 0
499 for (; begin != end; ++begin)
504 vigra_postcondition(n == nItems || nItems == 0,
"parallel_foreach(): Mismatch between num items and begin/end.");
608 template<
class ITER,
class F>
614 const std::ptrdiff_t nItems = 0)
618 parallel_foreach_impl(pool,nItems, begin, end, f,
619 typename std::iterator_traits<ITER>::iterator_category());
623 parallel_foreach_single_thread(begin, end, f, nItems);
627 template<
class ITER,
class F>
633 const std::ptrdiff_t nItems = 0)
643 std::ptrdiff_t nItems,
646 auto iter = range(nItems);
654 std::ptrdiff_t nItems,
657 auto iter = range(nItems);
665 #endif // VIGRA_THREADPOOL_HXX Int32 roundi(FixedPoint16< IntBits, OverflowHandling > v)
rounding to the nearest integer.
Definition: fixedpoint.hxx:1775
std::future< void > enqueue(F &&f)
Definition: threadpool.hxx:333
int getNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:90
ParallelOptions & numThreads(const int n)
Set the number of threads or one of the constants Auto, Nice and NoThreads.
Definition: threadpool.hxx:116
std::future< typename std::result_of< F(int)>::type > enqueueReturning(F &&f)
Definition: threadpool.hxx:299
Determine number of threads automatically (from std::thread::hardware_concurrency()) ...
Definition: threadpool.hxx:73
Definition: accessor.hxx:43
doxygen_overloaded_function(template<... > void separableConvolveBlockwise) template< unsigned int N
Separated convolution on ChunkedArrays.
~ThreadPool()
Definition: threadpool.hxx:286
Thread pool class to manage a set of parallel workers.
Definition: threadpool.hxx:152
size_t nThreads() const
Definition: threadpool.hxx:220
void parallel_foreach(...)
Apply a functor to all items in a range in parallel.
Option base class for parallel algorithms.
Definition: threadpool.hxx:66
void waitFinished()
Definition: threadpool.hxx:211
Use half as many threads as Auto would.
Definition: threadpool.hxx:74
Switch off multi-threading (i.e. execute tasks sequentially)
Definition: threadpool.hxx:75
ThreadPool(const int n)
Definition: threadpool.hxx:179
int getActualNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:99
ThreadPool(const ParallelOptions &options)
Definition: threadpool.hxx:161