aboutsummaryrefslogtreecommitdiff
path: root/src/threading.cpp
diff options
context:
space:
mode:
authorgingerBill <gingerBill@users.noreply.github.com>2023-01-01 13:29:20 +0000
committerGitHub <noreply@github.com>2023-01-01 13:29:20 +0000
commitc08ff891ad05605be56ea6a789e8d6c9a801da1f (patch)
tree27edc49833bfac6d1054bc8e3ea07d2191648f19 /src/threading.cpp
parent28fb35f2f7a6ffd75e76dd95352f4194d79b3166 (diff)
parent168cec1e9d56563719039d766fc6bf776f3cf5ee (diff)
Merge pull request #2287 from odin-lang/compiler-improvements-2022-12
Compiler improvements 2022 12
Diffstat (limited to 'src/threading.cpp')
-rw-r--r--src/threading.cpp271
1 files changed, 220 insertions, 51 deletions
diff --git a/src/threading.cpp b/src/threading.cpp
index e92ed5e31..493e57c91 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -11,24 +11,34 @@ struct RecursiveMutex;
struct Semaphore;
struct Condition;
struct Thread;
+struct ThreadPool;
#define THREAD_PROC(name) isize name(struct Thread *thread)
-typedef THREAD_PROC(ThreadProc);
+gb_internal THREAD_PROC(thread_pool_thread_proc);
+
+#define WORKER_TASK_PROC(name) isize name(void *data)
+typedef WORKER_TASK_PROC(WorkerTaskProc);
+
+typedef struct WorkerTask {
+ WorkerTaskProc *do_work;
+ void *data;
+} WorkerTask;
struct Thread {
#if defined(GB_SYSTEM_WINDOWS)
- void * win32_handle;
+ void *win32_handle;
#else
- pthread_t posix_handle;
+ pthread_t posix_handle;
#endif
+
+ isize idx;
- ThreadProc * proc;
- void * user_data;
- isize user_index;
- isize volatile return_value;
+ WorkerTask *queue;
+ size_t capacity;
+ std::atomic<uint64_t> head_and_tail;
- isize stack_size;
- std::atomic<bool> is_running;
+ isize stack_size;
+ struct ThreadPool *pool;
};
@@ -59,10 +69,9 @@ gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32
gb_internal u32 thread_current_id(void);
-gb_internal void thread_init_and_start (Thread *t, ThreadProc *proc, void *data);
-gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size);
+gb_internal void thread_init (ThreadPool *pool, Thread *t, isize idx);
+gb_internal void thread_init_and_start (ThreadPool *pool, Thread *t, isize idx);
gb_internal void thread_join_and_destroy(Thread *t);
-gb_internal bool thread_is_running (Thread const *t);
gb_internal void thread_set_name (Thread *t, char const *name);
gb_internal void yield_thread(void);
@@ -325,47 +334,45 @@ gb_internal gb_inline void yield(void) {
#endif
}
-gb_internal void private__thread_run(Thread *t) {
- t->return_value = t->proc(t);
-}
-
#if defined(GB_SYSTEM_WINDOWS)
- gb_internal DWORD __stdcall internal_thread_proc(void *arg) {
- Thread *t = cast(Thread *)arg;
- t->is_running.store(true);
- private__thread_run(t);
- return 0;
- }
+gb_internal DWORD __stdcall internal_thread_proc(void *arg) {
+ Thread *t = cast(Thread *)arg;
+ thread_pool_thread_proc(t);
+ return 0;
+}
#else
- gb_internal void *internal_thread_proc(void *arg) {
- #if (GB_SYSTEM_LINUX)
- // NOTE: Don't permit any signal delivery to threads on Linux.
- sigset_t mask = {};
- sigfillset(&mask);
- GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals");
- #endif
-
- Thread *t = cast(Thread *)arg;
- t->is_running.store(true);
- private__thread_run(t);
- return NULL;
- }
+gb_internal void *internal_thread_proc(void *arg) {
+#if (GB_SYSTEM_LINUX)
+ // NOTE: Don't permit any signal delivery to threads on Linux.
+ sigset_t mask = {};
+ sigfillset(&mask);
+ GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals");
+#endif
+
+ Thread *t = cast(Thread *)arg;
+ thread_pool_thread_proc(t);
+ return NULL;
+}
#endif
-gb_internal void thread_init_and_start(Thread *t, ThreadProc *proc, void *user_data) { thread_init_and_start_with_stack(t, proc, user_data, 0); }
-
-gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) {
+gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) {
gb_zero_item(t);
#if defined(GB_SYSTEM_WINDOWS)
t->win32_handle = INVALID_HANDLE_VALUE;
#else
t->posix_handle = 0;
#endif
- GB_ASSERT(!t->is_running.load());
- GB_ASSERT(proc != NULL);
- t->proc = proc;
- t->user_data = user_data;
- t->stack_size = stack_size;
+
+ t->capacity = 1 << 14; // must be a power of 2
+ t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity);
+ t->head_and_tail = 0;
+ t->pool = pool;
+ t->idx = idx;
+}
+
+gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) {
+ thread_init(pool, t, idx);
+ isize stack_size = 0;
#if defined(GB_SYSTEM_WINDOWS)
t->win32_handle = CreateThread(NULL, stack_size, internal_thread_proc, t, 0, NULL);
@@ -385,10 +392,6 @@ gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, v
}
gb_internal void thread_join_and_destroy(Thread *t) {
- if (!t->is_running.load()) {
- return;
- }
-
#if defined(GB_SYSTEM_WINDOWS)
WaitForSingleObject(t->win32_handle, INFINITE);
CloseHandle(t->win32_handle);
@@ -397,11 +400,8 @@ gb_internal void thread_join_and_destroy(Thread *t) {
pthread_join(t->posix_handle, NULL);
t->posix_handle = 0;
#endif
- t->is_running.store(false);
}
-gb_internal bool thread_is_running(Thread const *t) { return t->is_running.load(); }
-
gb_internal void thread_set_name(Thread *t, char const *name) {
#if defined(GB_COMPILER_MSVC)
#pragma pack(push, 8)
@@ -437,7 +437,176 @@ gb_internal void thread_set_name(Thread *t, char const *name) {
#endif
}
+#if defined(GB_SYSTEM_LINUX)
+#include <linux/futex.h>
+#include <sys/syscall.h>
+
+typedef std::atomic<int32_t> Futex;
+typedef volatile int32_t Footex;
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+ for (;;) {
+ int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0);
+ if (ret == -1) {
+ perror("Futex wake");
+ GB_PANIC("Failed in futex wake!\n");
+ } else if (ret > 0) {
+ return;
+ }
+ }
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+ for (;;) {
+ int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0);
+ if (ret == -1) {
+ if (errno != EAGAIN) {
+ perror("Futex wait");
+ GB_PANIC("Failed in futex wait!\n");
+ } else {
+ return;
+ }
+ } else if (ret == 0) {
+ if (*addr != val) {
+ return;
+ }
+ }
+ }
+}
+
+#elif defined(GB_SYSTEM_FREEBSD)
+
+#include <sys/types.h>
+#include <sys/umtx.h>
+
+typedef std::atomic<int32_t> Futex;
+typedef volatile int32_t Footex;
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+ _umtx_op(addr, UMTX_OP_WAKE, 1, 0, 0);
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+ for (;;) {
+ int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL);
+ if (ret == 0) {
+ if (errno == ETIMEDOUT || errno == EINTR) {
+ continue;
+ }
+
+ perror("Futex wait");
+ GB_PANIC("Failed in futex wait!\n");
+ } else if (ret == 0) {
+ if (*addr != val) {
+ return;
+ }
+ }
+ }
+}
+
+#elif defined(GB_SYSTEM_OPENBSD)
+
+#include <sys/futex.h>
+
+typedef std::atomic<int32_t> Futex;
+typedef volatile int32_t Footex;
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+ for (;;) {
+ int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL);
+ if (ret == -1) {
+ if (errno == ETIMEDOUT || errno == EINTR) {
+ continue;
+ }
+
+ perror("Futex wake");
+ GB_PANIC("futex wake fail");
+ } else if (ret == 1) {
+ return;
+ }
+ }
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+ for (;;) {
+ int ret = futex((volatile uint32_t *)addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL);
+ if (ret == -1) {
+ if (*addr != val) {
+ return;
+ }
+
+ if (errno == ETIMEDOUT || errno == EINTR) {
+ continue;
+ }
+
+ perror("Futex wait");
+ GB_PANIC("Failed in futex wait!\n");
+ }
+ }
+}
+
+#elif defined(GB_SYSTEM_OSX)
+
+typedef std::atomic<int64_t> Futex;
+typedef volatile int64_t Footex;
+
+#define UL_COMPARE_AND_WAIT 0x00000001
+#define ULF_NO_ERRNO 0x01000000
+
+extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */
+extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+ for (;;) {
+ int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0);
+ if (ret >= 0) {
+ return;
+ }
+ if (ret == EINTR || ret == EFAULT) {
+ continue;
+ }
+ if (ret == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futex wake!\n");
+ }
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+ for (;;) {
+ int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0);
+ if (ret >= 0) {
+ if (*addr != val) {
+ return;
+ }
+ continue;
+ }
+ if (ret == EINTR || ret == EFAULT) {
+ continue;
+ }
+ if (ret == ENOENT) {
+ return;
+ }
+
+ GB_PANIC("Failed in futex wait!\n");
+ }
+}
+#elif defined(GB_SYSTEM_WINDOWS)
+typedef std::atomic<int64_t> Futex;
+typedef volatile int64_t Footex;
+
+gb_internal void tpool_wake_addr(Futex *addr) {
+ WakeByAddressSingle((void *)addr);
+}
+
+gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+ for (;;) {
+ WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE);
+ if (*addr != val) break;
+ }
+}
+#endif
#if defined(GB_SYSTEM_WINDOWS)
#pragma warning(pop)
-#endif \ No newline at end of file
+#endif