7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 8 #define TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 20 #if defined(__APPLE__) && defined(__MACH__) 21 #include <mach/mach.h> 22 #include <mach/mach_init.h> 23 #include <mach/thread_policy.h> 30 template <
typename InputToken,
typename OutputToken,
typename TaskContext>
36 ThreadPool(
int num_worker,
const TaskContext* context, TaskFunc task)
37 : num_worker_(num_worker), context_(context), task_(task) {
38 TREELITE_CHECK(num_worker_ >= 0
39 && static_cast<unsigned>(num_worker_) < std::thread::hardware_concurrency())
40 <<
"Number of worker threads must be between 0 and " 41 << (std::thread::hardware_concurrency() - 1);
42 for (
int i = 0; i < num_worker_; ++i) {
44 outgoing_queue_.emplace_back(
new SpscQueue<OutputToken>());
46 thread_.resize(num_worker_);
47 for (
int i = 0; i < num_worker_; ++i) {
48 thread_[i] = std::thread(task_, incoming_queue_[i].
get(),
49 outgoing_queue_[i].
get(),
53 const char* bind_flag = getenv(
"TREELITE_BIND_THREADS");
54 if (bind_flag ==
nullptr || std::stoi(bind_flag) == 1) {
59 for (
int i = 0; i < num_worker_; ++i) {
60 incoming_queue_[i]->SignalForKill();
61 outgoing_queue_[i]->SignalForKill();
66 void SubmitTask(
int tid, InputToken request) {
67 incoming_queue_[tid]->Push(request);
70 bool WaitForTask(
int tid, OutputToken* response) {
71 return outgoing_queue_[tid]->Pop(response);
76 std::vector<std::thread> thread_;
77 std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
78 std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
79 const TaskContext* context_;
82 inline void SetAffinity() {
85 #elif defined(__APPLE__) && defined(__MACH__) 86 #include <TargetConditionals.h> 87 #if TARGET_OS_MAC == 1 89 thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
90 thread_affinity_policy_data_t policy = {0};
91 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
92 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
93 for (
int i = 0; i < num_worker_; ++i) {
94 const int core_id = i + 1;
95 mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
97 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
98 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
101 #error "iPhone not supported yet" 108 #if defined(__ANDROID__) 109 sched_setaffinity(pthread_self(),
sizeof(cpu_set_t), &cpuset);
111 pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
113 for (
int i = 0; i < num_worker_; ++i) {
114 const int core_id = i + 1;
116 CPU_SET(core_id, &cpuset);
117 #if defined(__ANDROID__) 118 sched_setaffinity(thread_[i].native_handle(),
119 sizeof(cpu_set_t), &cpuset);
121 pthread_setaffinity_np(thread_[i].native_handle(),
122 sizeof(cpu_set_t), &cpuset);
132 #endif // TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.