diff options
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 191 |
1 files changed, 131 insertions, 60 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index b29fcc5d9..a8bc327e5 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -4,85 +4,156 @@ typedef WORKER_TASK_PROC(WorkerTaskProc); struct WorkerTask { - WorkerTask *next_task; + WorkerTask * next; WorkerTaskProc *do_work; - void *data; + void * data; }; struct ThreadPool { - std::atomic<isize> outstanding_task_count; - WorkerTask *volatile next_task; - BlockingMutex task_list_mutex; - isize thread_count; + gbAllocator allocator; + BlockingMutex mutex; + Condition task_cond; + + Slice<Thread> threads; + + WorkerTask *task_queue; + + std::atomic<isize> ready; + std::atomic<bool> stop; + }; -void thread_pool_thread_entry(ThreadPool *pool) { - while (pool->outstanding_task_count) { - if (!pool->next_task) { - yield(); // No need to grab the mutex. - } else { - mutex_lock(&pool->task_list_mutex); - - if (pool->next_task) { - WorkerTask *task = pool->next_task; - pool->next_task = task->next_task; - mutex_unlock(&pool->task_list_mutex); - task->do_work(task->data); - pool->outstanding_task_count.fetch_sub(1); - gb_free(heap_allocator(), task); - } else { - mutex_unlock(&pool->task_list_mutex); - } - } +THREAD_PROC(thread_pool_thread_proc); + +void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { + pool->allocator = a; + pool->stop = false; + mutex_init(&pool->mutex); + condition_init(&pool->task_cond); + + slice_init(&pool->threads, a, thread_count); + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_init(t); + } + + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_start(t, thread_pool_thread_proc, pool); } } -#if defined(GB_SYSTEM_WINDOWS) - DWORD __stdcall thread_pool_thread_entry_platform(void *arg) { - thread_pool_thread_entry((ThreadPool *) arg); - return 0; - } +void thread_pool_destroy(ThreadPool *pool) { + pool->stop = true; + condition_broadcast(&pool->task_cond); - void thread_pool_start_thread(ThreadPool *pool) { - CloseHandle(CreateThread(NULL, 0, thread_pool_thread_entry_platform, pool, 0, NULL)); + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_join(t); } -#else - void *thread_pool_thread_entry_platform(void *arg) { - thread_pool_thread_entry((ThreadPool *) arg); - return NULL; + + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_destroy(t); } + + gb_free(pool->allocator, pool->threads.data); + mutex_destroy(&pool->mutex); + condition_destroy(&pool->task_cond); +} - void thread_pool_start_thread(ThreadPool *pool) { - pthread_t handle; - pthread_create(&handle, NULL, thread_pool_thread_entry_platform, pool); - pthread_detach(handle); - } -#endif +bool thread_pool_queue_empty(ThreadPool *pool) { + return pool->task_queue == nullptr; +} -void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { - memset(pool, 0, sizeof(ThreadPool)); - mutex_init(&pool->task_list_mutex); - pool->thread_count = thread_count; +WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { + GB_ASSERT(pool->task_queue != nullptr); + WorkerTask *task = pool->task_queue; + pool->task_queue = task->next; + return task; +} +void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { + GB_ASSERT(task != nullptr); + task->next = pool->task_queue; + pool->task_queue = task; } -void thread_pool_destroy(ThreadPool *pool) { - mutex_destroy(&pool->task_list_mutex); +bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { + GB_ASSERT(proc != nullptr); + mutex_lock(&pool->mutex); + WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask); + if (task == nullptr) { + mutex_unlock(&pool->mutex); + GB_PANIC("Out of memory"); + return false; + } + task->do_work = proc; + task->data = data; + + thread_pool_queue_push(pool, task); + GB_ASSERT(pool->ready >= 0); + pool->ready++; + mutex_unlock(&pool->mutex); + condition_signal(&pool->task_cond); + return true; +} + + +void thread_pool_do_task(WorkerTask *task) { + task->do_work(task->data); } void thread_pool_wait(ThreadPool *pool) { - for (int i = 0; i < pool->thread_count; i++) { - thread_pool_start_thread(pool); + if (pool->threads.count == 0) { + while (!thread_pool_queue_empty(pool)) { + thread_pool_do_task(thread_pool_queue_pop(pool)); + --pool->ready; + } + GB_ASSERT(pool->ready == 0); + return; } - thread_pool_thread_entry(pool); -} + for (;;) { + mutex_lock(&pool->mutex); -void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - WorkerTask *task = gb_alloc_item(heap_allocator(), WorkerTask); - task->do_work = proc; - task->data = data; - mutex_lock(&pool->task_list_mutex); - task->next_task = pool->next_task; - pool->next_task = task; - pool->outstanding_task_count.fetch_add(1); - mutex_unlock(&pool->task_list_mutex); + while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) { + condition_wait(&pool->task_cond, &pool->mutex); + } + if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) { + mutex_unlock(&pool->mutex); + return; + } + + WorkerTask *task = thread_pool_queue_pop(pool); + mutex_unlock(&pool->mutex); + + thread_pool_do_task(task); + if (--pool->ready == 0) { + condition_broadcast(&pool->task_cond); + } + } } + + +THREAD_PROC(thread_pool_thread_proc) { + ThreadPool *pool = cast(ThreadPool *)thread->user_data; + + for (;;) { + mutex_lock(&pool->mutex); + + while (!pool->stop && thread_pool_queue_empty(pool)) { + condition_wait(&pool->task_cond, &pool->mutex); + } + if (pool->stop && thread_pool_queue_empty(pool)) { + mutex_unlock(&pool->mutex); + return 0; + } + + WorkerTask *task = thread_pool_queue_pop(pool); + mutex_unlock(&pool->mutex); + + thread_pool_do_task(task); + if (--pool->ready == 0) { + condition_broadcast(&pool->task_cond); + } + } +}
\ No newline at end of file |