Treelite
spsc_queue.h
Go to the documentation of this file.
1 
7 #ifndef TREELITE_PREDICTOR_THREAD_POOL_SPSC_QUEUE_H_
8 #define TREELITE_PREDICTOR_THREAD_POOL_SPSC_QUEUE_H_
9 
10 #include <treelite/logging.h>
11 #include <atomic>
12 #include <thread>
13 #include <mutex>
14 #include <condition_variable>
15 #include <cstdint>
16 
17 namespace treelite {
18 namespace predictor {
19 
20 const constexpr int kL1CacheBytes = 64;
21 
23 template <typename T>
24 class SpscQueue {
25  public:
26  SpscQueue() :
27  buffer_(new T[kRingSize]),
28  head_(0),
29  tail_(0) {
30  }
31 
32  ~SpscQueue() {
33  delete[] buffer_;
34  }
35 
36  void Push(const T& input) {
37  while (!Enqueue(input)) {
38  std::this_thread::yield();
39  }
40  if (pending_.fetch_add(1) == -1) {
41  std::unique_lock<std::mutex> lock(mutex_);
42  cv_.notify_one();
43  }
44  }
45 
46  bool Pop(T* output, uint32_t spin_count = 300000) {
47  // Busy wait a bit when the queue is empty.
48  // If a new element comes to the queue quickly, this wait avoid the worker
49  // from sleeping.
50  // The default spin count is set by following the typical omp convention
51  for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
52  std::this_thread::yield();
53  }
54  if (pending_.fetch_sub(1) == 0) {
55  std::unique_lock<std::mutex> lock(mutex_);
56  cv_.wait(lock, [this] {
57  return pending_.load() >= 0 || exit_now_.load();
58  });
59  }
60  if (exit_now_.load(std::memory_order_relaxed)) {
61  return false;
62  }
63  const uint32_t head = head_.load(std::memory_order_relaxed);
64  // sanity check if the queue is empty
65  TREELITE_CHECK(tail_.load(std::memory_order_acquire) != head);
66  *output = buffer_[head];
67  head_.store((head + 1) % kRingSize, std::memory_order_release);
68  return true;
69  }
70 
74  void SignalForKill() {
75  std::lock_guard<std::mutex> lock(mutex_);
76  exit_now_.store(true);
77  cv_.notify_all();
78  }
79 
80  protected:
81  bool Enqueue(const T& input) {
82  const uint32_t tail = tail_.load(std::memory_order_relaxed);
83 
84  if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
85  buffer_[tail] = input;
86  tail_.store((tail + 1) % kRingSize, std::memory_order_release);
87  return true;
88  }
89  return false;
90  }
91 
92  // the cache line paddings are used for avoid false sharing between atomic variables
93  typedef char cache_line_pad_t[kL1CacheBytes];
94  cache_line_pad_t pad0_;
95  // size of the queue, the queue can host size_ - 1 items at most
96  // define it as a constant for better compiler optimization
97  static constexpr const int kRingSize = 2;
98  // pointer to access the item
99  T* const buffer_;
100 
101  cache_line_pad_t pad1_;
102  // queue head, where one gets an element from the queue
103  std::atomic<uint32_t> head_;
104 
105  cache_line_pad_t pad2_;
106  // queue tail, when one puts an element to the queue
107  std::atomic<uint32_t> tail_;
108 
109  cache_line_pad_t pad3_;
110  // pending elements in the queue
111  std::atomic<int8_t> pending_{0};
112 
113  cache_line_pad_t pad4_;
114  // signal for exit now
115  std::atomic<bool> exit_now_{false};
116 
117  // internal mutex
118  std::mutex mutex_;
119  // cv for consumer
120  std::condition_variable cv_;
121 };
122 
123 } // namespace predictor
124 } // namespace treelite
125 
126 #endif // TREELITE_PREDICTOR_THREAD_POOL_SPSC_QUEUE_H_
Lock-free single-producer-single-consumer queue for each thread.
Definition: spsc_queue.h:24
void SignalForKill()
Signal to terminate the worker.
Definition: spsc_queue.h:74
logging facility for Treelite