diff options
| author | gingerBill <gingerBill@users.noreply.github.com> | 2023-01-11 22:14:53 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-01-11 22:14:53 +0000 |
| commit | 320062157f06d979db926fcbf407bbbdcc3028c1 (patch) | |
| tree | 770bb60802ff24cc66c8e5e5837819969dc84cd6 /src/thread_pool.cpp | |
| parent | 86511d44e46b6271b01df2cd1ebb83b5496e143c (diff) | |
| parent | d7d6608142c8e169a7856c9e5965619809653903 (diff) | |
Merge pull request #2288 from odin-lang/compiler-improvements-2023-01
Multithreading Compiler Improvements 2023-01
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 130 |
1 files changed, 61 insertions, 69 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 3565ef25a..2c369eaad 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -5,7 +5,7 @@ struct ThreadPool; gb_thread_local Thread *current_thread; -gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name); +gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize worker_count, char const *worker_name); gb_internal void thread_pool_destroy(ThreadPool *pool); gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); gb_internal void thread_pool_wait(ThreadPool *pool); @@ -16,18 +16,21 @@ struct ThreadPool { Slice<Thread> threads; std::atomic<bool> running; - BlockingMutex task_lock; - Condition tasks_available; + Futex tasks_available; Futex tasks_left; }; -gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { - mutex_init(&pool->task_lock); - condition_init(&pool->tasks_available); +gb_internal isize current_thread_index(void) { + return current_thread ? current_thread->idx : 0; +} +gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize worker_count, char const *worker_name) { pool->allocator = a; - slice_init(&pool->threads, a, thread_count + 1); + slice_init(&pool->threads, a, worker_count + 1); + + // NOTE: this needs to be initialized before any thread starts + pool->running.store(true, std::memory_order_seq_cst); // setup the main thread thread_init(pool, &pool->threads[0], 0); @@ -37,62 +40,55 @@ gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize Thread *t = &pool->threads[i]; thread_init_and_start(pool, t, i); } - - pool->running = true; } gb_internal void thread_pool_destroy(ThreadPool *pool) { - pool->running = false; + pool->running.store(false, std::memory_order_seq_cst); for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; - condition_broadcast(&pool->tasks_available); + pool->tasks_available.fetch_add(1, std::memory_order_relaxed); + futex_broadcast(&pool->tasks_available); thread_join_and_destroy(t); } - for_array(i, pool->threads) { - free(pool->threads[i].queue); - } gb_free(pool->allocator, pool->threads.data); - mutex_destroy(&pool->task_lock); - condition_destroy(&pool->tasks_available); } void thread_pool_queue_push(Thread *thread, WorkerTask task) { - uint64_t capture; - uint64_t new_capture; + u64 capture; + u64 new_capture; do { capture = thread->head_and_tail.load(); - uint64_t mask = thread->capacity - 1; - uint64_t head = (capture >> 32) & mask; - uint64_t tail = ((uint32_t)capture) & mask; + u64 mask = thread->capacity - 1; + u64 head = (capture >> 32) & mask; + u64 tail = ((u32)capture) & mask; - uint64_t new_head = (head + 1) & mask; - if (new_head == tail) { - GB_PANIC("Thread Queue Full!\n"); - } + u64 new_head = (head + 1) & mask; + GB_ASSERT_MSG(new_head != tail, "Thread Queue Full!"); // This *must* be done in here, to avoid a potential race condition where we no longer own the slot by the time we're assigning thread->queue[head] = task; new_capture = (new_head << 32) | tail; } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); - thread->pool->tasks_left.fetch_add(1); - condition_broadcast(&thread->pool->tasks_available); + thread->pool->tasks_left.fetch_add(1, std::memory_order_release); + thread->pool->tasks_available.fetch_add(1, std::memory_order_relaxed); + futex_broadcast(&thread->pool->tasks_available); } bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { - uint64_t capture; - uint64_t new_capture; + u64 capture; + u64 new_capture; do { - capture = thread->head_and_tail.load(); + capture = thread->head_and_tail.load(std::memory_order_acquire); - uint64_t mask = thread->capacity - 1; - uint64_t head = (capture >> 32) & mask; - uint64_t tail = ((uint32_t)capture) & mask; + u64 mask = thread->capacity - 1; + u64 head = (capture >> 32) & mask; + u64 tail = ((u32)capture) & mask; - uint64_t new_tail = (tail + 1) & mask; + u64 new_tail = (tail + 1) & mask; if (tail == head) { return false; } @@ -101,7 +97,7 @@ bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { *task = thread->queue[tail]; new_capture = (head << 32) | new_tail; - } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); + } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture, std::memory_order_release)); return true; } @@ -118,12 +114,11 @@ gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, vo gb_internal void thread_pool_wait(ThreadPool *pool) { WorkerTask task; - while (pool->tasks_left) { - + while (pool->tasks_left.load(std::memory_order_acquire)) { // if we've got tasks on our queue, run them while (thread_pool_queue_pop(current_thread, &task)) { task.do_work(task.data); - pool->tasks_left.fetch_sub(1); + pool->tasks_left.fetch_sub(1, std::memory_order_release); } @@ -131,12 +126,12 @@ gb_internal void thread_pool_wait(ThreadPool *pool) { // This *must* be executed in this order, so the futex wakes immediately // if rem_tasks has changed since we checked last, otherwise the program // will permanently sleep - Footex rem_tasks = pool->tasks_left.load(); - if (!rem_tasks) { - break; + Footex rem_tasks = pool->tasks_left.load(std::memory_order_acquire); + if (rem_tasks == 0) { + return; } - tpool_wait_on_addr(&pool->tasks_left, rem_tasks); + futex_wait(&pool->tasks_left, rem_tasks); } } @@ -144,56 +139,53 @@ gb_internal THREAD_PROC(thread_pool_thread_proc) { WorkerTask task; current_thread = thread; ThreadPool *pool = current_thread->pool; + // debugf("worker id: %td\n", current_thread->idx); - for (;;) { -work_start: - if (!pool->running) { - break; - } - + while (pool->running.load(std::memory_order_seq_cst)) { // If we've got tasks to process, work through them - size_t finished_tasks = 0; + usize finished_tasks = 0; + i32 state; + while (thread_pool_queue_pop(current_thread, &task)) { task.do_work(task.data); - pool->tasks_left.fetch_sub(1); + pool->tasks_left.fetch_sub(1, std::memory_order_release); finished_tasks += 1; } - if (finished_tasks > 0 && !pool->tasks_left) { - tpool_wake_addr(&pool->tasks_left); + if (finished_tasks > 0 && pool->tasks_left.load(std::memory_order_acquire) == 0) { + futex_signal(&pool->tasks_left); } // If there's still work somewhere and we don't have it, steal it - if (pool->tasks_left) { - isize idx = current_thread->idx; + if (pool->tasks_left.load(std::memory_order_acquire)) { + usize idx = cast(usize)current_thread->idx; for_array(i, pool->threads) { - if (!pool->tasks_left) { + if (pool->tasks_left.load(std::memory_order_acquire) == 0) { break; } - idx = (idx + 1) % pool->threads.count; - Thread *thread = &pool->threads[idx]; + idx = (idx + 1) % cast(usize)pool->threads.count; + Thread *thread = &pool->threads.data[idx]; WorkerTask task; - if (!thread_pool_queue_pop(thread, &task)) { - continue; - } + if (thread_pool_queue_pop(thread, &task)) { + task.do_work(task.data); + pool->tasks_left.fetch_sub(1, std::memory_order_release); - task.do_work(task.data); - pool->tasks_left.fetch_sub(1); + if (pool->tasks_left.load(std::memory_order_acquire) == 0) { + futex_signal(&pool->tasks_left); + } - if (!pool->tasks_left) { - tpool_wake_addr(&pool->tasks_left); + goto main_loop_continue; } - - goto work_start; } } // if we've done all our work, and there's nothing to steal, go to sleep - mutex_lock(&pool->task_lock); - condition_wait(&pool->tasks_available, &pool->task_lock); - mutex_unlock(&pool->task_lock); + state = pool->tasks_available.load(std::memory_order_acquire); + futex_wait(&pool->tasks_available, state); + + main_loop_continue:; } return 0; |