NonBlockingThreadPool.h
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
12 
13 
14 namespace Eigen {
15 
16 template <typename Environment>
17 class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
18  public:
19  typedef typename Environment::Task Task;
20  typedef RunQueue<Task, 1024> Queue;
21 
22  NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
23  : env_(env),
24  threads_(num_threads),
25  queues_(num_threads),
26  coprimes_(num_threads),
27  waiters_(num_threads),
28  blocked_(0),
29  spinning_(0),
30  done_(false),
31  ec_(waiters_) {
32  // Calculate coprimes of num_threads.
33  // Coprimes are used for a random walk over all threads in Steal
34  // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
35  // a walk starting thread index t and calculate num_threads - 1 subsequent
36  // indices as (t + coprime) % num_threads, we will cover all threads without
37  // repetitions (effectively getting a presudo-random permutation of thread
38  // indices).
39  for (int i = 1; i <= num_threads; i++) {
40  unsigned a = i;
41  unsigned b = num_threads;
42  // If GCD(a, b) == 1, then a and b are coprimes.
43  while (b != 0) {
44  unsigned tmp = a;
45  a = b;
46  b = tmp % b;
47  }
48  if (a == 1) {
49  coprimes_.push_back(i);
50  }
51  }
52  for (int i = 0; i < num_threads; i++) {
53  queues_.push_back(new Queue());
54  }
55  for (int i = 0; i < num_threads; i++) {
56  threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
57  }
58  }
59 
60  ~NonBlockingThreadPoolTempl() {
61  done_ = true;
62  // Now if all threads block without work, they will start exiting.
63  // But note that threads can continue to work arbitrary long,
64  // block, submit new work, unblock and otherwise live full life.
65  ec_.Notify(true);
66 
67  // Join threads explicitly to avoid destruction order issues.
68  for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
69  for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
70  }
71 
72  void Schedule(std::function<void()> fn) {
73  Task t = env_.CreateTask(std::move(fn));
74  PerThread* pt = GetPerThread();
75  if (pt->pool == this) {
76  // Worker thread of this pool, push onto the thread's queue.
77  Queue* q = queues_[pt->thread_id];
78  t = q->PushFront(std::move(t));
79  } else {
80  // A free-standing thread (or worker of another pool), push onto a random
81  // queue.
82  Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
83  t = q->PushBack(std::move(t));
84  }
85  // Note: below we touch this after making w available to worker threads.
86  // Strictly speaking, this can lead to a racy-use-after-free. Consider that
87  // Schedule is called from a thread that is neither main thread nor a worker
88  // thread of this pool. Then, execution of w directly or indirectly
89  // completes overall computations, which in turn leads to destruction of
90  // this. We expect that such scenario is prevented by program, that is,
91  // this is kept alive while any threads can potentially be in Schedule.
92  if (!t.f)
93  ec_.Notify(false);
94  else
95  env_.ExecuteTask(t); // Push failed, execute directly.
96  }
97 
98  int NumThreads() const final {
99  return static_cast<int>(threads_.size());
100  }
101 
102  int CurrentThreadId() const final {
103  const PerThread* pt =
104  const_cast<NonBlockingThreadPoolTempl*>(this)->GetPerThread();
105  if (pt->pool == this) {
106  return pt->thread_id;
107  } else {
108  return -1;
109  }
110  }
111 
112  private:
113  typedef typename Environment::EnvThread Thread;
114 
115  struct PerThread {
116  constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
117  NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads.
118  uint64_t rand; // Random generator state.
119  int thread_id; // Worker thread index in pool.
120  };
121 
122  Environment env_;
123  MaxSizeVector<Thread*> threads_;
124  MaxSizeVector<Queue*> queues_;
125  MaxSizeVector<unsigned> coprimes_;
126  std::vector<EventCount::Waiter> waiters_;
127  std::atomic<unsigned> blocked_;
128  std::atomic<bool> spinning_;
129  std::atomic<bool> done_;
130  EventCount ec_;
131 
132  // Main worker thread loop.
133  void WorkerLoop(int thread_id) {
134  PerThread* pt = GetPerThread();
135  pt->pool = this;
136  pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
137  pt->thread_id = thread_id;
138  Queue* q = queues_[thread_id];
139  EventCount::Waiter* waiter = &waiters_[thread_id];
140  for (;;) {
141  Task t = q->PopFront();
142  if (!t.f) {
143  t = Steal();
144  if (!t.f) {
145  // Leave one thread spinning. This reduces latency.
146  // TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
147  // Also, the time it takes to attempt to steal work 1000 times depends
148  // on the size of the thread pool. However the speed at which the user
149  // of the thread pool submit tasks is independent of the size of the
150  // pool. Consider a time based limit instead.
151  if (!spinning_ && !spinning_.exchange(true)) {
152  for (int i = 0; i < 1000 && !t.f; i++) {
153  t = Steal();
154  }
155  spinning_ = false;
156  }
157  if (!t.f) {
158  if (!WaitForWork(waiter, &t)) {
159  return;
160  }
161  }
162  }
163  }
164  if (t.f) {
165  env_.ExecuteTask(t);
166  }
167  }
168  }
169 
170  // Steal tries to steal work from other worker threads in best-effort manner.
171  Task Steal() {
172  PerThread* pt = GetPerThread();
173  const size_t size = queues_.size();
174  unsigned r = Rand(&pt->rand);
175  unsigned inc = coprimes_[r % coprimes_.size()];
176  unsigned victim = r % size;
177  for (unsigned i = 0; i < size; i++) {
178  Task t = queues_[victim]->PopBack();
179  if (t.f) {
180  return t;
181  }
182  victim += inc;
183  if (victim >= size) {
184  victim -= size;
185  }
186  }
187  return Task();
188  }
189 
190  // WaitForWork blocks until new work is available (returns true), or if it is
191  // time to exit (returns false). Can optionally return a task to execute in t
192  // (in such case t.f != nullptr on return).
193  bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
194  eigen_assert(!t->f);
195  // We already did best-effort emptiness check in Steal, so prepare for
196  // blocking.
197  ec_.Prewait(waiter);
198  // Now do a reliable emptiness check.
199  int victim = NonEmptyQueueIndex();
200  if (victim != -1) {
201  ec_.CancelWait(waiter);
202  *t = queues_[victim]->PopBack();
203  return true;
204  }
205  // Number of blocked threads is used as termination condition.
206  // If we are shutting down and all worker threads blocked without work,
207  // that's we are done.
208  blocked_++;
209  if (done_ && blocked_ == threads_.size()) {
210  ec_.CancelWait(waiter);
211  // Almost done, but need to re-check queues.
212  // Consider that all queues are empty and all worker threads are preempted
213  // right after incrementing blocked_ above. Now a free-standing thread
214  // submits work and calls destructor (which sets done_). If we don't
215  // re-check queues, we will exit leaving the work unexecuted.
216  if (NonEmptyQueueIndex() != -1) {
217  // Note: we must not pop from queues before we decrement blocked_,
218  // otherwise the following scenario is possible. Consider that instead
219  // of checking for emptiness we popped the only element from queues.
220  // Now other worker threads can start exiting, which is bad if the
221  // work item submits other work. So we just check emptiness here,
222  // which ensures that all worker threads exit at the same time.
223  blocked_--;
224  return true;
225  }
226  // Reached stable termination state.
227  ec_.Notify(true);
228  return false;
229  }
230  ec_.CommitWait(waiter);
231  blocked_--;
232  return true;
233  }
234 
235  int NonEmptyQueueIndex() {
236  PerThread* pt = GetPerThread();
237  const size_t size = queues_.size();
238  unsigned r = Rand(&pt->rand);
239  unsigned inc = coprimes_[r % coprimes_.size()];
240  unsigned victim = r % size;
241  for (unsigned i = 0; i < size; i++) {
242  if (!queues_[victim]->Empty()) {
243  return victim;
244  }
245  victim += inc;
246  if (victim >= size) {
247  victim -= size;
248  }
249  }
250  return -1;
251  }
252 
253  static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
254  EIGEN_THREAD_LOCAL PerThread per_thread_;
255  PerThread* pt = &per_thread_;
256  return pt;
257  }
258 
259  static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
260  uint64_t current = *state;
261  // Update the internal state
262  *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
263  // Generate the random output (using the PCG-XSH-RS scheme)
264  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
265  }
266 };
267 
268 typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool;
269 
270 } // namespace Eigen
271 
272 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
Namespace containing all symbols from the Eigen library.
Definition: AdolcForward:45