diff options
Diffstat (limited to 'src/threading.cpp')
| -rw-r--r-- | src/threading.cpp | 199 |
1 files changed, 148 insertions, 51 deletions
diff --git a/src/threading.cpp b/src/threading.cpp index e92ed5e31..98b7aa0c2 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -11,24 +11,34 @@ struct RecursiveMutex; struct Semaphore; struct Condition; struct Thread; +struct ThreadPool; #define THREAD_PROC(name) isize name(struct Thread *thread) -typedef THREAD_PROC(ThreadProc); +gb_internal THREAD_PROC(thread_pool_thread_proc); + +#define WORKER_TASK_PROC(name) isize name(void *data) +typedef WORKER_TASK_PROC(WorkerTaskProc); + +typedef struct WorkerTask { + WorkerTaskProc *do_work; + void *data; +} WorkerTask; struct Thread { #if defined(GB_SYSTEM_WINDOWS) - void * win32_handle; + void *win32_handle; #else - pthread_t posix_handle; + pthread_t posix_handle; #endif + + isize idx; - ThreadProc * proc; - void * user_data; - isize user_index; - isize volatile return_value; + WorkerTask *queue; + size_t capacity; + std::atomic<uint64_t> head_and_tail; - isize stack_size; - std::atomic<bool> is_running; + isize stack_size; + struct ThreadPool *pool; }; @@ -59,10 +69,9 @@ gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 gb_internal u32 thread_current_id(void); -gb_internal void thread_init_and_start (Thread *t, ThreadProc *proc, void *data); -gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *data, isize stack_size); +gb_internal void thread_init (ThreadPool *pool, Thread *t, isize idx); +gb_internal void thread_init_and_start (ThreadPool *pool, Thread *t, isize idx); gb_internal void thread_join_and_destroy(Thread *t); -gb_internal bool thread_is_running (Thread const *t); gb_internal void thread_set_name (Thread *t, char const *name); gb_internal void yield_thread(void); @@ -325,47 +334,45 @@ gb_internal gb_inline void yield(void) { #endif } -gb_internal void private__thread_run(Thread *t) { - t->return_value = t->proc(t); -} - #if defined(GB_SYSTEM_WINDOWS) - gb_internal DWORD __stdcall internal_thread_proc(void *arg) { - Thread *t = cast(Thread *)arg; - t->is_running.store(true); - private__thread_run(t); - return 0; - } +gb_internal DWORD __stdcall internal_thread_proc(void *arg) { + Thread *t = cast(Thread *)arg; + thread_pool_thread_proc(t); + return 0; +} #else - gb_internal void *internal_thread_proc(void *arg) { - #if (GB_SYSTEM_LINUX) - // NOTE: Don't permit any signal delivery to threads on Linux. - sigset_t mask = {}; - sigfillset(&mask); - GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals"); - #endif - - Thread *t = cast(Thread *)arg; - t->is_running.store(true); - private__thread_run(t); - return NULL; - } +gb_internal void *internal_thread_proc(void *arg) { +#if (GB_SYSTEM_LINUX) + // NOTE: Don't permit any signal delivery to threads on Linux. + sigset_t mask = {}; + sigfillset(&mask); + GB_ASSERT_MSG(pthread_sigmask(SIG_BLOCK, &mask, nullptr) == 0, "failed to block signals"); +#endif + + Thread *t = cast(Thread *)arg; + thread_pool_thread_proc(t); + return NULL; +} #endif -gb_internal void thread_init_and_start(Thread *t, ThreadProc *proc, void *user_data) { thread_init_and_start_with_stack(t, proc, user_data, 0); } - -gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, void *user_data, isize stack_size) { +gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { gb_zero_item(t); #if defined(GB_SYSTEM_WINDOWS) t->win32_handle = INVALID_HANDLE_VALUE; #else t->posix_handle = 0; #endif - GB_ASSERT(!t->is_running.load()); - GB_ASSERT(proc != NULL); - t->proc = proc; - t->user_data = user_data; - t->stack_size = stack_size; + + t->capacity = 1 << 14; // must be a power of 2 + t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity); + t->head_and_tail = 0; + t->pool = pool; + t->idx = idx; +} + +gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { + thread_init(pool, t, idx); + isize stack_size = 0; #if defined(GB_SYSTEM_WINDOWS) t->win32_handle = CreateThread(NULL, stack_size, internal_thread_proc, t, 0, NULL); @@ -385,10 +392,6 @@ gb_internal void thread_init_and_start_with_stack(Thread *t, ThreadProc *proc, v } gb_internal void thread_join_and_destroy(Thread *t) { - if (!t->is_running.load()) { - return; - } - #if defined(GB_SYSTEM_WINDOWS) WaitForSingleObject(t->win32_handle, INFINITE); CloseHandle(t->win32_handle); @@ -397,11 +400,8 @@ gb_internal void thread_join_and_destroy(Thread *t) { pthread_join(t->posix_handle, NULL); t->posix_handle = 0; #endif - t->is_running.store(false); } -gb_internal bool thread_is_running(Thread const *t) { return t->is_running.load(); } - gb_internal void thread_set_name(Thread *t, char const *name) { #if defined(GB_COMPILER_MSVC) #pragma pack(push, 8) @@ -437,7 +437,104 @@ gb_internal void thread_set_name(Thread *t, char const *name) { #endif } +#if defined(GB_SYSTEM_LINUX) +#include <linux/futex.h> +#include <sys/syscall.h> + +typedef std::atomic<int32_t> Futex; +typedef volatile int32_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + for (;;) { + int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0); + if (ret == -1) { + perror("Futex wake"); + GB_PANIC("Failed in futex wake!\n"); + } else if (ret > 0) { + return; + } + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0); + if (ret == -1) { + if (errno != EAGAIN) { + perror("Futex wait"); + GB_PANIC("Failed in futex wait!\n"); + } else { + return; + } + } else if (ret == 0) { + if (*addr != val) { + return; + } + } + } +} +#elif defined(GB_SYSTEM_OSX) + +typedef std::atomic<int64_t> Futex; +typedef volatile int64_t Footex; + +#define UL_COMPARE_AND_WAIT 0x00000001 +#define ULF_NO_ERRNO 0x01000000 + +int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ +int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); + +gb_internal void tpool_wake_addr(Futex *addr) { + for (;;) { + int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0); + if (ret >= 0) { + return; + } + if (ret == EINTR || ret == EFAULT) { + continue; + } + if (ret == ENOENT) { + return; + } + GB_PANIC("Failed in futex wake!\n"); + } +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0); + if (ret >= 0) { + if (*addr != val) { + return; + } + continue; + } + if (ret == EINTR || ret == EFAULT) { + continue; + } + if (ret == ENOENT) { + return; + } + + GB_PANIC("Failed in futex wait!\n"); + } +} +#elif defined(GB_SYSTEM_WINDOWS) +typedef std::atomic<int64_t> Futex; +typedef volatile int64_t Footex; + +gb_internal void tpool_wake_addr(Futex *addr) { + WakeByAddressSingle((void *)addr); +} + +gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { + for (;;) { + int ret = WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE); + if (*addr != val) break; + } +} +#endif #if defined(GB_SYSTEM_WINDOWS) #pragma warning(pop) -#endif
\ No newline at end of file +#endif |