aboutsummaryrefslogtreecommitdiff
path: root/core/thread
diff options
context:
space:
mode:
authorgingerBill <bill@gingerbill.org>2020-01-02 15:07:12 +0000
committergingerBill <bill@gingerbill.org>2020-01-02 15:07:12 +0000
commit3bd00fd6b7bea465ab9882e6b0f1b8e0842a4fa1 (patch)
tree0a606061b785be4090fce211f2ca3d396771e174 /core/thread
parent16a7c55334fafa48d5fd84e86071263f43a3724f (diff)
Add `thread.Pool` with example in demo.odin; Update linalg to support handness changes for projection matrices
Diffstat (limited to 'core/thread')
-rw-r--r--core/thread/thread.odin6
-rw-r--r--core/thread/thread_pool.odin147
-rw-r--r--core/thread/thread_unix.odin5
-rw-r--r--core/thread/thread_windows.odin4
4 files changed, 159 insertions, 3 deletions
diff --git a/core/thread/thread.odin b/core/thread/thread.odin
index c326b30f1..00ee2f4b3 100644
--- a/core/thread/thread.odin
+++ b/core/thread/thread.odin
@@ -1,6 +1,6 @@
-package thread;
+package thread
-import "core:runtime";
+import "core:runtime"
Thread_Proc :: #type proc(^Thread);
@@ -12,4 +12,4 @@ Thread :: struct {
init_context: runtime.Context,
use_init_context: bool,
-} \ No newline at end of file
+}
diff --git a/core/thread/thread_pool.odin b/core/thread/thread_pool.odin
new file mode 100644
index 000000000..61d326f0c
--- /dev/null
+++ b/core/thread/thread_pool.odin
@@ -0,0 +1,147 @@
+package thread
+
+import "intrinsics"
+import "core:sync"
+import "core:mem"
+
+Task_Status :: enum i32 {
+ Ready,
+ Busy,
+ Waiting,
+ Term,
+}
+
+Task_Proc :: #type proc(task: ^Task);
+
+Task :: struct {
+ procedure: Task_Proc,
+ data: rawptr,
+ user_index: int,
+}
+
+Task_Id :: distinct i32;
+INVALID_TASK_ID :: Task_Id(-1);
+
+
+Pool :: struct {
+ allocator: mem.Allocator,
+ mutex: sync.Mutex,
+ sem_available: sync.Semaphore,
+ processing_task_count: int, // atomic
+ is_running: bool,
+
+ threads: []^Thread,
+
+ tasks: [dynamic]Task,
+}
+
+pool_init :: proc(pool: ^Pool, thread_count: int, allocator := context.allocator) {
+ worker_thread_internal :: proc(t: ^Thread) {
+ pool := (^Pool)(t.data);
+
+ for pool.is_running {
+ sync.semaphore_wait_for(&pool.sem_available);
+
+ if task, ok := pool_try_and_pop_task(pool); ok {
+ pool_do_work(pool, &task);
+ }
+ }
+
+ sync.semaphore_post(&pool.sem_available, 1);
+ }
+
+
+ context.allocator = allocator;
+ pool.allocator = allocator;
+ pool.tasks = make([dynamic]Task);
+ pool.threads = make([]^Thread, thread_count);
+
+ sync.mutex_init(&pool.mutex);
+ sync.semaphore_init(&pool.sem_available);
+ pool.is_running = true;
+
+ for _, i in pool.threads {
+ t := create(worker_thread_internal);
+ t.user_index = i;
+ t.data = pool;
+ pool.threads[i] = t;
+ }
+}
+
+pool_destroy :: proc(pool: ^Pool) {
+ delete(pool.tasks);
+ delete(pool.threads, pool.allocator);
+
+ sync.mutex_destroy(&pool.mutex);
+ sync.semaphore_destroy(&pool.sem_available);
+}
+
+pool_start :: proc(pool: ^Pool) {
+ for t in pool.threads {
+ start(t);
+ }
+}
+
+pool_join :: proc(pool: ^Pool) {
+ pool.is_running = false;
+
+ sync.semaphore_post(&pool.sem_available, len(pool.threads));
+
+ yield();
+
+ for t in pool.threads {
+ join(t);
+ }
+}
+
+pool_add_task :: proc(pool: ^Pool, procedure: Task_Proc, data: rawptr, user_index: int = 0) {
+ sync.mutex_lock(&pool.mutex);
+ defer sync.mutex_unlock(&pool.mutex);
+
+ task: Task;
+ task.procedure = procedure;
+ task.data = data;
+ task.user_index = user_index;
+
+ append(&pool.tasks, task);
+ sync.semaphore_post(&pool.sem_available, 1);
+}
+
+pool_try_and_pop_task :: proc(pool: ^Pool) -> (task: Task, got_task: bool = false) {
+ if sync.mutex_try_lock(&pool.mutex) {
+ if len(pool.tasks) != 0 {
+ intrinsics.atomic_add(&pool.processing_task_count, 1);
+ task = pool.tasks[0];
+ got_task = true;
+ ordered_remove(&pool.tasks, 0);
+ }
+ sync.mutex_unlock(&pool.mutex);
+ }
+ return;
+}
+
+
+pool_do_work :: proc(pool: ^Pool, task: ^Task) {
+ task.procedure(task);
+ intrinsics.atomic_sub(&pool.processing_task_count, 1);
+}
+
+
+pool_wait_and_process :: proc(pool: ^Pool) {
+ for len(pool.tasks) != 0 || intrinsics.atomic_load(&pool.processing_task_count) != 0 {
+ if task, ok := pool_try_and_pop_task(pool); ok {
+ pool_do_work(pool, &task);
+ }
+
+ // Safety kick
+ if len(pool.tasks) != 0 && intrinsics.atomic_load(&pool.processing_task_count) == 0 {
+ sync.mutex_lock(&pool.mutex);
+ sync.semaphore_post(&pool.sem_available, len(pool.tasks));
+ sync.mutex_unlock(&pool.mutex);
+ }
+
+ yield();
+ }
+
+ pool_join(pool);
+}
diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin
index 9a1680043..15f345c7c 100644
--- a/core/thread/thread_unix.odin
+++ b/core/thread/thread_unix.odin
@@ -155,3 +155,8 @@ destroy :: proc(t: ^Thread) {
t.unix_thread = {};
free(t);
}
+
+
+yield :: proc() {
+ unix.pthread_yield()
+}
diff --git a/core/thread/thread_windows.odin b/core/thread/thread_windows.odin
index fb8915057..a558e1b0a 100644
--- a/core/thread/thread_windows.odin
+++ b/core/thread/thread_windows.odin
@@ -92,3 +92,7 @@ destroy :: proc(thread: ^Thread) {
terminate :: proc(using thread : ^Thread, exit_code : u32) {
win32.terminate_thread(win32_thread, exit_code);
}
+
+yield :: proc() {
+ win32.sleep(0);
+}