From ad3a3547d61b3576a9b0841510669d0f102f9ccf Mon Sep 17 00:00:00 2001 From: gingerBill Date: Thu, 26 Aug 2021 21:22:30 +0100 Subject: Unify thread pool logic across the rest of the compiler, using a global thread pool --- src/checker.cpp | 117 ++++++++++++++++++++++---------------------------------- 1 file changed, 45 insertions(+), 72 deletions(-) (limited to 'src/checker.cpp') diff --git a/src/checker.cpp b/src/checker.cpp index 7e9b2c672..ebd5956c2 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -4140,7 +4140,7 @@ struct ThreadProcCheckerSection { }; -void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) { +void check_with_workers(Checker *c, WorkerTaskProc *proc, isize total_count) { isize thread_count = gb_max(build_context.thread_count, 1); isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work if (!build_context.threaded_checker) { @@ -4173,33 +4173,18 @@ void check_with_workers(Checker *c, ThreadProc *proc, isize total_count) { } GB_ASSERT(remaining_count <= 0); - Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count); - for (isize i = 0; i < worker_count; i++) { - thread_init(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_start(threads+i, proc, thread_data+i); + + for (isize i = 0; i < thread_count; i++) { + global_thread_pool_add_task(proc, thread_data+i); } - Thread dummy_main_thread = {}; - dummy_main_thread.user_data = thread_data+worker_count; - proc(&dummy_main_thread); - + global_thread_pool_wait(); semaphore_wait(&c->info.collect_semaphore); - - for (isize i = 0; i < worker_count; i++) { - thread_join(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_destroy(threads+i); - } } -THREAD_PROC(thread_proc_collect_entities) { - auto *data = cast(ThreadProcCheckerSection *)thread->user_data; - Checker *c = data->checker; +WORKER_TASK_PROC(thread_proc_collect_entities) { + auto *cs = cast(ThreadProcCheckerSection *)data; + Checker *c = cs->checker; CheckerContext collect_entity_ctx = make_checker_context(c); defer (destroy_checker_context(&collect_entity_ctx)); @@ -4208,8 +4193,8 @@ THREAD_PROC(thread_proc_collect_entities) { UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); - isize offset = data->offset; - isize file_end = gb_min(offset+data->count, c->info.files.entries.count); + isize offset = cs->offset; + isize file_end = gb_min(offset+cs->count, c->info.files.entries.count); for (isize i = offset; i < file_end; i++) { AstFile *f = c->info.files.entries[i].value; @@ -4246,9 +4231,9 @@ void check_export_entities_in_pkg(CheckerContext *ctx, AstPackage *pkg, UntypedE } } -THREAD_PROC(thread_proc_check_export_entities) { - auto data = cast(ThreadProcCheckerSection *)thread->user_data; - Checker *c = data->checker; +WORKER_TASK_PROC(thread_proc_check_export_entities) { + auto cs = cast(ThreadProcCheckerSection *)data; + Checker *c = cs->checker; CheckerContext ctx = make_checker_context(c); defer (destroy_checker_context(&ctx)); @@ -4256,8 +4241,8 @@ THREAD_PROC(thread_proc_check_export_entities) { UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); - isize end = gb_min(data->offset + data->count, c->info.packages.entries.count); - for (isize i = data->offset; i < end; i++) { + isize end = gb_min(cs->offset + cs->count, c->info.packages.entries.count); + for (isize i = cs->offset; i < end; i++) { AstPackage *pkg = c->info.packages.entries[i].value; check_export_entities_in_pkg(&ctx, pkg, &untyped); } @@ -4575,15 +4560,19 @@ void calculate_global_init_order(Checker *c) { } -void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) { +bool check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, ProcBodyQueue *procs_to_check_queue) { if (pi == nullptr) { - return; + return false; } if (pi->type == nullptr) { - return; + return false; } + Entity *e = pi->decl->entity; if (pi->decl->proc_checked) { - return; + if (e != nullptr) { + GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked); + } + return true; } CheckerContext ctx = make_checker_context(c); @@ -4601,14 +4590,13 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc token = ast_token(pi->poly_def_node); } error(token, "Unspecialized polymorphic procedure '%.*s'", LIT(name)); - return; + return false; } if (pt->is_polymorphic && pt->is_poly_specialized) { - Entity *e = pi->decl->entity; if ((e->flags & EntityFlag_Used) == 0) { // NOTE(bill, 2019-08-31): It was never used, don't check - return; + return false; } } @@ -4622,16 +4610,17 @@ void check_proc_info(Checker *c, ProcInfo *pi, UntypedExprInfoMap *untyped, Proc ctx.state_flags |= StateFlag_no_bounds_check; ctx.state_flags &= ~StateFlag_bounds_check; } - if (pi->body != nullptr && pi->decl->entity != nullptr) { - GB_ASSERT((pi->decl->entity->flags & EntityFlag_ProcBodyChecked) == 0); + if (pi->body != nullptr && e != nullptr) { + GB_ASSERT((e->flags & EntityFlag_ProcBodyChecked) == 0); } check_proc_body(&ctx, pi->token, pi->decl, pi->type, pi->body); - if (pi->body != nullptr && pi->decl->entity != nullptr) { - pi->decl->entity->flags |= EntityFlag_ProcBodyChecked; + if (e != nullptr) { + e->flags |= EntityFlag_ProcBodyChecked; } pi->decl->proc_checked = true; add_untyped_expressions(&c->info, ctx.untyped); + return true; } GB_STATIC_ASSERT(sizeof(isize) == sizeof(void *)); @@ -4681,9 +4670,10 @@ void check_unchecked_bodies(Checker *c) { ProcInfo *pi = nullptr; while (mpmc_dequeue(q, &pi)) { Entity *e = pi->decl->entity; - consume_proc_info_queue(c, pi, q, &untyped); - add_dependency_to_set(c, e); - GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked); + if (consume_proc_info_queue(c, pi, q, &untyped)) { + add_dependency_to_set(c, e); + GB_ASSERT(e->flags & EntityFlag_ProcBodyChecked); + } } } @@ -4728,15 +4718,15 @@ bool consume_proc_info_queue(Checker *c, ProcInfo *pi, ProcBodyQueue *q, Untyped // NOTE(bill): In single threaded mode, this should never happen if (parent->kind == Entity_Procedure && (parent->flags & EntityFlag_ProcBodyChecked) == 0) { mpmc_enqueue(q, pi); - return true; + return false; } } if (untyped) { map_clear(untyped); } - check_proc_info(c, pi, untyped, q); + bool ok = check_proc_info(c, pi, untyped, q); total_bodies_checked.fetch_add(1, std::memory_order_relaxed); - return false; + return ok; } struct ThreadProcBodyData { @@ -4747,11 +4737,11 @@ struct ThreadProcBodyData { ThreadProcBodyData *all_data; }; -THREAD_PROC(thread_proc_body) { - ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data; - Checker *c = data->checker; +WORKER_TASK_PROC(thread_proc_body) { + ThreadProcBodyData *bd = cast(ThreadProcBodyData *)data; + Checker *c = bd->checker; GB_ASSERT(c != nullptr); - ProcBodyQueue *this_queue = data->queue; + ProcBodyQueue *this_queue = bd->queue; UntypedExprInfoMap untyped = {}; map_init(&untyped, heap_allocator()); @@ -4821,31 +4811,14 @@ void check_procedure_bodies(Checker *c) { } GB_ASSERT(total_queued == original_queue_count); - semaphore_post(&c->procs_to_check_semaphore, cast(i32)thread_count); - - Thread *threads = gb_alloc_array(permanent_allocator(), Thread, worker_count); - for (isize i = 0; i < worker_count; i++) { - thread_init(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_start(threads+i, thread_proc_body, thread_data+i); + + for (isize i = 0; i < thread_count; i++) { + global_thread_pool_add_task(thread_proc_body, thread_data+i); } - Thread dummy_main_thread = {}; - dummy_main_thread.user_data = thread_data+worker_count; - thread_proc_body(&dummy_main_thread); - + global_thread_pool_wait(); semaphore_wait(&c->procs_to_check_semaphore); - for (isize i = 0; i < worker_count; i++) { - thread_join(threads+i); - } - - for (isize i = 0; i < worker_count; i++) { - thread_destroy(threads+i); - } - isize global_remaining = c->procs_to_check_queue.count.load(std::memory_order_relaxed); GB_ASSERT(global_remaining == 0); -- cgit v1.2.3 From ac191bd31f2983942e297beba077484ba58922f4 Mon Sep 17 00:00:00 2001 From: gingerBill Date: Thu, 26 Aug 2021 21:30:23 +0100 Subject: Simplify logic for `-thread-count:1` --- src/checker.cpp | 20 -------------------- src/thread_pool.cpp | 8 ++++++++ 2 files changed, 8 insertions(+), 20 deletions(-) (limited to 'src/checker.cpp') diff --git a/src/checker.cpp b/src/checker.cpp index ebd5956c2..c93837321 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -4149,17 +4149,6 @@ void check_with_workers(Checker *c, WorkerTaskProc *proc, isize total_count) { semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count); - if (worker_count == 0) { - ThreadProcCheckerSection section_all = {}; - section_all.checker = c; - section_all.offset = 0; - section_all.count = total_count; - Thread dummy_main_thread = {}; - dummy_main_thread.user_data = §ion_all; - proc(&dummy_main_thread); - return; - } - isize file_load_count = (total_count+thread_count-1)/thread_count; isize remaining_count = total_count; @@ -4764,15 +4753,6 @@ void check_procedure_bodies(Checker *c) { u32 worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work if (!build_context.threaded_checker) { worker_count = 0; - - auto *q = &c->procs_to_check_queue; - ProcInfo *pi = nullptr; - while (mpmc_dequeue(q, &pi)) { - consume_proc_info_queue(c, pi, q, nullptr); - } - - debugf("Total Procedure Bodies Checked: %td\n", total_bodies_checked.load(std::memory_order_relaxed)); - return; } global_procedure_body_in_worker_queue = true; diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index b0357996c..a8bc327e5 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -104,6 +104,14 @@ void thread_pool_do_task(WorkerTask *task) { } void thread_pool_wait(ThreadPool *pool) { + if (pool->threads.count == 0) { + while (!thread_pool_queue_empty(pool)) { + thread_pool_do_task(thread_pool_queue_pop(pool)); + --pool->ready; + } + GB_ASSERT(pool->ready == 0); + return; + } for (;;) { mutex_lock(&pool->mutex); -- cgit v1.2.3 From 6d49df1d87fb1d74f00977dee3a3ce42c46c1eee Mon Sep 17 00:00:00 2001 From: gingerBill Date: Thu, 26 Aug 2021 21:40:54 +0100 Subject: Don't use the thread pool if worker count is 0 --- src/checker.cpp | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'src/checker.cpp') diff --git a/src/checker.cpp b/src/checker.cpp index c93837321..e3af570f8 100644 --- a/src/checker.cpp +++ b/src/checker.cpp @@ -4146,8 +4146,16 @@ void check_with_workers(Checker *c, WorkerTaskProc *proc, isize total_count) { if (!build_context.threaded_checker) { worker_count = 0; } - + semaphore_post(&c->info.collect_semaphore, cast(i32)thread_count); + if (worker_count == 0) { + ThreadProcCheckerSection section_all = {}; + section_all.checker = c; + section_all.offset = 0; + section_all.count = total_count; + proc(§ion_all); + return; + } isize file_load_count = (total_count+thread_count-1)/thread_count; isize remaining_count = total_count; @@ -4754,6 +4762,21 @@ void check_procedure_bodies(Checker *c) { if (!build_context.threaded_checker) { worker_count = 0; } + if (worker_count == 0) { + auto *this_queue = &c->procs_to_check_queue; + + UntypedExprInfoMap untyped = {}; + map_init(&untyped, heap_allocator()); + + for (ProcInfo *pi = nullptr; mpmc_dequeue(this_queue, &pi); /**/) { + consume_proc_info_queue(c, pi, this_queue, &untyped); + } + + map_destroy(&untyped); + + debugf("Total Procedure Bodies Checked: %td\n", total_bodies_checked.load(std::memory_order_relaxed)); + return; + } global_procedure_body_in_worker_queue = true; -- cgit v1.2.3