7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 8 #define TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_ 24 template <
typename InputToken,
typename OutputToken,
typename TaskContext>
30 ThreadPool(
int num_worker,
const TaskContext* context, TaskFunc task)
31 : num_worker_(num_worker), context_(context), task_(task) {
32 CHECK(num_worker_ >= 0
33 && static_cast<unsigned>(num_worker_) < std::thread::hardware_concurrency())
34 <<
"Number of worker threads must be between 0 and " 35 << (std::thread::hardware_concurrency() - 1);
36 for (
int i = 0; i < num_worker_; ++i) {
38 outgoing_queue_.emplace_back(
new SpscQueue<OutputToken>());
40 thread_.resize(num_worker_);
41 for (
int i = 0; i < num_worker_; ++i) {
42 thread_[i] = std::thread(task_, incoming_queue_[i].
get(),
43 outgoing_queue_[i].
get(),
47 const char* bind_flag = getenv(
"TREELITE_BIND_THREADS");
48 if (bind_flag ==
nullptr || std::stoi(bind_flag) == 1) {
53 for (
int i = 0; i < num_worker_; ++i) {
54 incoming_queue_[i]->SignalForKill();
55 outgoing_queue_[i]->SignalForKill();
60 void SubmitTask(
int tid, InputToken request) {
61 incoming_queue_[tid]->Push(request);
64 bool WaitForTask(
int tid, OutputToken* response) {
65 return outgoing_queue_[tid]->Pop(response);
70 std::vector<std::thread> thread_;
71 std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
72 std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
73 const TaskContext* context_;
76 inline void SetAffinity() {
79 SetThreadAffinityMask(GetCurrentThread(), 0x1);
80 for (
int i = 0; i < num_worker_; ++i) {
81 const int core_id = i + 1;
82 SetThreadAffinityMask(thread_[i].native_handle(), (1ULL << core_id));
84 #elif defined(__APPLE__) && defined(__MACH__) 85 #include <TargetConditionals.h> 86 #if TARGET_OS_MAC == 1 88 thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
89 thread_affinity_policy_data_t policy = {0};
90 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
91 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
92 for (
int i = 0; i < num_worker_; ++i) {
93 const int core_id = i + 1;
94 mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
96 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
97 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
100 #error "iPhone not supported yet" 107 #if defined(__ANDROID__) 108 sched_setaffinity(pthread_self(),
sizeof(cpu_set_t), &cpuset);
110 pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
112 for (
int i = 0; i < num_worker_; ++i) {
113 const int core_id = i + 1;
115 CPU_SET(core_id, &cpuset);
116 #if defined(__ANDROID__) 117 sched_setaffinity(thread_[i].native_handle(),
118 sizeof(cpu_set_t), &cpuset);
120 pthread_setaffinity_np(thread_[i].native_handle(),
121 sizeof(cpu_set_t), &cpuset);
131 #endif // TREELITE_PREDICTOR_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.