From c93872cc1371d60863e2dae6c08f556f32dd5a8a Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sun, 1 Sep 2019 22:57:53 +0100 Subject: Thread pool fixes --- src/thread_pool.cpp | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 src/thread_pool.cpp (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp new file mode 100644 index 000000000..67e698e5d --- /dev/null +++ b/src/thread_pool.cpp @@ -0,0 +1,169 @@ +// worker_queue.cpp + +#define WORKER_TASK_PROC(name) isize name(void *data) +typedef WORKER_TASK_PROC(WorkerTaskProc); + +struct WorkerTask { + WorkerTaskProc *do_work; + void *data; +}; + + +struct ThreadPool { + gbMutex task_mutex; + gbMutex mutex; + gbSemaphore semaphore; + gbAtomic32 processing_work_count; + bool is_running; + + Array tasks; + Array threads; + + gbAllocator original_allocator; + + char worker_prefix[10]; + i32 worker_prefix_len; +}; + + +GB_ALLOCATOR_PROC(thread_pool_allocator_proc) { + ThreadPool *pool = cast(ThreadPool *)allocator_data; + return pool->original_allocator.proc(pool->original_allocator.data, type, size, 256, old_memory, old_size, flags); +} + +gbAllocator thread_pool_allocator(ThreadPool *pool) { + gbAllocator a = {thread_pool_allocator_proc, pool}; + return a; +} + +void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix = nullptr); +void thread_pool_destroy(ThreadPool *pool); +void thread_pool_start(ThreadPool *pool); +void thread_pool_join(ThreadPool *pool); +void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); +void thread_pool_kick(ThreadPool *pool); +void thread_pool_kick_and_wait(ThreadPool *pool); +GB_THREAD_PROC(worker_thread_internal); + +void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { + pool->original_allocator = a; + gbAllocator tpa = thread_pool_allocator(pool); + pool->tasks = array_make(tpa, 0, 1024); + pool->threads = array_make(tpa, thread_count); + gb_mutex_init(&pool->task_mutex); + gb_mutex_init(&pool->mutex); + gb_semaphore_init(&pool->semaphore); + pool->is_running = true; + + pool->worker_prefix_len = 0; + if (worker_prefix) { + i32 worker_prefix_len = cast(i32)gb_strlen(worker_prefix); + worker_prefix_len = gb_min(worker_prefix_len, 10); + gb_memmove(pool->worker_prefix, worker_prefix, worker_prefix_len); + pool->worker_prefix_len = worker_prefix_len; + } + + for_array(i, pool->threads) { + gbThread *t = &pool->threads[i]; + gb_thread_init(t); + t->user_index = i; + if (pool->worker_prefix_len > 0) { + char worker_name[16] = {}; + gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i); + gb_thread_set_name(t, worker_name); + } + } +} + +void thread_pool_start(ThreadPool *pool) { + for_array(i, pool->threads) { + gbThread *t = &pool->threads[i]; + gb_thread_start(t, worker_thread_internal, pool); + } +} + +void thread_pool_join(ThreadPool *pool) { + pool->is_running = false; + + for_array(i, pool->threads) { + gb_semaphore_release(&pool->semaphore); + } + + for_array(i, pool->threads) { + gbThread *t = &pool->threads[i]; + gb_thread_join(t); + } +} + + +void thread_pool_destroy(ThreadPool *pool) { + thread_pool_join(pool); + + gb_semaphore_destroy(&pool->semaphore); + gb_mutex_destroy(&pool->mutex); + gb_mutex_destroy(&pool->task_mutex); + array_free(&pool->threads); + array_free(&pool->tasks); +} + + +void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { + gb_mutex_lock(&pool->task_mutex); + + WorkerTask task = {}; + task.do_work = proc; + task.data = data; + array_add(&pool->tasks, task); + + gb_mutex_unlock(&pool->task_mutex); + + gb_semaphore_post(&pool->semaphore, 1); +} + +void thread_pool_kick(ThreadPool *pool) { + if (pool->tasks.count > 0) { + isize count = gb_min(pool->tasks.count, pool->threads.count); + for (isize i = 0; i < count; i++) { + gb_semaphore_post(&pool->semaphore, 1); + } + } + +} +void thread_pool_kick_and_wait(ThreadPool *pool) { + thread_pool_kick(pool); + + isize return_value = 0; + while (pool->tasks.count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { + gb_yield(); + } + + thread_pool_join(pool); +} + + +GB_THREAD_PROC(worker_thread_internal) { + ThreadPool *pool = cast(ThreadPool *)thread->user_data; + thread->return_value = 0; + while (pool->is_running) { + gb_semaphore_wait(&pool->semaphore); + + WorkerTask task = {}; + bool got_task = false; + + if (gb_mutex_try_lock(&pool->task_mutex)) { + if (pool->tasks.count > 0) { + gb_atomic32_fetch_add(&pool->processing_work_count, +1); + task = array_pop(&pool->tasks); + got_task = true; + } + gb_mutex_unlock(&pool->task_mutex); + } + + if (got_task) { + thread->return_value = task.do_work(task.data); + gb_atomic32_fetch_add(&pool->processing_work_count, -1); + } + } + return thread->return_value; +} + -- cgit v1.2.3 From 723f351a6d36193cb36c74c40b40befa3c4302f1 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sun, 1 Sep 2019 23:13:29 +0100 Subject: Remove custom allocator for thread pool --- src/thread_pool.cpp | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 67e698e5d..32fe3b82f 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -19,23 +19,10 @@ struct ThreadPool { Array tasks; Array threads; - gbAllocator original_allocator; - char worker_prefix[10]; i32 worker_prefix_len; }; - -GB_ALLOCATOR_PROC(thread_pool_allocator_proc) { - ThreadPool *pool = cast(ThreadPool *)allocator_data; - return pool->original_allocator.proc(pool->original_allocator.data, type, size, 256, old_memory, old_size, flags); -} - -gbAllocator thread_pool_allocator(ThreadPool *pool) { - gbAllocator a = {thread_pool_allocator_proc, pool}; - return a; -} - void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix = nullptr); void thread_pool_destroy(ThreadPool *pool); void thread_pool_start(ThreadPool *pool); @@ -46,10 +33,8 @@ void thread_pool_kick_and_wait(ThreadPool *pool); GB_THREAD_PROC(worker_thread_internal); void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { - pool->original_allocator = a; - gbAllocator tpa = thread_pool_allocator(pool); - pool->tasks = array_make(tpa, 0, 1024); - pool->threads = array_make(tpa, thread_count); + pool->tasks = array_make(a, 0, 1024); + pool->threads = array_make(a, thread_count); gb_mutex_init(&pool->task_mutex); gb_mutex_init(&pool->mutex); gb_semaphore_init(&pool->semaphore); -- cgit v1.2.3 From 6d614ef07caf89939b1757bd225ebd0b8c88fa78 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sun, 1 Sep 2019 23:16:01 +0100 Subject: Remove thread naming on thread pool --- src/thread_pool.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 32fe3b82f..83178ea47 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -52,11 +52,13 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count gbThread *t = &pool->threads[i]; gb_thread_init(t); t->user_index = i; + #if 0 if (pool->worker_prefix_len > 0) { char worker_name[16] = {}; gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i); gb_thread_set_name(t, worker_name); } + #endif } } -- cgit v1.2.3 From 1348d8a8cdefcb02be6ad9346ecbf24a4635fe0c Mon Sep 17 00:00:00 2001 From: gingerBill Date: Mon, 2 Sep 2019 18:49:23 +0100 Subject: Improve thread pool (volatile hints, etc) --- src/gb/gb.h | 8 ++++---- src/parser.cpp | 2 +- src/thread_pool.cpp | 57 +++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 45 insertions(+), 22 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/gb/gb.h b/src/gb/gb.h index 65b8b2ff6..1b2bc5188 100644 --- a/src/gb/gb.h +++ b/src/gb/gb.h @@ -978,10 +978,10 @@ typedef struct gbThread { pthread_t posix_handle; #endif - gbThreadProc *proc; - void * user_data; - isize user_index; - isize return_value; + gbThreadProc * proc; + void * user_data; + isize user_index; + isize volatile return_value; gbSemaphore semaphore; isize stack_size; diff --git a/src/parser.cpp b/src/parser.cpp index e4d21e72a..a026e8ecd 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4822,7 +4822,7 @@ ParseFileError parse_packages(Parser *p, String init_filename) { thread_pool_kick_and_wait(&parser_thread_pool); // NOTE(bill): Get the last error and use that - for (isize i = parser_thread_pool.threads.count-1; i >= 0; i--) { + for (isize i = parser_thread_pool.thread_count-1; i >= 0; i--) { gbThread *t = &parser_thread_pool.threads[i]; ParseFileError err = cast(ParseFileError)t->return_value; if (err != ParseFile_None) { diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 83178ea47..bbe9ccac6 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -16,8 +16,14 @@ struct ThreadPool { gbAtomic32 processing_work_count; bool is_running; - Array tasks; - Array threads; + gbAllocator allocator; + + WorkerTask *tasks; + isize volatile task_count; + isize volatile task_capacity; + + gbThread *threads; + isize thread_count; char worker_prefix[10]; i32 worker_prefix_len; @@ -33,8 +39,12 @@ void thread_pool_kick_and_wait(ThreadPool *pool); GB_THREAD_PROC(worker_thread_internal); void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { - pool->tasks = array_make(a, 0, 1024); - pool->threads = array_make(a, thread_count); + pool->allocator = a; + pool->task_count = 0; + pool->task_capacity = 1024; + pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity); + pool->threads = gb_alloc_array(a, gbThread, thread_count); + pool->thread_count = thread_count; gb_mutex_init(&pool->task_mutex); gb_mutex_init(&pool->mutex); gb_semaphore_init(&pool->semaphore); @@ -48,7 +58,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count pool->worker_prefix_len = worker_prefix_len; } - for_array(i, pool->threads) { + for (isize i = 0; i < pool->thread_count; i++) { gbThread *t = &pool->threads[i]; gb_thread_init(t); t->user_index = i; @@ -63,7 +73,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count } void thread_pool_start(ThreadPool *pool) { - for_array(i, pool->threads) { + for (isize i = 0; i < pool->thread_count; i++) { gbThread *t = &pool->threads[i]; gb_thread_start(t, worker_thread_internal, pool); } @@ -72,11 +82,11 @@ void thread_pool_start(ThreadPool *pool) { void thread_pool_join(ThreadPool *pool) { pool->is_running = false; - for_array(i, pool->threads) { + for (isize i = 0; i < pool->thread_count; i++) { gb_semaphore_release(&pool->semaphore); } - for_array(i, pool->threads) { + for (isize i = 0; i < pool->thread_count; i++) { gbThread *t = &pool->threads[i]; gb_thread_join(t); } @@ -89,18 +99,30 @@ void thread_pool_destroy(ThreadPool *pool) { gb_semaphore_destroy(&pool->semaphore); gb_mutex_destroy(&pool->mutex); gb_mutex_destroy(&pool->task_mutex); - array_free(&pool->threads); - array_free(&pool->tasks); + gb_free(pool->allocator, pool->threads); + pool->thread_count = 0; + gb_free(pool->allocator, pool->tasks); + pool->task_count = 0; + pool->task_capacity = 0; + } void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { gb_mutex_lock(&pool->task_mutex); + if (pool->task_count == pool->task_capacity) { + isize new_cap = 2*pool->task_capacity + 8; + WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap); + gb_memmove(new_tasks, pool->tasks, pool->task_count*gb_size_of(WorkerTask)); + pool->tasks = new_tasks; + pool->task_capacity = new_cap; + } WorkerTask task = {}; task.do_work = proc; task.data = data; - array_add(&pool->tasks, task); + + pool->tasks[pool->task_count++] = task; gb_mutex_unlock(&pool->task_mutex); @@ -108,19 +130,20 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { } void thread_pool_kick(ThreadPool *pool) { - if (pool->tasks.count > 0) { - isize count = gb_min(pool->tasks.count, pool->threads.count); + gb_mutex_lock(&pool->task_mutex); + if (pool->task_count > 0) { + isize count = gb_max(pool->task_count, pool->thread_count); for (isize i = 0; i < count; i++) { gb_semaphore_post(&pool->semaphore, 1); } } - + gb_mutex_unlock(&pool->task_mutex); } void thread_pool_kick_and_wait(ThreadPool *pool) { thread_pool_kick(pool); isize return_value = 0; - while (pool->tasks.count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { + while (pool->task_count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { gb_yield(); } @@ -138,9 +161,9 @@ GB_THREAD_PROC(worker_thread_internal) { bool got_task = false; if (gb_mutex_try_lock(&pool->task_mutex)) { - if (pool->tasks.count > 0) { + if (pool->task_count > 0) { gb_atomic32_fetch_add(&pool->processing_work_count, +1); - task = array_pop(&pool->tasks); + task = pool->tasks[--pool->task_count]; got_task = true; } gb_mutex_unlock(&pool->task_mutex); -- cgit v1.2.3 From c92b2e961297b3bf0c37070c27bcba34e27a2b9b Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 3 Sep 2019 21:17:46 +0100 Subject: Fix semaphore posting --- src/thread_pool.cpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index bbe9ccac6..8a32b7aca 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -124,15 +124,14 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { pool->tasks[pool->task_count++] = task; - gb_mutex_unlock(&pool->task_mutex); - gb_semaphore_post(&pool->semaphore, 1); + gb_mutex_unlock(&pool->task_mutex); } void thread_pool_kick(ThreadPool *pool) { gb_mutex_lock(&pool->task_mutex); if (pool->task_count > 0) { - isize count = gb_max(pool->task_count, pool->thread_count); + isize count = gb_min(pool->task_count, pool->thread_count); for (isize i = 0; i < count; i++) { gb_semaphore_post(&pool->semaphore, 1); } @@ -144,6 +143,14 @@ void thread_pool_kick_and_wait(ThreadPool *pool) { isize return_value = 0; while (pool->task_count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { + + if (pool->task_count > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { + gb_mutex_lock(&pool->task_mutex); + for (isize i = 0; i < pool->task_count; i++) { + gb_semaphore_post(&pool->semaphore, 1); + } + gb_mutex_unlock(&pool->task_mutex); + } gb_yield(); } -- cgit v1.2.3 From 772c8779fa4ed38fcc53c1a7a5c4c93e8a13f05a Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 3 Sep 2019 22:11:21 +0100 Subject: Clean up thread pool code --- src/gb/gb.h | 15 ++----- src/parser.cpp | 11 ++--- src/thread_pool.cpp | 114 ++++++++++++++++++++++++++-------------------------- 3 files changed, 65 insertions(+), 75 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/gb/gb.h b/src/gb/gb.h index 1b2bc5188..60303729f 100644 --- a/src/gb/gb.h +++ b/src/gb/gb.h @@ -918,10 +918,7 @@ GB_DEF void gb_lfence (void); #if defined(GB_SYSTEM_WINDOWS) -typedef struct gbSemaphore { - void *win32_handle; - LONG count; -} gbSemaphore; +typedef struct gbSemaphore { void *win32_handle;} gbSemaphore; #elif defined(GB_SYSTEM_OSX) typedef struct gbSemaphore { semaphore_t osx_handle; } gbSemaphore; #elif defined(GB_SYSTEM_UNIX) @@ -4593,21 +4590,15 @@ gb_inline void gb_semaphore_release(gbSemaphore *s) { gb_semaphore_post(s, 1); } #if defined(GB_SYSTEM_WINDOWS) gb_inline void gb_semaphore_init(gbSemaphore *s) { s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL); - s->count = 0; } gb_inline void gb_semaphore_destroy(gbSemaphore *s) { CloseHandle(s->win32_handle); } gb_inline void gb_semaphore_post(gbSemaphore *s, i32 count) { - _InterlockedIncrement(&s->count); - if (ReleaseSemaphore(s->win32_handle, count, NULL) == FALSE) { - _InterlockedDecrement(&s->count); - } + ReleaseSemaphore(s->win32_handle, count, NULL); } gb_inline void gb_semaphore_wait(gbSemaphore *s) { - if (WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE) == WAIT_OBJECT_0) { - _InterlockedDecrement(&s->count); - } + WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE); } #elif defined(GB_SYSTEM_OSX) diff --git a/src/parser.cpp b/src/parser.cpp index a026e8ecd..b1d9a457f 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4798,7 +4798,8 @@ ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); isize thread_count = gb_max(build_context.thread_count, 1); - thread_pool_init(&parser_thread_pool, heap_allocator(), thread_count, "ParserWork"); + isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work + thread_pool_init(&parser_thread_pool, heap_allocator(), worker_count, "ParserWork"); String init_fullpath = path_to_full_path(heap_allocator(), init_filename); if (!path_is_directory(init_fullpath)) { @@ -4819,12 +4820,12 @@ ParseFileError parse_packages(Parser *p, String init_filename) { p->init_fullpath = init_fullpath; thread_pool_start(&parser_thread_pool); - thread_pool_kick_and_wait(&parser_thread_pool); + thread_pool_wait_to_process(&parser_thread_pool); // NOTE(bill): Get the last error and use that - for (isize i = parser_thread_pool.thread_count-1; i >= 0; i--) { - gbThread *t = &parser_thread_pool.threads[i]; - ParseFileError err = cast(ParseFileError)t->return_value; + for (isize i = parser_thread_pool.task_tail-1; i >= 0; i--) { + WorkerTask *task = &parser_thread_pool.tasks[i]; + ParseFileError err = cast(ParseFileError)task->result; if (err != ParseFile_None) { return err; } diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 8a32b7aca..2467ba609 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -6,20 +6,21 @@ typedef WORKER_TASK_PROC(WorkerTaskProc); struct WorkerTask { WorkerTaskProc *do_work; void *data; + isize result; }; struct ThreadPool { - gbMutex task_mutex; gbMutex mutex; - gbSemaphore semaphore; + gbSemaphore sem_available; gbAtomic32 processing_work_count; bool is_running; gbAllocator allocator; WorkerTask *tasks; - isize volatile task_count; + isize volatile task_head; + isize volatile task_tail; isize volatile task_capacity; gbThread *threads; @@ -40,14 +41,14 @@ GB_THREAD_PROC(worker_thread_internal); void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { pool->allocator = a; - pool->task_count = 0; + pool->task_head = 0; + pool->task_tail = 0; pool->task_capacity = 1024; pool->tasks = gb_alloc_array(a, WorkerTask, pool->task_capacity); - pool->threads = gb_alloc_array(a, gbThread, thread_count); - pool->thread_count = thread_count; - gb_mutex_init(&pool->task_mutex); + pool->thread_count = gb_max(thread_count, 0); + pool->threads = gb_alloc_array(a, gbThread, pool->thread_count); gb_mutex_init(&pool->mutex); - gb_semaphore_init(&pool->semaphore); + gb_semaphore_init(&pool->sem_available); pool->is_running = true; pool->worker_prefix_len = 0; @@ -63,6 +64,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count gb_thread_init(t); t->user_index = i; #if 0 + // TODO(bill): Fix this on Linux as it causes a seg-fault if (pool->worker_prefix_len > 0) { char worker_name[16] = {}; gb_snprintf(worker_name, gb_size_of(worker_name), "%.*s%u", pool->worker_prefix_len, pool->worker_prefix, cast(u16)i); @@ -82,9 +84,9 @@ void thread_pool_start(ThreadPool *pool) { void thread_pool_join(ThreadPool *pool) { pool->is_running = false; - for (isize i = 0; i < pool->thread_count; i++) { - gb_semaphore_release(&pool->semaphore); - } + gb_semaphore_post(&pool->sem_available, cast(i32)pool->thread_count); + + gb_yield(); for (isize i = 0; i < pool->thread_count; i++) { gbThread *t = &pool->threads[i]; @@ -96,25 +98,24 @@ void thread_pool_join(ThreadPool *pool) { void thread_pool_destroy(ThreadPool *pool) { thread_pool_join(pool); - gb_semaphore_destroy(&pool->semaphore); + gb_semaphore_destroy(&pool->sem_available); gb_mutex_destroy(&pool->mutex); - gb_mutex_destroy(&pool->task_mutex); gb_free(pool->allocator, pool->threads); pool->thread_count = 0; gb_free(pool->allocator, pool->tasks); - pool->task_count = 0; + pool->task_head = 0; + pool->task_tail = 0; pool->task_capacity = 0; - } void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - gb_mutex_lock(&pool->task_mutex); + gb_mutex_lock(&pool->mutex); - if (pool->task_count == pool->task_capacity) { + if (pool->task_tail == pool->task_capacity) { isize new_cap = 2*pool->task_capacity + 8; WorkerTask *new_tasks = gb_alloc_array(pool->allocator, WorkerTask, new_cap); - gb_memmove(new_tasks, pool->tasks, pool->task_count*gb_size_of(WorkerTask)); + gb_memmove(new_tasks, pool->tasks, (pool->task_tail)*gb_size_of(WorkerTask)); pool->tasks = new_tasks; pool->task_capacity = new_cap; } @@ -122,35 +123,42 @@ void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { task.do_work = proc; task.data = data; - pool->tasks[pool->task_count++] = task; - - gb_semaphore_post(&pool->semaphore, 1); - gb_mutex_unlock(&pool->task_mutex); + pool->tasks[pool->task_tail++] = task; + gb_semaphore_post(&pool->sem_available, 1); + gb_mutex_unlock(&pool->mutex); } -void thread_pool_kick(ThreadPool *pool) { - gb_mutex_lock(&pool->task_mutex); - if (pool->task_count > 0) { - isize count = gb_min(pool->task_count, pool->thread_count); - for (isize i = 0; i < count; i++) { - gb_semaphore_post(&pool->semaphore, 1); +bool thread_pool_try_and_pop_task(ThreadPool *pool, WorkerTask *task) { + bool got_task = false; + if (gb_mutex_try_lock(&pool->mutex)) { + if (pool->task_tail > pool->task_head) { + gb_atomic32_fetch_add(&pool->processing_work_count, +1); + *task = pool->tasks[pool->task_head++]; + got_task = true; } + gb_mutex_unlock(&pool->mutex); } - gb_mutex_unlock(&pool->task_mutex); + return got_task; +} +void thread_pool_do_work(ThreadPool *pool, WorkerTask *task) { + task->result = task->do_work(task->data); + gb_atomic32_fetch_add(&pool->processing_work_count, -1); } -void thread_pool_kick_and_wait(ThreadPool *pool) { - thread_pool_kick(pool); - - isize return_value = 0; - while (pool->task_count > 0 || gb_atomic32_load(&pool->processing_work_count) != 0) { - - if (pool->task_count > 0 && gb_atomic32_load(&pool->processing_work_count) == 0) { - gb_mutex_lock(&pool->task_mutex); - for (isize i = 0; i < pool->task_count; i++) { - gb_semaphore_post(&pool->semaphore, 1); - } - gb_mutex_unlock(&pool->task_mutex); + +void thread_pool_wait_to_process(ThreadPool *pool) { + while (pool->task_tail > pool->task_head || gb_atomic32_load(&pool->processing_work_count) != 0) { + WorkerTask task = {}; + if (thread_pool_try_and_pop_task(pool, &task)) { + thread_pool_do_work(pool, &task); + } + + // Safety-kick + if (pool->task_tail > pool->task_head && gb_atomic32_load(&pool->processing_work_count) == 0) { + gb_mutex_lock(&pool->mutex); + gb_semaphore_post(&pool->sem_available, cast(i32)(pool->task_tail-pool->task_head)); + gb_mutex_unlock(&pool->mutex); } + gb_yield(); } @@ -160,27 +168,17 @@ void thread_pool_kick_and_wait(ThreadPool *pool) { GB_THREAD_PROC(worker_thread_internal) { ThreadPool *pool = cast(ThreadPool *)thread->user_data; - thread->return_value = 0; while (pool->is_running) { - gb_semaphore_wait(&pool->semaphore); + gb_semaphore_wait(&pool->sem_available); WorkerTask task = {}; - bool got_task = false; - - if (gb_mutex_try_lock(&pool->task_mutex)) { - if (pool->task_count > 0) { - gb_atomic32_fetch_add(&pool->processing_work_count, +1); - task = pool->tasks[--pool->task_count]; - got_task = true; - } - gb_mutex_unlock(&pool->task_mutex); - } - - if (got_task) { - thread->return_value = task.do_work(task.data); - gb_atomic32_fetch_add(&pool->processing_work_count, -1); + if (thread_pool_try_and_pop_task(pool, &task)) { + thread_pool_do_work(pool, &task); } } - return thread->return_value; + // Cascade + gb_semaphore_release(&pool->sem_available); + + return 0; } -- cgit v1.2.3