diff options
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 35bc5b7d2..be4c3122b 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -11,10 +11,10 @@ struct WorkerTask { struct ThreadPool { - BlockingMutex mutex; - gbSemaphore sem_available; - gbAtomic32 processing_work_count; - bool is_running; + BlockingMutex mutex; + Semaphore sem_available; + std::atomic<i32> processing_work_count; + bool is_running; gbAllocator allocator; @@ -40,7 +40,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count pool->thread_count = gb_max(thread_count, 0); pool->threads = gb_alloc_array(a, gbThread, pool->thread_count); mutex_init(&pool->mutex); - gb_semaphore_init(&pool->sem_available); + semaphore_init(&pool->sem_available); pool->is_running = true; pool->worker_prefix_len = 0; @@ -76,7 +76,7 @@ void thread_pool_start(ThreadPool *pool) { void thread_pool_join(ThreadPool *pool) { pool->is_running = false; - gb_semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); + semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); gb_yield(); @@ -90,7 +90,7 @@ void thread_pool_join(ThreadPool *pool) { void thread_pool_destroy(ThreadPool *pool) { thread_pool_join(pool); - gb_semaphore_destroy(&pool->sem_available); + semaphore_destroy(&pool->sem_available); mutex_destroy(&pool->mutex); gb_free(pool->allocator, pool->threads); pool->thread_count = 0; @@ -106,21 +106,21 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { task.data = data; mpmc_enqueue(&pool->tasks, task); - gb_semaphore_post(&pool->sem_available, 1); + semaphore_post(&pool->sem_available, 1); mutex_unlock(&pool->mutex); } bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { bool got_task = false; if (mpmc_dequeue(&pool->tasks, task)) { - gb_atomic32_fetch_add(&pool->processing_work_count, +1); + pool->processing_work_count.fetch_add(1); got_task = true; } return got_task; } void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { task->result = task->do_work(task->data); - gb_atomic32_fetch_add(&pool->processing_work_count, -1); + pool->processing_work_count.fetch_sub(1); } void thread_pool_wait_to_process(ThreadPool *pool) { @@ -131,16 +131,16 @@ void thread_pool_wait_to_process(ThreadPool *pool) { } return; } - while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || pool->processing_work_count.load() != 0) { WorkerTask task = {}; if (thread_pool_try_and_pop_task(pool, &task)) { thread_pool_do_work(pool, &task); } // Safety-kick - while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && pool->processing_work_count.load() == 0) { mutex_lock(&pool->mutex); - gb_semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); + semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); mutex_unlock(&pool->mutex); } @@ -154,7 +154,7 @@ void thread_pool_wait_to_process(ThreadPool *pool) { GB_THREAD_PROC(worker_thread_internal) { ThreadPool *pool = cast(ThreadPool *)thread->user_data; while (pool->is_running) { - gb_semaphore_wait(&pool->sem_available); + semaphore_wait(&pool->sem_available); WorkerTask task = {}; if (thread_pool_try_and_pop_task(pool, &task)) { @@ -162,7 +162,7 @@ GB_THREAD_PROC(worker_thread_internal) { } } // Cascade - gb_semaphore_release(&pool->sem_available); + semaphore_release(&pool->sem_available); return 0; } |