From cdede4928cbbe38e043f3a784020b2ed40c5470a Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Sat, 13 Jul 2024 23:16:22 -0700 Subject: move to a growing queue --- src/threading.cpp | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index 717dcb874..dda98631b 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -46,6 +46,18 @@ typedef struct WorkerTask { void *data; } WorkerTask; +typedef struct TaskRingBuffer { + std::atomic size; + std::atomic buffer; +} TaskRingBuffer; + +typedef struct TaskQueue { + std::atomic top; + std::atomic bottom; + + std::atomic ring; +} TaskQueue; + struct Thread { #if defined(GB_SYSTEM_WINDOWS) void *win32_handle; @@ -54,12 +66,9 @@ struct Thread { #endif isize idx; - - WorkerTask *queue; - size_t capacity; - std::atomic head_and_tail; - isize stack_size; + + struct TaskQueue queue; struct ThreadPool *pool; }; @@ -551,6 +560,18 @@ gb_internal void *internal_thread_proc(void *arg) { } #endif +TaskRingBuffer *taskring_init(ssize_t size) { + TaskRingBuffer *ring = (TaskRingBuffer *)gb_alloc(heap_allocator(), sizeof(TaskRingBuffer)); + ring->size = size; + ring->buffer = (WorkerTask *)gb_alloc_array(heap_allocator(), WorkerTask, ring->size); + return ring; +} + +void thread_queue_destroy(TaskQueue *q) { + gb_free(heap_allocator(), (*q->ring).buffer); + gb_free(heap_allocator(), q->ring); +} + gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { gb_zero_item(t); #if defined(GB_SYSTEM_WINDOWS) @@ -559,14 +580,12 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { t->posix_handle = 0; #endif - t->capacity = 1 << 14; // must be a power of 2 - t->queue = gb_alloc_array(heap_allocator(), WorkerTask, t->capacity); - t->head_and_tail = 0; + // Size must be a power of 2 + t->queue.ring = taskring_init(1 << 14); t->pool = pool; t->idx = idx; } - gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { thread_init(pool, t, idx); isize stack_size = 0; @@ -598,7 +617,7 @@ gb_internal void thread_join_and_destroy(Thread *t) { t->posix_handle = 0; #endif - gb_free(heap_allocator(), t->queue); + thread_queue_destroy(&t->queue); } gb_internal void thread_set_name(Thread *t, char const *name) { -- cgit v1.2.3 From 64feb7599e8ec01c2ec7c8d709df1cc70651c06b Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Sun, 14 Jul 2024 00:33:40 -0700 Subject: move to isize --- src/thread_pool.cpp | 18 +++++++++--------- src/threading.cpp | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) (limited to 'src/threading.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index da7e724a8..bf953ddd0 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -59,20 +59,20 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { gb_free(pool->threads_allocator, pool->threads.data); } -TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, ssize_t bottom, ssize_t top) { +TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, isize bottom, isize top) { TaskRingBuffer *new_ring = taskring_init(ring->size * 2); - for (ssize_t i = top; i < bottom; i++) { + for (isize i = top; i < bottom; i++) { new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size]; } return new_ring; } void thread_pool_queue_push(Thread *thread, WorkerTask task) { - ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed); - ssize_t top = thread->queue.top.load(std::memory_order_acquire); + isize bot = thread->queue.bottom.load(std::memory_order_relaxed); + isize top = thread->queue.top.load(std::memory_order_acquire); TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); - ssize_t size = bot - top; + isize size = bot - top; if (size > (cur_ring->size - 1)) { // Queue is full thread->queue.ring = taskring_grow(thread->queue.ring, bot, top); @@ -89,12 +89,12 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) { } GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) { - ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; + isize bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); thread->queue.bottom.store(bot, std::memory_order_relaxed); std::atomic_thread_fence(std::memory_order_seq_cst); - ssize_t top = thread->queue.top.load(std::memory_order_relaxed); + isize top = thread->queue.top.load(std::memory_order_relaxed); if (top <= bot) { // Queue is not empty @@ -121,9 +121,9 @@ GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) { } GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) { - ssize_t top = thread->queue.top.load(std::memory_order_acquire); + isize top = thread->queue.top.load(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_seq_cst); - ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire); + isize bot = thread->queue.bottom.load(std::memory_order_acquire); GrabState ret = GrabEmpty; if (top < bot) { diff --git a/src/threading.cpp b/src/threading.cpp index dda98631b..ac79efb05 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -47,13 +47,13 @@ typedef struct WorkerTask { } WorkerTask; typedef struct TaskRingBuffer { - std::atomic size; + std::atomic size; std::atomic buffer; } TaskRingBuffer; typedef struct TaskQueue { - std::atomic top; - std::atomic bottom; + std::atomic top; + std::atomic bottom; std::atomic ring; } TaskQueue; @@ -560,7 +560,7 @@ gb_internal void *internal_thread_proc(void *arg) { } #endif -TaskRingBuffer *taskring_init(ssize_t size) { +TaskRingBuffer *taskring_init(isize size) { TaskRingBuffer *ring = (TaskRingBuffer *)gb_alloc(heap_allocator(), sizeof(TaskRingBuffer)); ring->size = size; ring->buffer = (WorkerTask *)gb_alloc_array(heap_allocator(), WorkerTask, ring->size); -- cgit v1.2.3