7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 8 #define TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 22 template <
typename InputToken,
typename OutputToken,
typename TaskContext>
28 ThreadPool(
int num_worker,
const TaskContext* context, TaskFunc task)
29 : num_worker_(num_worker), context_(context), task_(task) {
30 CHECK(num_worker_ >= 0 && num_worker_ < std::thread::hardware_concurrency())
31 <<
"Number of worker threads must be between 0 and " 32 << (std::thread::hardware_concurrency() - 1);
33 for (
int i = 0; i < num_worker_; ++i) {
35 outgoing_queue_.emplace_back(
new SpscQueue<OutputToken>());
37 thread_.resize(num_worker_);
38 for (
int i = 0; i < num_worker_; ++i) {
39 thread_[i] = std::thread(task_, incoming_queue_[i].
get(),
40 outgoing_queue_[i].
get(),
44 const char* bind_flag = getenv(
"TREELITE_BIND_THREADS");
45 if (bind_flag ==
nullptr || std::atoi(bind_flag) == 1) {
50 for (
int i = 0; i < num_worker_; ++i) {
51 incoming_queue_[i]->SignalForKill();
52 outgoing_queue_[i]->SignalForKill();
57 void SubmitTask(
int tid, InputToken request) {
58 incoming_queue_[tid]->Push(request);
61 bool WaitForTask(
int tid, OutputToken* response) {
62 return outgoing_queue_[tid]->Pop(response);
67 std::vector<std::thread> thread_;
68 std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
69 std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
71 const TaskContext* context_;
73 inline void SetAffinity() {
76 SetThreadAffinityMask(GetCurrentThread(), 0x1);
77 for (
int i = 0; i < num_worker_; ++i) {
78 const int core_id = i + 1;
79 SetThreadAffinityMask(thread_[i].native_handle(), (1 << core_id));
81 #elif defined(__APPLE__) && defined(__MACH__) 82 #include <TargetConditionals.h> 83 #if TARGET_OS_MAC == 1 85 thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
86 thread_affinity_policy_data_t policy = {0};
87 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
88 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
89 for (
int i = 0; i < num_worker_; ++i) {
90 const int core_id = i + 1;
91 mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
93 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
94 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
97 #error "iPhone not supported yet" 104 #if defined(__ANDROID__) 105 sched_setaffinity(pthread_self(),
sizeof(cpu_set_t), &cpuset);
107 pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
109 for (
int i = 0; i < num_worker_; ++i) {
110 const int core_id = i + 1;
112 CPU_SET(core_id, &cpuset);
113 #if defined(__ANDROID__) 114 sched_setaffinity(thread_[i].native_handle(),
115 sizeof(cpu_set_t), &cpuset);
117 pthread_setaffinity_np(thread_[i].native_handle(),
118 sizeof(cpu_set_t), &cpuset);
127 #endif // TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.