From c15db051999e51f5c8ecc45fb084e6fb76c9b5c2 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Thu, 12 Jan 2023 12:41:53 +0000 Subject: Implement `MPSCQueue` --- src/queue.cpp | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) (limited to 'src/queue.cpp') diff --git a/src/queue.cpp b/src/queue.cpp index 8f279bb21..845f87310 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -1,3 +1,85 @@ +template +struct MPSCNode { + std::atomic *> next; + T value; +}; + +// +// Multiple Producer Single Consumer Lockless Queue +// URL: https://www.1024cores.net +// +template +struct MPSCQueue { + std::atomic *> head; + std::atomic *> tail; + std::atomic count; + MPSCNode sentinel; + gbAllocator allocator; +}; + +template gb_internal void mpsc_init (MPSCQueue *q, gbAllocator const &allocator); +template gb_internal void mpsc_destroy(MPSCQueue *q); +template gb_internal isize mpsc_enqueue(MPSCQueue *q, T const &value); +template gb_internal bool mpsc_dequeue(MPSCQueue *q, T *value_); +template gb_internal MPSCNode *mpsc_tail (MPSCQueue *q); + +template +gb_internal void mpsc_init(MPSCQueue *q, gbAllocator const &allocator) { + q->allocator = allocator; + q->count.store(0, std::memory_order_relaxed); + q->head.store(&q->sentinel, std::memory_order_relaxed); + q->tail.store(&q->sentinel, std::memory_order_relaxed); + q->sentinel.next.store(nullptr, std::memory_order_relaxed); +} + +template +gb_internal void mpsc_destroy(MPSCQueue *q) { + while (mpsc_dequeue(q, (T *)nullptr)) {} + // DO NOTHING for the time being +} + + +template +gb_internal MPSCNode *mpsc_alloc_node(MPSCQueue *q, T const &value) { + auto node = gb_alloc_item(q->allocator, MPSCNode); + node->value = value; + return node; +} + +template +gb_internal isize mpsc_enqueue(MPSCQueue *q, MPSCNode *node) { + node->next.store(nullptr, std::memory_order_relaxed); + auto prev = q->head.exchange(node, std::memory_order_acq_rel); + prev->next.store(node, std::memory_order_release); + isize count = 1 + q->count.fetch_add(1, std::memory_order_acq_rel); + return count; +} + +template +gb_internal isize mpsc_enqueue(MPSCQueue *q, T const &value) { + auto node = mpsc_alloc_node(q, value); + return mpsc_enqueue(q, node); +} + + +template +gb_internal bool mpsc_dequeue(MPSCQueue *q, T *value_) { + auto tail = q->tail.load(std::memory_order_relaxed); + auto next = tail->next.load(std::memory_order_relaxed); + if (next) { + q->tail.store(next, std::memory_order_relaxed); + // `tail` is now "dead" and needs to be "freed" + if (*value_) *value_ = next->value; + q->count.fetch_sub(1, std::memory_order_acq_rel); + return true; + } + return false; +} + +//////////////////////////// + + + #define MPMC_CACHE_LINE_SIZE 64 typedef std::atomic MPMCQueueAtomicIdx; -- cgit v1.2.3