aboutsummaryrefslogtreecommitdiff
path: root/src/threading.cpp
diff options
context:
space:
mode:
authorRilleP <rikkypikki@hotmail.com>2024-04-10 19:10:33 +0200
committerGitHub <noreply@github.com>2024-04-10 19:10:33 +0200
commit95a38d5a96322cd9adbb03bd5b7425b93469e62f (patch)
tree8cdc3b962506ddc4b7b1883f0e2ecb9dce54e2f9 /src/threading.cpp
parent239d4e10762a12e96280bd91003acbf2170cadf2 (diff)
parent13e459980b0143b49762cdfd04dd5cf1bbf83daa (diff)
Merge branch 'master' into parsing-package-fixes
Diffstat (limited to 'src/threading.cpp')
-rw-r--r--src/threading.cpp267
1 files changed, 266 insertions, 1 deletions
diff --git a/src/threading.cpp b/src/threading.cpp
index c283da425..fbe8997d1 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -107,6 +107,20 @@ gb_internal void thread_set_name (Thread *t, char const *name);
gb_internal void yield_thread(void);
gb_internal void yield_process(void);
+struct Wait_Signal {
+ Futex futex;
+};
+
+gb_internal void wait_signal_until_available(Wait_Signal *ws) {
+ if (ws->futex.load() == 0) {
+ futex_wait(&ws->futex, 0);
+ }
+}
+
+gb_internal void wait_signal_set(Wait_Signal *ws) {
+ ws->futex.store(1);
+ futex_broadcast(&ws->futex);
+}
struct MutexGuard {
MutexGuard() = delete;
@@ -119,17 +133,25 @@ struct MutexGuard {
explicit MutexGuard(RecursiveMutex *rm) noexcept : rm{rm} {
mutex_lock(this->rm);
}
+ explicit MutexGuard(RwMutex *rwm) noexcept : rwm{rwm} {
+ rw_mutex_lock(this->rwm);
+ }
explicit MutexGuard(BlockingMutex &bm) noexcept : bm{&bm} {
mutex_lock(this->bm);
}
explicit MutexGuard(RecursiveMutex &rm) noexcept : rm{&rm} {
mutex_lock(this->rm);
}
+ explicit MutexGuard(RwMutex &rwm) noexcept : rwm{&rwm} {
+ rw_mutex_lock(this->rwm);
+ }
~MutexGuard() noexcept {
if (this->bm) {
mutex_unlock(this->bm);
} else if (this->rm) {
mutex_unlock(this->rm);
+ } else if (this->rwm) {
+ rw_mutex_unlock(this->rwm);
}
}
@@ -137,10 +159,12 @@ struct MutexGuard {
BlockingMutex *bm;
RecursiveMutex *rm;
+ RwMutex *rwm;
};
#define MUTEX_GUARD_BLOCK(m) if (MutexGuard GB_DEFER_3(_mutex_guard_){m})
#define MUTEX_GUARD(m) mutex_lock(m); defer (mutex_unlock(m))
+#define RW_MUTEX_GUARD(m) rw_mutex_lock(m); defer (rw_mutex_unlock(m))
struct RecursiveMutex {
@@ -466,6 +490,8 @@ gb_internal u32 thread_current_id(void) {
__asm__("mov %%fs:0x10,%0" : "=r"(thread_id));
#elif defined(GB_SYSTEM_LINUX)
thread_id = gettid();
+#elif defined(GB_SYSTEM_HAIKU)
+ thread_id = find_thread(NULL);
#else
#error Unsupported architecture for thread_current_id()
#endif
@@ -732,6 +758,11 @@ gb_internal void futex_wait(Futex *f, Footex val) {
#elif defined(GB_SYSTEM_OSX)
+#if __has_include(<os/os_sync_wait_on_address.h>)
+ #define DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ #include <os/os_sync_wait_on_address.h>
+#endif
+
#define UL_COMPARE_AND_WAIT 0x00000001
#define ULF_NO_ERRNO 0x01000000
@@ -739,6 +770,23 @@ extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint
extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
gb_internal void futex_signal(Futex *f) {
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ if (__builtin_available(macOS 14.4, *)) {
+ for (;;) {
+ int ret = os_sync_wake_by_address_any(f, sizeof(Futex), OS_SYNC_WAKE_BY_ADDRESS_NONE);
+ if (ret >= 0) {
+ return;
+ }
+ if (errno == EINTR || errno == EFAULT) {
+ continue;
+ }
+ if (errno == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futex wake %d %d!\n", ret, errno);
+ }
+ } else {
+ #endif
for (;;) {
int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0);
if (ret >= 0) {
@@ -752,9 +800,29 @@ gb_internal void futex_signal(Futex *f) {
}
GB_PANIC("Failed in futex wake!\n");
}
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ }
+ #endif
}
gb_internal void futex_broadcast(Futex *f) {
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ if (__builtin_available(macOS 14.4, *)) {
+ for (;;) {
+ int ret = os_sync_wake_by_address_all(f, sizeof(Footex), OS_SYNC_WAKE_BY_ADDRESS_NONE);
+ if (ret >= 0) {
+ return;
+ }
+ if (errno == EINTR || errno == EFAULT) {
+ continue;
+ }
+ if (errno == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futext wake %d %d!\n", ret, errno);
+ }
+ } else {
+ #endif
for (;;) {
enum { ULF_WAKE_ALL = 0x00000100 };
int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0);
@@ -769,9 +837,32 @@ gb_internal void futex_broadcast(Futex *f) {
}
GB_PANIC("Failed in futex wake!\n");
}
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ }
+ #endif
}
gb_internal void futex_wait(Futex *f, Footex val) {
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ if (__builtin_available(macOS 14.4, *)) {
+ for (;;) {
+ int ret = os_sync_wait_on_address(f, cast(uint64_t)(val), sizeof(Footex), OS_SYNC_WAIT_ON_ADDRESS_NONE);
+ if (ret >= 0) {
+ if (*f != val) {
+ return;
+ }
+ continue;
+ }
+ if (errno == EINTR || errno == EFAULT) {
+ continue;
+ }
+ if (errno == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futex wait %d %d!\n", ret, errno);
+ }
+ } else {
+ #endif
for (;;) {
int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, val, 0);
if (ret >= 0) {
@@ -789,7 +880,11 @@ gb_internal void futex_wait(Futex *f, Footex val) {
GB_PANIC("Failed in futex wait!\n");
}
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ }
+ #endif
}
+
#elif defined(GB_SYSTEM_WINDOWS)
gb_internal void futex_signal(Futex *f) {
@@ -805,8 +900,178 @@ gb_internal void futex_wait(Futex *f, Footex val) {
WaitOnAddress(f, (void *)&val, sizeof(val), INFINITE);
} while (f->load() == val);
}
+
+#elif defined(GB_SYSTEM_HAIKU)
+
+// Futex implementation taken from https://tavianator.com/2023/futex.html
+
+#include <pthread.h>
+#include <atomic>
+
+struct _Spinlock {
+ std::atomic_flag state;
+
+ void init() {
+ state.clear();
+ }
+
+ void lock() {
+ while (state.test_and_set(std::memory_order_acquire)) {
+ #if defined(GB_CPU_X86)
+ _mm_pause();
+ #else
+ (void)0; // spin...
+ #endif
+ }
+ }
+
+ void unlock() {
+ state.clear(std::memory_order_release);
+ }
+};
+
+struct Futex_Waitq;
+
+struct Futex_Waiter {
+ _Spinlock lock;
+ pthread_t thread;
+ Futex *futex;
+ Futex_Waitq *waitq;
+ Futex_Waiter *prev, *next;
+};
+
+struct Futex_Waitq {
+ _Spinlock lock;
+ Futex_Waiter list;
+
+ void init() {
+ auto head = &list;
+ head->prev = head->next = head;
+ }
+};
+
+// FIXME: This approach may scale badly in the future,
+// possible solution - hash map (leads to deadlocks now).
+
+Futex_Waitq g_waitq = {
+ .lock = ATOMIC_FLAG_INIT,
+ .list = {
+ .prev = &g_waitq.list,
+ .next = &g_waitq.list,
+ },
+};
+
+Futex_Waitq *get_waitq(Futex *f) {
+ // Future hash map method...
+ return &g_waitq;
+}
+
+void futex_signal(Futex *f) {
+ auto waitq = get_waitq(f);
+
+ waitq->lock.lock();
+
+ auto head = &waitq->list;
+ for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
+ if (waiter->futex != f) {
+ continue;
+ }
+ waitq->lock.unlock();
+ pthread_kill(waiter->thread, SIGCONT);
+ return;
+ }
+
+ waitq->lock.unlock();
+}
+
+void futex_broadcast(Futex *f) {
+ auto waitq = get_waitq(f);
+
+ waitq->lock.lock();
+
+ auto head = &waitq->list;
+ for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
+ if (waiter->futex != f) {
+ continue;
+ }
+ if (waiter->next == head) {
+ waitq->lock.unlock();
+ pthread_kill(waiter->thread, SIGCONT);
+ return;
+ } else {
+ pthread_kill(waiter->thread, SIGCONT);
+ }
+ }
+
+ waitq->lock.unlock();
+}
+
+void futex_wait(Futex *f, Footex val) {
+ Futex_Waiter waiter;
+ waiter.thread = pthread_self();
+ waiter.futex = f;
+
+ auto waitq = get_waitq(f);
+ while (waitq->lock.state.test_and_set(std::memory_order_acquire)) {
+ if (f->load(std::memory_order_relaxed) != val) {
+ return;
+ }
+ #if defined(GB_CPU_X86)
+ _mm_pause();
+ #else
+ (void)0; // spin...
+ #endif
+ }
+
+ waiter.waitq = waitq;
+ waiter.lock.init();
+ waiter.lock.lock();
+
+ auto head = &waitq->list;
+ waiter.prev = head->prev;
+ waiter.next = head;
+ waiter.prev->next = &waiter;
+ waiter.next->prev = &waiter;
+
+ waiter.prev->next = &waiter;
+ waiter.next->prev = &waiter;
+
+ sigset_t old_mask, mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGCONT);
+ pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
+
+ if (f->load(std::memory_order_relaxed) == val) {
+ waiter.lock.unlock();
+ waitq->lock.unlock();
+
+ int sig;
+ sigwait(&mask, &sig);
+
+ waitq->lock.lock();
+ waiter.lock.lock();
+
+ while (waitq != waiter.waitq) {
+ auto req = waiter.waitq;
+ waiter.lock.unlock();
+ waitq->lock.unlock();
+ waitq = req;
+ waitq->lock.lock();
+ waiter.lock.lock();
+ }
+ }
+
+ waiter.prev->next = waiter.next;
+ waiter.next->prev = waiter.prev;
+
+ pthread_sigmask(SIG_SETMASK, &old_mask, NULL);
+
+ waiter.lock.unlock();
+ waitq->lock.unlock();
+}
+
#endif
#if defined(GB_SYSTEM_WINDOWS)
#pragma warning(pop)
-#endif \ No newline at end of file
+#endif