aboutsummaryrefslogtreecommitdiff
path: root/src/thread_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/thread_pool.cpp')
-rw-r--r--src/thread_pool.cpp72
1 files changed, 34 insertions, 38 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp
index 522b96d09..768a92645 100644
--- a/src/thread_pool.cpp
+++ b/src/thread_pool.cpp
@@ -16,8 +16,7 @@ struct ThreadPool {
Slice<Thread> threads;
std::atomic<bool> running;
- BlockingMutex task_lock;
- Condition tasks_available;
+ Futex tasks_available;
Futex tasks_left;
};
@@ -43,27 +42,25 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) {
for_array_off(i, 1, pool->threads) {
Thread *t = &pool->threads[i];
- condition_broadcast(&pool->tasks_available);
+ pool->tasks_available.fetch_add(1, std::memory_order_release);
+ futex_broadcast(&pool->tasks_available);
thread_join_and_destroy(t);
}
- for_array(i, pool->threads) {
- free(pool->threads[i].queue);
- }
gb_free(pool->allocator, pool->threads.data);
}
void thread_pool_queue_push(Thread *thread, WorkerTask task) {
- uint64_t capture;
- uint64_t new_capture;
+ u64 capture;
+ u64 new_capture;
do {
capture = thread->head_and_tail.load();
- uint64_t mask = thread->capacity - 1;
- uint64_t head = (capture >> 32) & mask;
- uint64_t tail = ((uint32_t)capture) & mask;
+ u64 mask = thread->capacity - 1;
+ u64 head = (capture >> 32) & mask;
+ u64 tail = ((u32)capture) & mask;
- uint64_t new_head = (head + 1) & mask;
+ u64 new_head = (head + 1) & mask;
if (new_head == tail) {
GB_PANIC("Thread Queue Full!\n");
}
@@ -73,21 +70,22 @@ void thread_pool_queue_push(Thread *thread, WorkerTask task) {
new_capture = (new_head << 32) | tail;
} while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture));
- thread->pool->tasks_left.fetch_add(1);
- condition_broadcast(&thread->pool->tasks_available);
+ thread->pool->tasks_left.fetch_add(1, std::memory_order_release);
+ thread->pool->tasks_available.fetch_add(1, std::memory_order_release);
+ futex_broadcast(&thread->pool->tasks_available);
}
bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) {
- uint64_t capture;
- uint64_t new_capture;
+ u64 capture;
+ u64 new_capture;
do {
capture = thread->head_and_tail.load();
- uint64_t mask = thread->capacity - 1;
- uint64_t head = (capture >> 32) & mask;
- uint64_t tail = ((uint32_t)capture) & mask;
+ u64 mask = thread->capacity - 1;
+ u64 head = (capture >> 32) & mask;
+ u64 tail = ((u32)capture) & mask;
- uint64_t new_tail = (tail + 1) & mask;
+ u64 new_tail = (tail + 1) & mask;
if (tail == head) {
return false;
}
@@ -113,12 +111,11 @@ gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, vo
gb_internal void thread_pool_wait(ThreadPool *pool) {
WorkerTask task;
- while (pool->tasks_left) {
-
+ while (pool->tasks_left.load()) {
// if we've got tasks on our queue, run them
while (thread_pool_queue_pop(current_thread, &task)) {
task.do_work(task.data);
- pool->tasks_left.fetch_sub(1);
+ pool->tasks_left.fetch_sub(1, std::memory_order_release);
}
@@ -127,8 +124,8 @@ gb_internal void thread_pool_wait(ThreadPool *pool) {
// if rem_tasks has changed since we checked last, otherwise the program
// will permanently sleep
Footex rem_tasks = pool->tasks_left.load();
- if (!rem_tasks) {
- break;
+ if (rem_tasks == 0) {
+ return;
}
futex_wait(&pool->tasks_left, rem_tasks);
@@ -147,37 +144,37 @@ work_start:
}
// If we've got tasks to process, work through them
- size_t finished_tasks = 0;
+ usize finished_tasks = 0;
while (thread_pool_queue_pop(current_thread, &task)) {
task.do_work(task.data);
- pool->tasks_left.fetch_sub(1);
+ pool->tasks_left.fetch_sub(1, std::memory_order_release);
finished_tasks += 1;
}
- if (finished_tasks > 0 && !pool->tasks_left) {
+ if (finished_tasks > 0 && pool->tasks_left.load() == 0) {
futex_signal(&pool->tasks_left);
}
// If there's still work somewhere and we don't have it, steal it
- if (pool->tasks_left) {
- isize idx = current_thread->idx;
+ if (pool->tasks_left.load()) {
+ usize idx = cast(usize)current_thread->idx;
for_array(i, pool->threads) {
- if (!pool->tasks_left) {
+ if (pool->tasks_left.load() == 0) {
break;
}
- idx = (idx + 1) % pool->threads.count;
- Thread *thread = &pool->threads[idx];
+ idx = (idx + 1) % cast(usize)pool->threads.count;
+ Thread *thread = &pool->threads.data[idx];
WorkerTask task;
if (!thread_pool_queue_pop(thread, &task)) {
continue;
}
task.do_work(task.data);
- pool->tasks_left.fetch_sub(1);
+ pool->tasks_left.fetch_sub(1, std::memory_order_release);
- if (!pool->tasks_left) {
+ if (pool->tasks_left.load() == 0) {
futex_signal(&pool->tasks_left);
}
@@ -186,9 +183,8 @@ work_start:
}
// if we've done all our work, and there's nothing to steal, go to sleep
- mutex_lock(&pool->task_lock);
- condition_wait(&pool->tasks_available, &pool->task_lock);
- mutex_unlock(&pool->task_lock);
+ i32 state = pool->tasks_available.load();
+ futex_wait(&pool->tasks_available, state);
}
return 0;