aboutsummaryrefslogtreecommitdiff
path: root/core/thread/thread_pool.odin
diff options
context:
space:
mode:
authorgingerBill <bill@gingerbill.org>2022-03-31 11:55:46 +0100
committergingerBill <bill@gingerbill.org>2022-03-31 11:55:46 +0100
commit22b961ea53a1b9dfbc9218059b8473a0e52ae35c (patch)
tree0e27bf001db0018e2f7412675d13027e53c08e86 /core/thread/thread_pool.odin
parent06e8476efc421a27ea8cd908cd5e6d8b319ebf0f (diff)
Update Thread Pool in `core:thread`
Thanks to the work of eisbehr
Diffstat (limited to 'core/thread/thread_pool.odin')
-rw-r--r--core/thread/thread_pool.odin224
1 files changed, 148 insertions, 76 deletions
diff --git a/core/thread/thread_pool.odin b/core/thread/thread_pool.odin
index 97ea2b77d..3f782cf73 100644
--- a/core/thread/thread_pool.odin
+++ b/core/thread/thread_pool.odin
@@ -1,65 +1,75 @@
package thread
+/*
+ thread.Pool
+ Copyright 2022 eisbehr
+ Made available under Odin's BSD-3 license.
+*/
+
import "core:intrinsics"
import "core:sync"
import "core:mem"
-Task_Status :: enum i32 {
- Ready,
- Busy,
- Waiting,
- Term,
-}
-
-Task_Proc :: #type proc(task: ^Task)
+Task_Proc :: #type proc(task: Task)
Task :: struct {
- procedure: Task_Proc,
- data: rawptr,
+ procedure: Task_Proc,
+ data: rawptr,
user_index: int,
+ allocator: mem.Allocator,
}
-Task_Id :: distinct i32
-INVALID_TASK_ID :: Task_Id(-1)
+// Do not access the pool's members directly while the pool threads are running,
+// since they use different kinds of locking and mutual exclusion devices.
+// Careless access can and will lead to nasty bugs. Once initialized, the
+// pool's memory address is not allowed to change until it is destroyed.
+Pool :: struct {
+ allocator: mem.Allocator,
+ mutex: sync.Mutex,
+ sem_available: sync.Sema,
+ // the following values are atomic
+ num_waiting: int,
+ num_in_processing: int,
+ num_outstanding: int, // num_waiting + num_in_processing
+ num_done: int,
+ // end of atomics
-Pool :: struct {
- allocator: mem.Allocator,
- mutex: sync.Mutex,
- sem_available: sync.Sema,
- processing_task_count: int, // atomic
- is_running: bool,
+ is_running: bool,
threads: []^Thread,
- tasks: [dynamic]Task,
+ tasks: [dynamic]Task,
+ tasks_done: [dynamic]Task,
}
-pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
- worker_thread_internal :: proc(t: ^Thread) {
- pool := (^Pool)(t.data)
-
- for pool.is_running {
- sync.sema_wait(&pool.sem_available)
-
- if task, ok := pool_try_and_pop_task(pool); ok {
- pool_do_work(pool, &task)
- }
- }
-
- sync.sema_post(&pool.sem_available, 1)
- }
-
-
+// Once initialized, the pool's memory address is not allowed to change until
+// it is destroyed. If thread_count < 1, thread count 1 will be used.
+//
+// The thread pool requires an allocator which it either owns, or which is thread safe.
+pool_init :: proc(pool: ^Pool, thread_count: int, allocator: mem.Allocator) {
context.allocator = allocator
pool.allocator = allocator
- pool.tasks = make([dynamic]Task)
- pool.threads = make([]^Thread, thread_count)
+ pool.tasks = make([dynamic]Task)
+ pool.tasks_done = make([dynamic]Task)
+ pool.threads = make([]^Thread, max(thread_count, 1))
pool.is_running = true
for _, i in pool.threads {
- t := create(worker_thread_internal)
+ t := create(proc(t: ^Thread) {
+ pool := (^Pool)(t.data)
+
+ for intrinsics.atomic_load(&pool.is_running) {
+ sync.wait(&pool.sem_available)
+
+ if task, ok := pool_pop_waiting(pool); ok {
+ pool_do_work(pool, task)
+ }
+ }
+
+ sync.post(&pool.sem_available, 1)
+ })
t.user_index = i
t.data = pool
pool.threads[i] = t
@@ -68,9 +78,10 @@ pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator
pool_destroy :: proc(pool: ^Pool) {
delete(pool.tasks)
+ delete(pool.tasks_done)
- for thread in &pool.threads {
- destroy(thread)
+ for t in &pool.threads {
+ destroy(t)
}
delete(pool.threads, pool.allocator)
@@ -82,10 +93,12 @@ pool_start :: proc(pool: ^Pool) {
}
}
+// Finish tasks that have already started processing, then shut down all pool
+// threads. Might leave over waiting tasks, any memory allocated for the
+// user data of those tasks will not be freed.
pool_join :: proc(pool: ^Pool) {
- pool.is_running = false
-
- sync.sema_post(&pool.sem_available, len(pool.threads))
+ intrinsics.atomic_store(&pool.is_running, false)
+ sync.post(&pool.sem_available, len(pool.threads))
yield()
@@ -94,53 +107,112 @@ pool_join :: proc(pool: ^Pool) {
}
}
-pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
- sync.mutex_lock(&pool.mutex)
- defer sync.mutex_unlock(&pool.mutex)
+// Add a task to the thread pool.
+//
+// Tasks can be added from any thread, not just the thread that created
+// the thread pool. You can even add tasks from inside other tasks.
+//
+// Each task also needs an allocator which it either owns, or which is thread
+// safe. By default, allocations in the task are disabled by use of the
+// nil_allocator.
+pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0, allocator := context.allocator) {
+ sync.guard(&pool.mutex)
+
+ append(&pool.tasks, Task{
+ procedure = procedure,
+ data = data,
+ user_index = user_index,
+ allocator = allocator,
+ })
+ intrinsics.atomic_add(&pool.num_waiting, 1)
+ intrinsics.atomic_add(&pool.num_outstanding, 1)
+ sync.post(&pool.sem_available, 1)
+}
+
+// Number of tasks waiting to be processed. Only informational, mostly for
+// debugging. Don't rely on this value being consistent with other num_*
+// values.
+pool_num_waiting :: #force_inline proc(pool: ^Pool) -> int {
+ return intrinsics.atomic_load(&pool.num_waiting)
+}
+
+// Number of tasks currently being processed. Only informational, mostly for
+// debugging. Don't rely on this value being consistent with other num_*
+// values.
+pool_num_in_processing :: #force_inline proc(pool: ^Pool) -> int {
+ return intrinsics.atomic_load(&pool.num_in_processing)
+}
+
+// Outstanding tasks are all tasks that are not done, that is, tasks that are
+// waiting, as well as tasks that are currently being processed. Only
+// informational, mostly for debugging. Don't rely on this value being
+// consistent with other num_* values.
+pool_num_outstanding :: #force_inline proc(pool: ^Pool) -> int {
+ return intrinsics.atomic_load(&pool.num_outstanding)
+}
- task: Task
- task.procedure = procedure
- task.data = data
- task.user_index = user_index
+// Number of tasks which are done processing. Only informational, mostly for
+// debugging. Don't rely on this value being consistent with other num_*
+// values.
+pool_num_done :: #force_inline proc(pool: ^Pool) -> int {
+ return intrinsics.atomic_load(&pool.num_done)
+}
- append(&pool.tasks, task)
- sync.sema_post(&pool.sem_available, 1)
+// If tasks are only being added from one thread, and this procedure is being
+// called from that same thread, it will reliably tell if the thread pool is
+// empty or not. Empty in this case means there are no tasks waiting, being
+// processed, or _done_.
+pool_is_empty :: #force_inline proc(pool: ^Pool) -> bool {
+ return pool_num_outstanding(pool) == 0 && pool_num_done(pool) == 0
}
-pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
- if sync.mutex_try_lock(&pool.mutex) {
- if len(pool.tasks) != 0 {
- intrinsics.atomic_add(&pool.processing_task_count, 1)
- task = pop_front(&pool.tasks)
- got_task = true
- }
- sync.mutex_unlock(&pool.mutex)
+// Mostly for internal use.
+pool_pop_waiting :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
+ sync.guard(&pool.mutex)
+
+ if len(pool.tasks) != 0 {
+ intrinsics.atomic_sub(&pool.num_waiting, 1)
+ intrinsics.atomic_add(&pool.num_in_processing, 1)
+ task = pop_front(&pool.tasks)
+ got_task = true
}
+
return
}
+// Use this to take out finished tasks.
+pool_pop_done :: proc(pool: ^Pool) -> (task: Task, got_task: bool) {
+ sync.guard(&pool.mutex)
+
+ if len(pool.tasks_done) != 0 {
+ task = pop_front(&pool.tasks_done)
+ got_task = true
+ intrinsics.atomic_sub(&pool.num_done, 1)
+ }
-pool_do_work :: proc(pool: ^Pool, task: ^Task) {
- task.procedure(task)
- intrinsics.atomic_sub(&pool.processing_task_count, 1)
+ return
}
+// Mostly for internal use.
+pool_do_work :: proc(pool: ^Pool, task: Task) {
+ {
+ context.allocator = task.allocator
+ task.procedure(task)
+ }
-pool_wait_and_process :: proc(pool: ^Pool) {
- for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
- if task, ok := pool_try_and_pop_task(pool); ok {
- pool_do_work(pool, &task)
- }
+ sync.guard(&pool.mutex)
- // Safety kick
- if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
- sync.mutex_lock(&pool.mutex)
- sync.sema_post(&pool.sem_available, len(pool.tasks))
- sync.mutex_unlock(&pool.mutex)
- }
+ append(&pool.tasks_done, task)
+ intrinsics.atomic_add(&pool.num_done, 1)
+ intrinsics.atomic_sub(&pool.num_outstanding, 1)
+ intrinsics.atomic_sub(&pool.num_in_processing, 1)
+}
- yield()
+// Process the rest of the tasks, also use this thread for processing, then join
+// all the pool threads.
+pool_finish :: proc(pool: ^Pool) {
+ for task in pool_pop_waiting(pool) {
+ pool_do_work(pool, task)
}
-
pool_join(pool)
}