package sync2 import "core:time" Atomic_Mutex_State :: enum i32 { Unlocked = 0, Locked = 1, Waiting = 2, } // An Atomic_Mutex is a mutual exclusion lock // The zero value for a Atomic_Mutex is an unlocked mutex // // An Atomic_Mutex must not be copied after first use Atomic_Mutex :: struct { state: Atomic_Mutex_State, } // atomic_mutex_lock locks m atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { @(cold) lock_slow :: proc(m: ^Atomic_Mutex, curr_state: Atomic_Mutex_State) { new_state := curr_state; // Make a copy of it spin_lock: for spin in 0.. 0; i -= 1 { cpu_relax(); } } for { if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked { return; } // TODO(bill): Use a Futex here for Linux to improve performance and error handling cpu_relax(); } } switch v := atomic_exchange_acquire(&m.state, .Locked); v { case .Unlocked: // Okay case: fallthrough; case .Locked, .Waiting: lock_slow(m, v); } } // atomic_mutex_unlock unlocks m atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) { @(cold) unlock_slow :: proc(m: ^Atomic_Mutex) { // TODO(bill): Use a Futex here for Linux to improve performance and error handling } switch atomic_exchange_release(&m.state, .Unlocked) { case .Unlocked: unreachable(); case .Locked: // Okay case .Waiting: unlock_slow(m); } } // atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool { _, ok := atomic_compare_exchange_strong_acquire(&m.state, .Unlocked, .Locked); return ok; } // Example: // // if atomic_mutex_guard(&m) { // ... // } // @(deferred_in=atomic_mutex_unlock) atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool { atomic_mutex_lock(m); return true; } Atomic_RW_Mutex_State :: distinct uint; Atomic_RW_Mutex_State_Half_Width :: size_of(Atomic_RW_Mutex_State)*8/2; Atomic_RW_Mutex_State_Is_Writing :: Atomic_RW_Mutex_State(1); Atomic_RW_Mutex_State_Writer :: Atomic_RW_Mutex_State(1)<<1; Atomic_RW_Mutex_State_Reader :: Atomic_RW_Mutex_State(1)< bool { if atomic_mutex_try_lock(&rw.mutex) { state := atomic_load(&rw.state); if state & Atomic_RW_Mutex_State_Reader_Mask == 0 { _ = atomic_or(&rw.state, Atomic_RW_Mutex_State_Is_Writing); return true; } atomic_mutex_unlock(&rw.mutex); } return false; } // atomic_rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers) atomic_rw_mutex_shared_lock :: proc(rw: ^Atomic_RW_Mutex) { state := atomic_load(&rw.state); for state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 { ok: bool; state, ok = atomic_compare_exchange_weak(&rw.state, state, state + Atomic_RW_Mutex_State_Reader); if ok { return; } } atomic_mutex_lock(&rw.mutex); _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader); atomic_mutex_unlock(&rw.mutex); } // atomic_rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers) atomic_rw_mutex_shared_unlock :: proc(rw: ^Atomic_RW_Mutex) { state := atomic_sub(&rw.state, Atomic_RW_Mutex_State_Reader); if (state & Atomic_RW_Mutex_State_Reader_Mask == Atomic_RW_Mutex_State_Reader) && (state & Atomic_RW_Mutex_State_Is_Writing != 0) { atomic_sema_post(&rw.sema); } } // atomic_rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers) atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool { state := atomic_load(&rw.state); if state & (Atomic_RW_Mutex_State_Is_Writing|Atomic_RW_Mutex_State_Writer_Mask) == 0 { _, ok := atomic_compare_exchange_strong(&rw.state, state, state + Atomic_RW_Mutex_State_Reader); if ok { return true; } } if atomic_mutex_try_lock(&rw.mutex) { _ = atomic_add(&rw.state, Atomic_RW_Mutex_State_Reader); atomic_mutex_unlock(&rw.mutex); return true; } return false; } // Example: // // if atomic_rw_mutex_guard(&m) { // ... // } // @(deferred_in=atomic_rw_mutex_unlock) atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { atomic_rw_mutex_lock(m); return true; } // Example: // // if atomic_rw_mutex_shared_guard(&m) { // ... // } // @(deferred_in=atomic_rw_mutex_shared_unlock) atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { atomic_rw_mutex_shared_lock(m); return true; } // An Atomic_Recursive_Mutex is a recursive mutual exclusion lock // The zero value for a Recursive_Mutex is an unlocked mutex // // An Atomic_Recursive_Mutex must not be copied after first use Atomic_Recursive_Mutex :: struct { owner: int, recursion: int, mutex: Mutex, } atomic_recursive_mutex_lock :: proc(m: ^Atomic_Recursive_Mutex) { tid := current_thread_id(); if tid != m.owner { mutex_lock(&m.mutex); } // inside the lock m.owner = tid; m.recursion += 1; } atomic_recursive_mutex_unlock :: proc(m: ^Atomic_Recursive_Mutex) { tid := current_thread_id(); assert(tid == m.owner); m.recursion -= 1; recursion := m.recursion; if recursion == 0 { m.owner = 0; } if recursion == 0 { mutex_unlock(&m.mutex); } // outside the lock } atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool { tid := current_thread_id(); if m.owner == tid { return mutex_try_lock(&m.mutex); } if !mutex_try_lock(&m.mutex) { return false; } // inside the lock m.owner = tid; m.recursion += 1; return true; } // Example: // // if atomic_recursive_mutex_guard(&m) { // ... // } // @(deferred_in=atomic_recursive_mutex_unlock) atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool { atomic_recursive_mutex_lock(m); return true; } @(private="file") Queue_Item :: struct { next: ^Queue_Item, futex: i32, } @(private="file") queue_item_wait :: proc(item: ^Queue_Item) { for atomic_load_acquire(&item.futex) == 0 { // TODO(bill): Use a Futex here for Linux to improve performance and error handling cpu_relax(); } } @(private="file") queue_item_signal :: proc(item: ^Queue_Item) { atomic_store_release(&item.futex, 1); // TODO(bill): Use a Futex here for Linux to improve performance and error handling } // Atomic_Cond implements a condition variable, a rendezvous point for threads // waiting for signalling the occurence of an event // // An Atomic_Cond must not be copied after first use Atomic_Cond :: struct { queue_mutex: Atomic_Mutex, queue_head: ^Queue_Item, pending: bool, } atomic_cond_wait :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex) { waiter := &Queue_Item{}; atomic_mutex_lock(&c.queue_mutex); waiter.next = c.queue_head; c.queue_head = waiter; atomic_store(&c.pending, true); atomic_mutex_unlock(&c.queue_mutex); atomic_mutex_unlock(m); queue_item_wait(waiter); atomic_mutex_lock(m); } atomic_cond_wait_with_timeout :: proc(c: ^Atomic_Cond, m: ^Atomic_Mutex, timeout: time.Duration) -> bool { // TODO(bill): _cond_wait_with_timeout for unix return false; } atomic_cond_signal :: proc(c: ^Atomic_Cond) { if !atomic_load(&c.pending) { return; } atomic_mutex_lock(&c.queue_mutex); waiter := c.queue_head; if c.queue_head != nil { c.queue_head = c.queue_head.next; } atomic_store(&c.pending, c.queue_head != nil); atomic_mutex_unlock(&c.queue_mutex); if waiter != nil { queue_item_signal(waiter); } } atomic_cond_broadcast :: proc(c: ^Atomic_Cond) { if !atomic_load(&c.pending) { return; } atomic_store(&c.pending, false); atomic_mutex_lock(&c.queue_mutex); waiters := c.queue_head; c.queue_head = nil; atomic_mutex_unlock(&c.queue_mutex); for waiters != nil { queue_item_signal(waiters); waiters = waiters.next; } } // When waited upon, blocks until the internal count is greater than zero, then subtracts one. // Posting to the semaphore increases the count by one, or the provided amount. // // An Atomic_Sema must not be copied after first use Atomic_Sema :: struct { mutex: Atomic_Mutex, cond: Atomic_Cond, count: int, } atomic_sema_wait :: proc(s: ^Atomic_Sema) { atomic_mutex_lock(&s.mutex); defer atomic_mutex_unlock(&s.mutex); for s.count == 0 { atomic_cond_wait(&s.cond, &s.mutex); } s.count -= 1; if s.count > 0 { atomic_cond_signal(&s.cond); } } atomic_sema_post :: proc(s: ^Atomic_Sema, count := 1) { atomic_mutex_lock(&s.mutex); defer atomic_mutex_unlock(&s.mutex); s.count += count; atomic_cond_signal(&s.cond); }