diff options
Diffstat (limited to 'src/threading.cpp')
| -rw-r--r-- | src/threading.cpp | 384 |
1 files changed, 364 insertions, 20 deletions
diff --git a/src/threading.cpp b/src/threading.cpp index 3ddc05b0a..af8fd803c 100644 --- a/src/threading.cpp +++ b/src/threading.cpp @@ -46,6 +46,18 @@ typedef struct WorkerTask { void *data; } WorkerTask; +typedef struct TaskRingBuffer { + std::atomic<isize> size; + std::atomic<WorkerTask *> buffer; +} TaskRingBuffer; + +typedef struct TaskQueue { + std::atomic<isize> top; + std::atomic<isize> bottom; + + std::atomic<TaskRingBuffer *> ring; +} TaskQueue; + struct Thread { #if defined(GB_SYSTEM_WINDOWS) void *win32_handle; @@ -54,13 +66,13 @@ struct Thread { #endif isize idx; + isize stack_size; - WorkerTask *queue; - size_t capacity; - std::atomic<uint64_t> head_and_tail; - - isize stack_size; + struct TaskQueue queue; struct ThreadPool *pool; + + struct Arena *permanent_arena; + struct Arena *temporary_arena; }; typedef std::atomic<i32> Futex; @@ -107,6 +119,20 @@ gb_internal void thread_set_name (Thread *t, char const *name); gb_internal void yield_thread(void); gb_internal void yield_process(void); +struct Wait_Signal { + Futex futex; +}; + +gb_internal void wait_signal_until_available(Wait_Signal *ws) { + if (ws->futex.load() == 0) { + futex_wait(&ws->futex, 0); + } +} + +gb_internal void wait_signal_set(Wait_Signal *ws) { + ws->futex.store(1); + futex_broadcast(&ws->futex); +} struct MutexGuard { MutexGuard() = delete; @@ -119,17 +145,25 @@ struct MutexGuard { explicit MutexGuard(RecursiveMutex *rm) noexcept : rm{rm} { mutex_lock(this->rm); } + explicit MutexGuard(RwMutex *rwm) noexcept : rwm{rwm} { + rw_mutex_lock(this->rwm); + } explicit MutexGuard(BlockingMutex &bm) noexcept : bm{&bm} { mutex_lock(this->bm); } explicit MutexGuard(RecursiveMutex &rm) noexcept : rm{&rm} { mutex_lock(this->rm); } + explicit MutexGuard(RwMutex &rwm) noexcept : rwm{&rwm} { + rw_mutex_lock(this->rwm); + } ~MutexGuard() noexcept { if (this->bm) { mutex_unlock(this->bm); } else if (this->rm) { mutex_unlock(this->rm); + } else if (this->rwm) { + rw_mutex_unlock(this->rwm); } } @@ -137,10 +171,12 @@ struct MutexGuard { BlockingMutex *bm; RecursiveMutex *rm; + RwMutex *rwm; }; #define MUTEX_GUARD_BLOCK(m) if (MutexGuard GB_DEFER_3(_mutex_guard_){m}) #define MUTEX_GUARD(m) mutex_lock(m); defer (mutex_unlock(m)) +#define RW_MUTEX_GUARD(m) rw_mutex_lock(m); defer (rw_mutex_unlock(m)) struct RecursiveMutex { @@ -210,7 +246,7 @@ gb_internal void semaphore_wait(Semaphore *s) { original_count = s->count().load(std::memory_order_relaxed); } - if (!s->count().compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) { + if (s->count().compare_exchange_strong(original_count, original_count-1, std::memory_order_acquire, std::memory_order_acquire)) { return; } } @@ -466,6 +502,12 @@ gb_internal u32 thread_current_id(void) { __asm__("mov %%fs:0x10,%0" : "=r"(thread_id)); #elif defined(GB_SYSTEM_LINUX) thread_id = gettid(); +#elif defined(GB_SYSTEM_HAIKU) + thread_id = find_thread(NULL); +#elif defined(GB_SYSTEM_FREEBSD) + thread_id = pthread_getthreadid_np(); +#elif defined(GB_SYSTEM_NETBSD) + thread_id = (u32)_lwp_self(); #else #error Unsupported architecture for thread_current_id() #endif @@ -487,6 +529,9 @@ gb_internal gb_inline void yield_thread(void) { _mm_pause(); #elif defined(GB_CPU_ARM) __asm__ volatile ("yield" : : : "memory"); +#elif defined(GB_CPU_RISCV) + // I guess? + __asm__ volatile ("nop" : : : "memory"); #else #error Unknown architecture #endif @@ -521,6 +566,20 @@ gb_internal void *internal_thread_proc(void *arg) { } #endif +gb_internal TaskRingBuffer *task_ring_init(isize size) { + TaskRingBuffer *ring = gb_alloc_item(heap_allocator(), TaskRingBuffer); + ring->size = size; + ring->buffer = gb_alloc_array(heap_allocator(), WorkerTask, ring->size); + return ring; +} + +gb_internal void thread_queue_destroy(TaskQueue *q) { + gb_free(heap_allocator(), (*q->ring).buffer); + gb_free(heap_allocator(), q->ring); +} + +gb_internal void thread_init_arenas(Thread *t); + gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { gb_zero_item(t); #if defined(GB_SYSTEM_WINDOWS) @@ -529,13 +588,13 @@ gb_internal void thread_init(ThreadPool *pool, Thread *t, isize idx) { t->posix_handle = 0; #endif - t->capacity = 1 << 14; // must be a power of 2 - t->queue = gb_alloc_array(heap_allocator(), WorkerTask, t->capacity); - t->head_and_tail = 0; + // Size must be a power of 2 + t->queue.ring = task_ring_init(1 << 14); t->pool = pool; t->idx = idx; -} + thread_init_arenas(t); +} gb_internal void thread_init_and_start(ThreadPool *pool, Thread *t, isize idx) { thread_init(pool, t, idx); @@ -568,7 +627,7 @@ gb_internal void thread_join_and_destroy(Thread *t) { t->posix_handle = 0; #endif - gb_free(heap_allocator(), t->queue); + thread_queue_destroy(&t->queue); } gb_internal void thread_set_name(Thread *t, char const *name) { @@ -600,15 +659,23 @@ gb_internal void thread_set_name(Thread *t, char const *name) { pthread_setname_np(name); #elif defined(GB_SYSTEM_FREEBSD) || defined(GB_SYSTEM_OPENBSD) pthread_set_name_np(t->posix_handle, name); +#elif defined(GB_SYSTEM_NETBSD) + pthread_setname_np(t->posix_handle, "%s", (void*)name); #else // TODO(bill): Test if this works pthread_setname_np(t->posix_handle, name); #endif } -#if defined(GB_SYSTEM_LINUX) -#include <linux/futex.h> +#if defined(GB_SYSTEM_LINUX) || defined(GB_SYSTEM_NETBSD) + #include <sys/syscall.h> +#ifdef GB_SYSTEM_LINUX + #include <linux/futex.h> +#else + #include <sys/futex.h> + #define SYS_futex SYS___futex +#endif gb_internal void futex_signal(Futex *addr) { int ret = syscall(SYS_futex, addr, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL, 0); @@ -660,7 +727,7 @@ gb_internal void futex_broadcast(Futex *addr) { gb_internal void futex_wait(Futex *addr, Footex val) { for (;;) { int ret = _umtx_op(addr, UMTX_OP_WAIT_UINT, val, 0, NULL); - if (ret == 0) { + if (ret == -1) { if (errno == ETIMEDOUT || errno == EINTR) { continue; } @@ -732,15 +799,59 @@ gb_internal void futex_wait(Futex *f, Footex val) { #elif defined(GB_SYSTEM_OSX) -#define UL_COMPARE_AND_WAIT 0x00000001 -#define ULF_NO_ERRNO 0x01000000 +// IMPORTANT NOTE(laytan): We use `OS_SYNC_*_SHARED` and `UL_COMPARE_AND_WAIT_SHARED` flags here. +// these flags tell the kernel that we are using these futexes across different processes which +// causes it to opt-out of some optimisations. +// +// BUT this is not actually the case! We should be using the normal non-shared version and letting +// the kernel optimize (I've measured it to be about 10% faster at the parsing/type checking stages). +// +// However we have reports of people on MacOS running into kernel panics, and this seems to fix it for them. +// Which means there is probably a bug in the kernel in one of these non-shared optimisations causing the panic. +// +// The panic also doesn't seem to happen on normal M1 CPUs, and happen more on later CPUs or pro/max series. +// Probably because they have more going on in terms of threads etc. + +#if __has_include(<os/os_sync_wait_on_address.h>) + #define DARWIN_WAIT_ON_ADDRESS_AVAILABLE + #include <os/os_sync_wait_on_address.h> +#endif + +#define UL_COMPARE_AND_WAIT 0x00000001 +#define UL_COMPARE_AND_WAIT_SHARED 0x00000003 +#define ULF_NO_ERRNO 0x01000000 extern "C" int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); /* timeout is specified in microseconds */ extern "C" int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); gb_internal void futex_signal(Futex *f) { + #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE + if (__builtin_available(macOS 14.4, *)) { + for (;;) { + int ret = os_sync_wake_by_address_any(f, sizeof(Futex), OS_SYNC_WAKE_BY_ADDRESS_SHARED); + if (ret >= 0) { + return; + } + if (errno == EINTR || errno == EFAULT) { + continue; + } + if (errno == ENOENT) { + return; + } + GB_PANIC("Failed in futex wake %d %d!\n", ret, errno); + } + } else { + #endif + // UL_COMPARE_AND_WAIT_SHARED is only available on macOS 10.15+ + int WAIT_FLAG; + if (__builtin_available(macOS 10.15, *)) { + WAIT_FLAG = UL_COMPARE_AND_WAIT_SHARED; + } else { + WAIT_FLAG = UL_COMPARE_AND_WAIT; + } + for (;;) { - int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, 0); + int ret = __ulock_wake(WAIT_FLAG | ULF_NO_ERRNO, f, 0); if (ret >= 0) { return; } @@ -752,12 +863,40 @@ gb_internal void futex_signal(Futex *f) { } GB_PANIC("Failed in futex wake!\n"); } + #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE + } + #endif } gb_internal void futex_broadcast(Futex *f) { + #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE + if (__builtin_available(macOS 14.4, *)) { + for (;;) { + int ret = os_sync_wake_by_address_all(f, sizeof(Footex), OS_SYNC_WAKE_BY_ADDRESS_SHARED); + if (ret >= 0) { + return; + } + if (errno == EINTR || errno == EFAULT) { + continue; + } + if (errno == ENOENT) { + return; + } + GB_PANIC("Failed in futext wake %d %d!\n", ret, errno); + } + } else { + #endif + // UL_COMPARE_AND_WAIT_SHARED is only available on macOS 10.15+ + int WAIT_FLAG; + if (__builtin_available(macOS 10.15, *)) { + WAIT_FLAG = UL_COMPARE_AND_WAIT_SHARED; + } else { + WAIT_FLAG = UL_COMPARE_AND_WAIT; + } + for (;;) { enum { ULF_WAKE_ALL = 0x00000100 }; - int ret = __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0); + int ret = __ulock_wake(WAIT_FLAG | ULF_NO_ERRNO | ULF_WAKE_ALL, f, 0); if (ret == 0) { return; } @@ -769,11 +908,42 @@ gb_internal void futex_broadcast(Futex *f) { } GB_PANIC("Failed in futex wake!\n"); } + #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE + } + #endif } gb_internal void futex_wait(Futex *f, Footex val) { + #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE + if (__builtin_available(macOS 14.4, *)) { + for (;;) { + int ret = os_sync_wait_on_address(f, cast(uint64_t)(val), sizeof(Footex), OS_SYNC_WAIT_ON_ADDRESS_SHARED); + if (ret >= 0) { + if (*f != val) { + return; + } + continue; + } + if (errno == EINTR || errno == EFAULT) { + continue; + } + if (errno == ENOENT) { + return; + } + GB_PANIC("Failed in futex wait %d %d!\n", ret, errno); + } + } else { + #endif + // UL_COMPARE_AND_WAIT_SHARED is only available on macOS 10.15+ + int WAIT_FLAG; + if (__builtin_available(macOS 10.15, *)) { + WAIT_FLAG = UL_COMPARE_AND_WAIT_SHARED; + } else { + WAIT_FLAG = UL_COMPARE_AND_WAIT; + } + for (;;) { - int ret = __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, f, val, 0); + int ret = __ulock_wait(WAIT_FLAG | ULF_NO_ERRNO, f, val, 0); if (ret >= 0) { if (*f != val) { return; @@ -789,7 +959,11 @@ gb_internal void futex_wait(Futex *f, Footex val) { GB_PANIC("Failed in futex wait!\n"); } + #ifdef DARWIN_WAIT_ON_ADDRESS_AVAILABLE + } + #endif } + #elif defined(GB_SYSTEM_WINDOWS) gb_internal void futex_signal(Futex *f) { @@ -805,8 +979,178 @@ gb_internal void futex_wait(Futex *f, Footex val) { WaitOnAddress(f, (void *)&val, sizeof(val), INFINITE); } while (f->load() == val); } + +#elif defined(GB_SYSTEM_HAIKU) + +// Futex implementation taken from https://tavianator.com/2023/futex.html + +#include <pthread.h> +#include <atomic> + +struct _Spinlock { + std::atomic_flag state; + + void init() { + state.clear(); + } + + void lock() { + while (state.test_and_set(std::memory_order_acquire)) { + #if defined(GB_CPU_X86) + _mm_pause(); + #else + (void)0; // spin... + #endif + } + } + + void unlock() { + state.clear(std::memory_order_release); + } +}; + +struct Futex_Waitq; + +struct Futex_Waiter { + _Spinlock lock; + pthread_t thread; + Futex *futex; + Futex_Waitq *waitq; + Futex_Waiter *prev, *next; +}; + +struct Futex_Waitq { + _Spinlock lock; + Futex_Waiter list; + + void init() { + auto head = &list; + head->prev = head->next = head; + } +}; + +// FIXME: This approach may scale badly in the future, +// possible solution - hash map (leads to deadlocks now). + +Futex_Waitq g_waitq = { + .lock = ATOMIC_FLAG_INIT, + .list = { + .prev = &g_waitq.list, + .next = &g_waitq.list, + }, +}; + +Futex_Waitq *get_waitq(Futex *f) { + // Future hash map method... + return &g_waitq; +} + +void futex_signal(Futex *f) { + auto waitq = get_waitq(f); + + waitq->lock.lock(); + + auto head = &waitq->list; + for (auto waiter = head->next; waiter != head; waiter = waiter->next) { + if (waiter->futex != f) { + continue; + } + waitq->lock.unlock(); + pthread_kill(waiter->thread, SIGCONT); + return; + } + + waitq->lock.unlock(); +} + +void futex_broadcast(Futex *f) { + auto waitq = get_waitq(f); + + waitq->lock.lock(); + + auto head = &waitq->list; + for (auto waiter = head->next; waiter != head; waiter = waiter->next) { + if (waiter->futex != f) { + continue; + } + if (waiter->next == head) { + waitq->lock.unlock(); + pthread_kill(waiter->thread, SIGCONT); + return; + } else { + pthread_kill(waiter->thread, SIGCONT); + } + } + + waitq->lock.unlock(); +} + +void futex_wait(Futex *f, Footex val) { + Futex_Waiter waiter; + waiter.thread = pthread_self(); + waiter.futex = f; + + auto waitq = get_waitq(f); + while (waitq->lock.state.test_and_set(std::memory_order_acquire)) { + if (f->load(std::memory_order_relaxed) != val) { + return; + } + #if defined(GB_CPU_X86) + _mm_pause(); + #else + (void)0; // spin... + #endif + } + + waiter.waitq = waitq; + waiter.lock.init(); + waiter.lock.lock(); + + auto head = &waitq->list; + waiter.prev = head->prev; + waiter.next = head; + waiter.prev->next = &waiter; + waiter.next->prev = &waiter; + + waiter.prev->next = &waiter; + waiter.next->prev = &waiter; + + sigset_t old_mask, mask; + sigemptyset(&mask); + sigaddset(&mask, SIGCONT); + pthread_sigmask(SIG_BLOCK, &mask, &old_mask); + + if (f->load(std::memory_order_relaxed) == val) { + waiter.lock.unlock(); + waitq->lock.unlock(); + + int sig; + sigwait(&mask, &sig); + + waitq->lock.lock(); + waiter.lock.lock(); + + while (waitq != waiter.waitq) { + auto req = waiter.waitq; + waiter.lock.unlock(); + waitq->lock.unlock(); + waitq = req; + waitq->lock.lock(); + waiter.lock.lock(); + } + } + + waiter.prev->next = waiter.next; + waiter.next->prev = waiter.prev; + + pthread_sigmask(SIG_SETMASK, &old_mask, NULL); + + waiter.lock.unlock(); + waitq->lock.unlock(); +} + #endif #if defined(GB_SYSTEM_WINDOWS) #pragma warning(pop) -#endif
\ No newline at end of file +#endif |