7 #ifndef TREELITE_THREAD_POOL_SPSC_QUEUE_H_ 8 #define TREELITE_THREAD_POOL_SPSC_QUEUE_H_ 13 #include <condition_variable> 15 #include <dmlc/logging.h> 17 const constexpr
int kL1CacheBytes = 64;
24 buffer_(
new T[kRingSize]),
33 void Push(
const T& input) {
34 while (!Enqueue(input)) {
35 std::this_thread::yield();
37 if (pending_.fetch_add(1) == -1) {
38 std::unique_lock<std::mutex> lock(mutex_);
43 bool Pop(T* output, uint32_t spin_count = 300000) {
48 for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
49 std::this_thread::yield();
51 if (pending_.fetch_sub(1) == 0) {
52 std::unique_lock<std::mutex> lock(mutex_);
53 cv_.wait(lock, [
this] {
54 return pending_.load() >= 0 || exit_now_.load();
57 if (exit_now_.load(std::memory_order_relaxed)) {
60 const uint32_t head = head_.load(std::memory_order_relaxed);
62 CHECK(tail_.load(std::memory_order_acquire) != head);
63 *output = buffer_[head];
64 head_.store((head + 1) % kRingSize, std::memory_order_release);
72 std::lock_guard<std::mutex> lock(mutex_);
73 exit_now_.store(
true);
78 bool Enqueue(
const T& input) {
79 const uint32_t tail = tail_.load(std::memory_order_relaxed);
81 if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
82 buffer_[tail] = input;
83 tail_.store((tail + 1) % kRingSize, std::memory_order_release);
90 typedef char cache_line_pad_t[kL1CacheBytes];
91 cache_line_pad_t pad0_;
94 static constexpr
const int kRingSize = 2;
98 cache_line_pad_t pad1_;
100 std::atomic<uint32_t> head_;
102 cache_line_pad_t pad2_;
104 std::atomic<uint32_t> tail_;
106 cache_line_pad_t pad3_;
108 std::atomic<int8_t> pending_{0};
110 cache_line_pad_t pad4_;
112 std::atomic<bool> exit_now_{
false};
117 std::condition_variable cv_;
120 #endif // TREELITE_THREAD_POOL_SPSC_QUEUE_H_ Lock-free single-producer-single-consumer queue for each thread.
void SignalForKill()
Signal to terminate the worker.