aboutsummaryrefslogtreecommitdiff
path: root/src/threading.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/threading.cpp')
-rw-r--r--src/threading.cpp524
1 files changed, 340 insertions, 184 deletions
diff --git a/src/threading.cpp b/src/threading.cpp
index 7a7d1a299..52e6b722a 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -8,10 +8,12 @@
struct BlockingMutex;
struct RecursiveMutex;
+struct RwMutex;
struct Semaphore;
struct Condition;
struct Thread;
struct ThreadPool;
+struct Parker;
#define THREAD_PROC(name) isize name(struct Thread *thread)
gb_internal THREAD_PROC(thread_pool_thread_proc);
@@ -41,31 +43,40 @@ struct Thread {
struct ThreadPool *pool;
};
+typedef std::atomic<i32> Futex;
+typedef volatile i32 Footex;
+
+gb_internal void futex_wait(Futex *addr, Footex val);
+gb_internal void futex_signal(Futex *addr);
+gb_internal void futex_broadcast(Futex *addr);
-gb_internal void mutex_init (BlockingMutex *m);
-gb_internal void mutex_destroy (BlockingMutex *m);
gb_internal void mutex_lock (BlockingMutex *m);
gb_internal bool mutex_try_lock(BlockingMutex *m);
gb_internal void mutex_unlock (BlockingMutex *m);
-gb_internal void mutex_init (RecursiveMutex *m);
-gb_internal void mutex_destroy (RecursiveMutex *m);
+
gb_internal void mutex_lock (RecursiveMutex *m);
gb_internal bool mutex_try_lock(RecursiveMutex *m);
gb_internal void mutex_unlock (RecursiveMutex *m);
-gb_internal void semaphore_init (Semaphore *s);
-gb_internal void semaphore_destroy(Semaphore *s);
+gb_internal void rw_mutex_lock (RwMutex *m);
+gb_internal bool rw_mutex_try_lock (RwMutex *m);
+gb_internal void rw_mutex_unlock (RwMutex *m);
+gb_internal void rw_mutex_shared_lock (RwMutex *m);
+gb_internal bool rw_mutex_try_shared_lock(RwMutex *m);
+gb_internal void rw_mutex_shared_unlock (RwMutex *m);
+
gb_internal void semaphore_post (Semaphore *s, i32 count);
gb_internal void semaphore_wait (Semaphore *s);
gb_internal void semaphore_release(Semaphore *s) { semaphore_post(s, 1); }
-gb_internal void condition_init(Condition *c);
-gb_internal void condition_destroy(Condition *c);
gb_internal void condition_broadcast(Condition *c);
gb_internal void condition_signal(Condition *c);
gb_internal void condition_wait(Condition *c, BlockingMutex *m);
-gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms);
+
+gb_internal void park(Parker *p);
+gb_internal void unpark_one(Parker *p);
+gb_internal void unpark_all(Parker *p);
gb_internal u32 thread_current_id(void);
@@ -79,22 +90,23 @@ gb_internal void yield_process(void);
struct MutexGuard {
- MutexGuard() = delete;
+ MutexGuard() = delete;
MutexGuard(MutexGuard const &) = delete;
+ MutexGuard(MutexGuard &&) = delete;
- MutexGuard(BlockingMutex *bm) : bm{bm} {
+ explicit MutexGuard(BlockingMutex *bm) noexcept : bm{bm} {
mutex_lock(this->bm);
}
- MutexGuard(RecursiveMutex *rm) : rm{rm} {
+ explicit MutexGuard(RecursiveMutex *rm) noexcept : rm{rm} {
mutex_lock(this->rm);
}
- MutexGuard(BlockingMutex &bm) : bm{&bm} {
+ explicit MutexGuard(BlockingMutex &bm) noexcept : bm{&bm} {
mutex_lock(this->bm);
}
- MutexGuard(RecursiveMutex &rm) : rm{&rm} {
+ explicit MutexGuard(RecursiveMutex &rm) noexcept : rm{&rm} {
mutex_lock(this->rm);
}
- ~MutexGuard() {
+ ~MutexGuard() noexcept {
if (this->bm) {
mutex_unlock(this->bm);
} else if (this->rm) {
@@ -102,24 +114,87 @@ struct MutexGuard {
}
}
- operator bool() const { return true; }
+ operator bool() const noexcept { return true; }
BlockingMutex *bm;
RecursiveMutex *rm;
};
#define MUTEX_GUARD_BLOCK(m) if (MutexGuard GB_DEFER_3(_mutex_guard_){m})
-#define MUTEX_GUARD(m) MutexGuard GB_DEFER_3(_mutex_guard_){m}
+#define MUTEX_GUARD(m) mutex_lock(m); defer (mutex_unlock(m))
+
+
+struct RecursiveMutex {
+ Futex owner;
+ i32 recursion;
+};
+
+gb_internal void mutex_lock(RecursiveMutex *m) {
+ Futex tid;
+ tid.store(cast(i32)thread_current_id());
+ for (;;) {
+ i32 prev_owner = 0;
+ m->owner.compare_exchange_strong(prev_owner, tid, std::memory_order_acquire, std::memory_order_acquire);
+ if (prev_owner == 0 || prev_owner == tid) {
+ m->recursion++;
+ // inside the lock
+ return;
+ }
+ futex_wait(&m->owner, prev_owner);
+ }
+}
+gb_internal bool mutex_try_lock(RecursiveMutex *m) {
+ Futex tid;
+ tid.store(cast(i32)thread_current_id());
+ i32 prev_owner = 0;
+ m->owner.compare_exchange_strong(prev_owner, tid, std::memory_order_acquire, std::memory_order_acquire);
+ if (prev_owner == 0 || prev_owner == tid) {
+ m->recursion++;
+ // inside the lock
+ return true;
+ }
+ return false;
+}
+gb_internal void mutex_unlock(RecursiveMutex *m) {
+ m->recursion--;
+ if (m->recursion != 0) {
+ return;
+ }
+ m->owner.exchange(0, std::memory_order_release);
+ futex_signal(&m->owner);
+ // outside the lock
+}
+
+struct Semaphore {
+ Futex count;
+};
+
+gb_internal void semaphore_post(Semaphore *s, i32 count) {
+ s->count.fetch_add(count, std::memory_order_release);
+ if (s->count == 1) {
+ futex_signal(&s->count);
+ } else {
+ futex_broadcast(&s->count);
+ }
+}
+gb_internal void semaphore_wait(Semaphore *s) {
+ for (;;) {
+ i32 original_count = s->count.load(std::memory_order_relaxed);
+ while (original_count == 0) {
+ futex_wait(&s->count, original_count);
+ original_count = s->count;
+ }
+ if (!s->count.compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) {
+ return;
+ }
+ }
+}
#if defined(GB_SYSTEM_WINDOWS)
struct BlockingMutex {
SRWLOCK srwlock;
};
- gb_internal void mutex_init(BlockingMutex *m) {
- }
- gb_internal void mutex_destroy(BlockingMutex *m) {
- }
gb_internal void mutex_lock(BlockingMutex *m) {
AcquireSRWLockExclusive(&m->srwlock);
}
@@ -130,50 +205,10 @@ struct MutexGuard {
ReleaseSRWLockExclusive(&m->srwlock);
}
- struct RecursiveMutex {
- CRITICAL_SECTION win32_critical_section;
- };
- gb_internal void mutex_init(RecursiveMutex *m) {
- InitializeCriticalSection(&m->win32_critical_section);
- }
- gb_internal void mutex_destroy(RecursiveMutex *m) {
- DeleteCriticalSection(&m->win32_critical_section);
- }
- gb_internal void mutex_lock(RecursiveMutex *m) {
- EnterCriticalSection(&m->win32_critical_section);
- }
- gb_internal bool mutex_try_lock(RecursiveMutex *m) {
- return TryEnterCriticalSection(&m->win32_critical_section) != 0;
- }
- gb_internal void mutex_unlock(RecursiveMutex *m) {
- LeaveCriticalSection(&m->win32_critical_section);
- }
-
- struct Semaphore {
- void *win32_handle;
- };
-
- gb_internal void semaphore_init(Semaphore *s) {
- s->win32_handle = CreateSemaphoreA(NULL, 0, I32_MAX, NULL);
- }
- gb_internal void semaphore_destroy(Semaphore *s) {
- CloseHandle(s->win32_handle);
- }
- gb_internal void semaphore_post(Semaphore *s, i32 count) {
- ReleaseSemaphore(s->win32_handle, count, NULL);
- }
- gb_internal void semaphore_wait(Semaphore *s) {
- WaitForSingleObjectEx(s->win32_handle, INFINITE, FALSE);
- }
-
struct Condition {
CONDITION_VARIABLE cond;
};
- gb_internal void condition_init(Condition *c) {
- }
- gb_internal void condition_destroy(Condition *c) {
- }
gb_internal void condition_broadcast(Condition *c) {
WakeAllConditionVariable(&c->cond);
}
@@ -183,103 +218,192 @@ struct MutexGuard {
gb_internal void condition_wait(Condition *c, BlockingMutex *m) {
SleepConditionVariableSRW(&c->cond, &m->srwlock, INFINITE, 0);
}
- gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) {
- SleepConditionVariableSRW(&c->cond, &m->srwlock, timeout_in_ms, 0);
+
+ struct RwMutex {
+ SRWLOCK srwlock;
+ };
+
+ gb_internal void rw_mutex_lock(RwMutex *m) {
+ AcquireSRWLockExclusive(&m->srwlock);
+ }
+ gb_internal bool rw_mutex_try_lock(RwMutex *m) {
+ return !!TryAcquireSRWLockExclusive(&m->srwlock);
+ }
+ gb_internal void rw_mutex_unlock(RwMutex *m) {
+ ReleaseSRWLockExclusive(&m->srwlock);
}
+ gb_internal void rw_mutex_shared_lock(RwMutex *m) {
+ AcquireSRWLockShared(&m->srwlock);
+ }
+ gb_internal bool rw_mutex_try_shared_lock(RwMutex *m) {
+ return !!TryAcquireSRWLockShared(&m->srwlock);
+ }
+ gb_internal void rw_mutex_shared_unlock(RwMutex *m) {
+ ReleaseSRWLockShared(&m->srwlock);
+ }
#else
+ enum Internal_Mutex_State : i32 {
+ Internal_Mutex_State_Unlocked = 0,
+ Internal_Mutex_State_Locked = 1,
+ Internal_Mutex_State_Waiting = 2,
+ };
+
struct BlockingMutex {
- pthread_mutex_t pthread_mutex;
+ i32 state_;
+
+ Futex &state() {
+ return *(Futex *)&this->state_;
+ }
+ Futex const &state() const {
+ return *(Futex const *)&this->state_;
+ }
};
- gb_internal void mutex_init(BlockingMutex *m) {
- pthread_mutex_init(&m->pthread_mutex, nullptr);
- }
- gb_internal void mutex_destroy(BlockingMutex *m) {
- pthread_mutex_destroy(&m->pthread_mutex);
+
+ gb_no_inline gb_internal void mutex_lock_slow(BlockingMutex *m, i32 curr_state) {
+ i32 new_state = curr_state;
+ for (i32 spin = 0; spin < 100; spin++) {
+ i32 state = Internal_Mutex_State_Unlocked;
+ bool ok = m->state().compare_exchange_weak(state, new_state, std::memory_order_acquire, std::memory_order_consume);
+ if (ok) {
+ return;
+ }
+ if (state == Internal_Mutex_State_Waiting) {
+ break;
+ }
+ for (i32 i = gb_min(spin+1, 32); i > 0; i--) {
+ yield_thread();
+ }
+ }
+
+ // Set just in case 100 iterations did not do it
+ new_state = Internal_Mutex_State_Waiting;
+
+ for (;;) {
+ if (m->state().exchange(Internal_Mutex_State_Waiting, std::memory_order_acquire) == Internal_Mutex_State_Unlocked) {
+ return;
+ }
+ futex_wait(&m->state(), new_state);
+ yield_thread();
+ }
}
+
gb_internal void mutex_lock(BlockingMutex *m) {
- pthread_mutex_lock(&m->pthread_mutex);
+ i32 v = m->state().exchange(Internal_Mutex_State_Locked, std::memory_order_acquire);
+ if (v != Internal_Mutex_State_Unlocked) {
+ mutex_lock_slow(m, v);
+ }
}
gb_internal bool mutex_try_lock(BlockingMutex *m) {
- return pthread_mutex_trylock(&m->pthread_mutex) == 0;
+ i32 v = m->state().exchange(Internal_Mutex_State_Locked, std::memory_order_acquire);
+ return v == Internal_Mutex_State_Unlocked;
}
- gb_internal void mutex_unlock(BlockingMutex *m) {
- pthread_mutex_unlock(&m->pthread_mutex);
+
+ gb_no_inline gb_internal void mutex_unlock_slow(BlockingMutex *m) {
+ futex_signal(&m->state());
}
- struct RecursiveMutex {
- pthread_mutex_t pthread_mutex;
- pthread_mutexattr_t pthread_mutexattr;
- };
- gb_internal void mutex_init(RecursiveMutex *m) {
- pthread_mutexattr_init(&m->pthread_mutexattr);
- pthread_mutexattr_settype(&m->pthread_mutexattr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&m->pthread_mutex, &m->pthread_mutexattr);
- }
- gb_internal void mutex_destroy(RecursiveMutex *m) {
- pthread_mutex_destroy(&m->pthread_mutex);
- }
- gb_internal void mutex_lock(RecursiveMutex *m) {
- pthread_mutex_lock(&m->pthread_mutex);
- }
- gb_internal bool mutex_try_lock(RecursiveMutex *m) {
- return pthread_mutex_trylock(&m->pthread_mutex) == 0;
- }
- gb_internal void mutex_unlock(RecursiveMutex *m) {
- pthread_mutex_unlock(&m->pthread_mutex);
- }
-
- #if defined(GB_SYSTEM_OSX)
- struct Semaphore {
- semaphore_t osx_handle;
- };
-
- gb_internal void semaphore_init (Semaphore *s) { semaphore_create(mach_task_self(), &s->osx_handle, SYNC_POLICY_FIFO, 0); }
- gb_internal void semaphore_destroy(Semaphore *s) { semaphore_destroy(mach_task_self(), s->osx_handle); }
- gb_internal void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) semaphore_signal(s->osx_handle); }
- gb_internal void semaphore_wait (Semaphore *s) { semaphore_wait(s->osx_handle); }
- #elif defined(GB_SYSTEM_UNIX)
- struct Semaphore {
- sem_t unix_handle;
- };
-
- gb_internal void semaphore_init (Semaphore *s) { sem_init(&s->unix_handle, 0, 0); }
- gb_internal void semaphore_destroy(Semaphore *s) { sem_destroy(&s->unix_handle); }
- gb_internal void semaphore_post (Semaphore *s, i32 count) { while (count --> 0) sem_post(&s->unix_handle); }
- void semaphore_wait (Semaphore *s) { int i; do { i = sem_wait(&s->unix_handle); } while (i == -1 && errno == EINTR); }
- #else
- #error Implement Semaphore for this platform
- #endif
-
+ gb_internal void mutex_unlock(BlockingMutex *m) {
+ i32 v = m->state().exchange(Internal_Mutex_State_Unlocked, std::memory_order_release);
+ switch (v) {
+ case Internal_Mutex_State_Unlocked:
+ GB_PANIC("Unreachable");
+ break;
+ case Internal_Mutex_State_Locked:
+ // Okay
+ break;
+ case Internal_Mutex_State_Waiting:
+ mutex_unlock_slow(m);
+ break;
+ }
+ }
struct Condition {
- pthread_cond_t pthread_cond;
+ i32 state_;
+
+ Futex &state() {
+ return *(Futex *)&this->state_;
+ }
+ Futex const &state() const {
+ return *(Futex const *)&this->state_;
+ }
};
-
- gb_internal void condition_init(Condition *c) {
- pthread_cond_init(&c->pthread_cond, NULL);
- }
- gb_internal void condition_destroy(Condition *c) {
- pthread_cond_destroy(&c->pthread_cond);
- }
+
gb_internal void condition_broadcast(Condition *c) {
- pthread_cond_broadcast(&c->pthread_cond);
+ c->state().fetch_add(1, std::memory_order_release);
+ futex_broadcast(&c->state());
}
gb_internal void condition_signal(Condition *c) {
- pthread_cond_signal(&c->pthread_cond);
+ c->state().fetch_add(1, std::memory_order_release);
+ futex_signal(&c->state());
}
gb_internal void condition_wait(Condition *c, BlockingMutex *m) {
- pthread_cond_wait(&c->pthread_cond, &m->pthread_mutex);
+ i32 state = c->state().load(std::memory_order_relaxed);
+ mutex_unlock(m);
+ futex_wait(&c->state(), state);
+ mutex_lock(m);
}
- gb_internal void condition_wait_with_timeout(Condition *c, BlockingMutex *m, u32 timeout_in_ms) {
- struct timespec abstime = {};
- abstime.tv_sec = timeout_in_ms/1000;
- abstime.tv_nsec = cast(long)(timeout_in_ms%1000)*1e6;
- pthread_cond_timedwait(&c->pthread_cond, &m->pthread_mutex, &abstime);
-
+
+ struct RwMutex {
+ // TODO(bill): make this a proper RW mutex
+ BlockingMutex mutex;
+ };
+
+ gb_internal void rw_mutex_lock(RwMutex *m) {
+ mutex_lock(&m->mutex);
+ }
+ gb_internal bool rw_mutex_try_lock(RwMutex *m) {
+ return mutex_try_lock(&m->mutex);
+ }
+ gb_internal void rw_mutex_unlock(RwMutex *m) {
+ mutex_unlock(&m->mutex);
+ }
+
+ gb_internal void rw_mutex_shared_lock(RwMutex *m) {
+ mutex_lock(&m->mutex);
+ }
+ gb_internal bool rw_mutex_try_shared_lock(RwMutex *m) {
+ return mutex_try_lock(&m->mutex);
+ }
+ gb_internal void rw_mutex_shared_unlock(RwMutex *m) {
+ mutex_unlock(&m->mutex);
}
#endif
+struct Parker {
+ Futex state;
+};
+enum ParkerState : u32 {
+ ParkerState_Empty = 0,
+ ParkerState_Notified = 1,
+ ParkerState_Parked = UINT32_MAX,
+};
+
+gb_internal void park(Parker *p) {
+ if (p->state.fetch_sub(1, std::memory_order_acquire) == ParkerState_Notified) {
+ return;
+ }
+ for (;;) {
+ futex_wait(&p->state, ParkerState_Parked);
+ i32 notified = ParkerState_Empty;
+ if (p->state.compare_exchange_strong(notified, ParkerState_Empty, std::memory_order_acquire, std::memory_order_acquire)) {
+ return;
+ }
+ }
+}
+
+gb_internal void unpark_one(Parker *p) {
+ if (p->state.exchange(ParkerState_Notified, std::memory_order_release) == ParkerState_Parked) {
+ futex_signal(&p->state);
+ }
+}
+
+gb_internal void unpark_all(Parker *p) {
+ if (p->state.exchange(ParkerState_Notified, std::memory_order_release) == ParkerState_Parked) {
+ futex_broadcast(&p->state);
+ }
+}
+
gb_internal u32 thread_current_id(void) {
u32 thread_id;
@@ -364,12 +488,13 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) {
#endif
t->capacity = 1 << 14; // must be a power of 2
- t->queue = (WorkerTask *)calloc(sizeof(WorkerTask), t->capacity);
+ t->queue = gb_alloc_array(heap_allocator(), WorkerTask, t->capacity);
t->head_and_tail = 0;
t->pool = pool;
t->idx = idx;
}
+
gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) {
thread_init(pool, t, idx);
isize stack_size = 0;
@@ -400,6 +525,8 @@ gb_internal void thread_join_and_destroy(Thread *t) {
pthread_join(t->posix_handle, NULL);
t->posix_handle = 0;
#endif
+
+ gb_free(heap_allocator(), t->queue);
}
gb_internal void thread_set_name(Thread *t, char const *name) {
@@ -441,24 +568,25 @@ gb_internal void thread_set_name(Thread *t, char const *name) {
#include <linux/futex.h>
#include <sys/syscall.h>
-typedef std::atomic<int32_t> Futex;
-typedef volatile int32_t Footex;
+gb_internal void futex_signal(Futex *addr) {
+ int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0);
+ if (ret == -1) {
+ perror("Futex wake");
+ GB_PANIC("Failed in futex wake!\n");
+ }
+}
-gb_internal void tpool_wake_addr(Futex *addr) {
- for (;;) {
- int ret = syscall(SYS_futex, addr, FUTEX_WAKE, 1, NULL, NULL, 0);
- if (ret == -1) {
- perror("Futex wake");
- GB_PANIC("Failed in futex wake!\n");
- } else if (ret > 0) {
- return;
- }
+gb_internal void futex_broadcast(Futex *addr) {
+ int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL, 0);
+ if (ret == -1) {
+ perror("Futex wake");
+ GB_PANIC("Failed in futex wake!\n");
}
}
-gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+gb_internal void futex_wait(Futex *addr, Footex val) {
for (;;) {
- int ret = syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0);
+ int ret = syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL, 0);
if (ret == -1) {
if (errno != EAGAIN) {
perror("Futex wait");
@@ -479,14 +607,15 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
#include <sys/types.h>
#include <sys/umtx.h>
-typedef std::atomic<int32_t> Futex;
-typedef volatile int32_t Footex;
-
-gb_internal void tpool_wake_addr(Futex *addr) {
+gb_internal void futex_signal(Futex *addr) {
_umtx_op(addr, UMTX_OP_WAKE, 1, 0, 0);
}
-gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+gb_internal void futex_broadcast(Futex *addr) {
+ _umtx_op(addr, UMTX_OP_WAKE, INT32_MAX, 0, 0);
+}
+
+gb_internal void futex_wait(Futex *addr, Footex val) {
for (;;) {
int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL);
if (ret == 0) {
@@ -508,12 +637,26 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
#include <sys/futex.h>
-typedef std::atomic<int32_t> Futex;
-typedef volatile int32_t Footex;
+gb_internal void futex_signal(Futex *f) {
+ for (;;) {
+ int ret = futex((volatile uint32_t *)f, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL);
+ if (ret == -1) {
+ if (errno == ETIMEDOUT || errno == EINTR) {
+ continue;
+ }
+
+ perror("Futex wake");
+ GB_PANIC("futex wake fail");
+ } else if (ret == 1) {
+ return;
+ }
+ }
+}
+
-gb_internal void tpool_wake_addr(Futex *addr) {
+gb_internal void futex_broadcast(Futex *f) {
for (;;) {
- int ret = futex((volatile uint32_t *)addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL);
+ int ret = futex((volatile uint32_t *)f, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL);
if (ret == -1) {
if (errno == ETIMEDOUT || errno == EINTR) {
continue;
@@ -527,11 +670,11 @@ gb_internal void tpool_wake_addr(Futex *addr) {
}
}
-gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+gb_internal void futex_wait(Futex *f, Footex val) {
for (;;) {
- int ret = futex((volatile uint32_t *)addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL);
+ int ret = futex((volatile uint32_t *)f, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, NULL, NULL);
if (ret == -1) {
- if (*addr != val) {
+ if (*f != val) {
return;
}
@@ -547,46 +690,58 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
#elif defined(GB_SYSTEM_OSX)
-typedef std::atomic<int64_t> Futex;
-typedef volatile int64_t Footex;
-
#define UL_COMPARE_AND_WAIT 0x00000001
#define ULF_NO_ERRNO 0x01000000
extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */
extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
-gb_internal void tpool_wake_addr(Futex *addr) {
+gb_internal void futex_signal(Futex *f) {
for (;;) {
- int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, 0);
+ int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0);
if (ret >= 0) {
return;
}
- ret = -ret;
- if (ret == EINTR || ret == EFAULT) {
+ if (ret == -EINTR || ret == -EFAULT) {
+ continue;
+ }
+ if (ret == -ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futex wake!\n");
+ }
+}
+
+gb_internal void futex_broadcast(Futex *f) {
+ for (;;) {
+ enum { ULF_WAKE_ALL = 0x00000100 };
+ int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0);
+ if (ret == 0) {
+ return;
+ }
+ if (ret == -EINTR || ret == -EFAULT) {
continue;
}
- if (ret == ENOENT) {
+ if (ret == -ENOENT) {
return;
}
GB_PANIC("Failed in futex wake!\n");
}
}
-gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
+gb_internal void futex_wait(Futex *f, Footex val) {
for (;;) {
- int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, addr, val, 0);
+ int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, val, 0);
if (ret >= 0) {
- if (*addr != val) {
+ if (*f != val) {
return;
}
continue;
}
- ret = -ret;
- if (ret == EINTR || ret == EFAULT) {
- continue;
+ if (ret == -EINTR || ret == -EFAULT) {continue;
+ ret = -ret;
}
- if (ret == ENOENT) {
+ if (ret == -ENOENT) {
return;
}
@@ -594,18 +749,19 @@ gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
}
}
#elif defined(GB_SYSTEM_WINDOWS)
-typedef std::atomic<int64_t> Futex;
-typedef volatile int64_t Footex;
-gb_internal void tpool_wake_addr(Futex *addr) {
- WakeByAddressSingle((void *)addr);
+gb_internal void futex_signal(Futex *f) {
+ WakeByAddressSingle(f);
}
-gb_internal void tpool_wait_on_addr(Futex *addr, Footex val) {
- for (;;) {
- WaitOnAddress(addr, (void *)&val, sizeof(val), INFINITE);
- if (*addr != val) break;
- }
+gb_internal void futex_broadcast(Futex *f) {
+ WakeByAddressAll(f);
+}
+
+gb_internal void futex_wait(Futex *f, Footex val) {
+ do {
+ WaitOnAddress(f, (void *)&val, sizeof(val), INFINITE);
+ } while (f->load() == val);
}
#endif