diff options
| author | gingerBill <gingerBill@users.noreply.github.com> | 2023-01-11 22:14:53 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-01-11 22:14:53 +0000 |
| commit | 320062157f06d979db926fcbf407bbbdcc3028c1 (patch) | |
| tree | 770bb60802ff24cc66c8e5e5837819969dc84cd6 /src/threading.cpp | |
| parent | 86511d44e46b6271b01df2cd1ebb83b5496e143c (diff) | |
| parent | d7d6608142c8e169a7856c9e5965619809653903 (diff) | |
Merge pull request #2288 from odin-lang/compiler-improvements-2023-01
Multithreading Compiler Improvements 2023-01
Diffstat (limited to 'src/threading.cpp')
| -rw-r--r-- | src/threading.cpp | 524 |
1 files changed, 340 insertions, 184 deletions
diff --git a/src/threading.cpp b/src/threading.cpp index 7a7d1a299..52e6b722a 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -8,10 +8,12 @@ struct BlockingMutex; struct RecursiveMutex; +struct RwMutex; struct Semaphore; struct Condition; struct Thread; struct ThreadPool; +struct Parker; #define THREAD_PROC(name) isize name(struct Thread *thread) gb_internal THREAD_PROC(thread_pool_thread_proc); @@ -41,31 +43,40 @@ struct Thread { struct ThreadPool *pool; }; +typedef std::atomic<i32> Futex; +typedef volatile i32 Footex; + +gb_internal void futex_wait(Futex *addr, Footex val); +gb_internal void futex_signal(Futex *addr); +gb_internal void futex_broadcast(Futex *addr); -gb_internal void mutex_init (BlockingMutex *m); -gb_internal void mutex_destroy (BlockingMutex *m); gb_internal void mutex_lock (BlockingMutex *m); gb_internal bool mutex_try_lock(BlockingMutex *m); gb_internal void mutex_unlock (BlockingMutex *m); -gb_internal void mutex_init (RecursiveMutex *m); -gb_internal void mutex_destroy (RecursiveMutex *m); + gb_internal void mutex_lock (RecursiveMutex *m); gb_internal bool mutex_try_lock(RecursiveMutex *m); gb_internal void mutex_unlock (RecursiveMutex *m); -gb_internal void semaphore_init (Semaphore *s); -gb_internal void semaphore_destroy(Semaphore *s); +gb_internal void rw_mutex_lock (RwMutex *m); +gb_internal bool rw_mutex_try_lock (RwMutex *m); +gb_internal void rw_mutex_unlock (RwMutex *m); +gb_internal void rw_mutex_shared_lock (RwMutex *m); +gb_internal bool rw_mutex_try_shared_lock(RwMutex *m); +gb_internal void rw_mutex_shared_unlock (RwMutex *m); + gb_internal void semaphore_post (Semaphore *s, i32 count); gb_internal void semaphore_wait (Semaphore *s); gb_internal void semaphore_release(Semaphore *s) { semaphore_post(s, 1); } -gb_internal void condition_init(Condition *c); -gb_internal void condition_destroy(Condition *c); gb_internal void condition_broadcast(Condition *c); gb_internal void condition_signal(Condition *c); gb_internal void condition_wait(Condition *c, BlockingMutex *m); -gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms); + +gb_internal void park(Parker *p); +gb_internal void unpark_one(Parker *p); +gb_internal void unpark_all(Parker *p); gb_internal u32 thread_current_id(void); @@ -79,22 +90,23 @@ gb_internal void yield_process(void); struct MutexGuard { - MutexGuard() = delete; + MutexGuard() = delete; MutexGuard(MutexGuard const &) = delete; + MutexGuard(MutexGuard &&) = delete; - MutexGuard(BlockingMutex *bm) : bm{bm} { + explicit MutexGuard(BlockingMutex *bm) noexcept : bm{bm} { mutex_lock(this->bm); } - MutexGuard(RecursiveMutex *rm) : rm{rm} { + explicit MutexGuard(RecursiveMutex *rm) noexcept : rm{rm} { mutex_lock(this->rm); } - MutexGuard(BlockingMutex &bm) : bm{&bm} { + explicit MutexGuard(BlockingMutex &bm) noexcept : bm{&bm} { mutex_lock(this->bm); } - MutexGuard(RecursiveMutex &rm) : rm{&rm} { + explicit MutexGuard(RecursiveMutex &rm) noexcept : rm{&rm} { mutex_lock(this->rm); } - ~MutexGuard() { + ~MutexGuard() noexcept { if (this->bm) { mutex_unlock(this->bm); } else if (this->rm) { @@ -102,24 +114,87 @@ struct MutexGuard { } } - operator bool() const { return true; } + operator bool() const noexcept { return true; } BlockingMutex *bm; RecursiveMutex *rm; }; #define MUTEX_GUARD_BLOCK(m) if (MutexGuard GB_DEFER_3(_mutex_guard_){m}) -#define MUTEX_GUARD(m) MutexGuard GB_DEFER_3(_mutex_guard_){m} +#define MUTEX_GUARD(m) mutex_lock(m); defer (mutex_unlock(m)) + + +struct RecursiveMutex { + Futex owner; + i32 recursion; +}; + +gb_internal void mutex_lock(RecursiveMutex *m) { + Futex tid; + tid.store(cast(i32)thread_current_id()); + for (;;) { + i32 prev_owner = 0; + m->owner.compare_exchange_strong(prev_owner, tid, std::memory_order_acquire, std::memory_order_acquire); + if (prev_owner == 0 || prev_owner == tid) { + m->recursion++; + // inside the lock + return; + } + futex_wait(&m->owner, prev_owner); + } +} +gb_internal bool mutex_try_lock(RecursiveMutex *m) { + Futex tid; + tid.store(cast(i32)thread_current_id()); + i32 prev_owner = 0; + m->owner.compare_exchange_strong(prev_owner, tid, std::memory_order_acquire, std::memory_order_acquire); + if (prev_owner == 0 || prev_owner == tid) { + m->recursion++; + // inside the lock + return true; + } + return false; +} +gb_internal void mutex_unlock(RecursiveMutex *m) { + m->recursion--; + if (m->recursion != 0) { + return; + } + m->owner.exchange(0, std::memory_order_release); + futex_signal(&m->owner); + // outside the lock +} + +struct Semaphore { + Futex count; +}; + +gb_internal void semaphore_post(Semaphore *s, i32 count) { + s->count.fetch_add(count, std::memory_order_release); + if (s->count == 1) { + futex_signal(&s->count); + } else { + futex_broadcast(&s->count); + } +} +gb_internal void semaphore_wait(Semaphore *s) { + for (;;) { + i32 original_count = s->count.load(std::memory_order_relaxed); + while (original_count == 0) { + futex_wait(&s->count, original_count); + original_count = s->count; + } + if (!s->count.compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) { + return; + } + } +} #if defined(GB_SYSTEM_WINDOWS) struct BlockingMutex { SRWLOCK srwlock; }; - gb_internal void mutex_init(BlockingMutex *m) { - } - gb_internal void mutex_destroy(BlockingMutex *m) { - } gb_internal void mutex_lock(BlockingMutex *m) { AcquireSRWLockExclusive(&m->srwlock); } @@ -130,50 +205,10 @@ struct MutexGuard { ReleaseSRWLockExclusive(&m->srwlock); } - struct RecursiveMutex { - CRITICAL_SECTION win32_critical_section; - }; - gb_internal void mutex_init(RecursiveMutex *m) { - InitializeCriticalSection(&m->win32_critical_section); - } - gb_internal void mutex_destroy(RecursiveMutex *m) { - DeleteCriticalSection(&m->win32_critical_section); - } - gb_internal void mutex_lock(RecursiveMutex *m) { - EnterCriticalSection(&m->win32_critical_section); - } - gb_internal bool mutex_try_lock(RecursiveMutex *m) { - return TryEnterCriticalSection(&m->win32_critical_section) != 0; - } - gb_internal void mutex_unlock(RecursiveMutex *m) { - LeaveCriticalSection(&m->win32_critical_section); - } - - struct Semaphore { - void *win32_handle; - }; - - gb_internal void semaphore_init(Semaphore *s) { - s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL); - } - gb_internal void semaphore_destroy(Semaphore *s) { - CloseHandle(s->win32_handle); - } - gb_internal void semaphore_post(Semaphore *s, i32 count) { - ReleaseSemaphore(s->win32_handle, count, NULL); - } - gb_internal void semaphore_wait(Semaphore *s) { - WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE); - } - struct Condition { CONDITION_VARIABLE cond; }; - gb_internal void condition_init(Condition *c) { - } - gb_internal void condition_destroy(Condition *c) { - } gb_internal void condition_broadcast(Condition *c) { WakeAllConditionVariable(&c->cond); } @@ -183,103 +218,192 @@ struct MutexGuard { gb_internal void condition_wait(Condition *c, BlockingMutex *m) { SleepConditionVariableSRW(&c->cond, &m->srwlock, INFINITE, 0); } - gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) { - SleepConditionVariableSRW(&c->cond, &m->srwlock, timeout_in_ms, 0); + + struct RwMutex { + SRWLOCK srwlock; + }; + + gb_internal void rw_mutex_lock(RwMutex *m) { + AcquireSRWLockExclusive(&m->srwlock); + } + gb_internal bool rw_mutex_try_lock(RwMutex *m) { + return !!TryAcquireSRWLockExclusive(&m->srwlock); + } + gb_internal void rw_mutex_unlock(RwMutex *m) { + ReleaseSRWLockExclusive(&m->srwlock); } + gb_internal void rw_mutex_shared_lock(RwMutex *m) { + AcquireSRWLockShared(&m->srwlock); + } + gb_internal bool rw_mutex_try_shared_lock(RwMutex *m) { + return !!TryAcquireSRWLockShared(&m->srwlock); + } + gb_internal void rw_mutex_shared_unlock(RwMutex *m) { + ReleaseSRWLockShared(&m->srwlock); + } #else + enum Internal_Mutex_State : i32 { + Internal_Mutex_State_Unlocked = 0, + Internal_Mutex_State_Locked = 1, + Internal_Mutex_State_Waiting = 2, + }; + struct BlockingMutex { - pthread_mutex_t pthread_mutex; + i32 state_; + + Futex &state() { + return *(Futex *)&this->state_; + } + Futex const &state() const { + return *(Futex const *)&this->state_; + } }; - gb_internal void mutex_init(BlockingMutex *m) { - pthread_mutex_init(&m->pthread_mutex, nullptr); - } - gb_internal void mutex_destroy(BlockingMutex *m) { - pthread_mutex_destroy(&m->pthread_mutex); + + gb_no_inline gb_internal void mutex_lock_slow(BlockingMutex *m, i32 curr_state) { + i32 new_state = curr_state; + for (i32 spin = 0; spin < 100; spin++) { + i32 state = Internal_Mutex_State_Unlocked; + bool ok = m->state().compare_exchange_weak(state, new_state, std::memory_order_acquire, std::memory_order_consume); + if (ok) { + return; + } + if (state == Internal_Mutex_State_Waiting) { + break; + } + for (i32 i = gb_min(spin+1, 32); i > 0; i--) { + yield_thread(); + } + } + + // Set just in case 100 iterations did not do it + new_state = Internal_Mutex_State_Waiting; + + for (;;) { + if (m->state().exchange(Internal_Mutex_State_Waiting, std::memory_order_acquire) == Internal_Mutex_State_Unlocked) { + return; + } + futex_wait(&m->state(), new_state); + yield_thread(); + } } + gb_internal void mutex_lock(BlockingMutex *m) { - pthread_mutex_lock(&m->pthread_mutex); + i32 v = m->state().exchange(Internal_Mutex_State_Locked, std::memory_order_acquire); + if (v != Internal_Mutex_State_Unlocked) { + mutex_lock_slow(m, v); + } } gb_internal bool mutex_try_lock(BlockingMutex *m) { - return pthread_mutex_trylock(&m->pthread_mutex) == 0; + i32 v = m->state().exchange(Internal_Mutex_State_Locked, std::memory_order_acquire); + return v == Internal_Mutex_State_Unlocked; } - gb_internal void mutex_unlock(BlockingMutex *m) { - pthread_mutex_unlock(&m->pthread_mutex); + + gb_no_inline gb_internal void mutex_unlock_slow(BlockingMutex *m) { + futex_signal(&m->state()); } - struct RecursiveMutex { - pthread_mutex_t pthread_mutex; - pthread_mutexattr_t pthread_mutexattr; - }; - gb_internal void mutex_init(RecursiveMutex *m) { - pthread_mutexattr_init(&m->pthread_mutexattr); - pthread_mutexattr_settype(&m->pthread_mutexattr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&m->pthread_mutex, &m->pthread_mutexattr); - } - gb_internal void mutex_destroy(RecursiveMutex *m) { - pthread_mutex_destroy(&m->pthread_mutex); - } - gb_internal void mutex_lock(RecursiveMutex *m) { - pthread_mutex_lock(&m->pthread_mutex); - } - gb_internal bool mutex_try_lock(RecursiveMutex *m) { - return pthread_mutex_trylock(&m->pthread_mutex) == 0; - } - gb_internal void mutex_unlock(RecursiveMutex *m) { - pthread_mutex_unlock(&m->pthread_mutex); - } - - #if defined(GB_SYSTEM_OSX) - struct Semaphore { - semaphore_t osx_handle; - }; - - gb_internal void semaphore_init (Semaphore *s) { semaphore_create(mach_task_self(), &s->osx_handle, SYNC_POLICY_FIFO, 0); } - gb_internal void semaphore_destroy(Semaphore *s) { semaphore_destroy(mach_task_self(), s->osx_handle); } - gb_internal void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) semaphore_signal(s->osx_handle); } - gb_internal void semaphore_wait (Semaphore *s) { semaphore_wait(s->osx_handle); } - #elif defined(GB_SYSTEM_UNIX) - struct Semaphore { - sem_t unix_handle; - }; - - gb_internal void semaphore_init (Semaphore *s) { sem_init(&s->unix_handle, 0, 0); } - gb_internal void semaphore_destroy(Semaphore *s) { sem_destroy(&s->unix_handle); } - gb_internal void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) sem_post(&s->unix_handle); } - void semaphore_wait (Semaphore *s) { int i; do { i = sem_wait(&s->unix_handle); } while (i == -1 && errno == EINTR); } - #else - #error Implement Semaphore for this platform - #endif - + gb_internal void mutex_unlock(BlockingMutex *m) { + i32 v = m->state().exchange(Internal_Mutex_State_Unlocked, std::memory_order_release); + switch (v) { + case Internal_Mutex_State_Unlocked: + GB_PANIC("Unreachable"); + break; + case Internal_Mutex_State_Locked: + // Okay + break; + case Internal_Mutex_State_Waiting: + mutex_unlock_slow(m); + break; + } + } struct Condition { - pthread_cond_t pthread_cond; + i32 state_; + + Futex &state() { + return *(Futex *)&this->state_; + } + Futex const &state() const { + return *(Futex const *)&this->state_; + } }; - - gb_internal void condition_init(Condition *c) { - pthread_cond_init(&c->pthread_cond, NULL); - } - gb_internal void condition_destroy(Condition *c) { - pthread_cond_destroy(&c->pthread_cond); - } + gb_internal void condition_broadcast(Condition *c) { - pthread_cond_broadcast(&c->pthread_cond); + c->state().fetch_add(1, std::memory_order_release); + futex_broadcast(&c->state()); } gb_internal void condition_signal(Condition *c) { - pthread_cond_signal(&c->pthread_cond); + c->state().fetch_add(1, std::memory_order_release); + futex_signal(&c->state()); } gb_internal void condition_wait(Condition *c, BlockingMutex *m) { - pthread_cond_wait(&c->pthread_cond, &m->pthread_mutex); + i32 state = c->state().load(std::memory_order_relaxed); + mutex_unlock(m); + futex_wait(&c->state(), state); + mutex_lock(m); } - gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) { - struct timespec abstime = {}; - abstime.tv_sec = timeout_in_ms/1000; - abstime.tv_nsec = cast(long)(timeout_in_ms%1000)*1e6; - pthread_cond_timedwait(&c->pthread_cond, &m->pthread_mutex, &abstime); - + + struct RwMutex { + // TODO(bill): make this a proper RW mutex + BlockingMutex mutex; + }; + + gb_internal void rw_mutex_lock(RwMutex *m) { + mutex_lock(&m->mutex); + } + gb_internal bool rw_mutex_try_lock(RwMutex *m) { + return mutex_try_lock(&m->mutex); + } + gb_internal void rw_mutex_unlock(RwMutex *m) { + mutex_unlock(&m->mutex); + } + + gb_internal void rw_mutex_shared_lock(RwMutex *m) { + mutex_lock(&m->mutex); + } + gb_internal bool rw_mutex_try_shared_lock(RwMutex *m) { + return mutex_try_lock(&m->mutex); + } + gb_internal void rw_mutex_shared_unlock(RwMutex *m) { + mutex_unlock(&m->mutex); } #endif +struct Parker { + Futex state; +}; +enum ParkerState : u32 { + ParkerState_Empty = 0, + ParkerState_Notified = 1, + ParkerState_Parked = UINT32_MAX, +}; + +gb_internal void park(Parker *p) { + if (p->state.fetch_sub(1, std::memory_order_acquire) == ParkerState_Notified) { + return; + } + for (;;) { + futex_wait(&p->state, ParkerState_Parked); + i32 notified = ParkerState_Empty; + if (p->state.compare_exchange_strong(notified, ParkerState_Empty, std::memory_order_acquire, std::memory_order_acquire)) { + return; + } + } +} + +gb_internal void unpark_one(Parker *p) { + if (p->state.exchange(ParkerState_Notified, std::memory_order_release) == ParkerState_Parked) { + futex_signal(&p->state); + } +} + +gb_internal void unpark_all(Parker *p) { + if (p->state.exchange(ParkerState_Notified, std::memory_order_release) == ParkerState_Parked) { + futex_broadcast(&p->state); + } +} + gb_internal u32 thread_current_id(void) { u32 thread_id; @@ -364,12 +488,13 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { #endif t->capacity = 1 << 14; // must be a power of 2 - t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity); + t->queue = gb_alloc_array(heap_allocator(), WorkerTask, t->capacity); t->head_and_tail = 0; t->pool = pool; t->idx = idx; } + gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { thread_init(pool, t, idx); isize stack_size = 0; @@ -400,6 +525,8 @@ gb_internal void thread_join_and_destroy(Thread *t) { pthread_join(t->posix_handle, NULL); t->posix_handle = 0; #endif + + gb_free(heap_allocator(), t->queue); } gb_internal void thread_set_name(Thread *t, char const *name) { @@ -441,24 +568,25 @@ gb_internal void thread_set_name(Thread *t, char const *name) { #include <linux/futex.h> #include <sys/syscall.h> -typedef std::atomic<int32_t> Futex; -typedef volatile int32_t Footex; +gb_internal void futex_signal(Futex *addr) { + int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0); + if (ret == -1) { + perror("Futex wake"); + GB_PANIC("Failed in futex wake!\n"); + } +} -gb_internal void tpool_wake_addr(Futex *addr) { - for (;;) { - int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0); - if (ret == -1) { - perror("Futex wake"); - GB_PANIC("Failed in futex wake!\n"); - } else if (ret > 0) { - return; - } +gb_internal void futex_broadcast(Futex *addr) { + int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL, 0); + if (ret == -1) { + perror("Futex wake"); + GB_PANIC("Failed in futex wake!\n"); } } -gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { +gb_internal void futex_wait(Futex *addr, Footex val) { for (;;) { - int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0); + int ret = syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL, 0); if (ret == -1) { if (errno != EAGAIN) { perror("Futex wait"); @@ -479,14 +607,15 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { #include <sys/types.h> #include <sys/umtx.h> -typedef std::atomic<int32_t> Futex; -typedef volatile int32_t Footex; - -gb_internal void tpool_wake_addr(Futex *addr) { +gb_internal void futex_signal(Futex *addr) { _umtx_op(addr, UMTX_OP_WAKE, 1, 0, 0); } -gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { +gb_internal void futex_broadcast(Futex *addr) { + _umtx_op(addr, UMTX_OP_WAKE, INT32_MAX, 0, 0); +} + +gb_internal void futex_wait(Futex *addr, Footex val) { for (;;) { int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL); if (ret == 0) { @@ -508,12 +637,26 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { #include <sys/futex.h> -typedef std::atomic<int32_t> Futex; -typedef volatile int32_t Footex; +gb_internal void futex_signal(Futex *f) { + for (;;) { + int ret = futex((volatile uint32_t *)f, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL); + if (ret == -1) { + if (errno == ETIMEDOUT || errno == EINTR) { + continue; + } + + perror("Futex wake"); + GB_PANIC("futex wake fail"); + } else if (ret == 1) { + return; + } + } +} + -gb_internal void tpool_wake_addr(Futex *addr) { +gb_internal void futex_broadcast(Futex *f) { for (;;) { - int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL); + int ret = futex((volatile uint32_t *)f, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL); if (ret == -1) { if (errno == ETIMEDOUT || errno == EINTR) { continue; @@ -527,11 +670,11 @@ gb_internal void tpool_wake_addr(Futex *addr) { } } -gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { +gb_internal void futex_wait(Futex *f, Footex val) { for (;;) { - int ret = futex((volatile uint32_t *)addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL); + int ret = futex((volatile uint32_t *)f, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL); if (ret == -1) { - if (*addr != val) { + if (*f != val) { return; } @@ -547,46 +690,58 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { #elif defined(GB_SYSTEM_OSX) -typedef std::atomic<int64_t> Futex; -typedef volatile int64_t Footex; - #define UL_COMPARE_AND_WAIT 0x00000001 #define ULF_NO_ERRNO 0x01000000 extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); -gb_internal void tpool_wake_addr(Futex *addr) { +gb_internal void futex_signal(Futex *f) { for (;;) { - int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0); + int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0); if (ret >= 0) { return; } - ret = -ret; - if (ret == EINTR || ret == EFAULT) { + if (ret == -EINTR || ret == -EFAULT) { + continue; + } + if (ret == -ENOENT) { + return; + } + GB_PANIC("Failed in futex wake!\n"); + } +} + +gb_internal void futex_broadcast(Futex *f) { + for (;;) { + enum { ULF_WAKE_ALL = 0x00000100 }; + int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0); + if (ret == 0) { + return; + } + if (ret == -EINTR || ret == -EFAULT) { continue; } - if (ret == ENOENT) { + if (ret == -ENOENT) { return; } GB_PANIC("Failed in futex wake!\n"); } } -gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { +gb_internal void futex_wait(Futex *f, Footex val) { for (;;) { - int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0); + int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, val, 0); if (ret >= 0) { - if (*addr != val) { + if (*f != val) { return; } continue; } - ret = -ret; - if (ret == EINTR || ret == EFAULT) { - continue; + if (ret == -EINTR || ret == -EFAULT) {continue; + ret = -ret; } - if (ret == ENOENT) { + if (ret == -ENOENT) { return; } @@ -594,18 +749,19 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { } } #elif defined(GB_SYSTEM_WINDOWS) -typedef std::atomic<int64_t> Futex; -typedef volatile int64_t Footex; -gb_internal void tpool_wake_addr(Futex *addr) { - WakeByAddressSingle((void *)addr); +gb_internal void futex_signal(Futex *f) { + WakeByAddressSingle(f); } -gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) { - for (;;) { - WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE); - if (*addr != val) break; - } +gb_internal void futex_broadcast(Futex *f) { + WakeByAddressAll(f); +} + +gb_internal void futex_wait(Futex *f, Footex val) { + do { + WaitOnAddress(f, (void *)&val, sizeof(val), INFINITE); + } while (f->load() == val); } #endif |