diff options
| author | gingerBill <gingerBill@users.noreply.github.com> | 2021-08-26 21:44:02 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-08-26 21:44:02 +0100 |
| commit | cdd35607023a333ae096f5a7e1437082a437ff04 (patch) | |
| tree | f62394981d7554acdddac3d3e68c60daa7af23a8 | |
| parent | e45aa68c14149cc4ebcc352e3fe4631c642f33f3 (diff) | |
| parent | 6d49df1d87fb1d74f00977dee3a3ce42c46c1eee (diff) | |
Merge pull request #1103 from odin-lang/new-thread-pool
Improved Thread Pool implementation for the Compiler
| -rw-r--r-- | src/checker.cpp | 140 | ||||
| -rw-r--r-- | src/common.cpp | 4 | ||||
| -rw-r--r-- | src/llvm_backend.cpp | 7 | ||||
| -rw-r--r-- | src/llvm_backend_general.cpp | 2 | ||||
| -rw-r--r-- | src/main.cpp | 17 | ||||
| -rw-r--r-- | src/parser.cpp | 69 | ||||
| -rw-r--r-- | src/parser.hpp | 4 | ||||
| -rw-r--r-- | src/thread_pool.cpp | 191 | ||||
| -rw-r--r-- | src/threading.cpp | 100 |
9 files changed, 346 insertions, 188 deletions
diff --git a/src/checker.cpp b/src/checker.cpp index 7e9b2c672..e3af570f8 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -4140,23 +4140,20 @@ struct ThreadProcCheckerSection { }; -void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) { +void check_with_workers(Checker *c, WorkerTaskProc *proc, isize total_count) { isize thread_count = gb_max(build_context.thread_count, 1); isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work if (!build_context.threaded_checker) { worker_count = 0; } - + semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count); - if (worker_count == 0) { ThreadProcCheckerSection section_all = {}; section_all.checker = c; section_all.offset = 0; section_all.count = total_count; - Thread dummy_main_thread = {}; - dummy_main_thread.user_data = §ion_all; - proc(&dummy_main_thread); + proc(§ion_all); return; } @@ -4173,33 +4170,18 @@ void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) { } GB_ASSERT(remaining_count <= 0); - Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count); - for (isize i = 0; i < worker_count; i++) { - thread_init(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_start(threads+i, proc, thread_data+i); + + for (isize i = 0; i < thread_count; i++) { + global_thread_pool_add_task(proc, thread_data+i); } - Thread dummy_main_thread = {}; - dummy_main_thread.user_data = thread_data+worker_count; - proc(&dummy_main_thread); - + global_thread_pool_wait(); semaphore_wait(&c->info.collect_semaphore); - - for (isize i = 0; i < worker_count; i++) { - thread_join(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_destroy(threads+i); - } } -THREAD_PROC(thread_proc_collect_entities) { - auto *data = cast(ThreadProcCheckerSection *)thread->user_data; - Checker *c = data->checker; +WORKER_TASK_PROC(thread_proc_collect_entities) { + auto *cs = cast(ThreadProcCheckerSection *)data; + Checker *c = cs->checker; CheckerContext collect_entity_ctx = make_checker_context(c); defer (destroy_checker_context(&collect_entity_ctx)); @@ -4208,8 +4190,8 @@ THREAD_PROC(thread_proc_collect_entities) { UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); - isize offset = data->offset; - isize file_end = gb_min(offset+data->count, c->info.files.entries.count); + isize offset = cs->offset; + isize file_end = gb_min(offset+cs->count, c->info.files.entries.count); for (isize i = offset; i < file_end; i++) { AstFile *f = c->info.files.entries[i].value; @@ -4246,9 +4228,9 @@ void check_export_entities_in_pkg(CheckerContext *ctx, AstPackage *pkg, UntypedE } } -THREAD_PROC(thread_proc_check_export_entities) { - auto data = cast(ThreadProcCheckerSection *)thread->user_data; - Checker *c = data->checker; +WORKER_TASK_PROC(thread_proc_check_export_entities) { + auto cs = cast(ThreadProcCheckerSection *)data; + Checker *c = cs->checker; CheckerContext ctx = make_checker_context(c); defer (destroy_checker_context(&ctx)); @@ -4256,8 +4238,8 @@ THREAD_PROC(thread_proc_check_export_entities) { UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); - isize end = gb_min(data->offset + data->count, c->info.packages.entries.count); - for (isize i = data->offset; i < end; i++) { + isize end = gb_min(cs->offset + cs->count, c->info.packages.entries.count); + for (isize i = cs->offset; i < end; i++) { AstPackage *pkg = c->info.packages.entries[i].value; check_export_entities_in_pkg(&ctx, pkg, &untyped); } @@ -4575,15 +4557,19 @@ void calculate_global_init_order(Checker *c) { } -void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) { +bool check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) { if (pi == nullptr) { - return; + return false; } if (pi->type == nullptr) { - return; + return false; } + Entity *e = pi->decl->entity; if (pi->decl->proc_checked) { - return; + if (e != nullptr) { + GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked); + } + return true; } CheckerContext ctx = make_checker_context(c); @@ -4601,14 +4587,13 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc token = ast_token(pi->poly_def_node); } error(token, "Unspecialized polymorphic procedure '%.*s'", LIT(name)); - return; + return false; } if (pt->is_polymorphic && pt->is_poly_specialized) { - Entity *e = pi->decl->entity; if ((e->flags & EntityFlag_Used) == 0) { // NOTE(bill, 2019-08-31): It was never used, don't check - return; + return false; } } @@ -4622,16 +4607,17 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc ctx.state_flags |= StateFlag_no_bounds_check; ctx.state_flags &= ~StateFlag_bounds_check; } - if (pi->body != nullptr && pi->decl->entity != nullptr) { - GB_ASSERT((pi->decl->entity->flags & EntityFlag_ProcBodyChecked) == 0); + if (pi->body != nullptr && e != nullptr) { + GB_ASSERT((e->flags & EntityFlag_ProcBodyChecked) == 0); } check_proc_body(&ctx, pi->token, pi->decl, pi->type, pi->body); - if (pi->body != nullptr && pi->decl->entity != nullptr) { - pi->decl->entity->flags |= EntityFlag_ProcBodyChecked; + if (e != nullptr) { + e->flags |= EntityFlag_ProcBodyChecked; } pi->decl->proc_checked = true; add_untyped_expressions(&c->info, ctx.untyped); + return true; } GB_STATIC_ASSERT(sizeof(isize) == sizeof(void *)); @@ -4681,9 +4667,10 @@ void check_unchecked_bodies(Checker *c) { ProcInfo *pi = nullptr; while (mpmc_dequeue(q, &pi)) { Entity *e = pi->decl->entity; - consume_proc_info_queue(c, pi, q, &untyped); - add_dependency_to_set(c, e); - GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked); + if (consume_proc_info_queue(c, pi, q, &untyped)) { + add_dependency_to_set(c, e); + GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked); + } } } @@ -4728,15 +4715,15 @@ bool consume_proc_info_queue(Checker *c, ProcInfo *pi, ProcBodyQueue *q, Untyped // NOTE(bill): In single threaded mode, this should never happen if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) { mpmc_enqueue(q, pi); - return true; + return false; } } if (untyped) { map_clear(untyped); } - check_proc_info(c, pi, untyped, q); + bool ok = check_proc_info(c, pi, untyped, q); total_bodies_checked.fetch_add(1, std::memory_order_relaxed); - return false; + return ok; } struct ThreadProcBodyData { @@ -4747,11 +4734,11 @@ struct ThreadProcBodyData { ThreadProcBodyData *all_data; }; -THREAD_PROC(thread_proc_body) { - ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data; - Checker *c = data->checker; +WORKER_TASK_PROC(thread_proc_body) { + ThreadProcBodyData *bd = cast(ThreadProcBodyData *)data; + Checker *c = bd->checker; GB_ASSERT(c != nullptr); - ProcBodyQueue *this_queue = data->queue; + ProcBodyQueue *this_queue = bd->queue; UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); @@ -4774,12 +4761,18 @@ void check_procedure_bodies(Checker *c) { u32 worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work if (!build_context.threaded_checker) { worker_count = 0; - - auto *q = &c->procs_to_check_queue; - ProcInfo *pi = nullptr; - while (mpmc_dequeue(q, &pi)) { - consume_proc_info_queue(c, pi, q, nullptr); + } + if (worker_count == 0) { + auto *this_queue = &c->procs_to_check_queue; + + UntypedExprInfoMap untyped = {}; + map_init(&untyped, heap_allocator()); + + for (ProcInfo *pi = nullptr; mpmc_dequeue(this_queue, &pi); /**/) { + consume_proc_info_queue(c, pi, this_queue, &untyped); } + + map_destroy(&untyped); debugf("Total Procedure Bodies Checked: %td\n", total_bodies_checked.load(std::memory_order_relaxed)); return; @@ -4821,31 +4814,14 @@ void check_procedure_bodies(Checker *c) { } GB_ASSERT(total_queued == original_queue_count); - semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count); - - Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count); - for (isize i = 0; i < worker_count; i++) { - thread_init(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_start(threads+i, thread_proc_body, thread_data+i); + + for (isize i = 0; i < thread_count; i++) { + global_thread_pool_add_task(thread_proc_body, thread_data+i); } - Thread dummy_main_thread = {}; - dummy_main_thread.user_data = thread_data+worker_count; - thread_proc_body(&dummy_main_thread); - + global_thread_pool_wait(); semaphore_wait(&c->procs_to_check_semaphore); - for (isize i = 0; i < worker_count; i++) { - thread_join(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_destroy(threads+i); - } - isize global_remaining = c->procs_to_check_queue.count.load(std::memory_order_relaxed); GB_ASSERT(global_remaining == 0); diff --git a/src/common.cpp b/src/common.cpp index af0a195a8..b132c26b0 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -44,11 +44,9 @@ void debugf(char const *fmt, ...); #include "queue.cpp" #include "common_memory.cpp" #include "string.cpp" - - - #include "range_cache.cpp" + u32 fnv32a(void const *data, isize len) { u8 const *bytes = cast(u8 const *)data; u32 h = 0x811c9dc5; diff --git a/src/llvm_backend.cpp b/src/llvm_backend.cpp index d00883bd9..034814bf1 100644 --- a/src/llvm_backend.cpp +++ b/src/llvm_backend.cpp @@ -1104,9 +1104,6 @@ void lb_generate_code(lbGenerator *gen) { LLVMBool do_threading = (LLVMIsMultithreaded() && USE_SEPARATE_MODULES && MULTITHREAD_OBJECT_GENERATION && worker_count > 0); - thread_pool_init(&lb_thread_pool, heap_allocator(), worker_count, "LLVMBackend"); - defer (thread_pool_destroy(&lb_thread_pool)); - lbModule *default_module = &gen->default_module; CheckerInfo *info = gen->info; @@ -1691,10 +1688,10 @@ void lb_generate_code(lbGenerator *gen) { wd->code_gen_file_type = code_gen_file_type; wd->filepath_obj = filepath_obj; wd->m = m; - thread_pool_add_task(&lb_thread_pool, lb_llvm_emit_worker_proc, wd); + global_thread_pool_add_task(lb_llvm_emit_worker_proc, wd); } - thread_pool_wait(&lb_thread_pool); + thread_pool_wait(&global_thread_pool); } else { for_array(j, gen->modules.entries) { lbModule *m = gen->modules.entries[j].value; diff --git a/src/llvm_backend_general.cpp b/src/llvm_backend_general.cpp index 94fe1e562..2348ac523 100644 --- a/src/llvm_backend_general.cpp +++ b/src/llvm_backend_general.cpp @@ -1,7 +1,5 @@ void lb_add_debug_local_variable(lbProcedure *p, LLVMValueRef ptr, Type *type, Token const &token); -gb_global ThreadPool lb_thread_pool = {}; - gb_global Entity *lb_global_type_info_data_entity = {}; gb_global lbAddr lb_global_type_info_member_types = {}; gb_global lbAddr lb_global_type_info_member_names = {}; diff --git a/src/main.cpp b/src/main.cpp index dab1257bd..f94b6bd4f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,6 +7,20 @@ #include "exact_value.cpp" #include "build_settings.cpp" +gb_global ThreadPool global_thread_pool; +void init_global_thread_pool(void) { + isize thread_count = gb_max(build_context.thread_count, 1); + isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work + thread_pool_init(&global_thread_pool, permanent_allocator(), worker_count, "ThreadPoolWorker"); +} +bool global_thread_pool_add_task(WorkerTaskProc *proc, void *data) { + return thread_pool_add_task(&global_thread_pool, proc, data); +} +void global_thread_pool_wait(void) { + thread_pool_wait(&global_thread_pool); +} + + void debugf(char const *fmt, ...) { if (build_context.show_debug_messages) { gb_printf_err("[DEBUG] "); @@ -2160,6 +2174,9 @@ int main(int arg_count, char const **arg_ptr) { // return 1; // } + init_global_thread_pool(); + defer (thread_pool_destroy(&global_thread_pool)); + init_universal(); // TODO(bill): prevent compiling without a linker diff --git a/src/parser.cpp b/src/parser.cpp index a98259454..d8f3a9e33 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4810,6 +4810,7 @@ bool init_parser(Parser *p) { string_set_init(&p->imported_files, heap_allocator()); array_init(&p->packages, heap_allocator()); array_init(&p->package_imports, heap_allocator()); + mutex_init(&p->wait_mutex); mutex_init(&p->import_mutex); mutex_init(&p->file_add_mutex); mutex_init(&p->file_decl_mutex); @@ -4837,6 +4838,7 @@ void destroy_parser(Parser *p) { array_free(&p->packages); array_free(&p->package_imports); string_set_destroy(&p->imported_files); + mutex_destroy(&p->wait_mutex); mutex_destroy(&p->import_mutex); mutex_destroy(&p->file_add_mutex); mutex_destroy(&p->file_decl_mutex); @@ -4870,7 +4872,7 @@ void parser_add_file_to_process(Parser *p, AstPackage *pkg, FileInfo fi, TokenPo auto wd = gb_alloc_item(heap_allocator(), ParserWorkerData); wd->parser = p; wd->imported_file = f; - thread_pool_add_task(&parser_thread_pool, parser_worker_proc, wd); + global_thread_pool_add_task(parser_worker_proc, wd); } WORKER_TASK_PROC(foreign_file_worker_proc) { @@ -4909,7 +4911,7 @@ void parser_add_foreign_file_to_process(Parser *p, AstPackage *pkg, AstForeignFi wd->parser = p; wd->imported_file = f; wd->foreign_kind = kind; - thread_pool_add_task(&parser_thread_pool, foreign_file_worker_proc, wd); + global_thread_pool_add_task(foreign_file_worker_proc, wd); } @@ -5619,10 +5621,6 @@ ParseFileError process_imported_file(Parser *p, ImportedFile imported_file) { ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); - isize thread_count = gb_max(build_context.thread_count, 1); - isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work - thread_pool_init(&parser_thread_pool, heap_allocator(), worker_count, "ParserWork"); - String init_fullpath = path_to_full_path(heap_allocator(), init_filename); if (!path_is_directory(init_fullpath)) { String const ext = str_lit(".odin"); @@ -5631,38 +5629,45 @@ ParseFileError parse_packages(Parser *p, String init_filename) { return ParseFile_WrongExtension; } } + - TokenPos init_pos = {}; - { - String s = get_fullpath_core(heap_allocator(), str_lit("runtime")); - try_add_import_path(p, s, s, init_pos, Package_Runtime); - } + { // Add these packages serially and then process them parallel + mutex_lock(&p->wait_mutex); + defer (mutex_unlock(&p->wait_mutex)); + + TokenPos init_pos = {}; + { + String s = get_fullpath_core(heap_allocator(), str_lit("runtime")); + try_add_import_path(p, s, s, init_pos, Package_Runtime); + } - try_add_import_path(p, init_fullpath, init_fullpath, init_pos, Package_Init); - p->init_fullpath = init_fullpath; + try_add_import_path(p, init_fullpath, init_fullpath, init_pos, Package_Init); + p->init_fullpath = init_fullpath; - if (build_context.command_kind == Command_test) { - String s = get_fullpath_core(heap_allocator(), str_lit("testing")); - try_add_import_path(p, s, s, init_pos, Package_Normal); - } - - for_array(i, build_context.extra_packages) { - String path = build_context.extra_packages[i]; - String fullpath = path_to_full_path(heap_allocator(), path); // LEAK? - if (!path_is_directory(fullpath)) { - String const ext = str_lit(".odin"); - if (!string_ends_with(fullpath, ext)) { - error_line("Expected either a directory or a .odin file, got '%.*s'\n", LIT(fullpath)); - return ParseFile_WrongExtension; - } + if (build_context.command_kind == Command_test) { + String s = get_fullpath_core(heap_allocator(), str_lit("testing")); + try_add_import_path(p, s, s, init_pos, Package_Normal); } - AstPackage *pkg = try_add_import_path(p, fullpath, fullpath, init_pos, Package_Normal); - if (pkg) { - pkg->is_extra = true; + + + for_array(i, build_context.extra_packages) { + String path = build_context.extra_packages[i]; + String fullpath = path_to_full_path(heap_allocator(), path); // LEAK? + if (!path_is_directory(fullpath)) { + String const ext = str_lit(".odin"); + if (!string_ends_with(fullpath, ext)) { + error_line("Expected either a directory or a .odin file, got '%.*s'\n", LIT(fullpath)); + return ParseFile_WrongExtension; + } + } + AstPackage *pkg = try_add_import_path(p, fullpath, fullpath, init_pos, Package_Normal); + if (pkg) { + pkg->is_extra = true; + } } } - - thread_pool_wait(&parser_thread_pool); + + global_thread_pool_wait(); for (ParseFileError err = ParseFile_None; mpmc_dequeue(&p->file_error_queue, &err); /**/) { if (err != ParseFile_None) { diff --git a/src/parser.hpp b/src/parser.hpp index 082415d16..ddcb27322 100644 --- a/src/parser.hpp +++ b/src/parser.hpp @@ -191,6 +191,7 @@ struct Parser { isize file_to_process_count; isize total_token_count; isize total_line_count; + BlockingMutex wait_mutex; BlockingMutex import_mutex; BlockingMutex file_add_mutex; BlockingMutex file_decl_mutex; @@ -198,9 +199,6 @@ struct Parser { MPMCQueue<ParseFileError> file_error_queue; }; - -gb_global ThreadPool parser_thread_pool = {}; - struct ParserWorkerData { Parser *parser; ImportedFile imported_file; diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index b29fcc5d9..a8bc327e5 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -4,85 +4,156 @@ typedef WORKER_TASK_PROC(WorkerTaskProc); struct WorkerTask { - WorkerTask *next_task; + WorkerTask * next; WorkerTaskProc *do_work; - void *data; + void * data; }; struct ThreadPool { - std::atomic<isize> outstanding_task_count; - WorkerTask *volatile next_task; - BlockingMutex task_list_mutex; - isize thread_count; + gbAllocator allocator; + BlockingMutex mutex; + Condition task_cond; + + Slice<Thread> threads; + + WorkerTask *task_queue; + + std::atomic<isize> ready; + std::atomic<bool> stop; + }; -void thread_pool_thread_entry(ThreadPool *pool) { - while (pool->outstanding_task_count) { - if (!pool->next_task) { - yield(); // No need to grab the mutex. - } else { - mutex_lock(&pool->task_list_mutex); - - if (pool->next_task) { - WorkerTask *task = pool->next_task; - pool->next_task = task->next_task; - mutex_unlock(&pool->task_list_mutex); - task->do_work(task->data); - pool->outstanding_task_count.fetch_sub(1); - gb_free(heap_allocator(), task); - } else { - mutex_unlock(&pool->task_list_mutex); - } - } +THREAD_PROC(thread_pool_thread_proc); + +void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { + pool->allocator = a; + pool->stop = false; + mutex_init(&pool->mutex); + condition_init(&pool->task_cond); + + slice_init(&pool->threads, a, thread_count); + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_init(t); + } + + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_start(t, thread_pool_thread_proc, pool); } } -#if defined(GB_SYSTEM_WINDOWS) - DWORD __stdcall thread_pool_thread_entry_platform(void *arg) { - thread_pool_thread_entry((ThreadPool *) arg); - return 0; - } +void thread_pool_destroy(ThreadPool *pool) { + pool->stop = true; + condition_broadcast(&pool->task_cond); - void thread_pool_start_thread(ThreadPool *pool) { - CloseHandle(CreateThread(NULL, 0, thread_pool_thread_entry_platform, pool, 0, NULL)); + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_join(t); } -#else - void *thread_pool_thread_entry_platform(void *arg) { - thread_pool_thread_entry((ThreadPool *) arg); - return NULL; + + for_array(i, pool->threads) { + Thread *t = &pool->threads[i]; + thread_destroy(t); } + + gb_free(pool->allocator, pool->threads.data); + mutex_destroy(&pool->mutex); + condition_destroy(&pool->task_cond); +} - void thread_pool_start_thread(ThreadPool *pool) { - pthread_t handle; - pthread_create(&handle, NULL, thread_pool_thread_entry_platform, pool); - pthread_detach(handle); - } -#endif +bool thread_pool_queue_empty(ThreadPool *pool) { + return pool->task_queue == nullptr; +} -void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_prefix) { - memset(pool, 0, sizeof(ThreadPool)); - mutex_init(&pool->task_list_mutex); - pool->thread_count = thread_count; +WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { + GB_ASSERT(pool->task_queue != nullptr); + WorkerTask *task = pool->task_queue; + pool->task_queue = task->next; + return task; +} +void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { + GB_ASSERT(task != nullptr); + task->next = pool->task_queue; + pool->task_queue = task; } -void thread_pool_destroy(ThreadPool *pool) { - mutex_destroy(&pool->task_list_mutex); +bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { + GB_ASSERT(proc != nullptr); + mutex_lock(&pool->mutex); + WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask); + if (task == nullptr) { + mutex_unlock(&pool->mutex); + GB_PANIC("Out of memory"); + return false; + } + task->do_work = proc; + task->data = data; + + thread_pool_queue_push(pool, task); + GB_ASSERT(pool->ready >= 0); + pool->ready++; + mutex_unlock(&pool->mutex); + condition_signal(&pool->task_cond); + return true; +} + + +void thread_pool_do_task(WorkerTask *task) { + task->do_work(task->data); } void thread_pool_wait(ThreadPool *pool) { - for (int i = 0; i < pool->thread_count; i++) { - thread_pool_start_thread(pool); + if (pool->threads.count == 0) { + while (!thread_pool_queue_empty(pool)) { + thread_pool_do_task(thread_pool_queue_pop(pool)); + --pool->ready; + } + GB_ASSERT(pool->ready == 0); + return; } - thread_pool_thread_entry(pool); -} + for (;;) { + mutex_lock(&pool->mutex); -void thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - WorkerTask *task = gb_alloc_item(heap_allocator(), WorkerTask); - task->do_work = proc; - task->data = data; - mutex_lock(&pool->task_list_mutex); - task->next_task = pool->next_task; - pool->next_task = task; - pool->outstanding_task_count.fetch_add(1); - mutex_unlock(&pool->task_list_mutex); + while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) { + condition_wait(&pool->task_cond, &pool->mutex); + } + if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) { + mutex_unlock(&pool->mutex); + return; + } + + WorkerTask *task = thread_pool_queue_pop(pool); + mutex_unlock(&pool->mutex); + + thread_pool_do_task(task); + if (--pool->ready == 0) { + condition_broadcast(&pool->task_cond); + } + } } + + +THREAD_PROC(thread_pool_thread_proc) { + ThreadPool *pool = cast(ThreadPool *)thread->user_data; + + for (;;) { + mutex_lock(&pool->mutex); + + while (!pool->stop && thread_pool_queue_empty(pool)) { + condition_wait(&pool->task_cond, &pool->mutex); + } + if (pool->stop && thread_pool_queue_empty(pool)) { + mutex_unlock(&pool->mutex); + return 0; + } + + WorkerTask *task = thread_pool_queue_pop(pool); + mutex_unlock(&pool->mutex); + + thread_pool_do_task(task); + if (--pool->ready == 0) { + condition_broadcast(&pool->task_cond); + } + } +}
\ No newline at end of file diff --git a/src/threading.cpp b/src/threading.cpp index 349f234f4..61f9df2db 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -1,6 +1,7 @@ struct BlockingMutex; struct RecursiveMutex; struct Semaphore; +struct Condition; struct Thread; #define THREAD_PROC(name) isize name(struct Thread *thread) @@ -41,6 +42,14 @@ void semaphore_post (Semaphore *s, i32 count); void semaphore_wait (Semaphore *s); void semaphore_release(Semaphore *s) { semaphore_post(s, 1); } + +void condition_init(Condition *c); +void condition_destroy(Condition *c); +void condition_broadcast(Condition *c); +void condition_signal(Condition *c); +void condition_wait(Condition *c, BlockingMutex *m); +void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms); + u32 thread_current_id(void); void thread_init (Thread *t); @@ -108,6 +117,27 @@ void yield_process(void); void semaphore_wait(Semaphore *s) { WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE); } + + struct Condition { + CONDITION_VARIABLE cond; + }; + + void condition_init(Condition *c) { + } + void condition_destroy(Condition *c) { + } + void condition_broadcast(Condition *c) { + WakeAllConditionVariable(&c->cond); + } + void condition_signal(Condition *c) { + WakeConditionVariable(&c->cond); + } + void condition_wait(Condition *c, BlockingMutex *m) { + SleepConditionVariableSRW(&c->cond, &m->srwlock, INFINITE, 0); + } + void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) { + SleepConditionVariableSRW(&c->cond, &m->srwlock, timeout_in_ms, 0); + } #else struct BlockingMutex { @@ -170,9 +200,77 @@ void yield_process(void); void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) sem_post(&s->unix_handle); } void semaphore_wait (Semaphore *s) { int i; do { i = sem_wait(&s->unix_handle); } while (i == -1 && errno == EINTR); } #else - #error + #error Implement Semaphore for this platform #endif + + + struct Condition { + pthread_cond_t pthread_cond; + }; + + void condition_init(Condition *c) { + pthread_cond_init(&c->pthread_cond, NULL); + } + void condition_destroy(Condition *c) { + pthread_cond_destroy(&c->pthread_cond); + } + void condition_broadcast(Condition *c) { + pthread_cond_broadcast(&c->pthread_cond); + } + void condition_signal(Condition *c) { + pthread_cond_signal(&c->pthread_cond); + } + void condition_wait(Condition *c, BlockingMutex *m) { + pthread_cond_wait(&c->pthread_cond, &m->pthread_mutex); + } + void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) { + struct timespec abstime = {}; + abstime.tv_sec = timeout_in_ms/1000; + abstime.tv_nsec = cast(long)(timeout_in_ms%1000)*1e6; + pthread_cond_timedwait(&c->pthread_cond, &m->pthread_mutex, &abstime); + + } #endif + + +struct Barrier { + BlockingMutex mutex; + Condition cond; + isize index; + isize generation_id; + isize thread_count; +}; + +void barrier_init(Barrier *b, isize thread_count) { + mutex_init(&b->mutex); + condition_init(&b->cond); + b->index = 0; + b->generation_id = 0; + b->thread_count = 0; +} + +void barrier_destroy(Barrier *b) { + condition_destroy(&b->cond); + mutex_destroy(&b->mutex); +} + +// Returns true if it is the leader +bool barrier_wait(Barrier *b) { + mutex_lock(&b->mutex); + defer (mutex_unlock(&b->mutex)); + isize local_gen = b->generation_id; + b->index += 1; + if (b->index < b->thread_count) { + while (local_gen == b->generation_id && b->index < b->thread_count) { + condition_wait(&b->cond, &b->mutex); + } + return false; + } + b->index = 0; + b->generation_id += 1; + condition_broadcast(&b->cond); + return true; +} |