From 5f27f2dd7f7c4af93435ff2e426cc5882cd8dbe7 Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Wed, 28 Dec 2022 21:44:17 -0800 Subject: move to work-stealing threadpool --- src/threading.cpp | 199 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 148 insertions(+), 51 deletions(-) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index e92ed5e31..98b7aa0c2 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -11,24 +11,34 @@ struct RecursiveMutex; struct Semaphore; struct Condition; struct Thread; +struct ThreadPool; #define THREAD_PROC(name) isize name(struct Thread *thread) -typedef THREAD_PROC(ThreadProc); +gb_internal THREAD_PROC(thread_pool_thread_proc); + +#define WORKER_TASK_PROC(name) isize name(void *data) +typedef WORKER_TASK_PROC(WorkerTaskProc); + +typedef struct WorkerTask { + WorkerTaskProc *do_work; + void *data; +} WorkerTask; struct Thread { #if defined(GB_SYSTEM_WINDOWS) - void * win32_handle; + void *win32_handle; #else - pthread_t posix_handle; + pthread_t posix_handle; #endif + + isize idx; - ThreadProc * proc; - void * user_data; - isize user_index; - isize volatile return_value; + WorkerTask *queue; + size_t capacity; + std::atomic head_and_tail; - isize stack_size; - std::atomic is_running; + isize stack_size; + struct ThreadPool *pool; }; @@ -59,10 +69,9 @@ gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 gb_internal u32 thread_current_id(void); -gb_internal void thread_init_and_start (Thread *t, ThreadProc *proc, void *data); -gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size); +gb_internal void thread_init (ThreadPool *pool, Thread *t, isize idx); +gb_internal void thread_init_and_start (ThreadPool *pool, Thread *t, isize idx); gb_internal void thread_join_and_destroy(Thread *t); -gb_internal bool thread_is_running (Thread const *t); gb_internal void thread_set_name (Thread *t, char const *name); gb_internal void yield_thread(void); @@ -325,47 +334,45 @@ gb_internal gb_inline void yield(void) { #endif } -gb_internal void private__thread_run(Thread *t) { - t->return_value = t->proc(t); -} - #if defined(GB_SYSTEM_WINDOWS) - gb_internal DWORD __stdcall internal_thread_proc(void *arg) { - Thread *t = cast(Thread *)arg; - t->is_running.store(true); - private__thread_run(t); - return 0; - } +gb_internal DWORD __stdcall internal_thread_proc(void *arg) { + Thread *t = cast(Thread *)arg; + thread_pool_thread_proc(t); + return 0; +} #else - gb_internal void *internal_thread_proc(void *arg) { - #if (GB_SYSTEM_LINUX) - // NOTE: Don't permit any signal delivery to threads on Linux. - sigset_t mask = {}; - sigfillset(&mask); - GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals"); - #endif - - Thread *t = cast(Thread *)arg; - t->is_running.store(true); - private__thread_run(t); - return NULL; - } +gb_internal void *internal_thread_proc(void *arg) { +#if (GB_SYSTEM_LINUX) + // NOTE: Don't permit any signal delivery to threads on Linux. + sigset_t mask = {}; + sigfillset(&mask); + GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals"); +#endif + + Thread *t = cast(Thread *)arg; + thread_pool_thread_proc(t); + return NULL; +} #endif -gb_internal void thread_init_and_start(Thread *t, ThreadProc *proc, void *user_data) { thread_init_and_start_with_stack(t, proc, user_data, 0); } - -gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) { +gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { gb_zero_item(t); #if defined(GB_SYSTEM_WINDOWS) t->win32_handle = INVALID_HANDLE_VALUE; #else t->posix_handle = 0; #endif - GB_ASSERT(!t->is_running.load()); - GB_ASSERT(proc != NULL); - t->proc = proc; - t->user_data = user_data; - t->stack_size = stack_size; + + t->capacity = 1 << 14; // must be a power of 2 + t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity); + t->head_and_tail = 0; + t->pool = pool; + t->idx = idx; +} + +gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { + thread_init(pool, t, idx); + isize stack_size = 0; #if defined(GB_SYSTEM_WINDOWS) t->win32_handle = CreateThread(NULL, stack_size, internal_thread_proc, t, 0, NULL); @@ -385,10 +392,6 @@ gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, v } gb_internal void thread_join_and_destroy(Thread *t) { - if (!t->is_running.load()) { - return; - } - #if defined(GB_SYSTEM_WINDOWS) WaitForSingleObject(t->win32_handle, INFINITE); CloseHandle(t->win32_handle); @@ -397,11 +400,8 @@ gb_internal void thread_join_and_destroy(Thread *t) { pthread_join(t->posix_handle, NULL); t->posix_handle = 0; #endif - t->is_running.store(false); } -gb_internal bool thread_is_running(Thread const *t) { return t->is_running.load(); } - gb_internal void thread_set_name(Thread *t, char const *name) { #if defined(GB_COMPILER_MSVC) #pragma pack(push, 8) @@ -437,7 +437,104 @@ gb_internal void thread_set_name(Thread *t, char const *name) { #endif } +#if defined(GB_SYSTEM_LINUX) +#include +#include + +typedef std::atomic Futex; +typedef volatile int32_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + for (;;) { + int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0); + if (ret == -1) { + perror("Futex wake"); + GB_PANIC("Failed in futex wake!\n"); + } else if (ret > 0) { + return; + } + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0); + if (ret == -1) { + if (errno != EAGAIN) { + perror("Futex wait"); + GB_PANIC("Failed in futex wait!\n"); + } else { + return; + } + } else if (ret == 0) { + if (*addr != val) { + return; + } + } + } +} +#elif defined(GB_SYSTEM_OSX) + +typedef std::atomic Futex; +typedef volatile int64_t Footex; + +#define UL_COMPARE_AND_WAIT 0x00000001 +#define ULF_NO_ERRNO 0x01000000 + +int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ +int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); + +gb_internal void tpool_wake_addr(Futex *addr) { + for (;;) { + int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0); + if (ret >= 0) { + return; + } + if (ret == EINTR || ret == EFAULT) { + continue; + } + if (ret == ENOENT) { + return; + } + GB_PANIC("Failed in futex wake!\n"); + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0); + if (ret >= 0) { + if (*addr != val) { + return; + } + continue; + } + if (ret == EINTR || ret == EFAULT) { + continue; + } + if (ret == ENOENT) { + return; + } + + GB_PANIC("Failed in futex wait!\n"); + } +} +#elif defined(GB_SYSTEM_WINDOWS) +typedef std::atomic Futex; +typedef volatile int64_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + WakeByAddressSingle((void *)addr); +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE); + if (*addr != val) break; + } +} +#endif #if defined(GB_SYSTEM_WINDOWS) #pragma warning(pop) -#endif \ No newline at end of file +#endif -- cgit v1.2.3 From e019673a18acf25ccb2f1791a68cae4e70dbd638 Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Wed, 28 Dec 2022 21:52:41 -0800 Subject: fix build --- build.bat | 3 ++- src/threading.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'src/threading.cpp') diff --git a/build.bat b/build.bat index d7a89fe20..bcb6d4c1a 100644 --- a/build.bat +++ b/build.bat @@ -69,6 +69,7 @@ set compiler_includes= ^ /Isrc\ set libs= ^ kernel32.lib ^ + Synchronization.lib ^ bin\llvm\windows\LLVM-C.lib set linker_flags= -incremental:no -opt:ref -subsystem:console @@ -95,4 +96,4 @@ if %release_mode% EQU 0 odin run examples/demo del *.obj > NUL 2> NUL -:end_of_build \ No newline at end of file +:end_of_build diff --git a/src/threading.cpp b/src/threading.cpp index 98b7aa0c2..f0a9faf19 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -529,7 +529,7 @@ gb_internal void tpool_wake_addr(Futex *addr) { gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { for (;;) { - int ret = WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE); + WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE); if (*addr != val) break; } } -- cgit v1.2.3 From ef9e31cb31ce6c87687b7932ecd0ace3a33b494f Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Wed, 28 Dec 2022 22:08:39 -0800 Subject: fix ulock/uwait imports --- src/threading.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index f0a9faf19..70a7072e7 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -481,8 +481,8 @@ typedef volatile int64_t Footex; #define UL_COMPARE_AND_WAIT 0x00000001 #define ULF_NO_ERRNO 0x01000000 -int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ -int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); +extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ +extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); gb_internal void tpool_wake_addr(Futex *addr) { for (;;) { -- cgit v1.2.3 From 04a4dbcdafe9a1e31c4bce215dfd5222c3e7275e Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Thu, 29 Dec 2022 11:05:31 -0800 Subject: add freebsd support --- src/threading.cpp | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index 70a7072e7..1de277259 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -473,6 +473,37 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { } } } + +#if defined(GB_SYSTEM_FREEBSD) + +#include +#include + +typedef std::atomic Futex; +typedef volatile int32_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + _umtx_op(addr, UMTX_OP_WAKE, 1, 0, 0); +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL); + if (ret == 0) { + if (errno == ETIMEDOUT || errno == EINTR) { + continue; + } + + perror("Futex wait"); + GB_PANIC("Failed in futex wait!\n"); + } else if (ret == 0) { + if (*addr != val) { + return; + } + } + } +} + #elif defined(GB_SYSTEM_OSX) typedef std::atomic Futex; -- cgit v1.2.3 From 223b66f42264eff24e63667ff9ccf3c2b78358b3 Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Thu, 29 Dec 2022 11:06:35 -0800 Subject: oops if->elif --- src/threading.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index 1de277259..cefe0ce8b 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -474,7 +474,7 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { } } -#if defined(GB_SYSTEM_FREEBSD) +#elif defined(GB_SYSTEM_FREEBSD) #include #include -- cgit v1.2.3 From 98e5523f2f6d36960d32041a2d5a1de96ef6e5a5 Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Thu, 29 Dec 2022 11:46:43 -0800 Subject: cover openbsd too --- src/threading.cpp | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index cefe0ce8b..c065663b2 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -504,6 +504,40 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { } } +#elif defined(GB_SYSTEM_OPENBSD) + +#include + +typedef std::atomic Futex; +typedef volatile int32_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL); + if (ret == -1) { + perror("Futex wake"); + GB_PANIC("futex wake fail"); + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = futex((volatile uint32_t *)addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL); + if (ret != -1) { + if (*addr != val) { + return; + } + + } else { + if (errno == ETIMEDOUT || errno == EINTR) { + continue; + } + + perror("Futex wait"); + GB_PANIC("Failed in futex wait!\n"); + } + } +} + #elif defined(GB_SYSTEM_OSX) typedef std::atomic Futex; -- cgit v1.2.3 From 27ba1d596c5b68f856b4e74c72bf28439daf4807 Mon Sep 17 00:00:00 2001 From: Colin Davidson Date: Thu, 29 Dec 2022 12:00:16 -0800 Subject: rework openbsd futexes a little --- src/threading.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) (limited to 'src/threading.cpp') diff --git a/src/threading.cpp b/src/threading.cpp index c065663b2..493e57c91 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -512,22 +512,29 @@ typedef std::atomic Futex; typedef volatile int32_t Footex; gb_internal void tpool_wake_addr(Futex *addr) { - int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL); - if (ret == -1) { - perror("Futex wake"); - GB_PANIC("futex wake fail"); + for (;;) { + int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL); + if (ret == -1) { + if (errno == ETIMEDOUT || errno == EINTR) { + continue; + } + + perror("Futex wake"); + GB_PANIC("futex wake fail"); + } else if (ret == 1) { + return; + } } } gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { for (;;) { int ret = futex((volatile uint32_t *)addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL); - if (ret != -1) { + if (ret == -1) { if (*addr != val) { return; } - } else { if (errno == ETIMEDOUT || errno == EINTR) { continue; } -- cgit v1.2.3