Treelite
thread_pool.h
Go to the documentation of this file.
1 
7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
8 #define TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
9 
10 #include <memory>
11 #include <vector>
12 #include <cstdlib>
13 #ifdef _WIN32
14 #include <windows.h>
15 #else
16 #include <sched.h>
17 #endif
18 #include "spsc_queue.h"
19 
20 namespace treelite {
21 
22 template <typename InputToken, typename OutputToken, typename TaskContext>
23 class ThreadPool {
24  public:
25  using TaskFunc = void(*)(SpscQueue<InputToken>*, SpscQueue<OutputToken>*,
26  const TaskContext*);
27 
28  ThreadPool(int num_worker, const TaskContext* context, TaskFunc task)
29  : num_worker_(num_worker), context_(context), task_(task) {
30  CHECK(num_worker_ >= 0 && num_worker_ < std::thread::hardware_concurrency())
31  << "Number of worker threads must be between 0 and "
32  << (std::thread::hardware_concurrency() - 1);
33  for (int i = 0; i < num_worker_; ++i) {
34  incoming_queue_.emplace_back(new SpscQueue<InputToken>());
35  outgoing_queue_.emplace_back(new SpscQueue<OutputToken>());
36  }
37  thread_.resize(num_worker_);
38  for (int i = 0; i < num_worker_; ++i) {
39  thread_[i] = std::thread(task_, incoming_queue_[i].get(),
40  outgoing_queue_[i].get(),
41  context_);
42  }
43  /* bind threads to cores */
44  const char* bind_flag = getenv("TREELITE_BIND_THREADS");
45  if (bind_flag == nullptr || std::atoi(bind_flag) == 1) {
46  SetAffinity();
47  }
48  }
49  ~ThreadPool() {
50  for (int i = 0; i < num_worker_; ++i) {
51  incoming_queue_[i]->SignalForKill();
52  outgoing_queue_[i]->SignalForKill();
53  thread_[i].join();
54  }
55  }
56 
57  void SubmitTask(int tid, InputToken request) {
58  incoming_queue_[tid]->Push(request);
59  }
60 
61  bool WaitForTask(int tid, OutputToken* response) {
62  return outgoing_queue_[tid]->Pop(response);
63  }
64 
65  private:
66  int num_worker_;
67  std::vector<std::thread> thread_;
68  std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
69  std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
70  TaskFunc task_;
71  const TaskContext* context_;
72 
73  inline void SetAffinity() {
74 #ifdef _WIN32
75  /* Windows */
76  SetThreadAffinityMask(GetCurrentThread(), 0x1);
77  for (int i = 0; i < num_worker_; ++i) {
78  const int core_id = i + 1;
79  SetThreadAffinityMask(thread_[i].native_handle(), (1 << core_id));
80  }
81 #elif defined(__APPLE__) && defined(__MACH__)
82 #include <TargetConditionals.h>
83 #if TARGET_OS_MAC == 1
84  /* Mac OSX */
85  thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
86  thread_affinity_policy_data_t policy = {0};
87  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
88  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
89  for (int i = 0; i < num_worker_; ++i) {
90  const int core_id = i + 1;
91  mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
92  policy = {core_id};
93  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
94  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
95  }
96 #else
97  #error "iPhone not supported yet"
98 #endif
99 #else
100  /* Linux and others */
101  cpu_set_t cpuset;
102  CPU_ZERO(&cpuset);
103  CPU_SET(0, &cpuset);
104 #if defined(__ANDROID__)
105  sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
106 #else
107  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
108 #endif
109  for (int i = 0; i < num_worker_; ++i) {
110  const int core_id = i + 1;
111  CPU_ZERO(&cpuset);
112  CPU_SET(core_id, &cpuset);
113 #if defined(__ANDROID__)
114  sched_setaffinity(thread_[i].native_handle(),
115  sizeof(cpu_set_t), &cpuset);
116 #else
117  pthread_setaffinity_np(thread_[i].native_handle(),
118  sizeof(cpu_set_t), &cpuset);
119 #endif
120  }
121 #endif
122  }
123 };
124 
125 } // namespace treelite
126 
127 #endif // TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.