parallelcxx11.hh
Go to the documentation of this file.
1 /* -*- mia-c++ -*-
2  *
3  * This file is part of MIA - a toolbox for medical image analysis
4  * Copyright (c) Leipzig, Madrid 1999-2016 Gert Wollny
5  *
6  * MIA is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with MIA; if not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20 
21 #ifndef mia_core_parallelcxx11_hh
22 #define mia_core_parallelcxx11_hh
23 
24 #include <mia/core/defines.hh>
25 
26 #include <thread>
27 #include <atomic>
28 #include <mutex>
29 #include <cassert>
30 #include <vector>
31 
33 
34 typedef std::mutex CMutex;
35 typedef std::recursive_mutex CRecursiveMutex;
36 
37 
39 public:
40  static int get_max_tasks();
41  static void set_max_tasks(int mt);
42 private:
43  static int max_tasks;
44 };
45 
46 #define ATOMIC std::atomic
47 
48 template <typename Mutex>
49 class TScopedLock {
50 public:
51  TScopedLock(Mutex& m): m_mutex(m){
52  m_mutex.lock();
53  own_lock = true;
54  };
56  if (own_lock)
57  m_mutex.unlock();
58  };
59 
60  void release() {
61  if (own_lock) {
62  own_lock = false;
63  m_mutex.unlock();
64  }
65  }
66 private:
67  Mutex& m_mutex;
68  bool own_lock;
69 };
70 
73 
75 public:
76  C1DParallelRange(int begin, int end, int block = 1):
77  m_begin(begin),
78  m_end(end),
79  m_block(block),
80  m_current_wp(0)
81  {
82  assert(begin <= end);
83  }
84 
86  m_begin(orig.m_begin),
87  m_end(orig.m_end),
88  m_block(orig.m_block)
89  {
90  m_current_wp = orig.m_current_wp.load();
91  }
92 
94  int wp = m_current_wp++;
95  int begin = m_begin + wp * m_block;
96  int end = begin + m_block;
97  if (begin > m_end) {
98  return C1DParallelRange(m_end,m_end,0);
99  }
100  if (end > m_end) {
101  return C1DParallelRange(begin, m_end, 1);
102  }
103  return C1DParallelRange(begin, end, 1);
104  }
105 
106  bool empty() const {
107  return m_begin >= m_end;
108  }
109 
110  int begin() const {
111  return m_begin;
112  }
113 
114  int end() const {
115  return m_end;
116  }
117 
118 private:
119  int m_begin;
120  int m_end;
121  int m_block;
122  std::atomic<int> m_current_wp;
123 };
124 
125 template <typename Range, typename Func>
126 void pfor_callback(Range& range, Func f)
127 {
128  while (true) {
129  Range wp = range.get_next_workpackage();
130  if (!wp.empty())
131  f(wp);
132  else
133  break;
134  }
135 }
136 
137 template <typename Range, typename Func>
138 void pfor(Range range, Func f) {
139 
140  int max_treads = CMaxTasks::get_max_tasks();
141 
142  std::thread::hardware_concurrency();
143 
144  std::vector<std::thread> threads;
145  for (int i = 0; i < max_treads; ++i) {
146  threads.push_back(std::thread(pfor_callback<Range, Func>, std::ref(range), f));
147  }
148 
149  for (int i = 0; i < max_treads; ++i) {
150  threads[i].join();
151  }
152 };
153 
154 template <typename V>
155 class ReduceValue {
156 public:
157  typedef V Value;
158  ReduceValue(const Value& i):identity(i), value(i) {
159  }
160 
161  template <typename Reduce>
162  void reduce(const Value& v, Reduce r)
163  {
164  CScopedLock sl(mutex);
165  value = r(v, value);
166  }
167  const Value& get_identity() const {
168  return identity;
169  }
170  const Value& get_reduced() const {
171  return value;
172  }
173 private:
174  mutable CMutex mutex;
175  Value identity;
176  Value value;
177 };
178 
179 template <typename Range, typename Value, typename Func, typename Reduce>
180 void preduce_callback(Range& range, ReduceValue<Value>& v, Func f, Reduce r)
181 {
182  Value value = v.get_identity();
183  while (true) {
184  Range wp = range.get_next_workpackage();
185  if (!wp.empty())
186  value = f(wp, value);
187  else
188  break;
189  }
190  v.reduce(value, r);
191 }
192 
193 template <typename Range, typename Value, typename Func, typename Reduce>
194 Value preduce(Range range, Value identity, Func f, Reduce r)
195 {
196  int max_treads = CMaxTasks::get_max_tasks();
197 
198  ReduceValue<Value> value(identity);
199 
200  std::vector<std::thread> threads;
201  for (int i = 0; i < max_treads; ++i) {
202  threads.push_back(std::thread(preduce_callback<Range, Value, Func, Reduce>,
203  std::ref(range), std::ref(value), f, r));
204  }
205 
206  for (int i = 0; i < max_treads; ++i) {
207  threads[i].join();
208  }
209  return value.get_reduced();
210 };
211 
212 NS_MIA_END
213 
214 #endif
TScopedLock(Mutex &m)
const Value & get_reduced() const
Value preduce(Range range, Value identity, Func f, Reduce r)
C1DParallelRange get_next_workpackage()
TScopedLock< CRecursiveMutex > CRecursiveScopedLock
#define NS_MIA_BEGIN
conveniance define to start the mia namespace
Definition: defines.hh:33
void reduce(const Value &v, Reduce r)
ReduceValue(const Value &i)
bool empty() const
std::recursive_mutex CRecursiveMutex
static int get_max_tasks()
void release()
void pfor_callback(Range &range, Func f)
C1DParallelRange(const C1DParallelRange &orig)
#define EXPORT_CORE
Macro to manage Visual C++ style dllimport/dllexport.
Definition: defines.hh:101
const Value & get_identity() const
TScopedLock< CMutex > CScopedLock
void preduce_callback(Range &range, ReduceValue< Value > &v, Func f, Reduce r)
std::mutex CMutex
C1DParallelRange(int begin, int end, int block=1)
int begin() const
#define NS_MIA_END
conveniance define to end the mia namespace
Definition: defines.hh:36
void pfor(Range range, Func f)