[ VIGRA Homepage | Function Index | Class Index | Namespaces | File List | Main Page ]

threadpool.hxx VIGRA

1 /************************************************************************/
2 /* */
3 /* Copyright 2014-2015 by Thorsten Beier, Philip Schill */
4 /* and Ullrich Koethe */
5 /* */
6 /* This file is part of the VIGRA computer vision library. */
7 /* The VIGRA Website is */
8 /* http://hci.iwr.uni-heidelberg.de/vigra/ */
9 /* Please direct questions, bug reports, and contributions to */
10 /* ullrich.koethe@iwr.uni-heidelberg.de or */
11 /* vigra@informatik.uni-hamburg.de */
12 /* */
13 /* Permission is hereby granted, free of charge, to any person */
14 /* obtaining a copy of this software and associated documentation */
15 /* files (the "Software"), to deal in the Software without */
16 /* restriction, including without limitation the rights to use, */
17 /* copy, modify, merge, publish, distribute, sublicense, and/or */
18 /* sell copies of the Software, and to permit persons to whom the */
19 /* Software is furnished to do so, subject to the following */
20 /* conditions: */
21 /* */
22 /* The above copyright notice and this permission notice shall be */
23 /* included in all copies or substantial portions of the */
24 /* Software. */
25 /* */
26 /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
27 /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
28 /* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
29 /* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
30 /* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
31 /* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
32 /* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
33 /* OTHER DEALINGS IN THE SOFTWARE. */
34 /* */
35 /************************************************************************/
36 #ifndef VIGRA_THREADPOOL_HXX
37 #define VIGRA_THREADPOOL_HXX
38 
39 #include <functional>
40 #include <thread>
41 #include <atomic>
42 #include <vector>
43 #include <future>
44 #include <mutex>
45 #include <queue>
46 #include <condition_variable>
47 #include <stdexcept>
48 #include <cmath>
49 #include "mathutil.hxx"
50 #include "counting_iterator.hxx"
51 
52 
53 namespace vigra
54 {
55 
56 /** \addtogroup ParallelProcessing Functions and classes for parallel processing.
57 */
58 
59 //@{
60 
61  /**\brief Option base class for parallel algorithms.
62 
63  <b>\#include</b> <vigra/threadpool.hxx><br>
64  Namespace: vigra
65  */
67 {
68  public:
69 
70  /** Constants for special settings.
71  */
72  enum {
73  Auto = -1, ///< Determine number of threads automatically (from <tt>std::thread::hardware_concurrency()</tt>)
74  Nice = -2, ///< Use half as many threads as <tt>Auto</tt> would.
75  NoThreads = 0 ///< Switch off multi-threading (i.e. execute tasks sequentially)
76  };
77 
79  : numThreads_(actualNumThreads(Auto))
80  {}
81 
82  /** \brief Get desired number of threads.
83 
84  <b>Note:</b> This function may return 0, which means that multi-threading
85  shall be switched off entirely. If an algorithm receives this value,
86  it should revert to a sequential implementation. In contrast, if
87  <tt>numThread() == 1</tt>, the parallel algorithm version shall be
88  executed with a single thread.
89  */
90  int getNumThreads() const
91  {
92  return numThreads_;
93  }
94 
95  /** \brief Get desired number of threads.
96 
97  In contrast to <tt>numThread()</tt>, this will always return a value <tt>>=1</tt>.
98  */
99  int getActualNumThreads() const
100  {
101  return std::max(1,numThreads_);
102  }
103 
104  /** \brief Set the number of threads or one of the constants <tt>Auto</tt>,
105  <tt>Nice</tt> and <tt>NoThreads</tt>.
106 
107  Default: <tt>ParallelOptions::Auto</tt> (use system default)
108 
109  This setting is ignored if the preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>
110  is defined. Then, the number of threads is set to 0 and all tasks revert to
111  sequential algorithm implementations. The same can be achieved at runtime
112  by passing <tt>n = 0</tt> to this function. In contrast, passing <tt>n = 1</tt>
113  causes the parallel algorithm versions to be executed with a single thread.
114  Both possibilities are mainly useful for debugging.
115  */
117  {
118  numThreads_ = actualNumThreads(n);
119  return *this;
120  }
121 
122 
123  private:
124  // helper function to compute the actual number of threads
125  static size_t actualNumThreads(const int userNThreads)
126  {
127  #ifdef VIGRA_SINGLE_THREADED
128  return 0;
129  #else
130  return userNThreads >= 0
131  ? userNThreads
132  : userNThreads == Nice
133  ? std::thread::hardware_concurrency() / 2
134  : std::thread::hardware_concurrency();
135  #endif
136  }
137 
138  int numThreads_;
139 };
140 
141 /********************************************************/
142 /* */
143 /* ThreadPool */
144 /* */
145 /********************************************************/
146 
147  /**\brief Thread pool class to manage a set of parallel workers.
148 
149  <b>\#include</b> <vigra/threadpool.hxx><br>
150  Namespace: vigra
151  */
153 {
154  public:
155 
156  /** Create a thread pool from ParallelOptions. The constructor just launches
157  the desired number of workers. If the number of threads is zero,
158  no workers are started, and all tasks will be executed in synchronously
159  in the present thread.
160  */
161  ThreadPool(const ParallelOptions & options)
162  : stop(false),
163  busy(0),
164  processed(0)
165  {
166  init(options);
167  }
168 
169  /** Create a thread pool with n threads. The constructor just launches
170  the desired number of workers. If \arg n is <tt>ParallelOptions::Auto</tt>,
171  the number of threads is determined by <tt>std::thread::hardware_concurrency()</tt>.
172  <tt>ParallelOptions::Nice</tt> will create half as many threads.
173  If <tt>n = 0</tt>, no workers are started, and all tasks will be executed
174  synchronously in the present thread. If the preprocessor flag
175  <tt>VIGRA_SINGLE_THREADED</tt> is defined, the number of threads is always set
176  to zero (i.e. synchronous execution), regardless of the value of \arg n. This
177  is useful for debugging.
178  */
179  ThreadPool(const int n)
180  : stop(false),
181  busy(0),
182  processed(0)
183  {
184  init(ParallelOptions().numThreads(n));
185  }
186 
187  /**
188  * The destructor joins all threads.
189  */
190  ~ThreadPool();
191 
192  /**
193  * Enqueue a task that will be executed by the thread pool.
194  * The task result can be obtained using the get() function of the returned future.
195  * If the task throws an exception, it will be raised on the call to get().
196  */
197  template<class F>
198  std::future<typename std::result_of<F(int)>::type> enqueueReturning(F&& f) ;
199 
200  /**
201  * Enqueue function for tasks without return value.
202  * This is a special case of the enqueueReturning template function, but
203  * some compilers fail on <tt>std::result_of<F(int)>::type</tt> for void(int) functions.
204  */
205  template<class F>
206  std::future<void> enqueue(F&& f) ;
207 
208  /**
209  * Block until all tasks are finished.
210  */
212  {
213  std::unique_lock<std::mutex> lock(queue_mutex);
214  finish_condition.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
215  }
216 
217  /**
218  * Return the number of worker threads.
219  */
220  size_t nThreads() const
221  {
222  return workers.size();
223  }
224 
225 private:
226 
227  // helper function to init the thread pool
228  void init(const ParallelOptions & options);
229 
230  // need to keep track of threads so we can join them
231  std::vector<std::thread> workers;
232 
233  // the task queue
234  std::queue<std::function<void(int)> > tasks;
235 
236  // synchronization
237  std::mutex queue_mutex;
238  std::condition_variable worker_condition;
239  std::condition_variable finish_condition;
240  bool stop;
241  std::atomic<unsigned int> busy, processed;
242 };
243 
244 inline void ThreadPool::init(const ParallelOptions & options)
245 {
246  const size_t actualNThreads = options.getNumThreads();
247  for(size_t ti = 0; ti<actualNThreads; ++ti)
248  {
249  workers.emplace_back(
250  [ti,this]
251  {
252  for(;;)
253  {
254  std::function<void(int)> task;
255  {
256  std::unique_lock<std::mutex> lock(this->queue_mutex);
257 
258  // will wait if : stop == false AND queue is empty
259  // if stop == true AND queue is empty thread function will return later
260  //
261  // so the idea of this wait, is : If where are not in the destructor
262  // (which sets stop to true, we wait here for new jobs)
263  this->worker_condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
264  if(!this->tasks.empty())
265  {
266  ++busy;
267  task = std::move(this->tasks.front());
268  this->tasks.pop();
269  lock.unlock();
270  task(ti);
271  ++processed;
272  --busy;
273  finish_condition.notify_one();
274  }
275  else if(stop)
276  {
277  return;
278  }
279  }
280  }
281  }
282  );
283  }
284 }
285 
287 {
288  {
289  std::unique_lock<std::mutex> lock(queue_mutex);
290  stop = true;
291  }
292  worker_condition.notify_all();
293  for(std::thread &worker: workers)
294  worker.join();
295 }
296 
297 template<class F>
298 inline std::future<typename std::result_of<F(int)>::type>
300 {
301  typedef typename std::result_of<F(int)>::type result_type;
302  typedef std::packaged_task<result_type(int)> PackageType;
303 
304  auto task = std::make_shared<PackageType>(f);
305  auto res = task->get_future();
306 
307  if(workers.size()>0){
308  {
309  std::unique_lock<std::mutex> lock(queue_mutex);
310 
311  // don't allow enqueueing after stopping the pool
312  if(stop)
313  throw std::runtime_error("enqueue on stopped ThreadPool");
314 
315  tasks.emplace(
316  [task](int tid)
317  {
318  (*task)(tid);
319  }
320  );
321  }
322  worker_condition.notify_one();
323  }
324  else{
325  (*task)(0);
326  }
327 
328  return res;
329 }
330 
331 template<class F>
332 inline std::future<void>
334 {
335  typedef std::packaged_task<void(int)> PackageType;
336 
337  auto task = std::make_shared<PackageType>(f);
338  auto res = task->get_future();
339  if(workers.size()>0){
340  {
341  std::unique_lock<std::mutex> lock(queue_mutex);
342 
343  // don't allow enqueueing after stopping the pool
344  if(stop)
345  throw std::runtime_error("enqueue on stopped ThreadPool");
346 
347  tasks.emplace(
348  [task](int tid)
349  {
350  (*task)(tid);
351  }
352  );
353  }
354  worker_condition.notify_one();
355  }
356  else{
357  (*task)(0);
358  }
359  return res;
360 }
361 
362 /********************************************************/
363 /* */
364 /* parallel_foreach */
365 /* */
366 /********************************************************/
367 
368 // nItems must be either zero or std::distance(iter, end).
369 template<class ITER, class F>
370 inline void parallel_foreach_impl(
371  ThreadPool & pool,
372  const std::ptrdiff_t nItems,
373  ITER iter,
374  ITER end,
375  F && f,
376  std::random_access_iterator_tag
377 ){
378  std::ptrdiff_t workload = std::distance(iter, end);
379  vigra_precondition(workload == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
380  const float workPerThread = float(workload)/pool.nThreads();
381  const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
382 
383  std::vector<std::future<void> > futures;
384  for( ;iter<end; iter+=chunkedWorkPerThread)
385  {
386  const size_t lc = std::min(workload, chunkedWorkPerThread);
387  workload-=lc;
388  futures.emplace_back(
389  pool.enqueue(
390  [&f, iter, lc]
391  (int id)
392  {
393  for(size_t i=0; i<lc; ++i)
394  f(id, iter[i]);
395  }
396  )
397  );
398  }
399  for (auto & fut : futures)
400  {
401  fut.get();
402  }
403 }
404 
405 
406 
407 // nItems must be either zero or std::distance(iter, end).
408 template<class ITER, class F>
409 inline void parallel_foreach_impl(
410  ThreadPool & pool,
411  const std::ptrdiff_t nItems,
412  ITER iter,
413  ITER end,
414  F && f,
415  std::forward_iterator_tag
416 ){
417  if (nItems == 0)
418  nItems = std::distance(iter, end);
419 
420  std::ptrdiff_t workload = nItems;
421  const float workPerThread = float(workload)/pool.nThreads();
422  const std::ptrdiff_t chunkedWorkPerThread = std::max<std::ptrdiff_t>(roundi(workPerThread/3.0), 1);
423 
424  std::vector<std::future<void> > futures;
425  for(;;)
426  {
427  const size_t lc = std::min(chunkedWorkPerThread, workload);
428  workload -= lc;
429  futures.emplace_back(
430  pool.enqueue(
431  [&f, iter, lc]
432  (int id)
433  {
434  auto iterCopy = iter;
435  for(size_t i=0; i<lc; ++i){
436  f(id, *iterCopy);
437  ++iterCopy;
438  }
439  }
440  )
441  );
442  for (size_t i = 0; i < lc; ++i)
443  {
444  ++iter;
445  if (iter == end)
446  {
447  vigra_postcondition(workload == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
448  break;
449  }
450  }
451  if(workload==0)
452  break;
453  }
454  for (auto & fut : futures)
455  fut.get();
456 }
457 
458 
459 
460 // nItems must be either zero or std::distance(iter, end).
461 template<class ITER, class F>
462 inline void parallel_foreach_impl(
463  ThreadPool & pool,
464  const std::ptrdiff_t nItems,
465  ITER iter,
466  ITER end,
467  F && f,
468  std::input_iterator_tag
469 ){
470  size_t num_items = 0;
471  std::vector<std::future<void> > futures;
472  for (; iter != end; ++iter)
473  {
474  auto item = *iter;
475  futures.emplace_back(
476  pool.enqueue(
477  [&f, &item](int id){
478  f(id, item);
479  }
480  )
481  );
482  ++num_items;
483  }
484  vigra_postcondition(num_items == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
485  for (auto & fut : futures)
486  fut.get();
487 }
488 
489 // Runs foreach on a single thread.
490 // Used for API compatibility when the numbe of threads is 0.
491 template<class ITER, class F>
492 inline void parallel_foreach_single_thread(
493  ITER begin,
494  ITER end,
495  F && f,
496  const std::ptrdiff_t nItems = 0
497 ){
498  size_t n = 0;
499  for (; begin != end; ++begin)
500  {
501  f(0, *begin);
502  ++n;
503  }
504  vigra_postcondition(n == nItems || nItems == 0, "parallel_foreach(): Mismatch between num items and begin/end.");
505 }
506 
507 /** \brief Apply a functor to all items in a range in parallel.
508 
509  Create a thread pool (or use an existing one) to apply the functor \arg f
510  to all items in the range <tt>[begin, end)</tt> in parallel. \arg f must
511  be callable with two arguments of type <tt>size_t</tt> and <tt>T</tt>, where
512  the first argument is the thread index (starting at 0) and T is convertible
513  from the iterator's <tt>reference_type</tt> (i.e. the result of <tt>*begin</tt>).
514 
515  If the iterators are forward iterators (<tt>std::forward_iterator_tag</tt>), you
516  can provide the optional argument <tt>nItems</tt> to avoid the a
517  <tt>std::distance(begin, end)</tt> call to compute the range's length.
518 
519  Parameter <tt>nThreads</tt> controls the number of threads. <tt>parallel_foreach</tt>
520  will split the work into about three times as many parallel tasks.
521  If <tt>nThreads = ParallelOptions::Auto</tt>, the number of threads is set to
522  the machine default (<tt>std::thread::hardware_concurrency()</tt>).
523 
524  If <tt>nThreads = 0</tt>, the function will not use threads,
525  but will call the functor sequentially. This can also be enforced by setting the
526  preprocessor flag <tt>VIGRA_SINGLE_THREADED</tt>, ignoring the value of
527  <tt>nThreads</tt> (useful for debugging).
528 
529  <b> Declarations:</b>
530 
531  \code
532  namespace vigra {
533  // pass the desired number of threads or ParallelOptions::Auto
534  // (creates an internal thread pool accordingly)
535  template<class ITER, class F>
536  void parallel_foreach(int64_t nThreads,
537  ITER begin, ITER end,
538  F && f,
539  const uint64_t nItems = 0);
540 
541  // use an existing thread pool
542  template<class ITER, class F>
543  void parallel_foreach(ThreadPool & pool,
544  ITER begin, ITER end,
545  F && f,
546  const uint64_t nItems = 0);
547 
548  // pass the integers from 0 ... (nItems-1) to the functor f,
549  // using the given number of threads or ParallelOptions::Auto
550  template<class F>
551  void parallel_foreach(int64_t nThreads,
552  uint64_t nItems,
553  F && f);
554 
555  // likewise with an existing thread pool
556  template<class F>
557  void parallel_foreach(ThreadPool & threadpool,
558  uint64_t nItems,
559  F && f);
560  }
561  \endcode
562 
563  <b>Usage:</b>
564 
565  \code
566  #include <iostream>
567  #include <algorithm>
568  #include <vector>
569  #include <vigra/threadpool.hxx>
570 
571  using namespace std;
572  using namespace vigra;
573 
574  int main()
575  {
576  size_t const n_threads = 4;
577  size_t const n = 2000;
578  vector<int> input(n);
579 
580  auto iter = input.begin(),
581  end = input.end();
582 
583  // fill input with 0, 1, 2, ...
584  iota(iter, end, 0);
585 
586  // compute the sum of the elements in the input vector.
587  // (each thread computes the partial sum of the items it sees
588  // and stores the sum at the appropriate index of 'results')
589  vector<int> results(n_threads, 0);
590  parallel_foreach(n_threads, iter, end,
591  // the functor to be executed, defined as a lambda function
592  // (first argument: thread ID, second argument: result of *iter)
593  [&results](size_t thread_id, int items)
594  {
595  results[thread_id] += items;
596  }
597  );
598 
599  // collect the partial sums of all threads
600  int sum = accumulate(results.begin(), results.end(), 0);
601 
602  cout << "The sum " << sum << " should be equal to " << (n*(n-1))/2 << endl;
603  }
604  \endcode
605  */
607 
608 template<class ITER, class F>
609 inline void parallel_foreach(
610  ThreadPool & pool,
611  ITER begin,
612  ITER end,
613  F && f,
614  const std::ptrdiff_t nItems = 0)
615 {
616  if(pool.nThreads()>1)
617  {
618  parallel_foreach_impl(pool,nItems, begin, end, f,
619  typename std::iterator_traits<ITER>::iterator_category());
620  }
621  else
622  {
623  parallel_foreach_single_thread(begin, end, f, nItems);
624  }
625 }
626 
627 template<class ITER, class F>
628 inline void parallel_foreach(
629  int64_t nThreads,
630  ITER begin,
631  ITER end,
632  F && f,
633  const std::ptrdiff_t nItems = 0)
634 {
635 
636  ThreadPool pool(nThreads);
637  parallel_foreach(pool, begin, end, f, nItems);
638 }
639 
640 template<class F>
641 inline void parallel_foreach(
642  int64_t nThreads,
643  std::ptrdiff_t nItems,
644  F && f)
645 {
646  auto iter = range(nItems);
647  parallel_foreach(nThreads, iter, iter.end(), f, nItems);
648 }
649 
650 
651 template<class F>
652 inline void parallel_foreach(
653  ThreadPool & threadpool,
654  std::ptrdiff_t nItems,
655  F && f)
656 {
657  auto iter = range(nItems);
658  parallel_foreach(threadpool, iter, iter.end(), f, nItems);
659 }
660 
661 //@}
662 
663 } // namespace vigra
664 
665 #endif // VIGRA_THREADPOOL_HXX
Int32 roundi(FixedPoint16< IntBits, OverflowHandling > v)
rounding to the nearest integer.
Definition: fixedpoint.hxx:1775
std::future< void > enqueue(F &&f)
Definition: threadpool.hxx:333
int getNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:90
ParallelOptions & numThreads(const int n)
Set the number of threads or one of the constants Auto, Nice and NoThreads.
Definition: threadpool.hxx:116
std::future< typename std::result_of< F(int)>::type > enqueueReturning(F &&f)
Definition: threadpool.hxx:299
Determine number of threads automatically (from std::thread::hardware_concurrency()) ...
Definition: threadpool.hxx:73
Definition: accessor.hxx:43
doxygen_overloaded_function(template<... > void separableConvolveBlockwise) template< unsigned int N
Separated convolution on ChunkedArrays.
~ThreadPool()
Definition: threadpool.hxx:286
Thread pool class to manage a set of parallel workers.
Definition: threadpool.hxx:152
size_t nThreads() const
Definition: threadpool.hxx:220
void parallel_foreach(...)
Apply a functor to all items in a range in parallel.
Option base class for parallel algorithms.
Definition: threadpool.hxx:66
void waitFinished()
Definition: threadpool.hxx:211
Use half as many threads as Auto would.
Definition: threadpool.hxx:74
Switch off multi-threading (i.e. execute tasks sequentially)
Definition: threadpool.hxx:75
ThreadPool(const int n)
Definition: threadpool.hxx:179
int getActualNumThreads() const
Get desired number of threads.
Definition: threadpool.hxx:99
ThreadPool(const ParallelOptions &options)
Definition: threadpool.hxx:161

© Ullrich Köthe (ullrich.koethe@iwr.uni-heidelberg.de)
Heidelberg Collaboratory for Image Processing, University of Heidelberg, Germany

html generated using doxygen and Python
vigra 1.10.0