treelite
thread_pool.h
Go to the documentation of this file.
1 
7 #ifndef TREELITE_THREAD_POOL_THREAD_POOL_H_
8 #define TREELITE_THREAD_POOL_THREAD_POOL_H_
9 
10 #include "spsc_queue.h"
11 #include <treelite/common.h>
12 #ifdef _WIN32
13 #define NOMINMAX
14 #include <windows.h>
15 #else
16 #include <sched.h>
17 #endif
18 
19 namespace treelite {
20 
21 template <typename InputToken, typename OutputToken, typename TaskContext>
22 class ThreadPool {
23  public:
24  using TaskFunc = void(*)(SpscQueue<InputToken>*, SpscQueue<OutputToken>*,
25  const TaskContext*);
26 
27  ThreadPool(int num_worker, const TaskContext* context, TaskFunc task)
28  : num_worker_(num_worker), context_(context), task_(task) {
29  CHECK(num_worker_ > 0
30  && num_worker_ + 1 <= std::thread::hardware_concurrency())
31  << "Number of worker threads must be between 1 and "
32  << std::thread::hardware_concurrency() - 1;
33  LOG(INFO) << "new thread pool with " << num_worker_ << " worker threads";
34  for (int i = 0; i < num_worker_; ++i) {
35  incoming_queue_.emplace_back(common::make_unique<SpscQueue<InputToken>>());
36  outgoing_queue_.emplace_back(common::make_unique<SpscQueue<OutputToken>>());
37  }
38  thread_.resize(num_worker_);
39  for (int i = 0; i < num_worker_; ++i) {
40  thread_[i] = std::thread(task_, incoming_queue_[i].get(),
41  outgoing_queue_[i].get(),
42  context_);
43  }
44  /* bind threads to cores */
45  SetAffinity();
46  }
47  ~ThreadPool() {
48  LOG(INFO) << "delete thread pool";
49  for (int i = 0; i < num_worker_; ++i) {
50  incoming_queue_[i]->SignalForKill();
51  outgoing_queue_[i]->SignalForKill();
52  thread_[i].join();
53  }
54  }
55 
56  void SubmitTask(int tid, InputToken request) {
57  incoming_queue_[tid]->Push(request);
58  }
59 
60  bool WaitForTask(int tid, OutputToken* response) {
61  return outgoing_queue_[tid]->Pop(response);
62  }
63 
64  private:
65  int num_worker_;
66  std::vector<std::thread> thread_;
67  std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
68  std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
69  TaskFunc task_;
70  const TaskContext* context_;
71 
72  inline void SetAffinity() {
73 #ifdef _WIN32
74  /* Windows */
75  SetThreadAffinityMask(GetCurrentThread(), 0x1);
76  for (int i = 0; i < num_worker_; ++i) {
77  const int core_id = i + 1;
78  SetThreadAffinityMask(thread_[i].native_handle(), (1 << core_id));
79  }
80 #elif defined(__APPLE__) && defined(__MACH__)
81 #include <TargetConditionals.h>
82 #if TARGET_OS_MAC == 1
83  /* Mac OSX */
84  thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
85  thread_affinity_policy_data_t policy = {0};
86  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
87  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
88  for (int i = 0; i < num_worker_; ++i) {
89  const int core_id = i + 1;
90  mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
91  policy = {core_id};
92  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
93  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
94  }
95 #else
96  #error "iPhone not supported yet"
97 #endif
98 #else
99  /* Linux and others */
100  cpu_set_t cpuset;
101  CPU_ZERO(&cpuset);
102  CPU_SET(0, &cpuset);
103  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
104  for (int i = 0; i < num_worker_; ++i) {
105  const int core_id = i + 1;
106  CPU_ZERO(&cpuset);
107  CPU_SET(core_id, &cpuset);
108  pthread_setaffinity_np(thread_[i].native_handle(),
109  sizeof(cpu_set_t), &cpuset);
110  }
111 #endif
112  }
113 };
114 
115 } // namespace treelite
116 
117 #endif // TREELITE_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.
Some useful utilities.