diff options
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 72 |
1 files changed, 34 insertions, 38 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 522b96d09..768a92645 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -16,8 +16,7 @@ struct ThreadPool { Slice<Thread> threads; std::atomic<bool> running; - BlockingMutex task_lock; - Condition tasks_available; + Futex tasks_available; Futex tasks_left; }; @@ -43,27 +42,25 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; - condition_broadcast(&pool->tasks_available); + pool->tasks_available.fetch_add(1, std::memory_order_release); + futex_broadcast(&pool->tasks_available); thread_join_and_destroy(t); } - for_array(i, pool->threads) { - free(pool->threads[i].queue); - } gb_free(pool->allocator, pool->threads.data); } void thread_pool_queue_push(Thread *thread, WorkerTask task) { - uint64_t capture; - uint64_t new_capture; + u64 capture; + u64 new_capture; do { capture = thread->head_and_tail.load(); - uint64_t mask = thread->capacity - 1; - uint64_t head = (capture >> 32) & mask; - uint64_t tail = ((uint32_t)capture) & mask; + u64 mask = thread->capacity - 1; + u64 head = (capture >> 32) & mask; + u64 tail = ((u32)capture) & mask; - uint64_t new_head = (head + 1) & mask; + u64 new_head = (head + 1) & mask; if (new_head == tail) { GB_PANIC("Thread Queue Full!\n"); } @@ -73,21 +70,22 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) { new_capture = (new_head << 32) | tail; } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); - thread->pool->tasks_left.fetch_add(1); - condition_broadcast(&thread->pool->tasks_available); + thread->pool->tasks_left.fetch_add(1, std::memory_order_release); + thread->pool->tasks_available.fetch_add(1, std::memory_order_release); + futex_broadcast(&thread->pool->tasks_available); } bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { - uint64_t capture; - uint64_t new_capture; + u64 capture; + u64 new_capture; do { capture = thread->head_and_tail.load(); - uint64_t mask = thread->capacity - 1; - uint64_t head = (capture >> 32) & mask; - uint64_t tail = ((uint32_t)capture) & mask; + u64 mask = thread->capacity - 1; + u64 head = (capture >> 32) & mask; + u64 tail = ((u32)capture) & mask; - uint64_t new_tail = (tail + 1) & mask; + u64 new_tail = (tail + 1) & mask; if (tail == head) { return false; } @@ -113,12 +111,11 @@ gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, vo gb_internal void thread_pool_wait(ThreadPool *pool) { WorkerTask task; - while (pool->tasks_left) { - + while (pool->tasks_left.load()) { // if we've got tasks on our queue, run them while (thread_pool_queue_pop(current_thread, &task)) { task.do_work(task.data); - pool->tasks_left.fetch_sub(1); + pool->tasks_left.fetch_sub(1, std::memory_order_release); } @@ -127,8 +124,8 @@ gb_internal void thread_pool_wait(ThreadPool *pool) { // if rem_tasks has changed since we checked last, otherwise the program // will permanently sleep Footex rem_tasks = pool->tasks_left.load(); - if (!rem_tasks) { - break; + if (rem_tasks == 0) { + return; } futex_wait(&pool->tasks_left, rem_tasks); @@ -147,37 +144,37 @@ work_start: } // If we've got tasks to process, work through them - size_t finished_tasks = 0; + usize finished_tasks = 0; while (thread_pool_queue_pop(current_thread, &task)) { task.do_work(task.data); - pool->tasks_left.fetch_sub(1); + pool->tasks_left.fetch_sub(1, std::memory_order_release); finished_tasks += 1; } - if (finished_tasks > 0 && !pool->tasks_left) { + if (finished_tasks > 0 && pool->tasks_left.load() == 0) { futex_signal(&pool->tasks_left); } // If there's still work somewhere and we don't have it, steal it - if (pool->tasks_left) { - isize idx = current_thread->idx; + if (pool->tasks_left.load()) { + usize idx = cast(usize)current_thread->idx; for_array(i, pool->threads) { - if (!pool->tasks_left) { + if (pool->tasks_left.load() == 0) { break; } - idx = (idx + 1) % pool->threads.count; - Thread *thread = &pool->threads[idx]; + idx = (idx + 1) % cast(usize)pool->threads.count; + Thread *thread = &pool->threads.data[idx]; WorkerTask task; if (!thread_pool_queue_pop(thread, &task)) { continue; } task.do_work(task.data); - pool->tasks_left.fetch_sub(1); + pool->tasks_left.fetch_sub(1, std::memory_order_release); - if (!pool->tasks_left) { + if (pool->tasks_left.load() == 0) { futex_signal(&pool->tasks_left); } @@ -186,9 +183,8 @@ work_start: } // if we've done all our work, and there's nothing to steal, go to sleep - mutex_lock(&pool->task_lock); - condition_wait(&pool->tasks_available, &pool->task_lock); - mutex_unlock(&pool->task_lock); + i32 state = pool->tasks_available.load(); + futex_wait(&pool->tasks_available, state); } return 0; |