From ac5f5a33e94054396de66a37043e226349b6c91c Mon Sep 17 00:00:00 2001 From: gingerBill Date: Sun, 18 Dec 2022 21:17:07 +0000 Subject: `gb_internal` a lot --- src/thread_pool.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 727cdcdda..1b3e74fbe 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -23,9 +23,9 @@ struct ThreadPool { }; -THREAD_PROC(thread_pool_thread_proc); +gb_internal THREAD_PROC(thread_pool_thread_proc); -void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { +gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { pool->allocator = a; pool->stop = false; mutex_init(&pool->mutex); @@ -43,7 +43,7 @@ void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count } } -void thread_pool_destroy(ThreadPool *pool) { +gb_internal void thread_pool_destroy(ThreadPool *pool) { mutex_lock(&pool->mutex); pool->stop = true; condition_broadcast(&pool->task_cond); @@ -64,23 +64,23 @@ void thread_pool_destroy(ThreadPool *pool) { condition_destroy(&pool->task_cond); } -bool thread_pool_queue_empty(ThreadPool *pool) { +gb_internal bool thread_pool_queue_empty(ThreadPool *pool) { return pool->task_queue == nullptr; } -WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { +gb_internal WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { GB_ASSERT(pool->task_queue != nullptr); WorkerTask *task = pool->task_queue; pool->task_queue = task->next; return task; } -void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { +gb_internal void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { GB_ASSERT(task != nullptr); task->next = pool->task_queue; pool->task_queue = task; } -bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { +gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { GB_ASSERT(proc != nullptr); mutex_lock(&pool->mutex); WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask); @@ -101,11 +101,11 @@ bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { } -void thread_pool_do_task(WorkerTask *task) { +gb_internal void thread_pool_do_task(WorkerTask *task) { task->do_work(task->data); } -void thread_pool_wait(ThreadPool *pool) { +gb_internal void thread_pool_wait(ThreadPool *pool) { if (pool->threads.count == 0) { while (!thread_pool_queue_empty(pool)) { thread_pool_do_task(thread_pool_queue_pop(pool)); @@ -138,7 +138,7 @@ void thread_pool_wait(ThreadPool *pool) { } -THREAD_PROC(thread_pool_thread_proc) { +gb_internal THREAD_PROC(thread_pool_thread_proc) { ThreadPool *pool = cast(ThreadPool *)thread->user_data; for (;;) { -- cgit v1.2.3 From 0edda2bea769303166eaab2965f6cfb8b2bd807c Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 20 Dec 2022 12:46:33 +0000 Subject: Clarify ThreadPool interface; Move `import_mutex` guarding to just the string set --- src/parser.cpp | 14 ++++++-------- src/thread_pool.cpp | 12 ++++++++++++ 2 files changed, 18 insertions(+), 8 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/parser.cpp b/src/parser.cpp index 520f123d8..eb006cb24 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4966,14 +4966,12 @@ gb_internal void parser_add_foreign_file_to_process(Parser *p, AstPackage *pkg, gb_internal AstPackage *try_add_import_path(Parser *p, String const &path, String const &rel_path, TokenPos pos, PackageKind kind = Package_Normal) { String const FILE_EXT = str_lit(".odin"); - mutex_lock(&p->import_mutex); - defer (mutex_unlock(&p->import_mutex)); - - if (string_set_exists(&p->imported_files, path)) { - return nullptr; + MUTEX_GUARD_BLOCK(&p->import_mutex) { + if (string_set_exists(&p->imported_files, path)) { + return nullptr; + } + string_set_add(&p->imported_files, path); } - string_set_add(&p->imported_files, path); - AstPackage *pkg = gb_alloc_item(permanent_allocator(), AstPackage); pkg->kind = kind; @@ -4991,8 +4989,8 @@ gb_internal AstPackage *try_add_import_path(Parser *p, String const &path, Strin fi.is_dir = false; pkg->is_single_file = true; - parser_add_file_to_process(p, pkg, fi, pos); parser_add_package(p, pkg); + parser_add_file_to_process(p, pkg, fi, pos); return pkg; } diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 1b3e74fbe..eca4d37ab 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -1,14 +1,25 @@ // thread_pool.cpp +struct WorkerTask; +struct ThreadPool; + #define WORKER_TASK_PROC(name) isize name(void *data) typedef WORKER_TASK_PROC(WorkerTaskProc); +gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name); +gb_internal void thread_pool_destroy(ThreadPool *pool); +gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); +gb_internal void thread_pool_wait(ThreadPool *pool); + + struct WorkerTask { WorkerTask * next; WorkerTaskProc *do_work; void * data; + ThreadPool * pool; }; + struct ThreadPool { gbAllocator allocator; BlockingMutex mutex; @@ -89,6 +100,7 @@ gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, vo GB_PANIC("Out of memory"); return false; } + task->pool = pool; task->do_work = proc; task->data = data; -- cgit v1.2.3 From a0e3a99dd165858136dd0d60759b08b22f81cfa2 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 20 Dec 2022 14:07:14 +0000 Subject: Remove need for `semaphore` in `Thread` --- src/thread_pool.cpp | 7 +------ src/threading.cpp | 41 ++++++++++++++--------------------------- 2 files changed, 15 insertions(+), 33 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index eca4d37ab..ba150bb6f 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -45,12 +45,7 @@ gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize slice_init(&pool->threads, a, thread_count); for_array(i, pool->threads) { Thread *t = &pool->threads[i]; - thread_init(t); - } - - for_array(i, pool->threads) { - Thread *t = &pool->threads[i]; - thread_start(t, thread_pool_thread_proc, pool); + thread_init_and_start(t, thread_pool_thread_proc, pool); } } diff --git a/src/threading.cpp b/src/threading.cpp index b74d087b4..30e9071d8 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -27,7 +27,6 @@ struct Thread { isize user_index; isize volatile return_value; - Semaphore * semaphore; isize stack_size; std::atomic is_running; }; @@ -60,10 +59,9 @@ gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 gb_internal u32 thread_current_id(void); -gb_internal void thread_init (Thread *t); +gb_internal void thread_init_and_start (Thread *t, ThreadProc *proc, void *data); +gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size); gb_internal void thread_destroy (Thread *t); -gb_internal void thread_start (Thread *t, ThreadProc *proc, void *data); -gb_internal void thread_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size); gb_internal void thread_join (Thread *t); gb_internal bool thread_is_running (Thread const *t); gb_internal void thread_set_name (Thread *t, char const *name); @@ -328,27 +326,12 @@ gb_internal gb_inline void yield(void) { #endif } - -gb_internal void thread_init(Thread *t) { - gb_zero_item(t); -#if defined(GB_SYSTEM_WINDOWS) - t->win32_handle = INVALID_HANDLE_VALUE; -#else - t->posix_handle = 0; -#endif - t->semaphore = gb_alloc_item(heap_allocator(), Semaphore); - semaphore_init(t->semaphore); -} - gb_internal void thread_destroy(Thread *t) { thread_join(t); - semaphore_destroy(t->semaphore); - gb_free(heap_allocator(), t->semaphore); } -gb_internal void gb__thread_run(Thread *t) { - semaphore_release(t->semaphore); +gb_internal void private__thread_run(Thread *t) { t->return_value = t->proc(t); } @@ -356,7 +339,7 @@ gb_internal void gb__thread_run(Thread *t) { gb_internal DWORD __stdcall internal_thread_proc(void *arg) { Thread *t = cast(Thread *)arg; t->is_running.store(true); - gb__thread_run(t); + private__thread_run(t); return 0; } #else @@ -370,14 +353,20 @@ gb_internal void gb__thread_run(Thread *t) { Thread *t = cast(Thread *)arg; t->is_running.store(true); - gb__thread_run(t); + private__thread_run(t); return NULL; } #endif -gb_internal void thread_start(Thread *t, ThreadProc *proc, void *user_data) { thread_start_with_stack(t, proc, user_data, 0); } +gb_internal void thread_init_and_start(Thread *t, ThreadProc *proc, void *user_data) { thread_init_and_start_with_stack(t, proc, user_data, 0); } -gb_internal void thread_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) { +gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) { + gb_zero_item(t); +#if defined(GB_SYSTEM_WINDOWS) + t->win32_handle = INVALID_HANDLE_VALUE; +#else + t->posix_handle = 0; +#endif GB_ASSERT(!t->is_running.load()); GB_ASSERT(proc != NULL); t->proc = proc; @@ -391,16 +380,14 @@ gb_internal void thread_start_with_stack(Thread *t, ThreadProc *proc, void *user { pthread_attr_t attr; pthread_attr_init(&attr); + defer (pthread_attr_destroy(&attr)); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); if (stack_size != 0) { pthread_attr_setstacksize(&attr, stack_size); } pthread_create(&t->posix_handle, &attr, internal_thread_proc, t); - pthread_attr_destroy(&attr); } #endif - - semaphore_wait(t->semaphore); } gb_internal void thread_join(Thread *t) { -- cgit v1.2.3 From 134c7db4d21e80751833ed45fb4ace5d0ae3b7d2 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 20 Dec 2022 14:08:24 +0000 Subject: Combine join and destroy for threads --- src/thread_pool.cpp | 9 ++------- src/threading.cpp | 10 ++-------- 2 files changed, 4 insertions(+), 15 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index ba150bb6f..a1ee11523 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -57,14 +57,9 @@ gb_internal void thread_pool_destroy(ThreadPool *pool) { for_array(i, pool->threads) { Thread *t = &pool->threads[i]; - thread_join(t); + thread_join_and_destroy(t); } - - for_array(i, pool->threads) { - Thread *t = &pool->threads[i]; - thread_destroy(t); - } - + gb_free(pool->allocator, pool->threads.data); mutex_destroy(&pool->mutex); condition_destroy(&pool->task_cond); diff --git a/src/threading.cpp b/src/threading.cpp index 30e9071d8..e92ed5e31 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -61,8 +61,7 @@ gb_internal u32 thread_current_id(void); gb_internal void thread_init_and_start (Thread *t, ThreadProc *proc, void *data); gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size); -gb_internal void thread_destroy (Thread *t); -gb_internal void thread_join (Thread *t); +gb_internal void thread_join_and_destroy(Thread *t); gb_internal bool thread_is_running (Thread const *t); gb_internal void thread_set_name (Thread *t, char const *name); @@ -326,11 +325,6 @@ gb_internal gb_inline void yield(void) { #endif } -gb_internal void thread_destroy(Thread *t) { - thread_join(t); -} - - gb_internal void private__thread_run(Thread *t) { t->return_value = t->proc(t); } @@ -390,7 +384,7 @@ gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, v #endif } -gb_internal void thread_join(Thread *t) { +gb_internal void thread_join_and_destroy(Thread *t) { if (!t->is_running.load()) { return; } -- cgit v1.2.3 From eb0775ad53e9651ca01ad236dd3fe83786f0cecc Mon Sep 17 00:00:00 2001 From: gingerBill Date: Tue, 20 Dec 2022 14:45:01 +0000 Subject: Move `mutex` use around in thread pool --- src/thread_pool.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src/thread_pool.cpp') diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index a1ee11523..6df991d7d 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -83,10 +83,8 @@ gb_internal void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { GB_ASSERT(proc != nullptr); - mutex_lock(&pool->mutex); WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask); if (task == nullptr) { - mutex_unlock(&pool->mutex); GB_PANIC("Out of memory"); return false; } @@ -94,9 +92,10 @@ gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, vo task->do_work = proc; task->data = data; + mutex_lock(&pool->mutex); thread_pool_queue_push(pool, task); GB_ASSERT(pool->ready >= 0); - pool->ready++; + pool->ready.fetch_add(1); condition_broadcast(&pool->task_cond); mutex_unlock(&pool->mutex); return true; @@ -111,7 +110,7 @@ gb_internal void thread_pool_wait(ThreadPool *pool) { if (pool->threads.count == 0) { while (!thread_pool_queue_empty(pool)) { thread_pool_do_task(thread_pool_queue_pop(pool)); - --pool->ready; + pool->ready.fetch_sub(1); } GB_ASSERT(pool->ready == 0); return; -- cgit v1.2.3