7 #ifndef TREELITE_THREAD_POOL_THREAD_POOL_H_ 8 #define TREELITE_THREAD_POOL_THREAD_POOL_H_ 21 template <
typename InputToken,
typename OutputToken,
typename TaskContext>
27 ThreadPool(
int num_worker,
const TaskContext* context, TaskFunc task)
28 : num_worker_(num_worker), context_(context), task_(task) {
30 && num_worker_ + 1 <= std::thread::hardware_concurrency())
31 <<
"Number of worker threads must be between 1 and " 32 << std::thread::hardware_concurrency() - 1;
33 LOG(INFO) <<
"new thread pool with " << num_worker_ <<
" worker threads";
34 for (
int i = 0; i < num_worker_; ++i) {
36 outgoing_queue_.emplace_back(common::make_unique<SpscQueue<OutputToken>>());
38 thread_.resize(num_worker_);
39 for (
int i = 0; i < num_worker_; ++i) {
40 thread_[i] = std::thread(task_, incoming_queue_[i].
get(),
41 outgoing_queue_[i].
get(),
48 LOG(INFO) <<
"delete thread pool";
49 for (
int i = 0; i < num_worker_; ++i) {
50 incoming_queue_[i]->SignalForKill();
51 outgoing_queue_[i]->SignalForKill();
56 void SubmitTask(
int tid, InputToken request) {
57 incoming_queue_[tid]->Push(request);
60 bool WaitForTask(
int tid, OutputToken* response) {
61 return outgoing_queue_[tid]->Pop(response);
66 std::vector<std::thread> thread_;
67 std::vector<std::unique_ptr<SpscQueue<InputToken>>> incoming_queue_;
68 std::vector<std::unique_ptr<SpscQueue<OutputToken>>> outgoing_queue_;
70 const TaskContext* context_;
72 inline void SetAffinity() {
75 SetThreadAffinityMask(GetCurrentThread(), 0x1);
76 for (
int i = 0; i < num_worker_; ++i) {
77 const int core_id = i + 1;
78 SetThreadAffinityMask(thread_[i].native_handle(), (1 << core_id));
80 #elif defined(__APPLE__) && defined(__MACH__) 81 #include <TargetConditionals.h> 82 #if TARGET_OS_MAC == 1 84 thread_port_t mach_thread = pthread_mach_thread_np(pthread_self());
85 thread_affinity_policy_data_t policy = {0};
86 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
87 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
88 for (
int i = 0; i < num_worker_; ++i) {
89 const int core_id = i + 1;
90 mach_thread = pthread_mach_thread_np(thread_[i].native_handle());
92 thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY,
93 (thread_policy_t)&policy, THREAD_AFFINITY_POLICY_COUNT);
96 #error "iPhone not supported yet" 103 pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
104 for (
int i = 0; i < num_worker_; ++i) {
105 const int core_id = i + 1;
107 CPU_SET(core_id, &cpuset);
108 pthread_setaffinity_np(thread_[i].native_handle(),
109 sizeof(cpu_set_t), &cpuset);
117 #endif // TREELITE_THREAD_POOL_THREAD_POOL_H_
Lock-free single-producer-single-consumer queue.