10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H) 11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 17 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL 18 template <
typename Env>
using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>;
19 typedef NonBlockingThreadPool ThreadPool;
21 template <
typename Env>
using ThreadPoolTempl = SimpleThreadPoolTempl<Env>;
22 typedef SimpleThreadPool ThreadPool;
30 Barrier(
unsigned int count) : state_(count << 1), notified_(false) {
31 eigen_assert(((count << 1) >> 1) == count);
34 eigen_assert((state_>>1) == 0);
38 unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
40 eigen_assert(((v + 2) & ~1) != 0);
43 std::unique_lock<std::mutex> l(mu_);
44 eigen_assert(!notified_);
50 unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
51 if ((v >> 1) == 0)
return;
52 std::unique_lock<std::mutex> l(mu_);
60 std::condition_variable cv_;
61 std::atomic<unsigned int> state_;
71 struct Notification : Barrier {
72 Notification() : Barrier(1) {};
78 template <
typename Function,
typename... Args>
struct FunctionWrapperWithNotification
80 static void run(Notification* n, Function f, Args... args) {
88 template <
typename Function,
typename... Args>
struct FunctionWrapperWithBarrier
90 static void run(Barrier* b, Function f, Args... args) {
98 template <
typename SyncType>
99 static EIGEN_STRONG_INLINE
void wait_until_ready(SyncType* n) {
107 struct ThreadPoolDevice {
109 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores) : pool_(pool), num_threads_(num_cores) { }
111 EIGEN_STRONG_INLINE
void* allocate(
size_t num_bytes)
const {
112 return internal::aligned_malloc(num_bytes);
115 EIGEN_STRONG_INLINE
void deallocate(
void* buffer)
const {
116 internal::aligned_free(buffer);
119 EIGEN_STRONG_INLINE
void memcpy(
void* dst,
const void* src,
size_t n)
const {
120 ::memcpy(dst, src, n);
122 EIGEN_STRONG_INLINE
void memcpyHostToDevice(
void* dst,
const void* src,
size_t n)
const {
125 EIGEN_STRONG_INLINE
void memcpyDeviceToHost(
void* dst,
const void* src,
size_t n)
const {
129 EIGEN_STRONG_INLINE
void memset(
void* buffer,
int c,
size_t n)
const {
130 ::memset(buffer, c, n);
133 EIGEN_STRONG_INLINE
int numThreads()
const {
137 EIGEN_STRONG_INLINE
size_t firstLevelCacheSize()
const {
138 return l1CacheSize();
141 EIGEN_STRONG_INLINE
size_t lastLevelCacheSize()
const {
143 return l3CacheSize() / num_threads_;
146 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
int majorDeviceVersion()
const {
151 template <
class Function,
class... Args>
152 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args)
const {
153 Notification* n =
new Notification();
154 std::function<void()> func =
155 std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...);
156 pool_->Schedule(func);
160 template <
class Function,
class... Args>
161 EIGEN_STRONG_INLINE
void enqueue_with_barrier(Barrier* b,
163 Args&&... args)
const {
164 std::function<void()> func = std::bind(
165 &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...);
166 pool_->Schedule(func);
169 template <
class Function,
class... Args>
170 EIGEN_STRONG_INLINE
void enqueueNoNotification(Function&& f, Args&&... args)
const {
171 std::function<void()> func = std::bind(f, args...);
172 pool_->Schedule(func);
177 EIGEN_STRONG_INLINE
int currentThreadId()
const {
178 return pool_->CurrentThreadId();
186 void parallelFor(Index n,
const TensorOpCost& cost,
187 std::function<Index(Index)> block_align,
188 std::function<
void(Index, Index)> f)
const {
189 typedef TensorCostModel<ThreadPoolDevice> CostModel;
190 if (n <= 1 || numThreads() == 1 ||
191 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
202 double block_size_f = 1.0 / CostModel::taskSize(1, cost);
203 Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f));
204 const Index max_block_size =
205 numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f));
207 Index new_block_size = block_align(block_size);
208 eigen_assert(new_block_size >= block_size);
209 block_size = numext::mini(n, new_block_size);
211 Index block_count = divup(n, block_size);
214 double max_efficiency =
215 static_cast<double>(block_count) /
216 (divup<int>(block_count, numThreads()) * numThreads());
219 for (Index prev_block_count = block_count; prev_block_count > 1;) {
222 Index coarser_block_size = divup(n, prev_block_count - 1);
224 Index new_block_size = block_align(coarser_block_size);
225 eigen_assert(new_block_size >= coarser_block_size);
226 coarser_block_size = numext::mini(n, new_block_size);
228 if (coarser_block_size > max_block_size) {
232 const Index coarser_block_count = divup(n, coarser_block_size);
233 eigen_assert(coarser_block_count < prev_block_count);
234 prev_block_count = coarser_block_count;
235 const double coarser_efficiency =
236 static_cast<double>(coarser_block_count) /
237 (divup<int>(coarser_block_count, numThreads()) * numThreads());
238 if (coarser_efficiency + 0.01 >= max_efficiency) {
240 block_size = coarser_block_size;
241 block_count = coarser_block_count;
242 if (max_efficiency < coarser_efficiency) {
243 max_efficiency = coarser_efficiency;
251 Barrier barrier(static_cast<unsigned int>(block_count));
252 std::function<void(Index, Index)> handleRange;
253 handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
254 if (last - first <= block_size) {
261 Index mid = first + divup((last - first) / 2, block_size) * block_size;
262 pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
263 pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
270 void parallelFor(Index n,
const TensorOpCost& cost,
271 std::function<
void(Index, Index)> f)
const {
272 parallelFor(n, cost,
nullptr, std::move(f));
276 ThreadPoolInterface* pool_;
283 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H Namespace containing all symbols from the Eigen library.
Definition: AdolcForward:45