7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_SPSC_QUEUE_H_ 8 #define TREELITE_PREDICTOR_THREAD_POOL_SPSC_QUEUE_H_ 10 #include <dmlc/logging.h> 14 #include <condition_variable> 20 const constexpr
int kL1CacheBytes = 64;
27 buffer_(
new T[kRingSize]),
36 void Push(
const T& input) {
37 while (!Enqueue(input)) {
38 std::this_thread::yield();
40 if (pending_.fetch_add(1) == -1) {
41 std::unique_lock<std::mutex> lock(mutex_);
46 bool Pop(T* output, uint32_t spin_count = 300000) {
51 for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
52 std::this_thread::yield();
54 if (pending_.fetch_sub(1) == 0) {
55 std::unique_lock<std::mutex> lock(mutex_);
56 cv_.wait(lock, [
this] {
57 return pending_.load() >= 0 || exit_now_.load();
60 if (exit_now_.load(std::memory_order_relaxed)) {
63 const uint32_t head = head_.load(std::memory_order_relaxed);
65 CHECK(tail_.load(std::memory_order_acquire) != head);
66 *output = buffer_[head];
67 head_.store((head + 1) % kRingSize, std::memory_order_release);
75 std::lock_guard<std::mutex> lock(mutex_);
76 exit_now_.store(
true);
81 bool Enqueue(
const T& input) {
82 const uint32_t tail = tail_.load(std::memory_order_relaxed);
84 if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
85 buffer_[tail] = input;
86 tail_.store((tail + 1) % kRingSize, std::memory_order_release);
93 typedef char cache_line_pad_t[kL1CacheBytes];
94 cache_line_pad_t pad0_;
97 static constexpr
const int kRingSize = 2;
101 cache_line_pad_t pad1_;
103 std::atomic<uint32_t> head_;
105 cache_line_pad_t pad2_;
107 std::atomic<uint32_t> tail_;
109 cache_line_pad_t pad3_;
111 std::atomic<int8_t> pending_{0};
113 cache_line_pad_t pad4_;
115 std::atomic<bool> exit_now_{
false};
120 std::condition_variable cv_;
126 #endif // TREELITE_PREDICTOR_THREAD_POOL_SPSC_QUEUE_H_ Lock-free single-producer-single-consumer queue for each thread.
void SignalForKill()
Signal to terminate the worker.