7 #ifndef TREELITE_THREAD_POOL_THREAD_POOL_H_ 8 #define TREELITE_THREAD_POOL_THREAD_POOL_H_ 10 #include <treelite/common.h> 23 template <
typename InputToken,
typename OutputToken,
typename TaskContext>
29 ThreadPool(
int num_worker,
const TaskContext* context, TaskFunc task)
30 : num_worker_(num_worker), context_(context), task_(task) {
31 CHECK(num_worker_ >= 0 && num_worker_ < std::thread::hardware_concurrency())
32 <<
"Number of worker threads must be between 0 and " 33 << (std::thread::hardware_concurrency() - 1);
34 for (
int i = 0; i < num_worker_; ++i) {
36 outgoing_queue_.emplace_back(common::make_unique<SpscQueue<OutputToken>>());
38 thread_.resize(num_worker_);
39 for (
int i = 0; i < num_worker_; ++i) {
40 thread_[i] = std::thread(task_, incoming_queue_[i].
get(),
41 outgoing_queue_[i].
get(),
45 const char* bind_flag = getenv(
"TREELITE_BIND_THREADS");
46 if (bind_flag ==
nullptr || std::atoi(bind_flag) == 1) {
51 for (
int i = 0; i < num_worker_; ++i) {
52 incoming_queue_[i]->SignalForKill();
53 outgoing_queue_[i]->SignalForKill();
58 void SubmitTask(
int tid, InputToken request) {
59 incoming_queue_[tid]->Push(request);
62 bool WaitForTask(
int tid, OutputToken* response) {
63 return outgoing_queue_[tid]->Pop(response);
68 std::vector<std::thread> thread_;
69 std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
70 std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
72 const TaskContext* context_;
74 inline void SetAffinity() {
77 SetThreadAffinityMask(GetCurrentThread(), 0x1);
78 for (
int i = 0; i < num_worker_; ++i) {
79 const int core_id = i + 1;
80 SetThreadAffinityMask(thread_[i].native_handle(), (1 << core_id));
82 #elif defined(__APPLE__) && defined(__MACH__) 83 #include <TargetConditionals.h> 84 #if TARGET_OS_MAC == 1 86 thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
87 thread_affinity_policy_data_t policy = {0};
88 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
89 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
90 for (
int i = 0; i < num_worker_; ++i) {
91 const int core_id = i + 1;
92 mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
94 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
95 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
98 #error "iPhone not supported yet" 105 #if defined(__ANDROID__) 106 sched_setaffinity(pthread_self(),
sizeof(cpu_set_t), &cpuset);
108 pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
110 for (
int i = 0; i < num_worker_; ++i) {
111 const int core_id = i + 1;
113 CPU_SET(core_id, &cpuset);
114 #if defined(__ANDROID__) 115 sched_setaffinity(thread_[i].native_handle(),
116 sizeof(cpu_set_t), &cpuset);
118 pthread_setaffinity_np(thread_[i].native_handle(),
119 sizeof(cpu_set_t), &cpuset);
128 #endif // TREELITE_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.