aboutsummaryrefslogtreecommitdiff
path: root/src/threading.cpp
diff options
context:
space:
mode:
authorgingerBill <gingerBill@users.noreply.github.com>2025-01-06 13:43:01 +0000
committerGitHub <noreply@github.com>2025-01-06 13:43:01 +0000
commit6e49bbb66853b5d824ac5bbd534ae3e81c4f39aa (patch)
tree50886a3be8f2fcfab053e07cfe9e15f50fa5f9f6 /src/threading.cpp
parentbd96cd0af761994210018ca647eb843dfeb71494 (diff)
parent98efb03934b464a1b23759b5695a12ff37588357 (diff)
Merge branch 'master' into d3d11-annotations
Diffstat (limited to 'src/threading.cpp')
-rw-r--r--src/threading.cpp384
1 files changed, 364 insertions, 20 deletions
diff --git a/src/threading.cpp b/src/threading.cpp
index 3ddc05b0a..af8fd803c 100644
--- a/src/threading.cpp
+++ b/src/threading.cpp
@@ -46,6 +46,18 @@ typedef struct WorkerTask {
void *data;
} WorkerTask;
+typedef struct TaskRingBuffer {
+ std::atomic<isize> size;
+ std::atomic<WorkerTask *> buffer;
+} TaskRingBuffer;
+
+typedef struct TaskQueue {
+ std::atomic<isize> top;
+ std::atomic<isize> bottom;
+
+ std::atomic<TaskRingBuffer *> ring;
+} TaskQueue;
+
struct Thread {
#if defined(GB_SYSTEM_WINDOWS)
void *win32_handle;
@@ -54,13 +66,13 @@ struct Thread {
#endif
isize idx;
+ isize stack_size;
- WorkerTask *queue;
- size_t capacity;
- std::atomic<uint64_t> head_and_tail;
-
- isize stack_size;
+ struct TaskQueue queue;
struct ThreadPool *pool;
+
+ struct Arena *permanent_arena;
+ struct Arena *temporary_arena;
};
typedef std::atomic<i32> Futex;
@@ -107,6 +119,20 @@ gb_internal void thread_set_name (Thread *t, char const *name);
gb_internal void yield_thread(void);
gb_internal void yield_process(void);
+struct Wait_Signal {
+ Futex futex;
+};
+
+gb_internal void wait_signal_until_available(Wait_Signal *ws) {
+ if (ws->futex.load() == 0) {
+ futex_wait(&ws->futex, 0);
+ }
+}
+
+gb_internal void wait_signal_set(Wait_Signal *ws) {
+ ws->futex.store(1);
+ futex_broadcast(&ws->futex);
+}
struct MutexGuard {
MutexGuard() = delete;
@@ -119,17 +145,25 @@ struct MutexGuard {
explicit MutexGuard(RecursiveMutex *rm) noexcept : rm{rm} {
mutex_lock(this->rm);
}
+ explicit MutexGuard(RwMutex *rwm) noexcept : rwm{rwm} {
+ rw_mutex_lock(this->rwm);
+ }
explicit MutexGuard(BlockingMutex &bm) noexcept : bm{&bm} {
mutex_lock(this->bm);
}
explicit MutexGuard(RecursiveMutex &rm) noexcept : rm{&rm} {
mutex_lock(this->rm);
}
+ explicit MutexGuard(RwMutex &rwm) noexcept : rwm{&rwm} {
+ rw_mutex_lock(this->rwm);
+ }
~MutexGuard() noexcept {
if (this->bm) {
mutex_unlock(this->bm);
} else if (this->rm) {
mutex_unlock(this->rm);
+ } else if (this->rwm) {
+ rw_mutex_unlock(this->rwm);
}
}
@@ -137,10 +171,12 @@ struct MutexGuard {
BlockingMutex *bm;
RecursiveMutex *rm;
+ RwMutex *rwm;
};
#define MUTEX_GUARD_BLOCK(m) if (MutexGuard GB_DEFER_3(_mutex_guard_){m})
#define MUTEX_GUARD(m) mutex_lock(m); defer (mutex_unlock(m))
+#define RW_MUTEX_GUARD(m) rw_mutex_lock(m); defer (rw_mutex_unlock(m))
struct RecursiveMutex {
@@ -210,7 +246,7 @@ gb_internal void semaphore_wait(Semaphore *s) {
original_count = s->count().load(std::memory_order_relaxed);
}
- if (!s->count().compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) {
+ if (s->count().compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) {
return;
}
}
@@ -466,6 +502,12 @@ gb_internal u32 thread_current_id(void) {
__asm__("mov %%fs:0x10,%0" : "=r"(thread_id));
#elif defined(GB_SYSTEM_LINUX)
thread_id = gettid();
+#elif defined(GB_SYSTEM_HAIKU)
+ thread_id = find_thread(NULL);
+#elif defined(GB_SYSTEM_FREEBSD)
+ thread_id = pthread_getthreadid_np();
+#elif defined(GB_SYSTEM_NETBSD)
+ thread_id = (u32)_lwp_self();
#else
#error Unsupported architecture for thread_current_id()
#endif
@@ -487,6 +529,9 @@ gb_internal gb_inline void yield_thread(void) {
_mm_pause();
#elif defined(GB_CPU_ARM)
__asm__ volatile ("yield" : : : "memory");
+#elif defined(GB_CPU_RISCV)
+ // I guess?
+ __asm__ volatile ("nop" : : : "memory");
#else
#error Unknown architecture
#endif
@@ -521,6 +566,20 @@ gb_internal void *internal_thread_proc(void *arg) {
}
#endif
+gb_internal TaskRingBuffer *task_ring_init(isize size) {
+ TaskRingBuffer *ring = gb_alloc_item(heap_allocator(), TaskRingBuffer);
+ ring->size = size;
+ ring->buffer = gb_alloc_array(heap_allocator(), WorkerTask, ring->size);
+ return ring;
+}
+
+gb_internal void thread_queue_destroy(TaskQueue *q) {
+ gb_free(heap_allocator(), (*q->ring).buffer);
+ gb_free(heap_allocator(), q->ring);
+}
+
+gb_internal void thread_init_arenas(Thread *t);
+
gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) {
gb_zero_item(t);
#if defined(GB_SYSTEM_WINDOWS)
@@ -529,13 +588,13 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) {
t->posix_handle = 0;
#endif
- t->capacity = 1 << 14; // must be a power of 2
- t->queue = gb_alloc_array(heap_allocator(), WorkerTask, t->capacity);
- t->head_and_tail = 0;
+ // Size must be a power of 2
+ t->queue.ring = task_ring_init(1 << 14);
t->pool = pool;
t->idx = idx;
-}
+ thread_init_arenas(t);
+}
gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) {
thread_init(pool, t, idx);
@@ -568,7 +627,7 @@ gb_internal void thread_join_and_destroy(Thread *t) {
t->posix_handle = 0;
#endif
- gb_free(heap_allocator(), t->queue);
+ thread_queue_destroy(&t->queue);
}
gb_internal void thread_set_name(Thread *t, char const *name) {
@@ -600,15 +659,23 @@ gb_internal void thread_set_name(Thread *t, char const *name) {
pthread_setname_np(name);
#elif defined(GB_SYSTEM_FREEBSD) || defined(GB_SYSTEM_OPENBSD)
pthread_set_name_np(t->posix_handle, name);
+#elif defined(GB_SYSTEM_NETBSD)
+ pthread_setname_np(t->posix_handle, "%s", (void*)name);
#else
// TODO(bill): Test if this works
pthread_setname_np(t->posix_handle, name);
#endif
}
-#if defined(GB_SYSTEM_LINUX)
-#include <linux/futex.h>
+#if defined(GB_SYSTEM_LINUX) || defined(GB_SYSTEM_NETBSD)
+
#include <sys/syscall.h>
+#ifdef GB_SYSTEM_LINUX
+ #include <linux/futex.h>
+#else
+ #include <sys/futex.h>
+ #define SYS_futex SYS___futex
+#endif
gb_internal void futex_signal(Futex *addr) {
int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0);
@@ -660,7 +727,7 @@ gb_internal void futex_broadcast(Futex *addr) {
gb_internal void futex_wait(Futex *addr, Footex val) {
for (;;) {
int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL);
- if (ret == 0) {
+ if (ret == -1) {
if (errno == ETIMEDOUT || errno == EINTR) {
continue;
}
@@ -732,15 +799,59 @@ gb_internal void futex_wait(Futex *f, Footex val) {
#elif defined(GB_SYSTEM_OSX)
-#define UL_COMPARE_AND_WAIT 0x00000001
-#define ULF_NO_ERRNO 0x01000000
+// IMPORTANT NOTE(laytan): We use `OS_SYNC_*_SHARED` and `UL_COMPARE_AND_WAIT_SHARED` flags here.
+// these flags tell the kernel that we are using these futexes across different processes which
+// causes it to opt-out of some optimisations.
+//
+// BUT this is not actually the case! We should be using the normal non-shared version and letting
+// the kernel optimize (I've measured it to be about 10% faster at the parsing/type checking stages).
+//
+// However we have reports of people on MacOS running into kernel panics, and this seems to fix it for them.
+// Which means there is probably a bug in the kernel in one of these non-shared optimisations causing the panic.
+//
+// The panic also doesn't seem to happen on normal M1 CPUs, and happen more on later CPUs or pro/max series.
+// Probably because they have more going on in terms of threads etc.
+
+#if __has_include(<os/os_sync_wait_on_address.h>)
+ #define DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ #include <os/os_sync_wait_on_address.h>
+#endif
+
+#define UL_COMPARE_AND_WAIT 0x00000001
+#define UL_COMPARE_AND_WAIT_SHARED 0x00000003
+#define ULF_NO_ERRNO 0x01000000
extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */
extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
gb_internal void futex_signal(Futex *f) {
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ if (__builtin_available(macOS 14.4, *)) {
+ for (;;) {
+ int ret = os_sync_wake_by_address_any(f, sizeof(Futex), OS_SYNC_WAKE_BY_ADDRESS_SHARED);
+ if (ret >= 0) {
+ return;
+ }
+ if (errno == EINTR || errno == EFAULT) {
+ continue;
+ }
+ if (errno == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futex wake %d %d!\n", ret, errno);
+ }
+ } else {
+ #endif
+ // UL_COMPARE_AND_WAIT_SHARED is only available on macOS 10.15+
+ int WAIT_FLAG;
+ if (__builtin_available(macOS 10.15, *)) {
+ WAIT_FLAG = UL_COMPARE_AND_WAIT_SHARED;
+ } else {
+ WAIT_FLAG = UL_COMPARE_AND_WAIT;
+ }
+
for (;;) {
- int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0);
+ int ret = __ulock_wake(WAIT_FLAG | ULF_NO_ERRNO, f, 0);
if (ret >= 0) {
return;
}
@@ -752,12 +863,40 @@ gb_internal void futex_signal(Futex *f) {
}
GB_PANIC("Failed in futex wake!\n");
}
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ }
+ #endif
}
gb_internal void futex_broadcast(Futex *f) {
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ if (__builtin_available(macOS 14.4, *)) {
+ for (;;) {
+ int ret = os_sync_wake_by_address_all(f, sizeof(Footex), OS_SYNC_WAKE_BY_ADDRESS_SHARED);
+ if (ret >= 0) {
+ return;
+ }
+ if (errno == EINTR || errno == EFAULT) {
+ continue;
+ }
+ if (errno == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futext wake %d %d!\n", ret, errno);
+ }
+ } else {
+ #endif
+ // UL_COMPARE_AND_WAIT_SHARED is only available on macOS 10.15+
+ int WAIT_FLAG;
+ if (__builtin_available(macOS 10.15, *)) {
+ WAIT_FLAG = UL_COMPARE_AND_WAIT_SHARED;
+ } else {
+ WAIT_FLAG = UL_COMPARE_AND_WAIT;
+ }
+
for (;;) {
enum { ULF_WAKE_ALL = 0x00000100 };
- int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0);
+ int ret = __ulock_wake(WAIT_FLAG | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0);
if (ret == 0) {
return;
}
@@ -769,11 +908,42 @@ gb_internal void futex_broadcast(Futex *f) {
}
GB_PANIC("Failed in futex wake!\n");
}
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ }
+ #endif
}
gb_internal void futex_wait(Futex *f, Footex val) {
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ if (__builtin_available(macOS 14.4, *)) {
+ for (;;) {
+ int ret = os_sync_wait_on_address(f, cast(uint64_t)(val), sizeof(Footex), OS_SYNC_WAIT_ON_ADDRESS_SHARED);
+ if (ret >= 0) {
+ if (*f != val) {
+ return;
+ }
+ continue;
+ }
+ if (errno == EINTR || errno == EFAULT) {
+ continue;
+ }
+ if (errno == ENOENT) {
+ return;
+ }
+ GB_PANIC("Failed in futex wait %d %d!\n", ret, errno);
+ }
+ } else {
+ #endif
+ // UL_COMPARE_AND_WAIT_SHARED is only available on macOS 10.15+
+ int WAIT_FLAG;
+ if (__builtin_available(macOS 10.15, *)) {
+ WAIT_FLAG = UL_COMPARE_AND_WAIT_SHARED;
+ } else {
+ WAIT_FLAG = UL_COMPARE_AND_WAIT;
+ }
+
for (;;) {
- int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, val, 0);
+ int ret = __ulock_wait(WAIT_FLAG | ULF_NO_ERRNO, f, val, 0);
if (ret >= 0) {
if (*f != val) {
return;
@@ -789,7 +959,11 @@ gb_internal void futex_wait(Futex *f, Footex val) {
GB_PANIC("Failed in futex wait!\n");
}
+ #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE
+ }
+ #endif
}
+
#elif defined(GB_SYSTEM_WINDOWS)
gb_internal void futex_signal(Futex *f) {
@@ -805,8 +979,178 @@ gb_internal void futex_wait(Futex *f, Footex val) {
WaitOnAddress(f, (void *)&val, sizeof(val), INFINITE);
} while (f->load() == val);
}
+
+#elif defined(GB_SYSTEM_HAIKU)
+
+// Futex implementation taken from https://tavianator.com/2023/futex.html
+
+#include <pthread.h>
+#include <atomic>
+
+struct _Spinlock {
+ std::atomic_flag state;
+
+ void init() {
+ state.clear();
+ }
+
+ void lock() {
+ while (state.test_and_set(std::memory_order_acquire)) {
+ #if defined(GB_CPU_X86)
+ _mm_pause();
+ #else
+ (void)0; // spin...
+ #endif
+ }
+ }
+
+ void unlock() {
+ state.clear(std::memory_order_release);
+ }
+};
+
+struct Futex_Waitq;
+
+struct Futex_Waiter {
+ _Spinlock lock;
+ pthread_t thread;
+ Futex *futex;
+ Futex_Waitq *waitq;
+ Futex_Waiter *prev, *next;
+};
+
+struct Futex_Waitq {
+ _Spinlock lock;
+ Futex_Waiter list;
+
+ void init() {
+ auto head = &list;
+ head->prev = head->next = head;
+ }
+};
+
+// FIXME: This approach may scale badly in the future,
+// possible solution - hash map (leads to deadlocks now).
+
+Futex_Waitq g_waitq = {
+ .lock = ATOMIC_FLAG_INIT,
+ .list = {
+ .prev = &g_waitq.list,
+ .next = &g_waitq.list,
+ },
+};
+
+Futex_Waitq *get_waitq(Futex *f) {
+ // Future hash map method...
+ return &g_waitq;
+}
+
+void futex_signal(Futex *f) {
+ auto waitq = get_waitq(f);
+
+ waitq->lock.lock();
+
+ auto head = &waitq->list;
+ for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
+ if (waiter->futex != f) {
+ continue;
+ }
+ waitq->lock.unlock();
+ pthread_kill(waiter->thread, SIGCONT);
+ return;
+ }
+
+ waitq->lock.unlock();
+}
+
+void futex_broadcast(Futex *f) {
+ auto waitq = get_waitq(f);
+
+ waitq->lock.lock();
+
+ auto head = &waitq->list;
+ for (auto waiter = head->next; waiter != head; waiter = waiter->next) {
+ if (waiter->futex != f) {
+ continue;
+ }
+ if (waiter->next == head) {
+ waitq->lock.unlock();
+ pthread_kill(waiter->thread, SIGCONT);
+ return;
+ } else {
+ pthread_kill(waiter->thread, SIGCONT);
+ }
+ }
+
+ waitq->lock.unlock();
+}
+
+void futex_wait(Futex *f, Footex val) {
+ Futex_Waiter waiter;
+ waiter.thread = pthread_self();
+ waiter.futex = f;
+
+ auto waitq = get_waitq(f);
+ while (waitq->lock.state.test_and_set(std::memory_order_acquire)) {
+ if (f->load(std::memory_order_relaxed) != val) {
+ return;
+ }
+ #if defined(GB_CPU_X86)
+ _mm_pause();
+ #else
+ (void)0; // spin...
+ #endif
+ }
+
+ waiter.waitq = waitq;
+ waiter.lock.init();
+ waiter.lock.lock();
+
+ auto head = &waitq->list;
+ waiter.prev = head->prev;
+ waiter.next = head;
+ waiter.prev->next = &waiter;
+ waiter.next->prev = &waiter;
+
+ waiter.prev->next = &waiter;
+ waiter.next->prev = &waiter;
+
+ sigset_t old_mask, mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGCONT);
+ pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
+
+ if (f->load(std::memory_order_relaxed) == val) {
+ waiter.lock.unlock();
+ waitq->lock.unlock();
+
+ int sig;
+ sigwait(&mask, &sig);
+
+ waitq->lock.lock();
+ waiter.lock.lock();
+
+ while (waitq != waiter.waitq) {
+ auto req = waiter.waitq;
+ waiter.lock.unlock();
+ waitq->lock.unlock();
+ waitq = req;
+ waitq->lock.lock();
+ waiter.lock.lock();
+ }
+ }
+
+ waiter.prev->next = waiter.next;
+ waiter.next->prev = waiter.prev;
+
+ pthread_sigmask(SIG_SETMASK, &old_mask, NULL);
+
+ waiter.lock.unlock();
+ waitq->lock.unlock();
+}
+
#endif
#if defined(GB_SYSTEM_WINDOWS)
#pragma warning(pop)
-#endif \ No newline at end of file
+#endif