diff options
| author | gingerBill <bill@gingerbill.org> | 2021-07-10 21:29:49 +0100 |
|---|---|---|
| committer | gingerBill <bill@gingerbill.org> | 2021-07-10 21:29:49 +0100 |
| commit | ed8a6f872dbcd8b195940dec40a0d86d59f11eaa (patch) | |
| tree | bbf4d7fc301a432583f8f2121742a83c1d4cc6af /src/thread_pool.cpp | |
| parent | 0a61d4bf2b2d6e8c8d0c92410f6dcfd2b6046f86 (diff) | |
Move things around for sanity checking for multithread preparation
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 40 |
1 files changed, 10 insertions, 30 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 73118321b..9e178b833 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -18,10 +18,7 @@ struct ThreadPool { gbAllocator allocator; - WorkerTask *tasks; - isize volatile task_head; - isize volatile task_tail; - isize volatile task_capacity; + MPMCQueue<WorkerTask> tasks; gbThread *threads; isize thread_count; @@ -39,10 +36,7 @@ GB_THREAD_PROC(worker_thread_internal); void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { pool->allocator = a; - pool->task_head = 0; - pool->task_tail = 0; - pool->task_capacity = 1024; - pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity); + mpmc_init(&pool->tasks, a, 1024); pool->thread_count = gb_max(thread_count, 0); pool->threads = gb_alloc_array(a, gbThread, pool->thread_count); gb_mutex_init(&pool->mutex); @@ -100,41 +94,27 @@ void thread_pool_destroy(ThreadPool *pool) { gb_mutex_destroy(&pool->mutex); gb_free(pool->allocator, pool->threads); pool->thread_count = 0; - gb_free(pool->allocator, pool->tasks); - pool->task_head = 0; - pool->task_tail = 0; - pool->task_capacity = 0; + mpmc_destroy(&pool->tasks); } void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { gb_mutex_lock(&pool->mutex); - if (pool->task_tail == pool->task_capacity) { - isize new_cap = 2*pool->task_capacity + 8; - WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap); - gb_memmove(new_tasks, pool->tasks, (pool->task_tail)*gb_size_of(WorkerTask)); - pool->tasks = new_tasks; - pool->task_capacity = new_cap; - } WorkerTask task = {}; task.do_work = proc; task.data = data; - pool->tasks[pool->task_tail++] = task; + mpmc_enqueue(&pool->tasks, task); gb_semaphore_post(&pool->sem_available, 1); gb_mutex_unlock(&pool->mutex); } bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { bool got_task = false; - if (gb_mutex_try_lock(&pool->mutex)) { - if (pool->task_tail > pool->task_head) { - gb_atomic32_fetch_add(&pool->processing_work_count, +1); - *task = pool->tasks[pool->task_head++]; - got_task = true; - } - gb_mutex_unlock(&pool->mutex); + if (mpmc_dequeue(&pool->tasks, task)) { + gb_atomic32_fetch_add(&pool->processing_work_count, +1); + got_task = true; } return got_task; } @@ -144,16 +124,16 @@ void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { } void thread_pool_wait_to_process(ThreadPool *pool) { - while (pool->task_tail > pool->task_head || gb_atomic32_load(&pool->processing_work_count) != 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { WorkerTask task = {}; if (thread_pool_try_and_pop_task(pool, &task)) { thread_pool_do_work(pool, &task); } // Safety-kick - if (pool->task_tail > pool->task_head && gb_atomic32_load(&pool->processing_work_count) == 0) { + while (pool->tasks.count.load(std::memory_order_relaxed) > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { gb_mutex_lock(&pool->mutex); - gb_semaphore_post(&pool->sem_available, cast(i32)(pool->task_tail-pool->task_head)); + gb_semaphore_post(&pool->sem_available, cast(i32)pool->tasks.count.load(std::memory_order_relaxed)); gb_mutex_unlock(&pool->mutex); } |