aboutsummaryrefslogtreecommitdiff
path: root/src/thread_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/thread_pool.cpp')
-rw-r--r--src/thread_pool.cpp36
1 files changed, 24 insertions, 12 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp
index 2b176db1c..da7e724a8 100644
--- a/src/thread_pool.cpp
+++ b/src/thread_pool.cpp
@@ -10,6 +10,12 @@ gb_internal void thread_pool_destroy(ThreadPool *pool);
gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data);
gb_internal void thread_pool_wait(ThreadPool *pool);
+enum GrabState {
+ GrabSuccess = 0,
+ GrabEmpty = 1,
+ GrabFailed = 2,
+};
+
struct ThreadPool {
gbAllocator threads_allocator;
Slice<Thread> threads;
@@ -82,7 +88,7 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) {
futex_broadcast(&thread->pool->tasks_available);
}
-bool thread_pool_queue_take(Thread *thread, WorkerTask *task) {
+GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) {
ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1;
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
thread->queue.bottom.store(bot, std::memory_order_relaxed);
@@ -98,28 +104,28 @@ bool thread_pool_queue_take(Thread *thread, WorkerTask *task) {
if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
// Race failed
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
- return false;
+ return GrabEmpty;
}
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
- return true;
+ return GrabSuccess;
}
// We got a task without hitting a race
- return true;
+ return GrabSuccess;
} else {
// Queue is empty
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
- return false;
+ return GrabEmpty;
}
}
-bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) {
+GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) {
ssize_t top = thread->queue.top.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire);
- bool ret = false;
+ GrabState ret = GrabEmpty;
if (top < bot) {
// Queue is not empty
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume);
@@ -127,9 +133,9 @@ bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) {
if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
// Race failed
- ret = false;
+ ret = GrabFailed;
} else {
- ret = true;
+ ret = GrabSuccess;
}
}
return ret;
@@ -149,7 +155,7 @@ gb_internal void thread_pool_wait(ThreadPool *pool) {
while (pool->tasks_left.load(std::memory_order_acquire)) {
// if we've got tasks on our queue, run them
- while (thread_pool_queue_take(current_thread, &task)) {
+ while (!thread_pool_queue_take(current_thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
}
@@ -178,7 +184,7 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) {
usize finished_tasks = 0;
i32 state;
- while (thread_pool_queue_take(current_thread, &task)) {
+ while (!thread_pool_queue_take(current_thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
@@ -200,7 +206,13 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) {
Thread *thread = &pool->threads.data[idx];
WorkerTask task;
- if (thread_pool_queue_steal(thread, &task)) {
+
+ GrabState ret = thread_pool_queue_steal(thread, &task);
+ if (ret == GrabFailed) {
+ goto main_loop_continue;
+ } else if (ret == GrabEmpty) {
+ continue;
+ } else if (ret == GrabSuccess) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);