treelite
thread_pool.h
Go to the documentation of this file.
1 
7 #ifndef TREELITE_THREAD_POOL_THREAD_POOL_H_
8 #define TREELITE_THREAD_POOL_THREAD_POOL_H_
9 
10 #include <treelite/common.h>
11 #include <vector>
12 #include <cstdlib>
13 #ifdef _WIN32
14 #define NOMINMAX
15 #include <windows.h>
16 #else
17 #include <sched.h>
18 #endif
19 #include "spsc_queue.h"
20 
21 namespace treelite {
22 
23 template <typename InputToken, typename OutputToken, typename TaskContext>
24 class ThreadPool {
25  public:
26  using TaskFunc = void(*)(SpscQueue<InputToken>*, SpscQueue<OutputToken>*,
27  const TaskContext*);
28 
29  ThreadPool(int num_worker, const TaskContext* context, TaskFunc task)
30  : num_worker_(num_worker), context_(context), task_(task) {
31  CHECK(num_worker_ >= 0 && num_worker_ < std::thread::hardware_concurrency())
32  << "Number of worker threads must be between 0 and "
33  << (std::thread::hardware_concurrency() - 1);
34  for (int i = 0; i < num_worker_; ++i) {
35  incoming_queue_.emplace_back(common::make_unique<SpscQueue<InputToken>>());
36  outgoing_queue_.emplace_back(common::make_unique<SpscQueue<OutputToken>>());
37  }
38  thread_.resize(num_worker_);
39  for (int i = 0; i < num_worker_; ++i) {
40  thread_[i] = std::thread(task_, incoming_queue_[i].get(),
41  outgoing_queue_[i].get(),
42  context_);
43  }
44  /* bind threads to cores */
45  const char* bind_flag = getenv("TREELITE_BIND_THREADS");
46  if (bind_flag == nullptr || std::atoi(bind_flag) == 1) {
47  SetAffinity();
48  }
49  }
50  ~ThreadPool() {
51  for (int i = 0; i < num_worker_; ++i) {
52  incoming_queue_[i]->SignalForKill();
53  outgoing_queue_[i]->SignalForKill();
54  thread_[i].join();
55  }
56  }
57 
58  void SubmitTask(int tid, InputToken request) {
59  incoming_queue_[tid]->Push(request);
60  }
61 
62  bool WaitForTask(int tid, OutputToken* response) {
63  return outgoing_queue_[tid]->Pop(response);
64  }
65 
66  private:
67  int num_worker_;
68  std::vector<std::thread> thread_;
69  std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
70  std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
71  TaskFunc task_;
72  const TaskContext* context_;
73 
74  inline void SetAffinity() {
75 #ifdef _WIN32
76  /* Windows */
77  SetThreadAffinityMask(GetCurrentThread(), 0x1);
78  for (int i = 0; i < num_worker_; ++i) {
79  const int core_id = i + 1;
80  SetThreadAffinityMask(thread_[i].native_handle(), (1 << core_id));
81  }
82 #elif defined(__APPLE__) && defined(__MACH__)
83 #include <TargetConditionals.h>
84 #if TARGET_OS_MAC == 1
85  /* Mac OSX */
86  thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
87  thread_affinity_policy_data_t policy = {0};
88  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
89  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
90  for (int i = 0; i < num_worker_; ++i) {
91  const int core_id = i + 1;
92  mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
93  policy = {core_id};
94  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
95  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
96  }
97 #else
98  #error "iPhone not supported yet"
99 #endif
100 #else
101  /* Linux and others */
102  cpu_set_t cpuset;
103  CPU_ZERO(&cpuset);
104  CPU_SET(0, &cpuset);
105 #if defined(__ANDROID__)
106  sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
107 #else
108  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
109 #endif
110  for (int i = 0; i < num_worker_; ++i) {
111  const int core_id = i + 1;
112  CPU_ZERO(&cpuset);
113  CPU_SET(core_id, &cpuset);
114 #if defined(__ANDROID__)
115  sched_setaffinity(thread_[i].native_handle(),
116  sizeof(cpu_set_t), &cpuset);
117 #else
118  pthread_setaffinity_np(thread_[i].native_handle(),
119  sizeof(cpu_set_t), &cpuset);
120 #endif
121  }
122 #endif
123  }
124 };
125 
126 } // namespace treelite
127 
128 #endif // TREELITE_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.