7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 8 #define TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 20 #if defined(__APPLE__) && defined(__MACH__) 21 #include <mach/mach.h> 22 #include <mach/mach_init.h> 23 #include <mach/thread_policy.h> 30 template <
typename InputToken,
typename OutputToken,
typename TaskContext>
36 ThreadPool(
int num_worker,
const TaskContext* context, TaskFunc task)
37 : num_worker_(num_worker), context_(context), task_(task) {
38 TREELITE_CHECK(num_worker_ >= 0
39 && static_cast<unsigned>(num_worker_) < std::thread::hardware_concurrency())
40 <<
"Number of worker threads must be between 0 and " 41 << (std::thread::hardware_concurrency() - 1);
42 for (
int i = 0; i < num_worker_; ++i) {
44 outgoing_queue_.emplace_back(
new SpscQueue<OutputToken>());
46 thread_.resize(num_worker_);
47 for (
int i = 0; i < num_worker_; ++i) {
48 thread_[i] = std::thread(task_, incoming_queue_[i].
get(),
49 outgoing_queue_[i].
get(),
53 const char* bind_flag = getenv(
"TREELITE_BIND_THREADS");
54 if (bind_flag ==
nullptr || std::stoi(bind_flag) == 1) {
59 for (
int i = 0; i < num_worker_; ++i) {
60 incoming_queue_[i]->SignalForKill();
61 outgoing_queue_[i]->SignalForKill();
66 void SubmitTask(
int tid, InputToken request) {
67 incoming_queue_[tid]->Push(request);
70 bool WaitForTask(
int tid, OutputToken* response) {
71 return outgoing_queue_[tid]->Pop(response);
76 std::vector<std::thread> thread_;
77 std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
78 std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
79 const TaskContext* context_;
82 inline void SetAffinity() {
85 SetThreadAffinityMask(GetCurrentThread(), 0x1);
86 for (
int i = 0; i < num_worker_; ++i) {
87 const int core_id = i + 1;
88 SetThreadAffinityMask(thread_[i].native_handle(), (1ULL << core_id));
90 #elif defined(__APPLE__) && defined(__MACH__) 91 #include <TargetConditionals.h> 92 #if TARGET_OS_MAC == 1 94 thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
95 thread_affinity_policy_data_t policy = {0};
96 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
97 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
98 for (
int i = 0; i < num_worker_; ++i) {
99 const int core_id = i + 1;
100 mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
102 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
103 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
106 #error "iPhone not supported yet" 113 #if defined(__ANDROID__) 114 sched_setaffinity(pthread_self(),
sizeof(cpu_set_t), &cpuset);
116 pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
118 for (
int i = 0; i < num_worker_; ++i) {
119 const int core_id = i + 1;
121 CPU_SET(core_id, &cpuset);
122 #if defined(__ANDROID__) 123 sched_setaffinity(thread_[i].native_handle(),
124 sizeof(cpu_set_t), &cpuset);
126 pthread_setaffinity_np(thread_[i].native_handle(),
127 sizeof(cpu_set_t), &cpuset);
137 #endif // TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.