Treelite
thread_pool.h
Go to the documentation of this file.
1 
7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
8 #define TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
9 
10 #include <string>
11 #include <memory>
12 #include <vector>
13 #include <cstdlib>
14 #ifdef _WIN32
15 #include <windows.h>
16 #else
17 #include <sched.h>
18 #endif
19 #include "spsc_queue.h"
20 
21 namespace treelite {
22 namespace predictor {
23 
24 template <typename InputToken, typename OutputToken, typename TaskContext>
25 class ThreadPool {
26  public:
27  using TaskFunc = void(*)(SpscQueue<InputToken>*, SpscQueue<OutputToken>*,
28  const TaskContext*);
29 
30  ThreadPool(int num_worker, const TaskContext* context, TaskFunc task)
31  : num_worker_(num_worker), context_(context), task_(task) {
32  CHECK(num_worker_ >= 0
33  && static_cast<unsigned>(num_worker_) < std::thread::hardware_concurrency())
34  << "Number of worker threads must be between 0 and "
35  << (std::thread::hardware_concurrency() - 1);
36  for (int i = 0; i < num_worker_; ++i) {
37  incoming_queue_.emplace_back(new SpscQueue<InputToken>());
38  outgoing_queue_.emplace_back(new SpscQueue<OutputToken>());
39  }
40  thread_.resize(num_worker_);
41  for (int i = 0; i < num_worker_; ++i) {
42  thread_[i] = std::thread(task_, incoming_queue_[i].get(),
43  outgoing_queue_[i].get(),
44  context_);
45  }
46  /* bind threads to cores */
47  const char* bind_flag = getenv("TREELITE_BIND_THREADS");
48  if (bind_flag == nullptr || std::stoi(bind_flag) == 1) {
49  SetAffinity();
50  }
51  }
52  ~ThreadPool() {
53  for (int i = 0; i < num_worker_; ++i) {
54  incoming_queue_[i]->SignalForKill();
55  outgoing_queue_[i]->SignalForKill();
56  thread_[i].join();
57  }
58  }
59 
60  void SubmitTask(int tid, InputToken request) {
61  incoming_queue_[tid]->Push(request);
62  }
63 
64  bool WaitForTask(int tid, OutputToken* response) {
65  return outgoing_queue_[tid]->Pop(response);
66  }
67 
68  private:
69  int num_worker_;
70  std::vector<std::thread> thread_;
71  std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
72  std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
73  const TaskContext* context_;
74  TaskFunc task_;
75 
76  inline void SetAffinity() {
77 #ifdef _WIN32
78  /* Windows */
79  SetThreadAffinityMask(GetCurrentThread(), 0x1);
80  for (int i = 0; i < num_worker_; ++i) {
81  const int core_id = i + 1;
82  SetThreadAffinityMask(thread_[i].native_handle(), (1ULL << core_id));
83  }
84 #elif defined(__APPLE__) && defined(__MACH__)
85 #include <TargetConditionals.h>
86 #if TARGET_OS_MAC == 1
87  /* Mac OSX */
88  thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
89  thread_affinity_policy_data_t policy = {0};
90  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
91  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
92  for (int i = 0; i < num_worker_; ++i) {
93  const int core_id = i + 1;
94  mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
95  policy = {core_id};
96  thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
97  (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
98  }
99 #else
100  #error "iPhone not supported yet"
101 #endif
102 #else
103  /* Linux and others */
104  cpu_set_t cpuset;
105  CPU_ZERO(&cpuset);
106  CPU_SET(0, &cpuset);
107 #if defined(__ANDROID__)
108  sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
109 #else
110  pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
111 #endif
112  for (int i = 0; i < num_worker_; ++i) {
113  const int core_id = i + 1;
114  CPU_ZERO(&cpuset);
115  CPU_SET(core_id, &cpuset);
116 #if defined(__ANDROID__)
117  sched_setaffinity(thread_[i].native_handle(),
118  sizeof(cpu_set_t), &cpuset);
119 #else
120  pthread_setaffinity_np(thread_[i].native_handle(),
121  sizeof(cpu_set_t), &cpuset);
122 #endif
123  }
124 #endif
125  }
126 };
127 
128 } // namespace predictor
129 } // namespace treelite
130 
131 #endif // TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.