diff options
Diffstat (limited to 'src/queue.cpp')
| -rw-r--r-- | src/queue.cpp | 75 |
1 files changed, 71 insertions, 4 deletions
diff --git a/src/queue.cpp b/src/queue.cpp index 92d83a76c..33ba5af28 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -46,7 +46,7 @@ void mpmc_destroy(MPMCQueue<T> *q) { template <typename T> -bool mpmc_enqueue(MPMCQueue<T> *q, T const &data) { +isize mpmc_enqueue(MPMCQueue<T> *q, T const &data) { isize head_idx = q->head_idx.load(std::memory_order_relaxed); for (;;) { @@ -59,8 +59,7 @@ bool mpmc_enqueue(MPMCQueue<T> *q, T const &data) { if (q->head_idx.compare_exchange_weak(head_idx, next_head_idx)) { node->data = data; node->idx.store(next_head_idx, std::memory_order_release); - q->count.fetch_add(1, std::memory_order_release); - return true; + return q->count.fetch_add(1, std::memory_order_release); } } else if (diff < 0) { gb_mutex_lock(&q->mutex); @@ -70,7 +69,7 @@ bool mpmc_enqueue(MPMCQueue<T> *q, T const &data) { if (q->buffer.data == nullptr) { GB_PANIC("Unable to resize enqueue: %td -> %td", old_size, new_size); gb_mutex_unlock(&q->mutex); - return false; + return -1; } for (isize i = old_size; i < new_size; i++) { q->buffer.data[i].idx.store(i, std::memory_order_relaxed); @@ -108,3 +107,71 @@ bool mpmc_dequeue(MPMCQueue<T> *q, T *data_) { } } } + + +template <typename T> +struct MPSCQueueNode { + std::atomic<MPSCQueueNode<T> *> next; + T data; +}; + +template <typename T> +struct MPSCQueue { + gbAllocator allocator; + + std::atomic<isize> count; + std::atomic<MPSCQueueNode<T> *> head; + std::atomic<MPSCQueueNode<T> *> tail; +}; + +template <typename T> +void mpsc_init(MPSCQueue<T> *q, gbAllocator a) { + using Node = MPSCQueueNode<T>; + + q->allocator = a; + Node *front = cast(Node *)gb_alloc_align(q->allocator, gb_size_of(Node), 64); + front->next.store(nullptr, std::memory_order_relaxed); + q->head.store(front, std::memory_order_relaxed); + q->tail.store(front, std::memory_order_relaxed); +} + + +template <typename T> +isize mpsc_enqueue(MPSCQueue<T> *q, T const &value) { + using Node = MPSCQueueNode<T>; + + Node *node = cast(Node *)gb_alloc_align(q->allocator, gb_size_of(Node), 64); + node->data = value; + node->next.store(nullptr, std::memory_order_relaxed); + + auto *prev_head = q->head.exchange(node, std::memory_order_acq_rel); + prev_head->next.store(node, std::memory_order_release); + return q->count.fetch_add(1, std::memory_order_release); +} + +template <typename T> +bool mpsc_dequeue(MPSCQueue<T> *q, T *value_) { + auto *tail = q->tail.load(std::memory_order_relaxed); + auto *next = tail->next.load(std::memory_order_acquire); + if (next == nullptr) { + return false; + } + + if (value_) *value_ = next->data; + q->tail.store(next, std::memory_order_release); + q->count.fetch_sub(1, std::memory_order_release); + gb_free(q->allocator, tail); + return true; +} + + +template <typename T> +void mpsc_destroy(MPSCQueue<T> *q) { + T output = {}; + while (mpsc_dequeue(q, &output)) { + // okay + } + auto *front = q->head.load(std::memory_order_relaxed); + gb_free(q->allocator, front); +} + |