diff options
| author | thebirk <pingnor@gmail.com> | 2019-08-26 16:47:41 +0200 |
|---|---|---|
| committer | thebirk <pingnor@gmail.com> | 2019-08-26 16:47:41 +0200 |
| commit | c44d25d14f2a8f800170daae4a1c9d08858978b6 (patch) | |
| tree | 66cfd1e96d0ea2e48cf55283f77eecacc9aa7dd4 /src/parser.cpp | |
| parent | 4908d1ebdd00a8822d9ef59245f2456db4b6dbfc (diff) | |
Fixed parser creating a new thread for each file.
Diffstat (limited to 'src/parser.cpp')
| -rw-r--r-- | src/parser.cpp | 107 |
1 files changed, 97 insertions, 10 deletions
diff --git a/src/parser.cpp b/src/parser.cpp index 9aebc78d6..0a2a3c1fc 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -4679,8 +4679,48 @@ skip: return ParseFile_None; } +#if 1 +struct ParserWorkerThreadData { + Parser *parser; + + gbSemaphore resume_work; //NOTE(thebirk): Use to signal that the worker thead has a new work to do + ParseFileError err; + + gbMutex lock; //NOTE(thebirk): All variables below are locked by this mutex + bool error_available; + bool should_exit; +}; + GB_THREAD_PROC(parse_worker_file_proc) { + GB_ASSERT(thread != nullptr); + + ParserWorkerThreadData* data = cast(ParserWorkerThreadData*) thread->user_data; + + for(;;) { + gb_semaphore_wait(&data->resume_work); + + gb_mutex_lock(&data->lock); + if (data->should_exit) { + gb_mutex_unlock(&data->lock); + return isize(0); + } + + Parser *p = data->parser; + isize index = thread->user_index; + gb_mutex_lock(&p->file_add_mutex); + auto file_to_process = p->files_to_process[index]; + gb_mutex_unlock(&p->file_add_mutex); + data->err = process_imported_file(p, file_to_process); + + data->error_available = true; + gb_mutex_unlock(&data->lock); + } + + //GB_PANIC("A worker thread should not be able to reach the end!!!"); +} +#else +GB_THREAD_PROC(parse_worker_file_proc) { if (thread == nullptr) return 0; auto *p = cast(Parser *)thread->user_data; isize index = thread->user_index; @@ -4690,6 +4730,7 @@ GB_THREAD_PROC(parse_worker_file_proc) { ParseFileError err = process_imported_file(p, file_to_process); return cast(isize)err; } +#endif ParseFileError parse_packages(Parser *p, String init_filename) { GB_ASSERT(init_filename.text[init_filename.len] == 0); @@ -4729,14 +4770,36 @@ ParseFileError parse_packages(Parser *p, String init_filename) { curr_import_index++; } + auto worker_threads_data = array_make<ParserWorkerThreadData>(heap_allocator(), thread_count); + defer (array_free(&worker_threads_data)); + + for_array(i, worker_threads_data) { + ParserWorkerThreadData *data = &worker_threads_data[i]; + gb_mutex_init(&data->lock); + gb_semaphore_init(&data->resume_work); + data->parser = p; + data->err = ParseFile_None; + data->should_exit = false; + data->error_available = false; + } + defer(for_array(i, worker_threads_data) { + ParserWorkerThreadData *data = &worker_threads_data[i]; + gb_mutex_destroy(&data->lock); + gb_semaphore_destroy(&data->resume_work); + }); + auto worker_threads = array_make<gbThread>(heap_allocator(), thread_count); defer (array_free(&worker_threads)); for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; gb_thread_init(t); + char buffer[64]; + gb_snprintf(buffer, 64, "Parser Worker #%lld", i); + gb_thread_set_name(t, buffer); + gb_thread_start(t, parse_worker_file_proc, &worker_threads_data[i]); } - defer (for_array(i, worker_threads) { + defer(for_array(i, worker_threads) { gb_thread_destroy(&worker_threads[i]); }); @@ -4744,27 +4807,51 @@ ParseFileError parse_packages(Parser *p, String init_filename) { for (;;) { bool are_any_alive = false; + for_array(i, worker_threads) { gbThread *t = &worker_threads[i]; - if (gb_thread_is_running(t)) { - are_any_alive = true; - } else if (curr_import_index < p->files_to_process.count) { - auto curr_err = cast(ParseFileError)t->return_value; - if (curr_err != ParseFile_None) { - array_add(&errors, curr_err); - } else { + ParserWorkerThreadData *data = &worker_threads_data[i]; + + if (gb_mutex_try_lock(&data->lock)) { + if (data->error_available) { + auto curr_err = data->err; + if (curr_err != ParseFile_None) { + array_add(&errors, curr_err); + } + + data->error_available = false; + } + + if (curr_import_index < p->files_to_process.count) { t->user_index = curr_import_index; curr_import_index++; - gb_thread_start(t, parse_worker_file_proc, p); are_any_alive = true; + + gb_semaphore_release(&data->resume_work); } + + gb_mutex_unlock(&data->lock); + } else { + //NOTE(thebirk): If we cant lock a thread it must be working + are_any_alive = true; } } - if (!are_any_alive && curr_import_index >= p->files_to_process.count) { + + //NOTE(thebirk): Everything collapses without this, but it really shouldn't! + gb_yield(); + + if ((!are_any_alive) && (curr_import_index >= p->files_to_process.count)) { break; } } + //NOTE(thebirk): Signal all workers to exit + for_array(i, worker_threads_data) { + ParserWorkerThreadData* data = &worker_threads_data[i]; + data->should_exit = true; + gb_semaphore_release(&data->resume_work); + } + if (errors.count > 0) { return errors[errors.count-1]; } |