diff options
| author | gingerBill <bill@gingerbill.org> | 2021-05-20 21:02:05 +0100 |
|---|---|---|
| committer | gingerBill <bill@gingerbill.org> | 2021-05-20 21:02:05 +0100 |
| commit | fe74b479c69fde6d7651e5dff6429736a170ec5d (patch) | |
| tree | 4409f70127007baba1d0517c1ac1efb62d766ec8 /core/sync/sync2 | |
| parent | 92abddddc5ca4be622e93856c7246159b594e9e9 (diff) | |
Begin changes to sync2
Diffstat (limited to 'core/sync/sync2')
| -rw-r--r-- | core/sync/sync2/atomic.odin | 2 | ||||
| -rw-r--r-- | core/sync/sync2/primitives.odin | 4 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_atomic.odin | 402 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_pthreads.odin | 2 |
4 files changed, 248 insertions, 162 deletions
diff --git a/core/sync/sync2/atomic.odin b/core/sync/sync2/atomic.odin index fa86ec352..efefc8025 100644 --- a/core/sync/sync2/atomic.odin +++ b/core/sync/sync2/atomic.odin @@ -56,6 +56,7 @@ atomic_exchange_release :: intrinsics.atomic_xchg_rel; atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel; atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed; +// Returns value and optional ok boolean atomic_compare_exchange_strong :: intrinsics.atomic_cxchg; atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq; atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel; @@ -66,6 +67,7 @@ atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_fa atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed; atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed; +// Returns value and optional ok boolean atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak; atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq; atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel; diff --git a/core/sync/sync2/primitives.odin b/core/sync/sync2/primitives.odin index 1ed83f706..e524586ec 100644 --- a/core/sync/sync2/primitives.odin +++ b/core/sync/sync2/primitives.odin @@ -15,7 +15,7 @@ mutex_lock :: proc(m: ^Mutex) { _mutex_lock(m); } -// mutex_lock unlocks m +// mutex_unlock unlocks m mutex_unlock :: proc(m: ^Mutex) { _mutex_unlock(m); } @@ -103,7 +103,7 @@ rw_mutex_shared_guard :: proc(m: ^RW_Mutex) -> bool { -// A Recusrive_Mutex is a recursive mutual exclusion lock +// A Recursive_Mutex is a recursive mutual exclusion lock // The zero value for a Recursive_Mutex is an unlocked mutex // // A Recursive_Mutex must not be copied after first use diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/sync2/primitives_atomic.odin index 7043f8c84..aed01eb1f 100644 --- a/core/sync/sync2/primitives_atomic.odin +++ b/core/sync/sync2/primitives_atomic.odin @@ -1,159 +1,193 @@ -//+build linux, darwin, freebsd -//+private package sync2 -when !#config(ODIN_SYNC_USE_PTHREADS, true) { - import "core:time" import "core:runtime" -_Mutex_State :: enum i32 { +Atomic_Mutex_State :: enum i32 { Unlocked = 0, Locked = 1, Waiting = 2, } -_Mutex :: struct { - state: _Mutex_State, -} -_mutex_lock :: proc(m: ^Mutex) { - if atomic_xchg_rel(&m.impl.state, .Unlocked) != .Unlocked { - _mutex_unlock_slow(m); - } -} - -_mutex_unlock :: proc(m: ^Mutex) { - switch atomic_xchg_rel(&m.impl.state, .Unlocked) { - case .Unlocked: - unreachable(); - case .Locked: - // Okay - case .Waiting: - _mutex_unlock_slow(m); - } -} -_mutex_try_lock :: proc(m: ^Mutex) -> bool { - _, ok := atomic_cxchg_acq(&m.impl.state, .Unlocked, .Locked); - return ok; +// An Atomic_Mutex is a mutual exclusion lock +// The zero value for a Atomic_Mutex is an unlocked mutex +// +// An Atomic_Mutex must not be copied after first use +Atomic_Mutex :: struct { + state: Atomic_Mutex_State, } - -@(cold) -_mutex_lock_slow :: proc(m: ^Mutex, curr_state: _Mutex_State) { - new_state := curr_state; // Make a copy of it - - spin_lock: for spin in 0..<i32(100) { - state, ok := atomic_cxchgweak_acq(&m.impl.state, .Unlocked, new_state); - if ok { - return; +// atomic_mutex_lock locks m +atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { + @(cold) + lock_slow :: proc(m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) { + new_state := curr_state; // Make a copy of it + + spin_lock: for spin in 0..<i32(100) { + state, ok := atomic_compare_exchange_weak_acquire(&m.state, .Unlocked, new_state); + if ok { + return; + } + + if state == .Waiting { + break spin_lock; + } + + for i := min(spin+1, 32); i > 0; i -= 1 { + cpu_relax(); + } } - if state == .Waiting { - break spin_lock; - } + for { + if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked { + return; + } - for i := min(spin+1, 32); i > 0; i -= 1 { + // TODO(bill): Use a Futex here for Linux to improve performance and error handling cpu_relax(); } } - for { - if atomic_xchg_acq(&m.impl.state, .Waiting) == .Unlocked { - return; - } + switch v := atomic_exchange_acquire(&m.state, .Locked); v { + case .Unlocked: + // Okay + case: fallthrough; + case .Locked, .Waiting: + lock_slow(m, v); + } +} + +// atomic_mutex_unlock unlocks m +atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) { + @(cold) + unlock_slow :: proc(m: ^Atomic_Mutex) { // TODO(bill): Use a Futex here for Linux to improve performance and error handling - cpu_relax(); } + + + switch atomic_exchange_release(&m.state, .Unlocked) { + case .Unlocked: + unreachable(); + case .Locked: + // Okay + case .Waiting: + unlock_slow(m); + } +} + +// atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure +atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool { + _, ok := atomic_compare_exchange_strong_acquire(&m.state, .Unlocked, .Locked); + return ok; } -@(cold) -_mutex_unlock_slow :: proc(m: ^Mutex) { - // TODO(bill): Use a Futex here for Linux to improve performance and error handling +// Example: +// +// if atomic_mutex_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_mutex_unlock) +atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool { + atomic_mutex_lock(m); + return true; } -RW_Mutex_State :: distinct uint; -RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2; -RW_Mutex_State_Is_Writing :: RW_Mutex_State(1); -RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1; -RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width; +Atomic_RW_Mutex_State :: distinct uint; +Atomic_RW_Mutex_State_Half_Width :: size_of(Atomic_RW_Mutex_State)*8/2; +Atomic_RW_Mutex_State_Is_Writing :: Atomic_RW_Mutex_State(1); +Atomic_RW_Mutex_State_Writer :: Atomic_RW_Mutex_State(1)<<1; +Atomic_RW_Mutex_State_Reader :: Atomic_RW_Mutex_State(1)<<Atomic_RW_Mutex_State_Half_Width; -RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1; -RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width; +Atomic_RW_Mutex_State_Writer_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << 1; +Atomic_RW_Mutex_State_Reader_Mask :: Atomic_RW_Mutex_State(1<<(Atomic_RW_Mutex_State_Half_Width-1) - 1) << Atomic_RW_Mutex_State_Half_Width; -_RW_Mutex :: struct { - state: RW_Mutex_State, - mutex: Mutex, - sema: Sema, +// An Atomic_RW_Mutex is a reader/writer mutual exclusion lock +// The lock can be held by any arbitrary number of readers or a single writer +// The zero value for an Atomic_RW_Mutex is an unlocked mutex +// +// An Atomic_RW_Mutex must not be copied after first use +Atomic_RW_Mutex :: struct { + state: Atomic_RW_Mutex_State, + mutex: Atomic_Mutex, + sema: Atomic_Sema, } -_rw_mutex_lock :: proc(rw: ^RW_Mutex) { - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Writer); - mutex_lock(&rw.impl.mutex); +// atomic_rw_mutex_lock locks rw for writing (with a single writer) +// If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available. +atomic_rw_mutex_lock :: proc(rw: ^Atomic_RW_Mutex) { + _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Writer); + atomic_mutex_lock(&rw.mutex); - state := atomic_or(&rw.impl.state, RW_Mutex_State_Writer); - if state & RW_Mutex_State_Reader_Mask != 0 { - sema_wait(&rw.impl.sema); + state := atomic_or(&rw.state, Atomic_RW_Mutex_State_Writer); + if state & Atomic_RW_Mutex_State_Reader_Mask != 0 { + atomic_sema_wait(&rw.sema); } } -_rw_mutex_unlock :: proc(rw: ^RW_Mutex) { - _ = atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing); - mutex_unlock(&rw.impl.mutex); +// atomic_rw_mutex_unlock unlocks rw for writing (with a single writer) +atomic_rw_mutex_unlock :: proc(rw: ^Atomic_RW_Mutex) { + _ = atomic_and(&rw.state, ~Atomic_RW_Mutex_State_Is_Writing); + atomic_mutex_unlock(&rw.mutex); } -_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { - if mutex_try_lock(&rw.impl.mutex) { - state := atomic_load(&rw.impl.state); - if state & RW_Mutex_State_Reader_Mask == 0 { - _ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing); +// atomic_rw_mutex_try_lock tries to lock rw for writing (with a single writer) +atomic_rw_mutex_try_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool { + if atomic_mutex_try_lock(&rw.mutex) { + state := atomic_load(&rw.state); + if state & Atomic_RW_Mutex_State_Reader_Mask == 0 { + _ = atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing); return true; } - mutex_unlock(&rw.impl.mutex); + atomic_mutex_unlock(&rw.mutex); } return false; } -_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { - state := atomic_load(&rw.impl.state); - for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { +// atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers) +atomic_rw_mutex_shared_lock :: proc(rw: ^Atomic_RW_Mutex) { + state := atomic_load(&rw.state); + for state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 { ok: bool; - state, ok = atomic_cxchgweak(&rw.impl.state, state, state + RW_Mutex_State_Reader); + state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader); if ok { return; } } - mutex_lock(&rw.impl.mutex); - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader); - mutex_unlock(&rw.impl.mutex); + atomic_mutex_lock(&rw.mutex); + _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader); + atomic_mutex_unlock(&rw.mutex); } -_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { - state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader); +// atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers) +atomic_rw_mutex_shared_unlock :: proc(rw: ^Atomic_RW_Mutex) { + state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader); - if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && - (state & RW_Mutex_State_Is_Writing != 0) { - sema_post(&rw.impl.sema); + if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) && + (state & Atomic_RW_Mutex_State_Is_Writing != 0) { + atomic_sema_post(&rw.sema); } } -_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { - state := atomic_load(&rw.impl.state); - if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { - _, ok := atomic_cxchg(&rw.impl.state, state, state + RW_Mutex_State_Reader); +// atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers) +atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool { + state := atomic_load(&rw.state); + if state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 { + _, ok := atomic_compare_exchange_strong(&rw.state, state, state + Atomic_RW_Mutex_State_Reader); if ok { return true; } } - if mutex_try_lock(&rw.impl.mutex) { - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader); - mutex_unlock(&rw.impl.mutex); + if atomic_mutex_try_lock(&rw.mutex) { + _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader); + atomic_mutex_unlock(&rw.mutex); return true; } @@ -161,127 +195,177 @@ _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { } -_Recursive_Mutex :: struct { +// Example: +// +// if atomic_rw_mutex_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_rw_mutex_unlock) +atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { + atomic_rw_mutex_lock(m); + return true; +} + +// Example: +// +// if atomic_rw_mutex_shared_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_rw_mutex_shared_unlock) +atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { + atomic_rw_mutex_shared_lock(m); + return true; +} + + + + +// An Atomic_Recursive_Mutex is a recursive mutual exclusion lock +// The zero value for a Recursive_Mutex is an unlocked mutex +// +// An Atomic_Recursive_Mutex must not be copied after first use +Atomic_Recursive_Mutex :: struct { owner: int, recursion: int, mutex: Mutex, } -_recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { +atomic_recursive_mutex_lock :: proc(m: ^Atomic_Recursive_Mutex) { tid := runtime.current_thread_id(); - if tid != m.impl.owner { - mutex_lock(&m.impl.mutex); + if tid != m.owner { + mutex_lock(&m.mutex); } // inside the lock - m.impl.owner = tid; - m.impl.recursion += 1; + m.owner = tid; + m.recursion += 1; } -_recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { +atomic_recursive_mutex_unlock :: proc(m: ^Atomic_Recursive_Mutex) { tid := runtime.current_thread_id(); - assert(tid == m.impl.owner); - m.impl.recursion -= 1; - recursion := m.impl.recursion; + assert(tid == m.owner); + m.recursion -= 1; + recursion := m.recursion; if recursion == 0 { - m.impl.owner = 0; + m.owner = 0; } if recursion == 0 { - mutex_unlock(&m.impl.mutex); + mutex_unlock(&m.mutex); } // outside the lock } -_recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { +atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool { tid := runtime.current_thread_id(); - if m.impl.owner == tid { - return mutex_try_lock(&m.impl.mutex); + if m.owner == tid { + return mutex_try_lock(&m.mutex); } - if !mutex_try_lock(&m.impl.mutex) { + if !mutex_try_lock(&m.mutex) { return false; } // inside the lock - m.impl.owner = tid; - m.impl.recursion += 1; + m.owner = tid; + m.recursion += 1; return true; } +// Example: +// +// if atomic_recursive_mutex_guard(&m) { +// ... +// } +// +@(deferred_in=atomic_recursive_mutex_unlock) +atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool { + atomic_recursive_mutex_lock(m); + return true; +} + +@(private="file") Queue_Item :: struct { next: ^Queue_Item, futex: i32, } +@(private="file") queue_item_wait :: proc(item: ^Queue_Item) { - for atomic_load_acq(&item.futex) == 0 { + for atomic_load_acquire(&item.futex) == 0 { // TODO(bill): Use a Futex here for Linux to improve performance and error handling cpu_relax(); } } +@(private="file") queue_item_signal :: proc(item: ^Queue_Item) { - atomic_store_rel(&item.futex, 1); + atomic_store_release(&item.futex, 1); // TODO(bill): Use a Futex here for Linux to improve performance and error handling } -_Cond :: struct { - queue_mutex: Mutex, +// Atomic_Cond implements a condition variable, a rendezvous point for threads +// waiting for signalling the occurence of an event +// +// An Atomic_Cond must not be copied after first use +Atomic_Cond :: struct { + queue_mutex: Atomic_Mutex, queue_head: ^Queue_Item, pending: bool, } -_cond_wait :: proc(c: ^Cond, m: ^Mutex) { +atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) { waiter := &Queue_Item{}; - mutex_lock(&c.impl.queue_mutex); - waiter.next = c.impl.queue_head; - c.impl.queue_head = waiter; + atomic_mutex_lock(&c.queue_mutex); + waiter.next = c.queue_head; + c.queue_head = waiter; - atomic_store(&c.impl.pending, true); - mutex_unlock(&c.impl.queue_mutex); + atomic_store(&c.pending, true); + atomic_mutex_unlock(&c.queue_mutex); - mutex_unlock(m); + atomic_mutex_unlock(m); queue_item_wait(waiter); - mutex_lock(m); + atomic_mutex_lock(m); } -_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { +atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, timeout: time.Duration) -> bool { // TODO(bill): _cond_wait_with_timeout for unix return false; } -_cond_signal :: proc(c: ^Cond) { - if !atomic_load(&c.impl.pending) { +atomic_cond_signal :: proc(c: ^Atomic_Cond) { + if !atomic_load(&c.pending) { return; } - mutex_lock(&c.impl.queue_mutex); - waiter := c.impl.queue_head; - if c.impl.queue_head != nil { - c.impl.queue_head = c.impl.queue_head.next; + atomic_mutex_lock(&c.queue_mutex); + waiter := c.queue_head; + if c.queue_head != nil { + c.queue_head = c.queue_head.next; } - atomic_store(&c.impl.pending, c.impl.queue_head != nil); - mutex_unlock(&c.impl.queue_mutex); + atomic_store(&c.pending, c.queue_head != nil); + atomic_mutex_unlock(&c.queue_mutex); if waiter != nil { queue_item_signal(waiter); } } -_cond_broadcast :: proc(c: ^Cond) { - if !atomic_load(&c.impl.pending) { +atomic_cond_broadcast :: proc(c: ^Atomic_Cond) { + if !atomic_load(&c.pending) { return; } - atomic_store(&c.impl.pending, false); + atomic_store(&c.pending, false); - mutex_lock(&c.impl.queue_mutex); - waiters := c.impl.queue_head; - c.impl.queue_head = nil; - mutex_unlock(&c.impl.queue_mutex); + atomic_mutex_lock(&c.queue_mutex); + waiters := c.queue_head; + c.queue_head = nil; + atomic_mutex_unlock(&c.queue_mutex); for waiters != nil { queue_item_signal(waiters); @@ -289,35 +373,35 @@ _cond_broadcast :: proc(c: ^Cond) { } } -_Sema :: struct { - mutex: Mutex, - cond: Cond, +// When waited upon, blocks until the internal count is greater than zero, then subtracts one. +// Posting to the semaphore increases the count by one, or the provided amount. +// +// An Atomic_Sema must not be copied after first use +Atomic_Sema :: struct { + mutex: Atomic_Mutex, + cond: Atomic_Cond, count: int, } -_sema_wait :: proc(s: ^Sema) { - mutex_lock(&s.impl.mutex); - defer mutex_unlock(&s.impl.mutex); +atomic_sema_wait :: proc(s: ^Atomic_Sema) { + atomic_mutex_lock(&s.mutex); + defer atomic_mutex_unlock(&s.mutex); - for s.impl.count == 0 { - cond_wait(&s.impl.cond, &s.impl.mutex); + for s.count == 0 { + atomic_cond_wait(&s.cond, &s.mutex); } - s.impl.count -= 1; - if s.impl.count > 0 { - cond_signal(&s.impl.cond); + s.count -= 1; + if s.count > 0 { + atomic_cond_signal(&s.cond); } } -_sema_post :: proc(s: ^Sema, count := 1) { - mutex_lock(&s.impl.mutex); - defer mutex_unlock(&s.impl.mutex); +atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) { + atomic_mutex_lock(&s.mutex); + defer atomic_mutex_unlock(&s.mutex); - s.impl.count += count; - cond_signal(&s.impl.cond); + s.count += count; + atomic_cond_signal(&s.cond); } - - - -} // !ODIN_SYNC_USE_PTHREADS diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/sync2/primitives_pthreads.odin index 5fd43d871..7e45e0565 100644 --- a/core/sync/sync2/primitives_pthreads.odin +++ b/core/sync/sync2/primitives_pthreads.odin @@ -1,4 +1,4 @@ -//+build linux, darwin, freebsd +//+build linux, freebsd //+private package sync2 |