diff options
| author | gingerBill <bill@gingerbill.org> | 2022-03-31 11:55:46 +0100 |
|---|---|---|
| committer | gingerBill <bill@gingerbill.org> | 2022-03-31 11:55:46 +0100 |
| commit | 22b961ea53a1b9dfbc9218059b8473a0e52ae35c (patch) | |
| tree | 0e27bf001db0018e2f7412675d13027e53c08e86 /core/thread/thread_pool.odin | |
| parent | 06e8476efc421a27ea8cd908cd5e6d8b319ebf0f (diff) | |
Update Thread Pool in `core:thread`
Thanks to the work of eisbehr
Diffstat (limited to 'core/thread/thread_pool.odin')
| -rw-r--r-- | core/thread/thread_pool.odin | 224 |
1 files changed, 148 insertions, 76 deletions
diff --git a/core/thread/thread_pool.odin b/core/thread/thread_pool.odin index 97ea2b77d..3f782cf73 100644 --- a/core/thread/thread_pool.odin +++ b/core/thread/thread_pool.odin @@ -1,65 +1,75 @@ package thread +/* + thread.Pool + Copyright 2022 eisbehr + Made available under Odin's BSD-3 license. +*/ + import "core:intrinsics" import "core:sync" import "core:mem" -Task_Status :: enum i32 { - Ready, - Busy, - Waiting, - Term, -} - -Task_Proc :: #type proc(task: ^Task) +Task_Proc :: #type proc(task: Task) Task :: struct { - procedure: Task_Proc, - data: rawptr, + procedure: Task_Proc, + data: rawptr, user_index: int, + allocator: mem.Allocator, } -Task_Id :: distinct i32 -INVALID_TASK_ID :: Task_Id(-1) +// Do not access the pool's members directly while the pool threads are running, +// since they use different kinds of locking and mutual exclusion devices. +// Careless access can and will lead to nasty bugs. Once initialized, the +// pool's memory address is not allowed to change until it is destroyed. +Pool :: struct { + allocator: mem.Allocator, + mutex: sync.Mutex, + sem_available: sync.Sema, + // the following values are atomic + num_waiting: int, + num_in_processing: int, + num_outstanding: int, // num_waiting + num_in_processing + num_done: int, + // end of atomics -Pool :: struct { - allocator: mem.Allocator, - mutex: sync.Mutex, - sem_available: sync.Sema, - processing_task_count: int, // atomic - is_running: bool, + is_running: bool, threads: []^Thread, - tasks: [dynamic]Task, + tasks: [dynamic]Task, + tasks_done: [dynamic]Task, } -pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) { - worker_thread_internal :: proc(t: ^Thread) { - pool := (^Pool)(t.data) - - for pool.is_running { - sync.sema_wait(&pool.sem_available) - - if task, ok := pool_try_and_pop_task(pool); ok { - pool_do_work(pool, &task) - } - } - - sync.sema_post(&pool.sem_available, 1) - } - - +// Once initialized, the pool's memory address is not allowed to change until +// it is destroyed. If thread_count < 1, thread count 1 will be used. +// +// The thread pool requires an allocator which it either owns, or which is thread safe. +pool_init :: proc(pool: ^Pool, thread_count: int, allocator: mem.Allocator) { context.allocator = allocator pool.allocator = allocator - pool.tasks = make([dynamic]Task) - pool.threads = make([]^Thread, thread_count) + pool.tasks = make([dynamic]Task) + pool.tasks_done = make([dynamic]Task) + pool.threads = make([]^Thread, max(thread_count, 1)) pool.is_running = true for _, i in pool.threads { - t := create(worker_thread_internal) + t := create(proc(t: ^Thread) { + pool := (^Pool)(t.data) + + for intrinsics.atomic_load(&pool.is_running) { + sync.wait(&pool.sem_available) + + if task, ok := pool_pop_waiting(pool); ok { + pool_do_work(pool, task) + } + } + + sync.post(&pool.sem_available, 1) + }) t.user_index = i t.data = pool pool.threads[i] = t @@ -68,9 +78,10 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator pool_destroy :: proc(pool: ^Pool) { delete(pool.tasks) + delete(pool.tasks_done) - for thread in &pool.threads { - destroy(thread) + for t in &pool.threads { + destroy(t) } delete(pool.threads, pool.allocator) @@ -82,10 +93,12 @@ pool_start :: proc(pool: ^Pool) { } } +// Finish tasks that have already started processing, then shut down all pool +// threads. Might leave over waiting tasks, any memory allocated for the +// user data of those tasks will not be freed. pool_join :: proc(pool: ^Pool) { - pool.is_running = false - - sync.sema_post(&pool.sem_available, len(pool.threads)) + intrinsics.atomic_store(&pool.is_running, false) + sync.post(&pool.sem_available, len(pool.threads)) yield() @@ -94,53 +107,112 @@ pool_join :: proc(pool: ^Pool) { } } -pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) { - sync.mutex_lock(&pool.mutex) - defer sync.mutex_unlock(&pool.mutex) +// Add a task to the thread pool. +// +// Tasks can be added from any thread, not just the thread that created +// the thread pool. You can even add tasks from inside other tasks. +// +// Each task also needs an allocator which it either owns, or which is thread +// safe. By default, allocations in the task are disabled by use of the +// nil_allocator. +pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0, allocator := context.allocator) { + sync.guard(&pool.mutex) + + append(&pool.tasks, Task{ + procedure = procedure, + data = data, + user_index = user_index, + allocator = allocator, + }) + intrinsics.atomic_add(&pool.num_waiting, 1) + intrinsics.atomic_add(&pool.num_outstanding, 1) + sync.post(&pool.sem_available, 1) +} + +// Number of tasks waiting to be processed. Only informational, mostly for +// debugging. Don't rely on this value being consistent with other num_* +// values. +pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int { + return intrinsics.atomic_load(&pool.num_waiting) +} + +// Number of tasks currently being processed. Only informational, mostly for +// debugging. Don't rely on this value being consistent with other num_* +// values. +pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int { + return intrinsics.atomic_load(&pool.num_in_processing) +} + +// Outstanding tasks are all tasks that are not done, that is, tasks that are +// waiting, as well as tasks that are currently being processed. Only +// informational, mostly for debugging. Don't rely on this value being +// consistent with other num_* values. +pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int { + return intrinsics.atomic_load(&pool.num_outstanding) +} - task: Task - task.procedure = procedure - task.data = data - task.user_index = user_index +// Number of tasks which are done processing. Only informational, mostly for +// debugging. Don't rely on this value being consistent with other num_* +// values. +pool_num_done :: #force_inline proc(pool: ^Pool) -> int { + return intrinsics.atomic_load(&pool.num_done) +} - append(&pool.tasks, task) - sync.sema_post(&pool.sem_available, 1) +// If tasks are only being added from one thread, and this procedure is being +// called from that same thread, it will reliably tell if the thread pool is +// empty or not. Empty in this case means there are no tasks waiting, being +// processed, or _done_. +pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool { + return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0 } -pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) { - if sync.mutex_try_lock(&pool.mutex) { - if len(pool.tasks) != 0 { - intrinsics.atomic_add(&pool.processing_task_count, 1) - task = pop_front(&pool.tasks) - got_task = true - } - sync.mutex_unlock(&pool.mutex) +// Mostly for internal use. +pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) { + sync.guard(&pool.mutex) + + if len(pool.tasks) != 0 { + intrinsics.atomic_sub(&pool.num_waiting, 1) + intrinsics.atomic_add(&pool.num_in_processing, 1) + task = pop_front(&pool.tasks) + got_task = true } + return } +// Use this to take out finished tasks. +pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) { + sync.guard(&pool.mutex) + + if len(pool.tasks_done) != 0 { + task = pop_front(&pool.tasks_done) + got_task = true + intrinsics.atomic_sub(&pool.num_done, 1) + } -pool_do_work :: proc(pool: ^Pool, task: ^Task) { - task.procedure(task) - intrinsics.atomic_sub(&pool.processing_task_count, 1) + return } +// Mostly for internal use. +pool_do_work :: proc(pool: ^Pool, task: Task) { + { + context.allocator = task.allocator + task.procedure(task) + } -pool_wait_and_process :: proc(pool: ^Pool) { - for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 { - if task, ok := pool_try_and_pop_task(pool); ok { - pool_do_work(pool, &task) - } + sync.guard(&pool.mutex) - // Safety kick - if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 { - sync.mutex_lock(&pool.mutex) - sync.sema_post(&pool.sem_available, len(pool.tasks)) - sync.mutex_unlock(&pool.mutex) - } + append(&pool.tasks_done, task) + intrinsics.atomic_add(&pool.num_done, 1) + intrinsics.atomic_sub(&pool.num_outstanding, 1) + intrinsics.atomic_sub(&pool.num_in_processing, 1) +} - yield() +// Process the rest of the tasks, also use this thread for processing, then join +// all the pool threads. +pool_finish :: proc(pool: ^Pool) { + for task in pool_pop_waiting(pool) { + pool_do_work(pool, task) } - pool_join(pool) } |