diff options
| author | Jeroen van Rijn <Kelimion@users.noreply.github.com> | 2022-04-27 14:37:15 +0200 |
|---|---|---|
| committer | Jeroen van Rijn <Kelimion@users.noreply.github.com> | 2022-04-27 14:37:15 +0200 |
| commit | c4e0d1efa1ec655bae9134b95a0fcd060cc7bbea (patch) | |
| tree | c29bd0b78138e8d67aebe34ac689d13e32d9d15f /core/sync | |
| parent | 6e61abc7d06f22129f93110a9f652c3eec21f0c6 (diff) | |
| parent | 9349dfba8fec53f52f77a0c8928e115ec93ff447 (diff) | |
Merge branch 'master' into xml
Diffstat (limited to 'core/sync')
| -rw-r--r-- | core/sync/atomic.odin | 205 | ||||
| -rw-r--r-- | core/sync/barrier.odin | 81 | ||||
| -rw-r--r-- | core/sync/channel.odin | 889 | ||||
| -rw-r--r-- | core/sync/channel_unix.odin | 16 | ||||
| -rw-r--r-- | core/sync/channel_windows.odin | 33 | ||||
| -rw-r--r-- | core/sync/extended.odin (renamed from core/sync/sync2/extended.odin) | 103 | ||||
| -rw-r--r-- | core/sync/futex_darwin.odin (renamed from core/sync/sync2/futex_darwin.odin) | 2 | ||||
| -rw-r--r-- | core/sync/futex_freebsd.odin | 75 | ||||
| -rw-r--r-- | core/sync/futex_linux.odin (renamed from core/sync/sync2/futex_linux.odin) | 8 | ||||
| -rw-r--r-- | core/sync/futex_openbsd.odin | 78 | ||||
| -rw-r--r-- | core/sync/futex_windows.odin (renamed from core/sync/sync2/futex_windows.odin) | 2 | ||||
| -rw-r--r-- | core/sync/primitives.odin (renamed from core/sync/sync2/primitives.odin) | 52 | ||||
| -rw-r--r-- | core/sync/primitives_atomic.odin (renamed from core/sync/sync2/primitives_atomic.odin) | 76 | ||||
| -rw-r--r-- | core/sync/primitives_darwin.odin (renamed from core/sync/sync2/primitives_darwin.odin) | 2 | ||||
| -rw-r--r-- | core/sync/primitives_freebsd.odin | 46 | ||||
| -rw-r--r-- | core/sync/primitives_internal.odin | 125 | ||||
| -rw-r--r-- | core/sync/primitives_linux.odin | 47 | ||||
| -rw-r--r-- | core/sync/primitives_openbsd.odin | 46 | ||||
| -rw-r--r-- | core/sync/primitives_windows.odin (renamed from core/sync/sync2/primitives_windows.odin) | 2 | ||||
| -rw-r--r-- | core/sync/sema_internal.odin (renamed from core/sync/sync2/sema_internal.odin) | 13 | ||||
| -rw-r--r-- | core/sync/sync.odin | 123 | ||||
| -rw-r--r-- | core/sync/sync2/atomic.odin | 79 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_internal.odin | 184 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_linux.odin | 9 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_pthreads.odin | 58 | ||||
| -rw-r--r-- | core/sync/sync_darwin.odin | 54 | ||||
| -rw-r--r-- | core/sync/sync_freebsd.odin | 40 | ||||
| -rw-r--r-- | core/sync/sync_linux.odin | 36 | ||||
| -rw-r--r-- | core/sync/sync_unix.odin | 248 | ||||
| -rw-r--r-- | core/sync/sync_util.odin (renamed from core/sync/sync2/sync_util.odin) | 30 | ||||
| -rw-r--r-- | core/sync/sync_windows.odin | 180 | ||||
| -rw-r--r-- | core/sync/wait_group.odin | 58 |
32 files changed, 595 insertions, 2405 deletions
diff --git a/core/sync/atomic.odin b/core/sync/atomic.odin index 21dcea178..f537764c4 100644 --- a/core/sync/atomic.odin +++ b/core/sync/atomic.odin @@ -2,167 +2,44 @@ package sync import "core:intrinsics" -Ordering :: enum { - Relaxed, // Monotonic - Release, - Acquire, - Acquire_Release, - Sequentially_Consistent, -} - -strongest_failure_ordering_table := [Ordering]Ordering{ - .Relaxed = .Relaxed, - .Release = .Relaxed, - .Acquire = .Acquire, - .Acquire_Release = .Acquire, - .Sequentially_Consistent = .Sequentially_Consistent, -} - -strongest_failure_ordering :: #force_inline proc(order: Ordering) -> Ordering { - return strongest_failure_ordering_table[order] -} - -fence :: #force_inline proc($order: Ordering) { - when order == .Relaxed { #panic("there is no such thing as a relaxed fence") } - else when order == .Release { intrinsics.atomic_fence_rel() } - else when order == .Acquire { intrinsics.atomic_fence_acq() } - else when order == .Acquire_Release { intrinsics.atomic_fence_acqrel() } - else when order == .Sequentially_Consistent { intrinsics.atomic_fence() } - else { #panic("unknown order") } -} - - -atomic_store :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) { - when order == .Relaxed { intrinsics.atomic_store_relaxed(dst, val) } - else when order == .Release { intrinsics.atomic_store_rel(dst, val) } - else when order == .Sequentially_Consistent { intrinsics.atomic_store(dst, val) } - else when order == .Acquire { #panic("there is not such thing as an acquire store") } - else when order == .Acquire_Release { #panic("there is not such thing as an acquire/release store") } - else { #panic("unknown order") } -} - -atomic_load :: #force_inline proc(dst: ^$T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_load_relaxed(dst) } - else when order == .Acquire { return intrinsics.atomic_load_acq(dst) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_load(dst) } - else when order == .Release { #panic("there is no such thing as a release load") } - else when order == .Acquire_Release { #panic("there is no such thing as an acquire/release load") } - else { #panic("unknown order") } -} - -atomic_swap :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_xchg_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_xchg_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_xchg_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_xchg_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_xchg(dst, val) } - else { #panic("unknown order") } -} - -atomic_compare_exchange :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) { - when failure == .Relaxed { - when success == .Relaxed { return intrinsics.atomic_cxchg_relaxed(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchg_acq_failrelaxed(dst, old, new) } - else when success == .Acquire_Release { return intrinsics.atomic_cxchg_acqrel_failrelaxed(dst, old, new) } - else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg_failrelaxed(dst, old, new) } - else when success == .Release { return intrinsics.atomic_cxchg_rel(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire { - when success == .Release { return intrinsics.atomic_cxchg_acqrel(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchg_acq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Sequentially_Consistent { - when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire_Release { - #panic("there is not such thing as an acquire/release failure ordering") - } else when failure == .Release { - when success == .Acquire { return instrinsics.atomic_cxchg_failacq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else { - return T{}, false - } - -} - -atomic_compare_exchange_weak :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) { - when failure == .Relaxed { - when success == .Relaxed { return intrinsics.atomic_cxchgweak_relaxed(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq_failrelaxed(dst, old, new) } - else when success == .Acquire_Release { return intrinsics.atomic_cxchgweak_acqrel_failrelaxed(dst, old, new) } - else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak_failrelaxed(dst, old, new) } - else when success == .Release { return intrinsics.atomic_cxchgweak_rel(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire { - when success == .Release { return intrinsics.atomic_cxchgweak_acqrel(dst, old, new) } - else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Sequentially_Consistent { - when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else when failure == .Acquire_Release { - #panic("there is not such thing as an acquire/release failure ordering") - } else when failure == .Release { - when success == .Acquire { return intrinsics.atomic_cxchgweak_failacq(dst, old, new) } - else { #panic("an unknown ordering combination") } - } else { - return T{}, false - } - -} - - -atomic_add :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_add_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_add_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_add_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_add_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_add(dst, val) } - else { #panic("unknown order") } -} - -atomic_sub :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_sub_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_sub_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_sub_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_sub_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_sub(dst, val) } - else { #panic("unknown order") } -} - -atomic_and :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_and_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_and_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_and_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_and_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_and(dst, val) } - else { #panic("unknown order") } -} - -atomic_nand :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_nand_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_nand_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_nand_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_nand_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_nand(dst, val) } - else { #panic("unknown order") } -} - -atomic_or :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_or_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_or_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_or_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_or_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_or(dst, val) } - else { #panic("unknown order") } -} - -atomic_xor :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { - when order == .Relaxed { return intrinsics.atomic_xor_relaxed(dst, val) } - else when order == .Release { return intrinsics.atomic_xor_rel(dst, val) } - else when order == .Acquire { return intrinsics.atomic_xor_acq(dst, val) } - else when order == .Acquire_Release { return intrinsics.atomic_xor_acqrel(dst, val) } - else when order == .Sequentially_Consistent { return intrinsics.atomic_xor(dst, val) } - else { #panic("unknown order") } -} - +cpu_relax :: intrinsics.cpu_relax + +/* +Atomic_Memory_Order :: enum { + Relaxed = 0, + Consume = 1, + Acquire = 2, + Release = 3, + Acq_Rel = 4, + Seq_Cst = 5, +} +*/ +Atomic_Memory_Order :: intrinsics.Atomic_Memory_Order + + +atomic_thread_fence :: intrinsics.atomic_thread_fence +atomic_signal_fence :: intrinsics.atomic_signal_fence +atomic_store :: intrinsics.atomic_store +atomic_store_explicit :: intrinsics.atomic_store_explicit +atomic_load :: intrinsics.atomic_load +atomic_load_explicit :: intrinsics.atomic_load_explicit +atomic_add :: intrinsics.atomic_add +atomic_add_explicit :: intrinsics.atomic_add_explicit +atomic_sub :: intrinsics.atomic_sub +atomic_sub_explicit :: intrinsics.atomic_sub_explicit +atomic_and :: intrinsics.atomic_and +atomic_and_explicit :: intrinsics.atomic_and_explicit +atomic_nand :: intrinsics.atomic_nand +atomic_nand_explicit :: intrinsics.atomic_nand_explicit +atomic_or :: intrinsics.atomic_or +atomic_or_explicit :: intrinsics.atomic_or_explicit +atomic_xor :: intrinsics.atomic_xor +atomic_xor_explicit :: intrinsics.atomic_xor_explicit +atomic_exchange :: intrinsics.atomic_exchange +atomic_exchange_explicit :: intrinsics.atomic_exchange_explicit + +// Returns value and optional ok boolean +atomic_compare_exchange_strong :: intrinsics.atomic_compare_exchange_strong +atomic_compare_exchange_strong_explicit :: intrinsics.atomic_compare_exchange_strong_explicit +atomic_compare_exchange_weak :: intrinsics.atomic_compare_exchange_weak +atomic_compare_exchange_weak_explicit :: intrinsics.atomic_compare_exchange_weak_explicit
\ No newline at end of file diff --git a/core/sync/barrier.odin b/core/sync/barrier.odin deleted file mode 100644 index 997fde82d..000000000 --- a/core/sync/barrier.odin +++ /dev/null @@ -1,81 +0,0 @@ -package sync - - -// A barrier enabling multiple threads to synchronize the beginning of some computation -/* - * Example: - * - * package example - * - * import "core:fmt" - * import "core:sync" - * import "core:thread" - * - * barrier := &sync.Barrier{}; - * - * main :: proc() { - * fmt.println("Start"); - * - * THREAD_COUNT :: 4; - * threads: [THREAD_COUNT]^thread.Thread; - * - * sync.barrier_init(barrier, THREAD_COUNT); - * defer sync.barrier_destroy(barrier); - * - * - * for _, i in threads { - * threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { - * // Same messages will be printed together but without any interleaving - * fmt.println("Getting ready!"); - * sync.barrier_wait(barrier); - * fmt.println("Off their marks they go!"); - * }); - * } - * - * for t in threads { - * thread.destroy(t); // join and free thread - * } - * fmt.println("Finished"); - * } - * - */ -Barrier :: struct { - mutex: Blocking_Mutex, - cond: Condition, - index: int, - generation_id: int, - thread_count: int, -} - -barrier_init :: proc(b: ^Barrier, thread_count: int) { - blocking_mutex_init(&b.mutex) - condition_init(&b.cond, &b.mutex) - b.index = 0 - b.generation_id = 0 - b.thread_count = thread_count -} - -barrier_destroy :: proc(b: ^Barrier) { - blocking_mutex_destroy(&b.mutex) - condition_destroy(&b.cond) -} - -// Block the current thread until all threads have rendezvoused -// Barrier can be reused after all threads rendezvoused once, and can be used continuously -barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) { - blocking_mutex_lock(&b.mutex) - defer blocking_mutex_unlock(&b.mutex) - local_gen := b.generation_id - b.index += 1 - if b.index < b.thread_count { - for local_gen == b.generation_id && b.index < b.thread_count { - condition_wait_for(&b.cond) - } - return false - } - - b.index = 0 - b.generation_id += 1 - condition_broadcast(&b.cond) - return true -} diff --git a/core/sync/channel.odin b/core/sync/channel.odin deleted file mode 100644 index 82b9504f4..000000000 --- a/core/sync/channel.odin +++ /dev/null @@ -1,889 +0,0 @@ -package sync - -import "core:mem" -import "core:time" -import "core:intrinsics" -import "core:math/rand" - -_, _ :: time, rand - -Channel_Direction :: enum i8 { - Both = 0, - Send = +1, - Recv = -1, -} - -Channel :: struct($T: typeid, $Direction := Channel_Direction.Both) { - using _internal: ^Raw_Channel, -} - -channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} - -channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} - -channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} -channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) { - context.allocator = allocator - ch._internal = raw_channel_create(size_of(T), align_of(T), cap) - return -} - -channel_destroy :: proc(ch: $C/Channel($T, $D)) { - raw_channel_destroy(ch._internal) -} - -channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) { - res._internal = ch._internal - return -} - -channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) { - res._internal = ch._internal - return -} - - -channel_len :: proc(ch: $C/Channel($T, $D)) -> int { - return ch._internal.len if ch._internal != nil else 0 -} -channel_cap :: proc(ch: $C/Channel($T, $D)) -> int { - return ch._internal.cap if ch._internal != nil else 0 -} - - -channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both { - msg := msg - _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc) -} -channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both { - msg := msg - return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc) -} - -channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both { - c := ch._internal - if c == nil { - panic(message="cannot recv message; channel is nil", loc=loc) - } - mutex_lock(&c.mutex) - raw_channel_recv_impl(c, &msg, loc) - mutex_unlock(&c.mutex) - return -} -channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both { - c := ch._internal - if c != nil && mutex_try_lock(&c.mutex) { - if c.len > 0 { - raw_channel_recv_impl(c, &msg, loc) - ok = true - } - mutex_unlock(&c.mutex) - } - return -} -channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both { - res: T - res, ok = channel_try_recv(ch, loc) - if ok && msg != nil { - msg^ = res - } - return -} - - -channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool { - return ch._internal == nil -} -channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool { - c := ch._internal - return c != nil && !c.closed -} - - -channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool { - return a._internal == b._internal -} -channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool { - return a._internal != b._internal -} - - -channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both { - return raw_channel_can_send(ch._internal) -} -channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both { - return raw_channel_can_recv(ch._internal) -} - - -channel_peek :: proc(ch: $C/Channel($T, $D)) -> int { - c := ch._internal - if c == nil { - return -1 - } - if intrinsics.atomic_load(&c.closed) { - return -1 - } - return intrinsics.atomic_load(&c.len) -} - - -channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) { - raw_channel_close(ch._internal, loc) -} - - -channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both { - c := ch._internal - if c == nil { - return - } - - if !c.closed || c.len > 0 { - msg, ok = channel_recv(ch), true - } - return -} -channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both { - raw_channel_drain(ch._internal) -} - - -channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both { - for msg in channel_iterator(src) { - channel_send(dst, msg) - } -} - - -Raw_Channel_Wait_Queue :: struct { - next: ^Raw_Channel_Wait_Queue, - state: ^uintptr, -} - - -Raw_Channel :: struct { - closed: bool, - ready: bool, // ready to recv - data_offset: u16, // data is stored at the end of this data structure - elem_size: u32, - len, cap: int, - read, write: int, - mutex: Mutex, - cond: Condition, - allocator: mem.Allocator, - - sendq: ^Raw_Channel_Wait_Queue, - recvq: ^Raw_Channel_Wait_Queue, -} - -raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { - val.next = head^ - head^ = val -} -raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { - p := head - for p^ != nil && p^ != val { - p = &p^.next - } - if p != nil { - p^ = p^.next - } -} - - -raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel { - assert(int(u32(elem_size)) == elem_size) - - s := size_of(Raw_Channel) - s = mem.align_forward_int(s, elem_align) - data_offset := uintptr(s) - s += elem_size * max(cap, 1) - - a := max(elem_align, align_of(Raw_Channel)) - - c := (^Raw_Channel)(mem.alloc(s, a)) - if c == nil { - return nil - } - - c.data_offset = u16(data_offset) - c.elem_size = u32(elem_size) - c.len, c.cap = 0, max(cap, 0) - c.read, c.write = 0, 0 - mutex_init(&c.mutex) - condition_init(&c.cond, &c.mutex) - c.allocator = context.allocator - c.closed = false - - return c -} - - -raw_channel_destroy :: proc(c: ^Raw_Channel) { - if c == nil { - return - } - context.allocator = c.allocator - intrinsics.atomic_store(&c.closed, true) - - condition_destroy(&c.cond) - mutex_destroy(&c.mutex) - free(c) -} - -raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) { - if c == nil { - panic(message="cannot close nil channel", loc=loc) - } - mutex_lock(&c.mutex) - defer mutex_unlock(&c.mutex) - intrinsics.atomic_store(&c.closed, true) - - // Release readers and writers - raw_channel_wait_queue_broadcast(c.recvq) - raw_channel_wait_queue_broadcast(c.sendq) - condition_broadcast(&c.cond) -} - - - -raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool { - send :: proc(c: ^Raw_Channel, src: rawptr) { - data := uintptr(c) + uintptr(c.data_offset) - dst := data + uintptr(c.write * int(c.elem_size)) - mem.copy(rawptr(dst), src, int(c.elem_size)) - c.len += 1 - c.write = (c.write + 1) % max(c.cap, 1) - } - - switch { - case c == nil: - panic(message="cannot send message; channel is nil", loc=loc) - case c.closed: - panic(message="cannot send message; channel is closed", loc=loc) - } - - mutex_lock(&c.mutex) - defer mutex_unlock(&c.mutex) - - if c.cap > 0 { - if !block && c.len >= c.cap { - return false - } - - for c.len >= c.cap { - condition_wait_for(&c.cond) - } - } else if c.len > 0 { // TODO(bill): determine correct behaviour - if !block { - return false - } - condition_wait_for(&c.cond) - } else if c.len == 0 && !block { - return false - } - - send(c, msg) - condition_signal(&c.cond) - raw_channel_wait_queue_signal(c.recvq) - - return true -} - -raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) { - recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) { - if c.len < 1 { - panic(message="cannot recv message; channel is empty", loc=loc) - } - c.len -= 1 - - data := uintptr(c) + uintptr(c.data_offset) - src := data + uintptr(c.read * int(c.elem_size)) - mem.copy(dst, rawptr(src), int(c.elem_size)) - c.read = (c.read + 1) % max(c.cap, 1) - } - - if c == nil { - panic(message="cannot recv message; channel is nil", loc=loc) - } - intrinsics.atomic_store(&c.ready, true) - for c.len < 1 { - raw_channel_wait_queue_signal(c.sendq) - condition_wait_for(&c.cond) - } - intrinsics.atomic_store(&c.ready, false) - recv(c, res, loc) - if c.cap > 0 { - if c.len == c.cap - 1 { - // NOTE(bill): Only signal on the last one - condition_signal(&c.cond) - } - } else { - condition_signal(&c.cond) - } -} - - -raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) { - if c == nil { - return false - } - mutex_lock(&c.mutex) - switch { - case c.closed: - ok = false - case c.cap > 0: - ok = c.ready && c.len < c.cap - case: - ok = c.ready && c.len == 0 - } - mutex_unlock(&c.mutex) - return -} -raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) { - if c == nil { - return false - } - mutex_lock(&c.mutex) - ok = c.len > 0 - mutex_unlock(&c.mutex) - return -} - - -raw_channel_drain :: proc(c: ^Raw_Channel) { - if c == nil { - return - } - mutex_lock(&c.mutex) - c.len = 0 - c.read = 0 - c.write = 0 - mutex_unlock(&c.mutex) -} - - - -MAX_SELECT_CHANNELS :: 64 -SELECT_MAX_TIMEOUT :: max(time.Duration) - -Select_Command :: enum { - Recv, - Send, -} - -Select_Channel :: struct { - channel: ^Raw_Channel, - command: Select_Command, -} - - - -select :: proc(channels: ..Select_Channel) -> (index: int) { - return select_timeout(SELECT_MAX_TIMEOUT, ..channels) -} -select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - if c.channel == nil { - continue - } - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i - count += 1 - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i - count += 1 - } - } - } - - if count == 0 { - wait_state: uintptr = 0 - for _, i in channels { - q := &queues[i] - q.state = &wait_state - } - - for c, i in channels { - if c.channel == nil { - continue - } - q := &queues[i] - switch c.command { - case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q) - case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q) - } - } - raw_channel_wait_queue_wait_on(&wait_state, timeout) - for c, i in channels { - if c.channel == nil { - continue - } - q := &queues[i] - switch c.command { - case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q) - case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q) - } - } - - for c, i in channels { - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i - count += 1 - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i - count += 1 - } - } - } - if count == 0 && timeout == SELECT_MAX_TIMEOUT { - index = -1 - return - } - - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.recvq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.recvq, q) - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.recvq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.recvq, q) - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - msg = channel_recv(channels[index]) - - return -} - -select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.recvq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.recvq, q) - } - - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - - if msg != nil { - channel_send(channels[index], msg) - } - - return -} - -select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue - - count := u32(0) - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - state: uintptr - for c, i in channels { - q := &queues[i] - q.state = &state - raw_channel_wait_queue_insert(&c.sendq, q) - } - raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT) - for c, i in channels { - q := &queues[i] - raw_channel_wait_queue_remove(&c.sendq, q) - } - - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - assert(count != 0) - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_try :: proc(channels: ..Select_Channel) -> (index: int) { - switch len(channels) { - case 0: - panic("sync: select with no channels") - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - - backing: [MAX_SELECT_CHANNELS]int - candidates := backing[:] - cap := len(channels) - candidates = candidates[:cap] - - count := u32(0) - for c, i in channels { - switch c.command { - case .Recv: - if raw_channel_can_recv(c.channel) { - candidates[count] = i - count += 1 - } - case .Send: - if raw_channel_can_send(c.channel) { - candidates[count] = i - count += 1 - } - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - - -select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { - switch len(channels) { - case 0: - index = -1 - return - case 1: - index = -1 - if raw_channel_can_recv(channels[0]) { - index = 0 - } - return - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - - -select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check { - switch len(channels) { - case 0: - return -1 - case 1: - if raw_channel_can_send(channels[0]) { - return 0 - } - return -1 - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - return -} - -select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { - switch len(channels) { - case 0: - index = -1 - return - case 1: - ok: bool - if msg, ok = channel_try_recv(channels[0]); ok { - index = 0 - } - return - } - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if channel_can_recv(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - msg = channel_recv(channels[index]) - return -} - -select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { - index = -1 - switch len(channels) { - case 0: - return - case 1: - if channel_try_send(channels[0], msg) { - index = 0 - } - return - } - - - assert(len(channels) <= MAX_SELECT_CHANNELS) - candidates: [MAX_SELECT_CHANNELS]int - - count := u32(0) - for c, i in channels { - if raw_channel_can_send(c) { - candidates[count] = i - count += 1 - } - } - - if count == 0 { - index = -1 - return - } - - t := time.now() - r := rand.create(transmute(u64)t) - i := rand.uint32(&r) - - index = candidates[i % count] - channel_send(channels[index], msg) - return -} - diff --git a/core/sync/channel_unix.odin b/core/sync/channel_unix.odin deleted file mode 100644 index d6bac2d71..000000000 --- a/core/sync/channel_unix.odin +++ /dev/null @@ -1,16 +0,0 @@ -// +build linux, darwin, freebsd -package sync - -import "core:time" - -raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { - // stub -} - -raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { - // stub -} - -raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { - // stub -} diff --git a/core/sync/channel_windows.odin b/core/sync/channel_windows.odin deleted file mode 100644 index 5d469ffff..000000000 --- a/core/sync/channel_windows.odin +++ /dev/null @@ -1,33 +0,0 @@ -package sync - -import "core:intrinsics" -import win32 "core:sys/windows" -import "core:time" - -raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { - ms: win32.DWORD = win32.INFINITE - if max(time.Duration) != SELECT_MAX_TIMEOUT { - ms = win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000) - } - - v := intrinsics.atomic_load(state) - for v == 0 { - win32.WaitOnAddress(state, &v, size_of(state^), ms) - v = intrinsics.atomic_load(state) - } - intrinsics.atomic_store(state, 0) -} - -raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { - for x := q; x != nil; x = x.next { - intrinsics.atomic_add(x.state, 1) - win32.WakeByAddressSingle(x.state) - } -} - -raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { - for x := q; x != nil; x = x.next { - intrinsics.atomic_add(x.state, 1) - win32.WakeByAddressAll(x.state) - } -} diff --git a/core/sync/sync2/extended.odin b/core/sync/extended.odin index d6a99fe04..2cca6f961 100644 --- a/core/sync/sync2/extended.odin +++ b/core/sync/extended.odin @@ -1,4 +1,4 @@ -package sync2 +package sync import "core:time" @@ -67,44 +67,41 @@ wait_group_wait_with_timeout :: proc(wg: ^Wait_Group, duration: time.Duration) - -// A barrier enabling multiple threads to synchronize the beginning of some computation /* - * Example: - * - * package example - * - * import "core:fmt" - * import "core:sync" - * import "core:thread" - * - * barrier := &sync.Barrier{} - * - * main :: proc() { - * fmt.println("Start") - * - * THREAD_COUNT :: 4 - * threads: [THREAD_COUNT]^thread.Thread - * - * sync.barrier_init(barrier, THREAD_COUNT) - * defer sync.barrier_destroy(barrier) - * - * - * for _, i in threads { - * threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { - * // Same messages will be printed together but without any interleaving - * fmt.println("Getting ready!") - * sync.barrier_wait(barrier) - * fmt.println("Off their marks they go!") - * }) - * } - * - * for t in threads { - * thread.destroy(t) // join and free thread - * } - * fmt.println("Finished") - * } - * - */ +A barrier enabling multiple threads to synchronize the beginning of some computation + +Example: + package example + + import "core:fmt" + import "core:sync" + import "core:thread" + + barrier := &sync.Barrier{} + + main :: proc() { + fmt.println("Start") + + THREAD_COUNT :: 4 + threads: [THREAD_COUNT]^thread.Thread + + sync.barrier_init(barrier, THREAD_COUNT) + + for _, i in threads { + threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { + // Same messages will be printed together but without any interleaving + fmt.println("Getting ready!") + sync.barrier_wait(barrier) + fmt.println("Off their marks they go!") + }) + } + + for t in threads { + thread.destroy(t) // join and free thread + } + fmt.println("Finished") + } +*/ Barrier :: struct { mutex: Mutex, cond: Cond, @@ -149,10 +146,10 @@ Auto_Reset_Event :: struct { } auto_reset_event_signal :: proc(e: ^Auto_Reset_Event) { - old_status := atomic_load_relaxed(&e.status) + old_status := atomic_load_explicit(&e.status, .Relaxed) for { new_status := old_status + 1 if old_status < 1 else 1 - if _, ok := atomic_compare_exchange_weak_release(&e.status, old_status, new_status); ok { + if _, ok := atomic_compare_exchange_weak_explicit(&e.status, old_status, new_status, .Release, .Relaxed); ok { break } @@ -163,7 +160,7 @@ auto_reset_event_signal :: proc(e: ^Auto_Reset_Event) { } auto_reset_event_wait :: proc(e: ^Auto_Reset_Event) { - old_status := atomic_sub_acquire(&e.status, 1) + old_status := atomic_sub_explicit(&e.status, 1, .Acquire) if old_status < 1 { sema_wait(&e.sema) } @@ -177,14 +174,14 @@ Ticket_Mutex :: struct { } ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) { - ticket := atomic_add_relaxed(&m.ticket, 1) - for ticket != atomic_load_acquire(&m.serving) { + ticket := atomic_add_explicit(&m.ticket, 1, .Relaxed) + for ticket != atomic_load_explicit(&m.serving, .Acquire) { cpu_relax() } } ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) { - atomic_add_relaxed(&m.serving, 1) + atomic_add_explicit(&m.serving, 1, .Relaxed) } @(deferred_in=ticket_mutex_unlock) ticket_mutex_guard :: proc(m: ^Ticket_Mutex) -> bool { @@ -199,18 +196,18 @@ Benaphore :: struct { } benaphore_lock :: proc(b: ^Benaphore) { - if atomic_add_acquire(&b.counter, 1) > 1 { + if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 { sema_wait(&b.sema) } } benaphore_try_lock :: proc(b: ^Benaphore) -> bool { - v, _ := atomic_compare_exchange_strong_acquire(&b.counter, 1, 0) + v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire) return v == 0 } benaphore_unlock :: proc(b: ^Benaphore) { - if atomic_sub_release(&b.counter, 1) > 0 { + if atomic_sub_explicit(&b.counter, 1, .Release) > 0 { sema_post(&b.sema) } } @@ -230,7 +227,7 @@ Recursive_Benaphore :: struct { recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) { tid := current_thread_id() - if atomic_add_acquire(&b.counter, 1) > 1 { + if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 { if tid != b.owner { sema_wait(&b.sema) } @@ -243,10 +240,10 @@ recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) { recursive_benaphore_try_lock :: proc(b: ^Recursive_Benaphore) -> bool { tid := current_thread_id() if b.owner == tid { - atomic_add_acquire(&b.counter, 1) + atomic_add_explicit(&b.counter, 1, .Acquire) } - if v, _ := atomic_compare_exchange_strong_acquire(&b.counter, 1, 0); v != 0 { + if v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire); v != 0 { return false } // inside the lock @@ -263,7 +260,7 @@ recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) { if recursion == 0 { b.owner = 0 } - if atomic_sub_release(&b.counter, 1) > 0 { + if atomic_sub_explicit(&b.counter, 1, .Release) > 0 { if recursion == 0 { sema_post(&b.sema) } @@ -296,12 +293,12 @@ once_do :: proc(o: ^Once, fn: proc()) { defer mutex_unlock(&o.m) if !o.done { fn() - atomic_store_release(&o.done, true) + atomic_store_explicit(&o.done, true, .Release) } } - if atomic_load_acquire(&o.done) == false { + if atomic_load_explicit(&o.done, .Acquire) == false { do_slow(o, fn) } } diff --git a/core/sync/sync2/futex_darwin.odin b/core/sync/futex_darwin.odin index 9dad8d375..88e354827 100644 --- a/core/sync/sync2/futex_darwin.odin +++ b/core/sync/futex_darwin.odin @@ -1,6 +1,6 @@ //+private //+build darwin -package sync2 +package sync import "core:c" import "core:time" diff --git a/core/sync/futex_freebsd.odin b/core/sync/futex_freebsd.odin new file mode 100644 index 000000000..2e1d065bc --- /dev/null +++ b/core/sync/futex_freebsd.odin @@ -0,0 +1,75 @@ +//+private +//+build freebsd +package sync + +import "core:c" +import "core:os" +import "core:time" + +UMTX_OP_WAIT :: 2 +UMTX_OP_WAKE :: 3 + +foreign import libc "system:c" + +foreign libc { + _umtx_op :: proc "c" (obj: rawptr, op: c.int, val: c.ulong, uaddr: rawptr, uaddr2: rawptr) -> c.int --- +} + +_futex_wait :: proc(f: ^Futex, expected: u32) -> bool { + timeout := os.Unix_File_Time{ + seconds = 5, + nanoseconds = 0, + } + + for { + res := _umtx_op(f, UMTX_OP_WAIT, c.ulong(expected), nil, &timeout) + + if res != -1 { + return true + } + + if os.Errno(os.get_last_error()) == os.ETIMEDOUT { + continue + } + + panic("_futex_wait failure") + } + unreachable() +} + +_futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> bool { + if duration <= 0 { + return false + } + + res := _umtx_op(f, UMTX_OP_WAIT, c.ulong(expected), nil, &os.Unix_File_Time{ + seconds = (os.time_t)(duration/1e9), + nanoseconds = (c.long)(duration%1e9), + }) + + if res != -1 { + return true + } + + if os.Errno(os.get_last_error()) == os.ETIMEDOUT { + return false + } + + panic("_futex_wait_with_timeout failure") +} + +_futex_signal :: proc(f: ^Futex) { + res := _umtx_op(f, UMTX_OP_WAKE, 1, nil, nil) + + if res == -1 { + panic("_futex_signal failure") + } +} + +_futex_broadcast :: proc(f: ^Futex) { + res := _umtx_op(f, UMTX_OP_WAKE, c.ulong(max(i32)), nil, nil) + + if res == -1 { + panic("_futex_broadcast failure") + } +} diff --git a/core/sync/sync2/futex_linux.odin b/core/sync/futex_linux.odin index fca28cace..c429a9d64 100644 --- a/core/sync/sync2/futex_linux.odin +++ b/core/sync/futex_linux.odin @@ -1,6 +1,6 @@ //+private //+build linux -package sync2 +package sync import "core:c" import "core:time" @@ -14,12 +14,6 @@ FUTEX_PRIVATE_FLAG :: 128 FUTEX_WAIT_PRIVATE :: (FUTEX_WAIT | FUTEX_PRIVATE_FLAG) FUTEX_WAKE_PRIVATE :: (FUTEX_WAKE | FUTEX_PRIVATE_FLAG) -foreign import libc "system:c" - -foreign libc { - __errno_location :: proc "c" () -> ^c.int --- -} - ESUCCESS :: 0 EINTR :: -4 EAGAIN :: -11 diff --git a/core/sync/futex_openbsd.odin b/core/sync/futex_openbsd.odin new file mode 100644 index 000000000..6ac9d3efb --- /dev/null +++ b/core/sync/futex_openbsd.odin @@ -0,0 +1,78 @@ +//+private +//+build openbsd +package sync + +import "core:c" +import "core:os" +import "core:time" + +FUTEX_WAIT :: 1 +FUTEX_WAKE :: 2 + +FUTEX_PRIVATE_FLAG :: 128 + +FUTEX_WAIT_PRIVATE :: (FUTEX_WAIT | FUTEX_PRIVATE_FLAG) +FUTEX_WAKE_PRIVATE :: (FUTEX_WAKE | FUTEX_PRIVATE_FLAG) + +foreign import libc "system:c" + +foreign libc { + @(link_name="futex") + _unix_futex :: proc "c" (f: ^Futex, op: c.int, val: u32, timeout: rawptr) -> c.int --- +} + +_futex_wait :: proc(f: ^Futex, expected: u32) -> bool { + res := _unix_futex(f, FUTEX_WAIT_PRIVATE, expected, nil) + + if res != -1 { + return true + } + + if os.Errno(os.get_last_error()) == os.ETIMEDOUT { + return false + } + + panic("futex_wait failure") +} + +_futex_wait_with_timeout :: proc(f: ^Futex, expected: u32, duration: time.Duration) -> bool { + if duration <= 0 { + return false + } + + timespec_t :: struct { + tv_sec: c.long, + tv_nsec: c.long, + } + + res := _unix_futex(f, FUTEX_WAIT_PRIVATE, expected, ×pec_t{ + tv_sec = (c.long)(duration/1e9), + tv_nsec = (c.long)(duration%1e9), + }) + + if res != -1 { + return true + } + + if os.Errno(os.get_last_error()) == os.ETIMEDOUT { + return false + } + + panic("futex_wait_with_timeout failure") +} + +_futex_signal :: proc(f: ^Futex) { + res := _unix_futex(f, FUTEX_WAKE_PRIVATE, 1, nil) + + if res == -1 { + panic("futex_wake_single failure") + } +} + +_futex_broadcast :: proc(f: ^Futex) { + res := _unix_futex(f, FUTEX_WAKE_PRIVATE, u32(max(i32)), nil) + + if res == -1 { + panic("_futex_wake_all failure") + } +} diff --git a/core/sync/sync2/futex_windows.odin b/core/sync/futex_windows.odin index 200a119ff..1c9d8b845 100644 --- a/core/sync/sync2/futex_windows.odin +++ b/core/sync/futex_windows.odin @@ -1,6 +1,6 @@ //+private //+build windows -package sync2 +package sync import "core:time" diff --git a/core/sync/sync2/primitives.odin b/core/sync/primitives.odin index d9e013664..483f85343 100644 --- a/core/sync/sync2/primitives.odin +++ b/core/sync/primitives.odin @@ -1,4 +1,4 @@ -package sync2 +package sync import "core:time" @@ -29,12 +29,12 @@ mutex_try_lock :: proc(m: ^Mutex) -> bool { return _mutex_try_lock(m) } -// Example: -// -// if mutex_guard(&m) { -// ... -// } -// +/* +Example: + if mutex_guard(&m) { + ... + } +*/ @(deferred_in=mutex_unlock) mutex_guard :: proc(m: ^Mutex) -> bool { mutex_lock(m) @@ -80,25 +80,24 @@ rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { return _rw_mutex_try_shared_lock(rw) } - -// Example: -// -// if rw_mutex_guard(&m) { -// ... -// } -// +/* +Example: + if rw_mutex_guard(&m) { + ... + } +*/ @(deferred_in=rw_mutex_unlock) rw_mutex_guard :: proc(m: ^RW_Mutex) -> bool { rw_mutex_lock(m) return true } -// Example: -// -// if rw_mutex_shared_guard(&m) { -// ... -// } -// +/* +Example: + if rw_mutex_shared_guard(&m) { + ... + } +*/ @(deferred_in=rw_mutex_shared_unlock) rw_mutex_shared_guard :: proc(m: ^RW_Mutex) -> bool { rw_mutex_shared_lock(m) @@ -127,13 +126,12 @@ recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { return _recursive_mutex_try_lock(m) } - -// Example: -// -// if recursive_mutex_guard(&m) { -// ... -// } -// +/* +Example: + if recursive_mutex_guard(&m) { + ... + } +*/ @(deferred_in=recursive_mutex_unlock) recursive_mutex_guard :: proc(m: ^Recursive_Mutex) -> bool { recursive_mutex_lock(m) diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/primitives_atomic.odin index a13b73a99..11fff4e60 100644 --- a/core/sync/sync2/primitives_atomic.odin +++ b/core/sync/primitives_atomic.odin @@ -1,4 +1,4 @@ -package sync2 +package sync import "core:time" @@ -24,7 +24,7 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { new_state := curr_state // Make a copy of it spin_lock: for spin in 0..<i32(100) { - state, ok := atomic_compare_exchange_weak_acquire(&m.state, .Unlocked, new_state) + state, ok := atomic_compare_exchange_weak_explicit(&m.state, .Unlocked, new_state, .Acquire, .Consume) if ok { return } @@ -38,8 +38,11 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { } } + // Set just in case 100 iterations did not do it + new_state = .Waiting + for { - if atomic_exchange_acquire(&m.state, .Waiting) == .Unlocked { + if atomic_exchange_explicit(&m.state, .Waiting, .Acquire) == .Unlocked { return } @@ -49,11 +52,7 @@ atomic_mutex_lock :: proc(m: ^Atomic_Mutex) { } - switch v := atomic_exchange_acquire(&m.state, .Locked); v { - case .Unlocked: - // Okay - case: fallthrough - case .Locked, .Waiting: + if v := atomic_exchange_explicit(&m.state, .Locked, .Acquire); v != .Unlocked { lock_slow(m, v) } } @@ -66,7 +65,7 @@ atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) { } - switch atomic_exchange_release(&m.state, .Unlocked) { + switch atomic_exchange_explicit(&m.state, .Unlocked, .Release) { case .Unlocked: unreachable() case .Locked: @@ -78,17 +77,16 @@ atomic_mutex_unlock :: proc(m: ^Atomic_Mutex) { // atomic_mutex_try_lock tries to lock m, will return true on success, and false on failure atomic_mutex_try_lock :: proc(m: ^Atomic_Mutex) -> bool { - _, ok := atomic_compare_exchange_strong_acquire(&m.state, .Unlocked, .Locked) + _, ok := atomic_compare_exchange_strong_explicit(&m.state, .Unlocked, .Locked, .Acquire, .Consume) return ok } - -// Example: -// -// if atomic_mutex_guard(&m) { -// ... -// } -// +/* +Example: + if atomic_mutex_guard(&m) { + ... + } +*/ @(deferred_in=atomic_mutex_unlock) atomic_mutex_guard :: proc(m: ^Atomic_Mutex) -> bool { atomic_mutex_lock(m) @@ -193,25 +191,24 @@ atomic_rw_mutex_try_shared_lock :: proc(rw: ^Atomic_RW_Mutex) -> bool { return false } - -// Example: -// -// if atomic_rw_mutex_guard(&m) { -// ... -// } -// +/* +Example: + if atomic_rw_mutex_guard(&m) { + ... + } +*/ @(deferred_in=atomic_rw_mutex_unlock) atomic_rw_mutex_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { atomic_rw_mutex_lock(m) return true } -// Example: -// -// if atomic_rw_mutex_shared_guard(&m) { -// ... -// } -// +/* +Example: + if atomic_rw_mutex_shared_guard(&m) { + ... + } +*/ @(deferred_in=atomic_rw_mutex_shared_unlock) atomic_rw_mutex_shared_guard :: proc(m: ^Atomic_RW_Mutex) -> bool { atomic_rw_mutex_shared_lock(m) @@ -270,13 +267,12 @@ atomic_recursive_mutex_try_lock :: proc(m: ^Atomic_Recursive_Mutex) -> bool { return true } - -// Example: -// -// if atomic_recursive_mutex_guard(&m) { -// ... -// } -// +/* +Example: + if atomic_recursive_mutex_guard(&m) { + ... + } +*/ @(deferred_in=atomic_recursive_mutex_unlock) atomic_recursive_mutex_guard :: proc(m: ^Atomic_Recursive_Mutex) -> bool { atomic_recursive_mutex_lock(m) @@ -294,7 +290,7 @@ Queue_Item :: struct { @(private="file") queue_item_wait :: proc(item: ^Queue_Item) { - for atomic_load_acquire(&item.futex) == 0 { + for atomic_load_explicit(&item.futex, .Acquire) == 0 { futex_wait(&item.futex, 0) cpu_relax() } @@ -302,7 +298,7 @@ queue_item_wait :: proc(item: ^Queue_Item) { @(private="file") queue_item_wait_with_timeout :: proc(item: ^Queue_Item, duration: time.Duration) -> bool { start := time.tick_now() - for atomic_load_acquire(&item.futex) == 0 { + for atomic_load_explicit(&item.futex, .Acquire) == 0 { remaining := duration - time.tick_since(start) if remaining < 0 { return false @@ -316,7 +312,7 @@ queue_item_wait_with_timeout :: proc(item: ^Queue_Item, duration: time.Duration) } @(private="file") queue_item_signal :: proc(item: ^Queue_Item) { - atomic_store_release(&item.futex, 1) + atomic_store_explicit(&item.futex, 1, .Release) futex_signal(&item.futex) } diff --git a/core/sync/sync2/primitives_darwin.odin b/core/sync/primitives_darwin.odin index 66995bd01..514f66f3e 100644 --- a/core/sync/sync2/primitives_darwin.odin +++ b/core/sync/primitives_darwin.odin @@ -1,6 +1,6 @@ //+build darwin //+private -package sync2 +package sync import "core:c" import "core:time" diff --git a/core/sync/primitives_freebsd.odin b/core/sync/primitives_freebsd.odin new file mode 100644 index 000000000..b88fca181 --- /dev/null +++ b/core/sync/primitives_freebsd.odin @@ -0,0 +1,46 @@ +//+build freebsd +//+private +package sync + +import "core:os" +import "core:time" + +_current_thread_id :: proc "contextless" () -> int { + return os.current_thread_id() +} + +_Mutex :: struct { + mutex: Atomic_Mutex, +} + +_mutex_lock :: proc(m: ^Mutex) { + atomic_mutex_lock(&m.impl.mutex) +} + +_mutex_unlock :: proc(m: ^Mutex) { + atomic_mutex_unlock(&m.impl.mutex) +} + +_mutex_try_lock :: proc(m: ^Mutex) -> bool { + return atomic_mutex_try_lock(&m.impl.mutex) +} + +_Cond :: struct { + cond: Atomic_Cond, +} + +_cond_wait :: proc(c: ^Cond, m: ^Mutex) { + atomic_cond_wait(&c.impl.cond, &m.impl.mutex) +} + +_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool { + return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration) +} + +_cond_signal :: proc(c: ^Cond) { + atomic_cond_signal(&c.impl.cond) +} + +_cond_broadcast :: proc(c: ^Cond) { + atomic_cond_broadcast(&c.impl.cond) +} diff --git a/core/sync/primitives_internal.odin b/core/sync/primitives_internal.odin new file mode 100644 index 000000000..de9aca991 --- /dev/null +++ b/core/sync/primitives_internal.odin @@ -0,0 +1,125 @@ +//+private +package sync + +when #config(ODIN_SYNC_RECURSIVE_MUTEX_USE_FUTEX, true) { + _Recursive_Mutex :: struct { + owner: Futex, + recursion: i32, + } + + _recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { + tid := Futex(current_thread_id()) + for { + prev_owner := atomic_compare_exchange_strong_explicit(&m.impl.owner, 0, tid, .Acquire, .Acquire) + switch prev_owner { + case 0, tid: + m.impl.recursion += 1 + // inside the lock + return + } + + futex_wait(&m.impl.owner, u32(prev_owner)) + } + } + + _recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { + m.impl.recursion -= 1 + if m.impl.recursion != 0 { + return + } + atomic_exchange_explicit(&m.impl.owner, 0, .Release) + + futex_signal(&m.impl.owner) + // outside the lock + + } + + _recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { + tid := Futex(current_thread_id()) + prev_owner := atomic_compare_exchange_strong_explicit(&m.impl.owner, 0, tid, .Acquire, .Acquire) + switch prev_owner { + case 0, tid: + m.impl.recursion += 1 + // inside the lock + return true + } + return false + } +} else { + _Recursive_Mutex :: struct { + owner: int, + recursion: int, + mutex: Mutex, + } + + _recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { + tid := current_thread_id() + if tid != m.impl.owner { + mutex_lock(&m.impl.mutex) + } + // inside the lock + m.impl.owner = tid + m.impl.recursion += 1 + } + + _recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { + tid := current_thread_id() + assert(tid == m.impl.owner) + m.impl.recursion -= 1 + recursion := m.impl.recursion + if recursion == 0 { + m.impl.owner = 0 + } + if recursion == 0 { + mutex_unlock(&m.impl.mutex) + } + // outside the lock + + } + + _recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { + tid := current_thread_id() + if m.impl.owner == tid { + return mutex_try_lock(&m.impl.mutex) + } + if !mutex_try_lock(&m.impl.mutex) { + return false + } + // inside the lock + m.impl.owner = tid + m.impl.recursion += 1 + return true + } +} + + +when ODIN_OS != .Windows { + _RW_Mutex :: struct { + mutex: Atomic_RW_Mutex, + } + + _rw_mutex_lock :: proc(rw: ^RW_Mutex) { + atomic_rw_mutex_lock(&rw.impl.mutex) + } + + _rw_mutex_unlock :: proc(rw: ^RW_Mutex) { + atomic_rw_mutex_unlock(&rw.impl.mutex) + } + + _rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { + return atomic_rw_mutex_try_lock(&rw.impl.mutex) + } + + _rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { + atomic_rw_mutex_shared_lock(&rw.impl.mutex) + } + + _rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { + atomic_rw_mutex_shared_unlock(&rw.impl.mutex) + } + + _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { + return atomic_rw_mutex_try_shared_lock(&rw.impl.mutex) + } + +}
\ No newline at end of file diff --git a/core/sync/primitives_linux.odin b/core/sync/primitives_linux.odin new file mode 100644 index 000000000..0a9f0cc33 --- /dev/null +++ b/core/sync/primitives_linux.odin @@ -0,0 +1,47 @@ +//+build linux +//+private +package sync + +import "core:sys/unix" +import "core:time" + +_current_thread_id :: proc "contextless" () -> int { + return unix.sys_gettid() +} + + +_Mutex :: struct { + mutex: Atomic_Mutex, +} + +_mutex_lock :: proc(m: ^Mutex) { + atomic_mutex_lock(&m.impl.mutex) +} + +_mutex_unlock :: proc(m: ^Mutex) { + atomic_mutex_unlock(&m.impl.mutex) +} + +_mutex_try_lock :: proc(m: ^Mutex) -> bool { + return atomic_mutex_try_lock(&m.impl.mutex) +} + +_Cond :: struct { + cond: Atomic_Cond, +} + +_cond_wait :: proc(c: ^Cond, m: ^Mutex) { + atomic_cond_wait(&c.impl.cond, &m.impl.mutex) +} + +_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool { + return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration) +} + +_cond_signal :: proc(c: ^Cond) { + atomic_cond_signal(&c.impl.cond) +} + +_cond_broadcast :: proc(c: ^Cond) { + atomic_cond_broadcast(&c.impl.cond) +} diff --git a/core/sync/primitives_openbsd.odin b/core/sync/primitives_openbsd.odin new file mode 100644 index 000000000..7794016f8 --- /dev/null +++ b/core/sync/primitives_openbsd.odin @@ -0,0 +1,46 @@ +//+build openbsd +//+private +package sync + +import "core:os" +import "core:time" + +_current_thread_id :: proc "contextless" () -> int { + return os.current_thread_id() +} + +_Mutex :: struct { + mutex: Atomic_Mutex, +} + +_mutex_lock :: proc(m: ^Mutex) { + atomic_mutex_lock(&m.impl.mutex) +} + +_mutex_unlock :: proc(m: ^Mutex) { + atomic_mutex_unlock(&m.impl.mutex) +} + +_mutex_try_lock :: proc(m: ^Mutex) -> bool { + return atomic_mutex_try_lock(&m.impl.mutex) +} + +_Cond :: struct { + cond: Atomic_Cond, +} + +_cond_wait :: proc(c: ^Cond, m: ^Mutex) { + atomic_cond_wait(&c.impl.cond, &m.impl.mutex) +} + +_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool { + return atomic_cond_wait_with_timeout(&c.impl.cond, &m.impl.mutex, duration) +} + +_cond_signal :: proc(c: ^Cond) { + atomic_cond_signal(&c.impl.cond) +} + +_cond_broadcast :: proc(c: ^Cond) { + atomic_cond_broadcast(&c.impl.cond) +} diff --git a/core/sync/sync2/primitives_windows.odin b/core/sync/primitives_windows.odin index ca7a5c9ee..055167892 100644 --- a/core/sync/sync2/primitives_windows.odin +++ b/core/sync/primitives_windows.odin @@ -1,6 +1,6 @@ //+build windows //+private -package sync2 +package sync import "core:time" import win32 "core:sys/windows" diff --git a/core/sync/sync2/sema_internal.odin b/core/sync/sema_internal.odin index 64fc4ed96..e4a3c0bfc 100644 --- a/core/sync/sync2/sema_internal.odin +++ b/core/sync/sema_internal.odin @@ -1,4 +1,5 @@ -package sync2 +//+private +package sync import "core:time" @@ -9,7 +10,7 @@ when #config(ODIN_SYNC_SEMA_USE_FUTEX, true) { } _sema_post :: proc(s: ^Sema, count := 1) { - atomic_add(&s.impl.count, Futex(count)) + atomic_add_explicit(&s.impl.count, Futex(count), .Release) if count == 1 { futex_signal(&s.impl.count) } else { @@ -19,12 +20,12 @@ when #config(ODIN_SYNC_SEMA_USE_FUTEX, true) { _sema_wait :: proc(s: ^Sema) { for { - original_count := atomic_load(&s.impl.count) + original_count := atomic_load_explicit(&s.impl.count, .Relaxed) for original_count == 0 { futex_wait(&s.impl.count, u32(original_count)) original_count = s.impl.count } - if original_count == atomic_compare_exchange_strong(&s.impl.count, original_count-1, original_count) { + if original_count == atomic_compare_exchange_strong_explicit(&s.impl.count, original_count, original_count-1, .Acquire, .Acquire) { return } } @@ -36,7 +37,7 @@ when #config(ODIN_SYNC_SEMA_USE_FUTEX, true) { } for { - original_count := atomic_load(&s.impl.count) + original_count := atomic_load_explicit(&s.impl.count, .Relaxed) for start := time.tick_now(); original_count == 0; /**/ { remaining := duration - time.tick_since(start) if remaining < 0 { @@ -48,7 +49,7 @@ when #config(ODIN_SYNC_SEMA_USE_FUTEX, true) { } original_count = s.impl.count } - if original_count == atomic_compare_exchange_strong(&s.impl.count, original_count-1, original_count) { + if original_count == atomic_compare_exchange_strong_explicit(&s.impl.count, original_count, original_count-1, .Acquire, .Acquire) { return true } } diff --git a/core/sync/sync.odin b/core/sync/sync.odin deleted file mode 100644 index 05c86a868..000000000 --- a/core/sync/sync.odin +++ /dev/null @@ -1,123 +0,0 @@ -package sync - -import "core:intrinsics" - -cpu_relax :: #force_inline proc "contextless" () { - intrinsics.cpu_relax() -} - -Condition_Mutex_Ptr :: union{^Mutex, ^Blocking_Mutex} - - -Ticket_Mutex :: struct { - ticket: u64, - serving: u64, -} - -ticket_mutex_init :: proc(m: ^Ticket_Mutex) { - atomic_store(&m.ticket, 0, .Relaxed) - atomic_store(&m.serving, 0, .Relaxed) -} - -ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) { - ticket := atomic_add(&m.ticket, 1, .Relaxed) - for ticket != atomic_load(&m.serving, .Acquire) { - intrinsics.cpu_relax() - } -} - -ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) { - atomic_add(&m.serving, 1, .Relaxed) -} - - -Benaphore :: struct { - counter: int, - sema: Semaphore, -} - -benaphore_init :: proc(b: ^Benaphore) { - intrinsics.atomic_store(&b.counter, 0) - semaphore_init(&b.sema) -} - -benaphore_destroy :: proc(b: ^Benaphore) { - semaphore_destroy(&b.sema) -} - -benaphore_lock :: proc(b: ^Benaphore) { - if intrinsics.atomic_add_acq(&b.counter, 1) > 1 { - semaphore_wait_for(&b.sema) - } -} - -benaphore_try_lock :: proc(b: ^Benaphore) -> bool { - v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0) - return v == 0 -} - -benaphore_unlock :: proc(b: ^Benaphore) { - if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 { - semaphore_post(&b.sema) - } -} - -Recursive_Benaphore :: struct { - counter: int, - owner: int, - recursion: int, - sema: Semaphore, -} - -recursive_benaphore_init :: proc(b: ^Recursive_Benaphore) { - intrinsics.atomic_store(&b.counter, 0) - semaphore_init(&b.sema) -} - -recursive_benaphore_destroy :: proc(b: ^Recursive_Benaphore) { - semaphore_destroy(&b.sema) -} - -recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) { - tid := current_thread_id() - if intrinsics.atomic_add_acq(&b.counter, 1) > 1 { - if tid != b.owner { - semaphore_wait_for(&b.sema) - } - } - // inside the lock - b.owner = tid - b.recursion += 1 -} - -recursive_benaphore_try_lock :: proc(b: ^Recursive_Benaphore) -> bool { - tid := current_thread_id() - if b.owner == tid { - intrinsics.atomic_add_acq(&b.counter, 1) - } else { - v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0) - if v != 0 { - return false - } - // inside the lock - b.owner = tid - } - b.recursion += 1 - return true -} - -recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) { - tid := current_thread_id() - assert(tid == b.owner) - b.recursion -= 1 - recursion := b.recursion - if recursion == 0 { - b.owner = 0 - } - if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 { - if recursion == 0 { - semaphore_post(&b.sema) - } - } - // outside the lock -} diff --git a/core/sync/sync2/atomic.odin b/core/sync/sync2/atomic.odin deleted file mode 100644 index fe19f17c8..000000000 --- a/core/sync/sync2/atomic.odin +++ /dev/null @@ -1,79 +0,0 @@ -package sync2 - -import "core:intrinsics" - -cpu_relax :: intrinsics.cpu_relax - -atomic_fence :: intrinsics.atomic_fence -atomic_fence_acquire :: intrinsics.atomic_fence_acq -atomic_fence_release :: intrinsics.atomic_fence_rel -atomic_fence_acqrel :: intrinsics.atomic_fence_acqrel - -atomic_store :: intrinsics.atomic_store -atomic_store_release :: intrinsics.atomic_store_rel -atomic_store_relaxed :: intrinsics.atomic_store_relaxed -atomic_store_unordered :: intrinsics.atomic_store_unordered - -atomic_load :: intrinsics.atomic_load -atomic_load_acquire :: intrinsics.atomic_load_acq -atomic_load_relaxed :: intrinsics.atomic_load_relaxed -atomic_load_unordered :: intrinsics.atomic_load_unordered - -atomic_add :: intrinsics.atomic_add -atomic_add_acquire :: intrinsics.atomic_add_acq -atomic_add_release :: intrinsics.atomic_add_rel -atomic_add_acqrel :: intrinsics.atomic_add_acqrel -atomic_add_relaxed :: intrinsics.atomic_add_relaxed -atomic_sub :: intrinsics.atomic_sub -atomic_sub_acquire :: intrinsics.atomic_sub_acq -atomic_sub_release :: intrinsics.atomic_sub_rel -atomic_sub_acqrel :: intrinsics.atomic_sub_acqrel -atomic_sub_relaxed :: intrinsics.atomic_sub_relaxed -atomic_and :: intrinsics.atomic_and -atomic_and_acquire :: intrinsics.atomic_and_acq -atomic_and_release :: intrinsics.atomic_and_rel -atomic_and_acqrel :: intrinsics.atomic_and_acqrel -atomic_and_relaxed :: intrinsics.atomic_and_relaxed -atomic_nand :: intrinsics.atomic_nand -atomic_nand_acquire :: intrinsics.atomic_nand_acq -atomic_nand_release :: intrinsics.atomic_nand_rel -atomic_nand_acqrel :: intrinsics.atomic_nand_acqrel -atomic_nand_relaxed :: intrinsics.atomic_nand_relaxed -atomic_or :: intrinsics.atomic_or -atomic_or_acquire :: intrinsics.atomic_or_acq -atomic_or_release :: intrinsics.atomic_or_rel -atomic_or_acqrel :: intrinsics.atomic_or_acqrel -atomic_or_relaxed :: intrinsics.atomic_or_relaxed -atomic_xor :: intrinsics.atomic_xor -atomic_xor_acquire :: intrinsics.atomic_xor_acq -atomic_xor_release :: intrinsics.atomic_xor_rel -atomic_xor_acqrel :: intrinsics.atomic_xor_acqrel -atomic_xor_relaxed :: intrinsics.atomic_xor_relaxed - -atomic_exchange :: intrinsics.atomic_xchg -atomic_exchange_acquire :: intrinsics.atomic_xchg_acq -atomic_exchange_release :: intrinsics.atomic_xchg_rel -atomic_exchange_acqrel :: intrinsics.atomic_xchg_acqrel -atomic_exchange_relaxed :: intrinsics.atomic_xchg_relaxed - -// Returns value and optional ok boolean -atomic_compare_exchange_strong :: intrinsics.atomic_cxchg -atomic_compare_exchange_strong_acquire :: intrinsics.atomic_cxchg_acq -atomic_compare_exchange_strong_release :: intrinsics.atomic_cxchg_rel -atomic_compare_exchange_strong_acqrel :: intrinsics.atomic_cxchg_acqrel -atomic_compare_exchange_strong_relaxed :: intrinsics.atomic_cxchg_relaxed -atomic_compare_exchange_strong_failrelaxed :: intrinsics.atomic_cxchg_failrelaxed -atomic_compare_exchange_strong_failacquire :: intrinsics.atomic_cxchg_failacq -atomic_compare_exchange_strong_acquire_failrelaxed :: intrinsics.atomic_cxchg_acq_failrelaxed -atomic_compare_exchange_strong_acqrel_failrelaxed :: intrinsics.atomic_cxchg_acqrel_failrelaxed - -// Returns value and optional ok boolean -atomic_compare_exchange_weak :: intrinsics.atomic_cxchgweak -atomic_compare_exchange_weak_acquire :: intrinsics.atomic_cxchgweak_acq -atomic_compare_exchange_weak_release :: intrinsics.atomic_cxchgweak_rel -atomic_compare_exchange_weak_acqrel :: intrinsics.atomic_cxchgweak_acqrel -atomic_compare_exchange_weak_relaxed :: intrinsics.atomic_cxchgweak_relaxed -atomic_compare_exchange_weak_failrelaxed :: intrinsics.atomic_cxchgweak_failrelaxed -atomic_compare_exchange_weak_failacquire :: intrinsics.atomic_cxchgweak_failacq -atomic_compare_exchange_weak_acquire_failrelaxed :: intrinsics.atomic_cxchgweak_acq_failrelaxed -atomic_compare_exchange_weak_acqrel_failrelaxed :: intrinsics.atomic_cxchgweak_acqrel_failrelaxed diff --git a/core/sync/sync2/primitives_internal.odin b/core/sync/sync2/primitives_internal.odin deleted file mode 100644 index ae7e2599c..000000000 --- a/core/sync/sync2/primitives_internal.odin +++ /dev/null @@ -1,184 +0,0 @@ -//+private -package sync2 - -when #config(ODIN_SYNC_RECURSIVE_MUTEX_USE_FUTEX, true) { - _Recursive_Mutex :: struct { - owner: Futex, - recursion: i32, - } - - _recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { - tid := Futex(current_thread_id()) - for { - prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0) - switch prev_owner { - case 0, tid: - m.impl.recursion += 1 - // inside the lock - return - } - - futex_wait(&m.impl.owner, u32(prev_owner)) - } - } - - _recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { - m.impl.recursion -= 1 - if m.impl.recursion != 0 { - return - } - atomic_exchange_release(&m.impl.owner, 0) - - futex_signal(&m.impl.owner) - // outside the lock - - } - - _recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { - tid := Futex(current_thread_id()) - prev_owner := atomic_compare_exchange_strong_acquire(&m.impl.owner, tid, 0) - switch prev_owner { - case 0, tid: - m.impl.recursion += 1 - // inside the lock - return true - } - return false - } -} else { - _Recursive_Mutex :: struct { - owner: int, - recursion: int, - mutex: Mutex, - } - - _recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { - tid := current_thread_id() - if tid != m.impl.owner { - mutex_lock(&m.impl.mutex) - } - // inside the lock - m.impl.owner = tid - m.impl.recursion += 1 - } - - _recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { - tid := current_thread_id() - assert(tid == m.impl.owner) - m.impl.recursion -= 1 - recursion := m.impl.recursion - if recursion == 0 { - m.impl.owner = 0 - } - if recursion == 0 { - mutex_unlock(&m.impl.mutex) - } - // outside the lock - - } - - _recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { - tid := current_thread_id() - if m.impl.owner == tid { - return mutex_try_lock(&m.impl.mutex) - } - if !mutex_try_lock(&m.impl.mutex) { - return false - } - // inside the lock - m.impl.owner = tid - m.impl.recursion += 1 - return true - } -} - - -when ODIN_OS != "windows" { - RW_Mutex_State :: distinct uint - RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2 - RW_Mutex_State_Is_Writing :: RW_Mutex_State(1) - RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1 - RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width - - RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1 - RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width - - - _RW_Mutex :: struct { - // NOTE(bill): pthread_rwlock_t cannot be used since pthread_rwlock_destroy is required on some platforms - // TODO(bill): Can we determine which platforms exactly? - state: RW_Mutex_State, - mutex: Mutex, - sema: Sema, - } - - _rw_mutex_lock :: proc(rw: ^RW_Mutex) { - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Writer) - mutex_lock(&rw.impl.mutex) - - state := atomic_or(&rw.impl.state, RW_Mutex_State_Writer) - if state & RW_Mutex_State_Reader_Mask != 0 { - sema_wait(&rw.impl.sema) - } - } - - _rw_mutex_unlock :: proc(rw: ^RW_Mutex) { - _ = atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing) - mutex_unlock(&rw.impl.mutex) - } - - _rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { - if mutex_try_lock(&rw.impl.mutex) { - state := atomic_load(&rw.impl.state) - if state & RW_Mutex_State_Reader_Mask == 0 { - _ = atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing) - return true - } - - mutex_unlock(&rw.impl.mutex) - } - return false - } - - _rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { - state := atomic_load(&rw.impl.state) - for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { - ok: bool - state, ok = atomic_compare_exchange_weak(&rw.impl.state, state, state + RW_Mutex_State_Reader) - if ok { - return - } - } - - mutex_lock(&rw.impl.mutex) - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader) - mutex_unlock(&rw.impl.mutex) - } - - _rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { - state := atomic_sub(&rw.impl.state, RW_Mutex_State_Reader) - - if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && - (state & RW_Mutex_State_Is_Writing != 0) { - sema_post(&rw.impl.sema) - } - } - - _rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { - state := atomic_load(&rw.impl.state) - if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { - _, ok := atomic_compare_exchange_strong(&rw.impl.state, state, state + RW_Mutex_State_Reader) - if ok { - return true - } - } - if mutex_try_lock(&rw.impl.mutex) { - _ = atomic_add(&rw.impl.state, RW_Mutex_State_Reader) - mutex_unlock(&rw.impl.mutex) - return true - } - - return false - } - -}
\ No newline at end of file diff --git a/core/sync/sync2/primitives_linux.odin b/core/sync/sync2/primitives_linux.odin deleted file mode 100644 index 89ed97985..000000000 --- a/core/sync/sync2/primitives_linux.odin +++ /dev/null @@ -1,9 +0,0 @@ -//+build linux -//+private -package sync2 - -import "core:sys/unix" - -_current_thread_id :: proc "contextless" () -> int { - return unix.sys_gettid() -} diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/sync2/primitives_pthreads.odin deleted file mode 100644 index 8d2c3986d..000000000 --- a/core/sync/sync2/primitives_pthreads.odin +++ /dev/null @@ -1,58 +0,0 @@ -//+build linux, freebsd -//+private -package sync2 - -import "core:time" -import "core:sys/unix" - -_Mutex_State :: enum i32 { - Unlocked = 0, - Locked = 1, - Waiting = 2, -} -_Mutex :: struct { - pthread_mutex: unix.pthread_mutex_t, -} - -_mutex_lock :: proc(m: ^Mutex) { - err := unix.pthread_mutex_lock(&m.impl.pthread_mutex) - assert(err == 0) -} - -_mutex_unlock :: proc(m: ^Mutex) { - err := unix.pthread_mutex_unlock(&m.impl.pthread_mutex) - assert(err == 0) -} - -_mutex_try_lock :: proc(m: ^Mutex) -> bool { - err := unix.pthread_mutex_trylock(&m.impl.pthread_mutex) - return err == 0 -} - -_Cond :: struct { - pthread_cond: unix.pthread_cond_t, -} - -_cond_wait :: proc(c: ^Cond, m: ^Mutex) { - err := unix.pthread_cond_wait(&c.impl.pthread_cond, &m.impl.pthread_mutex) - assert(err == 0) -} - - -_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, duration: time.Duration) -> bool { - tv_sec := i64(duration/1e9) - tv_nsec := i64(duration%1e9) - err := unix.pthread_cond_timedwait(&c.impl.pthread_cond, &m.impl.pthread_mutex, &{tv_sec, tv_nsec}) - return err == 0 -} - - -_cond_signal :: proc(c: ^Cond) { - err := unix.pthread_cond_signal(&c.impl.pthread_cond) - assert(err == 0) -} - -_cond_broadcast :: proc(c: ^Cond) { - err := unix.pthread_cond_broadcast(&c.impl.pthread_cond) - assert(err == 0) -} diff --git a/core/sync/sync_darwin.odin b/core/sync/sync_darwin.odin deleted file mode 100644 index f3bb4d5a3..000000000 --- a/core/sync/sync_darwin.odin +++ /dev/null @@ -1,54 +0,0 @@ -package sync - -import "core:sys/darwin" - -import "core:c" - -foreign import pthread "System.framework" - -current_thread_id :: proc "contextless" () -> int { - tid: u64 - // NOTE(Oskar): available from OSX 10.6 and iOS 3.2. - // For older versions there is `syscall(SYS_thread_selfid)`, but not really - // the same thing apparently. - foreign pthread { pthread_threadid_np :: proc "c" (rawptr, ^u64) -> c.int --- } - pthread_threadid_np(nil, &tid) - return int(tid) -} - - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: darwin.semaphore_t, -} -// TODO(tetra): Only marked with alignment because we cannot mark distinct integers with alignments. -// See core/sys/unix/pthread_linux.odin/pthread_t. - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - ct := darwin.mach_task_self() - res := darwin.semaphore_create(ct, &s.handle, 0, c.int(initial_count)) - assert(res == 0) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - ct := darwin.mach_task_self() - res := darwin.semaphore_destroy(ct, s.handle) - assert(res == 0) - s.handle = {} -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0..<count { - res := darwin.semaphore_signal(s.handle) - assert(res == 0) - } -} - -semaphore_wait_for :: proc(s: ^Semaphore) { - res := darwin.semaphore_wait(s.handle) - assert(res == 0) -} diff --git a/core/sync/sync_freebsd.odin b/core/sync/sync_freebsd.odin deleted file mode 100644 index 240308b7d..000000000 --- a/core/sync/sync_freebsd.odin +++ /dev/null @@ -1,40 +0,0 @@ -package sync - -import "core:sys/unix" -import "core:intrinsics" - - -current_thread_id :: proc "contextless" () -> int { - SYS_GETTID :: 186; - return int(intrinsics.syscall(SYS_GETTID)); -} - - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: unix.sem_t, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0); -} - -semaphore_destroy :: proc(s: ^Semaphore) { - assert(unix.sem_destroy(&s.handle) == 0); - s.handle = {}; -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0..<count { - assert(unix.sem_post(&s.handle) == 0); - } -} - -semaphore_wait_for :: proc(s: ^Semaphore) { - assert(unix.sem_wait(&s.handle) == 0); -} - diff --git a/core/sync/sync_linux.odin b/core/sync/sync_linux.odin deleted file mode 100644 index 340437c11..000000000 --- a/core/sync/sync_linux.odin +++ /dev/null @@ -1,36 +0,0 @@ -package sync - -import "core:sys/unix" - -current_thread_id :: proc "contextless" () -> int { - return unix.sys_gettid() -} - - -// The Darwin docs say it best: -// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. -// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, -// but when there are none left, a thread must wait until another thread returns one. -Semaphore :: struct #align 16 { - handle: unix.sem_t, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - assert(unix.sem_destroy(&s.handle) == 0) - s.handle = {} -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - // NOTE: SPEED: If there's one syscall to do this, we should use it instead of the loop. - for in 0..<count { - assert(unix.sem_post(&s.handle) == 0) - } -} - -semaphore_wait_for :: proc(s: ^Semaphore) { - assert(unix.sem_wait(&s.handle) == 0) -} diff --git a/core/sync/sync_unix.odin b/core/sync/sync_unix.odin deleted file mode 100644 index 114625483..000000000 --- a/core/sync/sync_unix.odin +++ /dev/null @@ -1,248 +0,0 @@ -// +build linux, darwin, freebsd -package sync - -import "core:sys/unix" -import "core:time" - -// A recursive lock that can only be held by one thread at once -Mutex :: struct { - handle: unix.pthread_mutex_t, -} - - -mutex_init :: proc(m: ^Mutex) { - // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. - attrs: unix.pthread_mutexattr_t - assert(unix.pthread_mutexattr_init(&attrs) == 0) - defer unix.pthread_mutexattr_destroy(&attrs) // ignores destruction error - unix.pthread_mutexattr_settype(&attrs, unix.PTHREAD_MUTEX_RECURSIVE) - - assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0) -} - -mutex_destroy :: proc(m: ^Mutex) { - assert(unix.pthread_mutex_destroy(&m.handle) == 0) - m.handle = {} -} - -mutex_lock :: proc(m: ^Mutex) { - assert(unix.pthread_mutex_lock(&m.handle) == 0) -} - -// Returns false if someone else holds the lock. -mutex_try_lock :: proc(m: ^Mutex) -> bool { - return unix.pthread_mutex_trylock(&m.handle) == 0 -} - -mutex_unlock :: proc(m: ^Mutex) { - assert(unix.pthread_mutex_unlock(&m.handle) == 0) -} - - -Blocking_Mutex :: struct { - handle: unix.pthread_mutex_t, -} - - -blocking_mutex_init :: proc(m: ^Blocking_Mutex) { - // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. - attrs: unix.pthread_mutexattr_t - assert(unix.pthread_mutexattr_init(&attrs) == 0) - defer unix.pthread_mutexattr_destroy(&attrs) // ignores destruction error - - assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0) -} - -blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) { - assert(unix.pthread_mutex_destroy(&m.handle) == 0) - m.handle = {} -} - -blocking_mutex_lock :: proc(m: ^Blocking_Mutex) { - assert(unix.pthread_mutex_lock(&m.handle) == 0) -} - -// Returns false if someone else holds the lock. -blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool { - return unix.pthread_mutex_trylock(&m.handle) == 0 -} - -blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) { - assert(unix.pthread_mutex_unlock(&m.handle) == 0) -} - - - -// Blocks until signalled, and then lets past exactly -// one thread. -Condition :: struct { - handle: unix.pthread_cond_t, - mutex: Condition_Mutex_Ptr, - - // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent. - // This means that you may signal the condition before anyone is waiting to cause the - // next thread that tries to wait to just pass by uninterrupted, without sleeping. - // Without this, signalling a condition will only wake up a thread which is already waiting, - // but not one that is about to wait, which can cause your program to become out of sync in - // ways that are hard to debug or fix. - flag: bool, // atomically mutated -} - -condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool { - // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition. - attrs: unix.pthread_condattr_t - if unix.pthread_condattr_init(&attrs) != 0 { - return false - } - defer unix.pthread_condattr_destroy(&attrs) // ignores destruction error - - c.flag = false - c.mutex = mutex - return unix.pthread_cond_init(&c.handle, &attrs) == 0 -} - -condition_destroy :: proc(c: ^Condition) { - assert(unix.pthread_cond_destroy(&c.handle) == 0) - c.handle = {} -} - -// Awaken exactly one thread who is waiting on the condition -condition_signal :: proc(c: ^Condition) -> bool { - switch m in c.mutex { - case ^Mutex: - mutex_lock(m) - defer mutex_unlock(m) - atomic_swap(&c.flag, true, .Sequentially_Consistent) - return unix.pthread_cond_signal(&c.handle) == 0 - case ^Blocking_Mutex: - blocking_mutex_lock(m) - defer blocking_mutex_unlock(m) - atomic_swap(&c.flag, true, .Sequentially_Consistent) - return unix.pthread_cond_signal(&c.handle) == 0 - } - return false -} - -// Awaken all threads who are waiting on the condition -condition_broadcast :: proc(c: ^Condition) -> bool { - return unix.pthread_cond_broadcast(&c.handle) == 0 -} - -// Wait for the condition to be signalled. -// Does not block if the condition has been signalled and no one -// has waited on it yet. -condition_wait_for :: proc(c: ^Condition) -> bool { - switch m in c.mutex { - case ^Mutex: - mutex_lock(m) - defer mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - for { - if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - - case ^Blocking_Mutex: - blocking_mutex_lock(m) - defer blocking_mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - for { - if unix.pthread_cond_wait(&c.handle, &m.handle) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - } - return false -} - -// Wait for the condition to be signalled. -// Does not block if the condition has been signalled and no one -// has waited on it yet. -condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool { - switch m in c.mutex { - case ^Mutex: - mutex_lock(m) - defer mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - - ns := time.duration_nanoseconds(duration) - timeout: time.TimeSpec - timeout.tv_sec = ns / 1e9 - timeout.tv_nsec = ns % 1e9 - - for { - if unix.pthread_cond_timedwait(&c.handle, &m.handle, &timeout) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - - case ^Blocking_Mutex: - blocking_mutex_lock(m) - defer blocking_mutex_unlock(m) - // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, - // the thread that gets signalled and wakes up, discovers that the flag was taken and goes - // back to sleep. - // Though this overall behavior is the most sane, there may be a better way to do this that means that - // the first thread to wait, gets the flag first. - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - - ns := time.duration_nanoseconds(duration) - - timeout: time.TimeSpec - timeout.tv_sec = ns / 1e9 - timeout.tv_nsec = ns % 1e9 - - for { - if unix.pthread_cond_timedwait(&c.handle, &m.handle, &timeout) != 0 { - return false - } - if atomic_swap(&c.flag, false, .Sequentially_Consistent) { - return true - } - } - return false - } - return false -} - - - -thread_yield :: proc() { - unix.sched_yield() -} diff --git a/core/sync/sync2/sync_util.odin b/core/sync/sync_util.odin index 853c3c685..4add9064d 100644 --- a/core/sync/sync2/sync_util.odin +++ b/core/sync/sync_util.odin @@ -1,12 +1,11 @@ -package sync2 - - -// Example: -// -// if guard(&m) { -// ... -// } -// +package sync + +/* +Example: + if guard(&m) { + ... + } +*/ guard :: proc{ mutex_guard, rw_mutex_guard, @@ -17,13 +16,12 @@ guard :: proc{ atomic_recursive_mutex_guard, atomic_rw_mutex_guard, } - -// Example: -// -// if shared_guard(&m) { -// ... -// } -// +/* +Example: + if shared_guard(&m) { + ... + } +*/ shared_guard :: proc{ rw_mutex_shared_guard, atomic_rw_mutex_shared_guard, diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin deleted file mode 100644 index 0a7cf71b2..000000000 --- a/core/sync/sync_windows.odin +++ /dev/null @@ -1,180 +0,0 @@ -// +build windows -package sync - -import win32 "core:sys/windows" -import "core:time" - -current_thread_id :: proc "contextless" () -> int { - return int(win32.GetCurrentThreadId()) -} - - -// When waited upon, blocks until the internal count is greater than zero, then subtracts one. -// Posting to the semaphore increases the count by one, or the provided amount. -Semaphore :: struct { - _handle: win32.HANDLE, -} - -semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { - s._handle = win32.CreateSemaphoreW(nil, i32(initial_count), 1<<31-1, nil) -} - -semaphore_destroy :: proc(s: ^Semaphore) { - win32.CloseHandle(s._handle) -} - -semaphore_post :: proc(s: ^Semaphore, count := 1) { - win32.ReleaseSemaphore(s._handle, i32(count), nil) -} - -semaphore_wait_for :: proc(s: ^Semaphore) { - // NOTE(tetra, 2019-10-30): wait_for_single_object decrements the count before it returns. - result := win32.WaitForSingleObject(s._handle, win32.INFINITE) - assert(result != win32.WAIT_FAILED) -} - - -Mutex :: struct { - _critical_section: win32.CRITICAL_SECTION, -} - - -mutex_init :: proc(m: ^Mutex, spin_count := 0) { - win32.InitializeCriticalSectionAndSpinCount(&m._critical_section, u32(spin_count)) -} - -mutex_destroy :: proc(m: ^Mutex) { - win32.DeleteCriticalSection(&m._critical_section) -} - -mutex_lock :: proc(m: ^Mutex) { - win32.EnterCriticalSection(&m._critical_section) -} - -mutex_try_lock :: proc(m: ^Mutex) -> bool { - return bool(win32.TryEnterCriticalSection(&m._critical_section)) -} - -mutex_unlock :: proc(m: ^Mutex) { - win32.LeaveCriticalSection(&m._critical_section) -} - -Blocking_Mutex :: struct { - _handle: win32.SRWLOCK, -} - - -blocking_mutex_init :: proc(m: ^Blocking_Mutex) { - win32.InitializeSRWLock(&m._handle) -} - -blocking_mutex_destroy :: proc(m: ^Blocking_Mutex) { - // -} - -blocking_mutex_lock :: proc(m: ^Blocking_Mutex) { - win32.AcquireSRWLockExclusive(&m._handle) -} - -blocking_mutex_try_lock :: proc(m: ^Blocking_Mutex) -> bool { - return bool(win32.TryAcquireSRWLockExclusive(&m._handle)) -} - -blocking_mutex_unlock :: proc(m: ^Blocking_Mutex) { - win32.ReleaseSRWLockExclusive(&m._handle) -} - - -// Blocks until signalled. -// When signalled, awakens exactly one waiting thread. -Condition :: struct { - _handle: win32.CONDITION_VARIABLE, - - mutex: Condition_Mutex_Ptr, -} - - -condition_init :: proc(c: ^Condition, mutex: Condition_Mutex_Ptr) -> bool { - assert(mutex != nil) - win32.InitializeConditionVariable(&c._handle) - c.mutex = mutex - return true -} - -condition_destroy :: proc(c: ^Condition) { - // -} - -condition_signal :: proc(c: ^Condition) -> bool { - if c._handle.ptr == nil { - return false - } - win32.WakeConditionVariable(&c._handle) - return true -} - -condition_broadcast :: proc(c: ^Condition) -> bool { - if c._handle.ptr == nil { - return false - } - win32.WakeAllConditionVariable(&c._handle) - return true -} - -condition_wait_for :: proc(c: ^Condition) -> bool { - switch m in &c.mutex { - case ^Mutex: - return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, win32.INFINITE) - case ^Blocking_Mutex: - return cast(bool)win32.SleepConditionVariableSRW(&c._handle, &m._handle, win32.INFINITE, 0) - } - return false -} -condition_wait_for_timeout :: proc(c: ^Condition, duration: time.Duration) -> bool { - ms := win32.DWORD((max(time.duration_nanoseconds(duration), 0) + 999999)/1000000) - switch m in &c.mutex { - case ^Mutex: - return cast(bool)win32.SleepConditionVariableCS(&c._handle, &m._critical_section, ms) - case ^Blocking_Mutex: - return cast(bool)win32.SleepConditionVariableSRW(&c._handle, &m._handle, ms, 0) - } - return false -} - - - - -RW_Lock :: struct { - _handle: win32.SRWLOCK, -} - -rw_lock_init :: proc(l: ^RW_Lock) { - l._handle = win32.SRWLOCK_INIT -} -rw_lock_destroy :: proc(l: ^RW_Lock) { - // -} -rw_lock_read :: proc(l: ^RW_Lock) { - win32.AcquireSRWLockShared(&l._handle) -} -rw_lock_try_read :: proc(l: ^RW_Lock) -> bool { - return bool(win32.TryAcquireSRWLockShared(&l._handle)) -} -rw_lock_write :: proc(l: ^RW_Lock) { - win32.AcquireSRWLockExclusive(&l._handle) -} -rw_lock_try_write :: proc(l: ^RW_Lock) -> bool { - return bool(win32.TryAcquireSRWLockExclusive(&l._handle)) -} -rw_lock_read_unlock :: proc(l: ^RW_Lock) { - win32.ReleaseSRWLockShared(&l._handle) -} -rw_lock_write_unlock :: proc(l: ^RW_Lock) { - win32.ReleaseSRWLockExclusive(&l._handle) -} - - -thread_yield :: proc() { - win32.SwitchToThread() -} - diff --git a/core/sync/wait_group.odin b/core/sync/wait_group.odin deleted file mode 100644 index 63d882ed1..000000000 --- a/core/sync/wait_group.odin +++ /dev/null @@ -1,58 +0,0 @@ -package sync - -import "core:intrinsics" - -Wait_Group :: struct { - counter: int, - mutex: Blocking_Mutex, - cond: Condition, -} - -wait_group_init :: proc(wg: ^Wait_Group) { - wg.counter = 0 - blocking_mutex_init(&wg.mutex) - condition_init(&wg.cond, &wg.mutex) -} - - -wait_group_destroy :: proc(wg: ^Wait_Group) { - condition_destroy(&wg.cond) - blocking_mutex_destroy(&wg.mutex) -} - -wait_group_add :: proc(wg: ^Wait_Group, delta: int) { - if delta == 0 { - return - } - - blocking_mutex_lock(&wg.mutex) - defer blocking_mutex_unlock(&wg.mutex) - - intrinsics.atomic_add(&wg.counter, delta) - if wg.counter < 0 { - panic("sync.Wait_Group negative counter") - } - if wg.counter == 0 { - condition_broadcast(&wg.cond) - if wg.counter != 0 { - panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") - } - } -} - -wait_group_done :: proc(wg: ^Wait_Group) { - wait_group_add(wg, -1) -} - -wait_group_wait :: proc(wg: ^Wait_Group) { - blocking_mutex_lock(&wg.mutex) - defer blocking_mutex_unlock(&wg.mutex) - - if wg.counter != 0 { - condition_wait_for(&wg.cond) - if wg.counter != 0 { - panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") - } - } -} - |