treelite
spsc_queue.h
Go to the documentation of this file.
1 
7 #ifndef TREELITE_THREAD_POOL_SPSC_QUEUE_H_
8 #define TREELITE_THREAD_POOL_SPSC_QUEUE_H_
9 
10 #include <dmlc/logging.h>
11 #include <atomic>
12 #include <thread>
13 #include <mutex>
14 #include <condition_variable>
15 #include <cstdint>
16 
17 const constexpr int kL1CacheBytes = 64;
18 
20 template <typename T>
21 class SpscQueue {
22  public:
23  SpscQueue() :
24  buffer_(new T[kRingSize]),
25  head_(0),
26  tail_(0) {
27  }
28 
29  ~SpscQueue() {
30  delete[] buffer_;
31  }
32 
33  void Push(const T& input) {
34  while (!Enqueue(input)) {
35  std::this_thread::yield();
36  }
37  if (pending_.fetch_add(1) == -1) {
38  std::unique_lock<std::mutex> lock(mutex_);
39  cv_.notify_one();
40  }
41  }
42 
43  bool Pop(T* output, uint32_t spin_count = 300000) {
44  // Busy wait a bit when the queue is empty.
45  // If a new element comes to the queue quickly, this wait avoid the worker
46  // from sleeping.
47  // The default spin count is set by following the typical omp convention
48  for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
49  std::this_thread::yield();
50  }
51  if (pending_.fetch_sub(1) == 0) {
52  std::unique_lock<std::mutex> lock(mutex_);
53  cv_.wait(lock, [this] {
54  return pending_.load() >= 0 || exit_now_.load();
55  });
56  }
57  if (exit_now_.load(std::memory_order_relaxed)) {
58  return false;
59  }
60  const uint32_t head = head_.load(std::memory_order_relaxed);
61  // sanity check if the queue is empty
62  CHECK(tail_.load(std::memory_order_acquire) != head);
63  *output = buffer_[head];
64  head_.store((head + 1) % kRingSize, std::memory_order_release);
65  return true;
66  }
67 
71  void SignalForKill() {
72  std::lock_guard<std::mutex> lock(mutex_);
73  exit_now_.store(true);
74  cv_.notify_all();
75  }
76 
77  protected:
78  bool Enqueue(const T& input) {
79  const uint32_t tail = tail_.load(std::memory_order_relaxed);
80 
81  if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
82  buffer_[tail] = input;
83  tail_.store((tail + 1) % kRingSize, std::memory_order_release);
84  return true;
85  }
86  return false;
87  }
88 
89  // the cache line paddings are used for avoid false sharing between atomic variables
90  typedef char cache_line_pad_t[kL1CacheBytes];
91  cache_line_pad_t pad0_;
92  // size of the queue, the queue can host size_ - 1 items at most
93  // define it as a constant for better compiler optimization
94  static constexpr const int kRingSize = 2;
95  // pointer to access the item
96  T* const buffer_;
97 
98  cache_line_pad_t pad1_;
99  // queue head, where one gets an element from the queue
100  std::atomic<uint32_t> head_;
101 
102  cache_line_pad_t pad2_;
103  // queue tail, when one puts an element to the queue
104  std::atomic<uint32_t> tail_;
105 
106  cache_line_pad_t pad3_;
107  // pending elements in the queue
108  std::atomic<int8_t> pending_{0};
109 
110  cache_line_pad_t pad4_;
111  // signal for exit now
112  std::atomic<bool> exit_now_{false};
113 
114  // internal mutex
115  std::mutex mutex_;
116  // cv for consumer
117  std::condition_variable cv_;
118 };
119 
120 #endif // TREELITE_THREAD_POOL_SPSC_QUEUE_H_
Lock-free single-producer-single-consumer queue for each thread.
Definition: spsc_queue.h:21
void SignalForKill()
Signal to terminate the worker.
Definition: spsc_queue.h:71