From c44d25d14f2a8f800170daae4a1c9d08858978b6 Mon Sep 17 00:00:00 2001 From: thebirk Date: Mon, 26 Aug 2019 16:47:41 +0200 Subject: Fixed parser creating a new thread for each file. --- src/parser.cpp | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 97 insertions(+), 10 deletions(-) (limited to 'src/parser.cpp') diff --git a/src/parser.cpp b/src/parser.cpp index 9aebc78d6..0a2a3c1fc 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4679,7 +4679,47 @@ skip: return ParseFile_None; } +#if 1 +struct ParserWorkerThreadData { + Parser *parser; + + gbSemaphore resume_work; //NOTE(thebirk): Use to signal that the worker thead has a new work to do + ParseFileError err; + + gbMutex lock; //NOTE(thebirk): All variables below are locked by this mutex + bool error_available; + bool should_exit; +}; + +GB_THREAD_PROC(parse_worker_file_proc) { + GB_ASSERT(thread != nullptr); + + ParserWorkerThreadData* data = cast(ParserWorkerThreadData*) thread->user_data; + + for(;;) { + gb_semaphore_wait(&data->resume_work); + + gb_mutex_lock(&data->lock); + if (data->should_exit) { + gb_mutex_unlock(&data->lock); + return isize(0); + } + + Parser *p = data->parser; + isize index = thread->user_index; + gb_mutex_lock(&p->file_add_mutex); + auto file_to_process = p->files_to_process[index]; + gb_mutex_unlock(&p->file_add_mutex); + data->err = process_imported_file(p, file_to_process); + + data->error_available = true; + gb_mutex_unlock(&data->lock); + } + + //GB_PANIC("A worker thread should not be able to reach the end!!!"); +} +#else GB_THREAD_PROC(parse_worker_file_proc) { if (thread == nullptr) return 0; auto *p = cast(Parser *)thread->user_data; @@ -4690,6 +4730,7 @@ GB_THREAD_PROC(parse_worker_file_proc) { ParseFileError err = process_imported_file(p, file_to_process); return cast(isize)err; } +#endif ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); @@ -4729,14 +4770,36 @@ ParseFileError parse_packages(Parser *p, String init_filename) { curr_import_index++; } + auto worker_threads_data = array_make(heap_allocator(), thread_count); + defer (array_free(&worker_threads_data)); + + for_array(i, worker_threads_data) { + ParserWorkerThreadData *data = &worker_threads_data[i]; + gb_mutex_init(&data->lock); + gb_semaphore_init(&data->resume_work); + data->parser = p; + data->err = ParseFile_None; + data->should_exit = false; + data->error_available = false; + } + defer(for_array(i, worker_threads_data) { + ParserWorkerThreadData *data = &worker_threads_data[i]; + gb_mutex_destroy(&data->lock); + gb_semaphore_destroy(&data->resume_work); + }); + auto worker_threads = array_make(heap_allocator(), thread_count); defer (array_free(&worker_threads)); for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; gb_thread_init(t); + char buffer[64]; + gb_snprintf(buffer, 64, "Parser Worker #%lld", i); + gb_thread_set_name(t, buffer); + gb_thread_start(t, parse_worker_file_proc, &worker_threads_data[i]); } - defer (for_array(i, worker_threads) { + defer(for_array(i, worker_threads) { gb_thread_destroy(&worker_threads[i]); }); @@ -4744,27 +4807,51 @@ ParseFileError parse_packages(Parser *p, String init_filename) { for (;;) { bool are_any_alive = false; + for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; - if (gb_thread_is_running(t)) { - are_any_alive = true; - } else if (curr_import_index < p->files_to_process.count) { - auto curr_err = cast(ParseFileError)t->return_value; - if (curr_err != ParseFile_None) { - array_add(&errors, curr_err); - } else { + ParserWorkerThreadData *data = &worker_threads_data[i]; + + if (gb_mutex_try_lock(&data->lock)) { + if (data->error_available) { + auto curr_err = data->err; + if (curr_err != ParseFile_None) { + array_add(&errors, curr_err); + } + + data->error_available = false; + } + + if (curr_import_index < p->files_to_process.count) { t->user_index = curr_import_index; curr_import_index++; - gb_thread_start(t, parse_worker_file_proc, p); are_any_alive = true; + + gb_semaphore_release(&data->resume_work); } + + gb_mutex_unlock(&data->lock); + } else { + //NOTE(thebirk): If we cant lock a thread it must be working + are_any_alive = true; } } - if (!are_any_alive && curr_import_index >= p->files_to_process.count) { + + //NOTE(thebirk): Everything collapses without this, but it really shouldn't! + gb_yield(); + + if ((!are_any_alive) && (curr_import_index >= p->files_to_process.count)) { break; } } + //NOTE(thebirk): Signal all workers to exit + for_array(i, worker_threads_data) { + ParserWorkerThreadData* data = &worker_threads_data[i]; + data->should_exit = true; + gb_semaphore_release(&data->resume_work); + } + if (errors.count > 0) { return errors[errors.count-1]; } -- cgit v1.2.3