From cdede4928cbbe38e043f3a784020b2ed40c5470a Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Sat, 13 Jul 2024 23:16:22 -0700 Subject: move to a growing queue --- src/thread_pool.cpp | 112 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 39 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 5dbbe37c4..2b176db1c 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -16,7 +16,6 @@ struct ThreadPool { std::atomic running; Futex tasks_available; - Futex tasks_left; }; @@ -46,7 +45,7 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; - pool->tasks_available.fetch_add(1, std::memory_order_relaxed); + pool->tasks_available.fetch_add(1, std::memory_order_acquire); futex_broadcast(&pool->tasks_available); thread_join_and_destroy(t); } @@ -54,51 +53,86 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { gb_free(pool->threads_allocator, pool->threads.data); } -void thread_pool_queue_push(Thread *thread, WorkerTask task) { - u64 capture; - u64 new_capture; - do { - capture = thread->head_and_tail.load(); - - u64 mask = thread->capacity - 1; - u64 head = (capture >> 32) & mask; - u64 tail = ((u32)capture) & mask; +TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, ssize_t bottom, ssize_t top) { + TaskRingBuffer *new_ring = taskring_init(ring->size * 2); + for (ssize_t i = top; i < bottom; i++) { + new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size]; + } + return new_ring; +} - u64 new_head = (head + 1) & mask; - GB_ASSERT_MSG(new_head != tail, "Thread Queue Full!"); +void thread_pool_queue_push(Thread *thread, WorkerTask task) { + ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed); + ssize_t top = thread->queue.top.load(std::memory_order_acquire); + TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); + + ssize_t size = bot - top; + if (size > (cur_ring->size - 1)) { + // Queue is full + thread->queue.ring = taskring_grow(thread->queue.ring, bot, top); + cur_ring = thread->queue.ring.load(std::memory_order_relaxed); + } - // This *must* be done in here, to avoid a potential race condition where we no longer own the slot by the time we're assigning - thread->queue[head] = task; - new_capture = (new_head << 32) | tail; - } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); + cur_ring->buffer[bot % cur_ring->size] = task; + std::atomic_thread_fence(std::memory_order_release); + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); thread->pool->tasks_left.fetch_add(1, std::memory_order_release); thread->pool->tasks_available.fetch_add(1, std::memory_order_relaxed); futex_broadcast(&thread->pool->tasks_available); } -bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { - u64 capture; - u64 new_capture; - do { - capture = thread->head_and_tail.load(std::memory_order_acquire); - - u64 mask = thread->capacity - 1; - u64 head = (capture >> 32) & mask; - u64 tail = ((u32)capture) & mask; +bool thread_pool_queue_take(Thread *thread, WorkerTask *task) { + ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; + TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); + thread->queue.bottom.store(bot, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + + ssize_t top = thread->queue.top.load(std::memory_order_relaxed); + if (top <= bot) { + + // Queue is not empty + *task = cur_ring->buffer[bot % cur_ring->size]; + if (top == bot) { + // Only one entry left in queue + if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { + // Race failed + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); + return false; + } - u64 new_tail = (tail + 1) & mask; - if (tail == head) { - return false; + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); + return true; } - // Making a copy of the task before we increment the tail, avoiding the same potential race condition as above - *task = thread->queue[tail]; - - new_capture = (head << 32) | new_tail; - } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture, std::memory_order_release)); + // We got a task without hitting a race + return true; + } else { + // Queue is empty + thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); + return false; + } +} - return true; +bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) { + ssize_t top = thread->queue.top.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_seq_cst); + ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire); + + bool ret = false; + if (top < bot) { + // Queue is not empty + TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume); + *task = cur_ring->buffer[top % cur_ring->size]; + + if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { + // Race failed + ret = false; + } else { + ret = true; + } + } + return ret; } gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { @@ -115,12 +149,11 @@ gb_internal void thread_pool_wait(ThreadPool *pool) { while (pool->tasks_left.load(std::memory_order_acquire)) { // if we've got tasks on our queue, run them - while (thread_pool_queue_pop(current_thread, &task)) { + while (thread_pool_queue_take(current_thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); } - // is this mem-barriered enough? // This *must* be executed in this order, so the futex wakes immediately // if rem_tasks has changed since we checked last, otherwise the program @@ -145,7 +178,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { usize finished_tasks = 0; i32 state; - while (thread_pool_queue_pop(current_thread, &task)) { + while (thread_pool_queue_take(current_thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); @@ -167,7 +200,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { Thread *thread = &pool->threads.data[idx]; WorkerTask task; - if (thread_pool_queue_pop(thread, &task)) { + if (thread_pool_queue_steal(thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); @@ -182,6 +215,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { // if we've done all our work, and there's nothing to steal, go to sleep state = pool->tasks_available.load(std::memory_order_acquire); + if (!pool->running) { break; } futex_wait(&pool->tasks_available, state); main_loop_continue:; -- cgit v1.2.3 From 4420128dc1d15775d1f56d47b858d8ffe75e4b9f Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Sun, 14 Jul 2024 00:29:58 -0700 Subject: handle steal-fail vs steal-empty --- src/thread_pool.cpp | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 2b176db1c..da7e724a8 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -10,6 +10,12 @@ gb_internal void thread_pool_destroy(ThreadPool *pool); gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); gb_internal void thread_pool_wait(ThreadPool *pool); +enum GrabState { + GrabSuccess = 0, + GrabEmpty = 1, + GrabFailed = 2, +}; + struct ThreadPool { gbAllocator threads_allocator; Slice threads; @@ -82,7 +88,7 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) { futex_broadcast(&thread->pool->tasks_available); } -bool thread_pool_queue_take(Thread *thread, WorkerTask *task) { +GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) { ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); thread->queue.bottom.store(bot, std::memory_order_relaxed); @@ -98,28 +104,28 @@ bool thread_pool_queue_take(Thread *thread, WorkerTask *task) { if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { // Race failed thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); - return false; + return GrabEmpty; } thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); - return true; + return GrabSuccess; } // We got a task without hitting a race - return true; + return GrabSuccess; } else { // Queue is empty thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); - return false; + return GrabEmpty; } } -bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) { +GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) { ssize_t top = thread->queue.top.load(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_seq_cst); ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire); - bool ret = false; + GrabState ret = GrabEmpty; if (top < bot) { // Queue is not empty TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume); @@ -127,9 +133,9 @@ bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) { if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { // Race failed - ret = false; + ret = GrabFailed; } else { - ret = true; + ret = GrabSuccess; } } return ret; @@ -149,7 +155,7 @@ gb_internal void thread_pool_wait(ThreadPool *pool) { while (pool->tasks_left.load(std::memory_order_acquire)) { // if we've got tasks on our queue, run them - while (thread_pool_queue_take(current_thread, &task)) { + while (!thread_pool_queue_take(current_thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); } @@ -178,7 +184,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { usize finished_tasks = 0; i32 state; - while (thread_pool_queue_take(current_thread, &task)) { + while (!thread_pool_queue_take(current_thread, &task)) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); @@ -200,7 +206,13 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { Thread *thread = &pool->threads.data[idx]; WorkerTask task; - if (thread_pool_queue_steal(thread, &task)) { + + GrabState ret = thread_pool_queue_steal(thread, &task); + if (ret == GrabFailed) { + goto main_loop_continue; + } else if (ret == GrabEmpty) { + continue; + } else if (ret == GrabSuccess) { task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); -- cgit v1.2.3 From 64feb7599e8ec01c2ec7c8d709df1cc70651c06b Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Sun, 14 Jul 2024 00:33:40 -0700 Subject: move to isize --- src/thread_pool.cpp | 18 +++++++++--------- src/threading.cpp | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index da7e724a8..bf953ddd0 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -59,20 +59,20 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { gb_free(pool->threads_allocator, pool->threads.data); } -TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, ssize_t bottom, ssize_t top) { +TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, isize bottom, isize top) { TaskRingBuffer *new_ring = taskring_init(ring->size * 2); - for (ssize_t i = top; i < bottom; i++) { + for (isize i = top; i < bottom; i++) { new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size]; } return new_ring; } void thread_pool_queue_push(Thread *thread, WorkerTask task) { - ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed); - ssize_t top = thread->queue.top.load(std::memory_order_acquire); + isize bot = thread->queue.bottom.load(std::memory_order_relaxed); + isize top = thread->queue.top.load(std::memory_order_acquire); TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); - ssize_t size = bot - top; + isize size = bot - top; if (size > (cur_ring->size - 1)) { // Queue is full thread->queue.ring = taskring_grow(thread->queue.ring, bot, top); @@ -89,12 +89,12 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) { } GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) { - ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; + isize bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1; TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed); thread->queue.bottom.store(bot, std::memory_order_relaxed); std::atomic_thread_fence(std::memory_order_seq_cst); - ssize_t top = thread->queue.top.load(std::memory_order_relaxed); + isize top = thread->queue.top.load(std::memory_order_relaxed); if (top <= bot) { // Queue is not empty @@ -121,9 +121,9 @@ GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) { } GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) { - ssize_t top = thread->queue.top.load(std::memory_order_acquire); + isize top = thread->queue.top.load(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_seq_cst); - ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire); + isize bot = thread->queue.bottom.load(std::memory_order_acquire); GrabState ret = GrabEmpty; if (top < bot) { diff --git a/src/threading.cpp b/src/threading.cpp index dda98631b..ac79efb05 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -47,13 +47,13 @@ typedef struct WorkerTask { } WorkerTask; typedef struct TaskRingBuffer { - std::atomic size; + std::atomic size; std::atomic buffer; } TaskRingBuffer; typedef struct TaskQueue { - std::atomic top; - std::atomic bottom; + std::atomic top; + std::atomic bottom; std::atomic ring; } TaskQueue; @@ -560,7 +560,7 @@ gb_internal void *internal_thread_proc(void *arg) { } #endif -TaskRingBuffer *taskring_init(ssize_t size) { +TaskRingBuffer *taskring_init(isize size) { TaskRingBuffer *ring = (TaskRingBuffer *)gb_alloc(heap_allocator(), sizeof(TaskRingBuffer)); ring->size = size; ring->buffer = (WorkerTask *)gb_alloc_array(heap_allocator(), WorkerTask, ring->size); -- cgit v1.2.3 From d1450e3d880ec70b306dc735f7f694f265348ef1 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sun, 14 Jul 2024 14:44:22 +0100 Subject: Fix styling issues --- src/thread_pool.cpp | 39 ++++++++++++++++++++------------------- src/threading.cpp | 12 ++++++------ 2 files changed, 26 insertions(+), 25 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index bf953ddd0..62cca6de6 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -11,14 +11,14 @@ gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, vo gb_internal void thread_pool_wait(ThreadPool *pool); enum GrabState { - GrabSuccess = 0, - GrabEmpty = 1, - GrabFailed = 2, + Grab_Success = 0, + Grab_Empty = 1, + Grab_Failed = 2, }; struct ThreadPool { - gbAllocator threads_allocator; - Slice threads; + gbAllocator threads_allocator; + Slice threads; std::atomic running; Futex tasks_available; @@ -59,8 +59,8 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { gb_free(pool->threads_allocator, pool->threads.data); } -TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, isize bottom, isize top) { - TaskRingBuffer *new_ring = taskring_init(ring->size * 2); +TaskRingBuffer *task_ring_grow(TaskRingBuffer *ring, isize bottom, isize top) { + TaskRingBuffer *new_ring = task_ring_init(ring->size * 2); for (isize i = top; i < bottom; i++) { new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size]; } @@ -75,7 +75,7 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) { isize size = bot - top; if (size > (cur_ring->size - 1)) { // Queue is full - thread->queue.ring = taskring_grow(thread->queue.ring, bot, top); + thread->queue.ring = task_ring_grow(thread->queue.ring, bot, top); cur_ring = thread->queue.ring.load(std::memory_order_relaxed); } @@ -104,19 +104,19 @@ GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) { if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { // Race failed thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); - return GrabEmpty; + return Grab_Empty; } thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); - return GrabSuccess; + return Grab_Success; } // We got a task without hitting a race - return GrabSuccess; + return Grab_Success; } else { // Queue is empty thread->queue.bottom.store(bot + 1, std::memory_order_relaxed); - return GrabEmpty; + return Grab_Empty; } } @@ -125,7 +125,7 @@ GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) { std::atomic_thread_fence(std::memory_order_seq_cst); isize bot = thread->queue.bottom.load(std::memory_order_acquire); - GrabState ret = GrabEmpty; + GrabState ret = Grab_Empty; if (top < bot) { // Queue is not empty TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume); @@ -133,9 +133,9 @@ GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) { if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) { // Race failed - ret = GrabFailed; + ret = Grab_Failed; } else { - ret = GrabSuccess; + ret = Grab_Success; } } return ret; @@ -208,11 +208,10 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { WorkerTask task; GrabState ret = thread_pool_queue_steal(thread, &task); - if (ret == GrabFailed) { - goto main_loop_continue; - } else if (ret == GrabEmpty) { + switch (ret) { + case Grab_Empty: continue; - } else if (ret == GrabSuccess) { + case Grab_Success: task.do_work(task.data); pool->tasks_left.fetch_sub(1, std::memory_order_release); @@ -220,6 +219,8 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { futex_signal(&pool->tasks_left); } + /*fallthrough*/ + case Grab_Failed: goto main_loop_continue; } } diff --git a/src/threading.cpp b/src/threading.cpp index ac79efb05..ff0fdfcde 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -66,9 +66,9 @@ struct Thread { #endif isize idx; - isize stack_size; + isize stack_size; - struct TaskQueue queue; + struct TaskQueue queue; struct ThreadPool *pool; }; @@ -560,10 +560,10 @@ gb_internal void *internal_thread_proc(void *arg) { } #endif -TaskRingBuffer *taskring_init(isize size) { - TaskRingBuffer *ring = (TaskRingBuffer *)gb_alloc(heap_allocator(), sizeof(TaskRingBuffer)); +TaskRingBuffer *task_ring_init(isize size) { + TaskRingBuffer *ring = gb_alloc_item(heap_allocator(), TaskRingBuffer); ring->size = size; - ring->buffer = (WorkerTask *)gb_alloc_array(heap_allocator(), WorkerTask, ring->size); + ring->buffer = gb_alloc_array(heap_allocator(), WorkerTask, ring->size); return ring; } @@ -581,7 +581,7 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { #endif // Size must be a power of 2 - t->queue.ring = taskring_init(1 << 14); + t->queue.ring = task_ring_init(1 << 14); t->pool = pool; t->idx = idx; } -- cgit v1.2.3