diff options
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 71 |
1 files changed, 46 insertions, 25 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 1e82983c7..b0357996c 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -19,10 +19,15 @@ struct ThreadPool { WorkerTask *task_queue; std::atomic<isize> ready; + std::atomic<bool> stop; + }; +THREAD_PROC(thread_pool_thread_proc); + void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { pool->allocator = a; + pool->stop = false; mutex_init(&pool->mutex); condition_init(&pool->task_cond); @@ -31,9 +36,15 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count Thread *t = &pool->threads[i]; thread_init(t); } + + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_start(t, thread_pool_thread_proc, pool); + } } void thread_pool_destroy(ThreadPool *pool) { + pool->stop = true; condition_broadcast(&pool->task_cond); for_array(i, pool->threads) { @@ -41,20 +52,20 @@ void thread_pool_destroy(ThreadPool *pool) { thread_join(t); } - for_array(i, pool->threads) { Thread *t = &pool->threads[i]; thread_destroy(t); } gb_free(pool->allocator, pool->threads.data); - condition_destroy(&pool->task_cond); mutex_destroy(&pool->mutex); + condition_destroy(&pool->task_cond); } bool thread_pool_queue_empty(ThreadPool *pool) { return pool->task_queue == nullptr; } + WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { GB_ASSERT(pool->task_queue != nullptr); WorkerTask *task = pool->task_queue; @@ -80,30 +91,34 @@ bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { task->data = data; thread_pool_queue_push(pool, task); + GB_ASSERT(pool->ready >= 0); pool->ready++; mutex_unlock(&pool->mutex); condition_signal(&pool->task_cond); return true; } -THREAD_PROC(thread_pool_thread_proc) { - ThreadPool *pool = cast(ThreadPool *)thread->user_data; - + +void thread_pool_do_task(WorkerTask *task) { + task->do_work(task->data); +} + +void thread_pool_wait(ThreadPool *pool) { for (;;) { mutex_lock(&pool->mutex); - - while (pool->ready > 0 && thread_pool_queue_empty(pool)) { + + while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) { condition_wait(&pool->task_cond, &pool->mutex); } - if (pool->ready == 0 && thread_pool_queue_empty(pool)) { + if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) { mutex_unlock(&pool->mutex); - return 0; + return; } - + WorkerTask *task = thread_pool_queue_pop(pool); mutex_unlock(&pool->mutex); - task->do_work(task->data); + thread_pool_do_task(task); if (--pool->ready == 0) { condition_broadcast(&pool->task_cond); } @@ -111,20 +126,26 @@ THREAD_PROC(thread_pool_thread_proc) { } -void thread_pool_wait(ThreadPool *pool) { - for_array(i, pool->threads) { - Thread *t = &pool->threads[i]; - thread_start(t, thread_pool_thread_proc, pool); - } +THREAD_PROC(thread_pool_thread_proc) { + ThreadPool *pool = cast(ThreadPool *)thread->user_data; - Thread dummy = {}; - dummy.proc = thread_pool_thread_proc; - dummy.user_data = pool; - thread_pool_thread_proc(&dummy); + for (;;) { + mutex_lock(&pool->mutex); + + while (!pool->stop && thread_pool_queue_empty(pool)) { + condition_wait(&pool->task_cond, &pool->mutex); + } + if (pool->stop && thread_pool_queue_empty(pool)) { + mutex_unlock(&pool->mutex); + return 0; + } + + WorkerTask *task = thread_pool_queue_pop(pool); + mutex_unlock(&pool->mutex); - for_array(i, pool->threads) { - Thread *t = &pool->threads[i]; - thread_join(t); + thread_pool_do_task(task); + if (--pool->ready == 0) { + condition_broadcast(&pool->task_cond); + } } -} - +}
\ No newline at end of file |