aboutsummaryrefslogtreecommitdiff
path: root/src/checker.cpp
diff options
context:
space:
mode:
authorgingerBill <bill@gingerbill.org>2021-07-14 01:13:39 +0100
committergingerBill <bill@gingerbill.org>2021-07-14 01:13:39 +0100
commit6d8302825c10de815583057249249a6dc8282316 (patch)
tree26445b95f5f833e0d6cf07d6b95c16590b674eb2 /src/checker.cpp
parentfae8bf96dd45ca06d06b9067ce35b662c50a5234 (diff)
Add Greed Work Stealing and Random Load Balancing for check_procedure_bodies
Diffstat (limited to 'src/checker.cpp')
-rw-r--r--src/checker.cpp44
1 files changed, 35 insertions, 9 deletions
diff --git a/src/checker.cpp b/src/checker.cpp
index ebc82ae2f..37565b2a5 100644
--- a/src/checker.cpp
+++ b/src/checker.cpp
@@ -4554,33 +4554,59 @@ bool consume_proc_info_queue(Checker *c, ProcInfo *pi, ProcBodyQueue *q, Untyped
struct ThreadProcBodyData {
Checker *checker;
ProcBodyQueue *queue;
- isize thread_index;
+ u32 thread_index;
+ u32 thread_count;
ThreadProcBodyData *all_data;
- isize thread_count;
};
GB_THREAD_PROC(thread_proc_body) {
ThreadProcBodyData *data = cast(ThreadProcBodyData *)thread->user_data;
Checker *c = data->checker;
- ProcBodyQueue *q = data->queue;
- ProcInfo *pi = nullptr;
+ ThreadProcBodyData *all_data = data->all_data;
+ ProcBodyQueue *this_queue = data->queue;
UntypedExprInfoMap untyped = {};
map_init(&untyped, heap_allocator());
defer (map_destroy(&untyped));
- while (mpmc_dequeue(q, &pi)) {
+ gbRandom r = {}; gb_random_init(&r);
+
+ for (ProcInfo *pi; mpmc_dequeue(this_queue, &pi); /**/) {
+ // NOTE(bill): Randomize the load balancing of procs
+ // adding more procs to the queues
+ u32 index = gb_random_gen_u32(&r) % data->thread_count;
+ ProcBodyQueue *q = all_data[index].queue;
consume_proc_info_queue(c, pi, q, &untyped);
}
+ // Greedy Work Stealing
+retry:;
+ isize max_count = 0;
+ isize best_index = -1;
+ for (u32 i = 0; i < data->thread_count; i++) {
+ ProcBodyQueue *q = all_data[i].queue;
+ isize count = q->count.load(std::memory_order_relaxed);
+ if (max_count < count) {
+ max_count = count;
+ best_index = i;
+ }
+ }
+ if (best_index >= 0) {
+ ProcBodyQueue *other_queue = all_data[best_index].queue;
+ for (ProcInfo *pi; mpmc_dequeue(other_queue, &pi); /**/) {
+ consume_proc_info_queue(c, pi, this_queue, &untyped);
+ }
+ goto retry;
+ }
+
gb_semaphore_release(&c->procs_to_check_semaphore);
return 0;
}
void check_procedure_bodies(Checker *c) {
- isize thread_count = gb_max(build_context.thread_count, 1);
- isize worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
+ u32 thread_count = cast(u32)gb_max(build_context.thread_count, 1);
+ u32 worker_count = thread_count-1; // NOTE(bill): The main thread will also be used for work
if (!build_context.threaded_checker) {
worker_count = 0;
@@ -4600,13 +4626,13 @@ void check_procedure_bodies(Checker *c) {
isize load_count = (original_queue_count+thread_count-1)/thread_count;
ThreadProcBodyData *thread_data = gb_alloc_array(permanent_allocator(), ThreadProcBodyData, thread_count);
- for (isize i = 0; i < thread_count; i++) {
+ for (u32 i = 0; i < thread_count; i++) {
ThreadProcBodyData *data = thread_data + i;
data->checker = c;
data->queue = gb_alloc_item(permanent_allocator(), ProcBodyQueue);
data->thread_index = i;
- data->all_data = data;
data->thread_count = thread_count;
+ data->all_data = thread_data;
// NOTE(bill) 2x the amount assumes on average only 1 nested procedure
// TODO(bill): Determine a good heuristic
mpmc_init(data->queue, heap_allocator(), next_pow2_isize(load_count*2));