diff options
| author | gingerBill <bill@gingerbill.org> | 2020-01-02 15:07:12 +0000 |
|---|---|---|
| committer | gingerBill <bill@gingerbill.org> | 2020-01-02 15:07:12 +0000 |
| commit | 3bd00fd6b7bea465ab9882e6b0f1b8e0842a4fa1 (patch) | |
| tree | 0a606061b785be4090fce211f2ca3d396771e174 /core/thread/thread_pool.odin | |
| parent | 16a7c55334fafa48d5fd84e86071263f43a3724f (diff) | |
Add `thread.Pool` with example in demo.odin; Update linalg to support handness changes for projection matrices
Diffstat (limited to 'core/thread/thread_pool.odin')
| -rw-r--r-- | core/thread/thread_pool.odin | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/core/thread/thread_pool.odin b/core/thread/thread_pool.odin new file mode 100644 index 000000000..61d326f0c --- /dev/null +++ b/core/thread/thread_pool.odin @@ -0,0 +1,147 @@ +package thread + +import "intrinsics" +import "core:sync" +import "core:mem" + +Task_Status :: enum i32 { + Ready, + Busy, + Waiting, + Term, +} + +Task_Proc :: #type proc(task: ^Task); + +Task :: struct { + procedure: Task_Proc, + data: rawptr, + user_index: int, +} + +Task_Id :: distinct i32; +INVALID_TASK_ID :: Task_Id(-1); + + +Pool :: struct { + allocator: mem.Allocator, + mutex: sync.Mutex, + sem_available: sync.Semaphore, + processing_task_count: int, // atomic + is_running: bool, + + threads: []^Thread, + + tasks: [dynamic]Task, +} + +pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) { + worker_thread_internal :: proc(t: ^Thread) { + pool := (^Pool)(t.data); + + for pool.is_running { + sync.semaphore_wait_for(&pool.sem_available); + + if task, ok := pool_try_and_pop_task(pool); ok { + pool_do_work(pool, &task); + } + } + + sync.semaphore_post(&pool.sem_available, 1); + } + + + context.allocator = allocator; + pool.allocator = allocator; + pool.tasks = make([dynamic]Task); + pool.threads = make([]^Thread, thread_count); + + sync.mutex_init(&pool.mutex); + sync.semaphore_init(&pool.sem_available); + pool.is_running = true; + + for _, i in pool.threads { + t := create(worker_thread_internal); + t.user_index = i; + t.data = pool; + pool.threads[i] = t; + } +} + +pool_destroy :: proc(pool: ^Pool) { + delete(pool.tasks); + delete(pool.threads, pool.allocator); + + sync.mutex_destroy(&pool.mutex); + sync.semaphore_destroy(&pool.sem_available); +} + +pool_start :: proc(pool: ^Pool) { + for t in pool.threads { + start(t); + } +} + +pool_join :: proc(pool: ^Pool) { + pool.is_running = false; + + sync.semaphore_post(&pool.sem_available, len(pool.threads)); + + yield(); + + for t in pool.threads { + join(t); + } +} + +pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) { + sync.mutex_lock(&pool.mutex); + defer sync.mutex_unlock(&pool.mutex); + + task: Task; + task.procedure = procedure; + task.data = data; + task.user_index = user_index; + + append(&pool.tasks, task); + sync.semaphore_post(&pool.sem_available, 1); +} + +pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) { + if sync.mutex_try_lock(&pool.mutex) { + if len(pool.tasks) != 0 { + intrinsics.atomic_add(&pool.processing_task_count, 1); + task = pool.tasks[0]; + got_task = true; + ordered_remove(&pool.tasks, 0); + } + sync.mutex_unlock(&pool.mutex); + } + return; +} + + +pool_do_work :: proc(pool: ^Pool, task: ^Task) { + task.procedure(task); + intrinsics.atomic_sub(&pool.processing_task_count, 1); +} + + +pool_wait_and_process :: proc(pool: ^Pool) { + for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 { + if task, ok := pool_try_and_pop_task(pool); ok { + pool_do_work(pool, &task); + } + + // Safety kick + if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 { + sync.mutex_lock(&pool.mutex); + sync.semaphore_post(&pool.sem_available, len(pool.tasks)); + sync.mutex_unlock(&pool.mutex); + } + + yield(); + } + + pool_join(pool); +} |