diff options
| author | gingerBill <bill@gingerbill.org> | 2021-04-11 15:18:28 +0100 |
|---|---|---|
| committer | gingerBill <bill@gingerbill.org> | 2021-04-11 15:18:28 +0100 |
| commit | 2db1fe74299766c9a29a33c39299d07e12556bb2 (patch) | |
| tree | 00abf6383909f56c437a91a5da911ed39f337230 /core/sync/sync2 | |
| parent | 5bc9e4e4f78206d9a46a32505b27c314d93e61ff (diff) | |
New redesign of core:sync (stored under core:sync/sync2 for the time being)
Diffstat (limited to 'core/sync/sync2')
| -rw-r--r-- | core/sync/sync2/atomic.odin | 170 | ||||
| -rw-r--r-- | core/sync/sync2/channel.odin | 887 | ||||
| -rw-r--r-- | core/sync/sync2/channel_unix.odin | 17 | ||||
| -rw-r--r-- | core/sync/sync2/channel_windows.odin | 35 | ||||
| -rw-r--r-- | core/sync/sync2/extended.odin | 215 | ||||
| -rw-r--r-- | core/sync/sync2/primitives.odin | 185 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_atomic.odin | 244 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_pthreads.odin | 155 | ||||
| -rw-r--r-- | core/sync/sync2/primitives_windows.odin | 73 |
9 files changed, 1981 insertions, 0 deletions
diff --git a/core/sync/sync2/atomic.odin b/core/sync/sync2/atomic.odin new file mode 100644 index 000000000..8240c0fcd --- /dev/null +++ b/core/sync/sync2/atomic.odin @@ -0,0 +1,170 @@ +package sync2 + +import "intrinsics" + +// TODO(bill): Is this even a good design? The intrinsics seem to be more than good enough and just as clean + +Ordering :: enum { + Relaxed, // Monotonic + Release, + Acquire, + Acquire_Release, + Sequentially_Consistent, +} + +strongest_failure_ordering_table := [Ordering]Ordering{ + .Relaxed = .Relaxed, + .Release = .Relaxed, + .Acquire = .Acquire, + .Acquire_Release = .Acquire, + .Sequentially_Consistent = .Sequentially_Consistent, +}; + +strongest_failure_ordering :: #force_inline proc(order: Ordering) -> Ordering { + return strongest_failure_ordering_table[order]; +} + +fence :: #force_inline proc($order: Ordering) { + when order == .Relaxed { #panic("there is no such thing as a relaxed fence"); } + else when order == .Release { intrinsics.atomic_fence_rel(); } + else when order == .Acquire { intrinsics.atomic_fence_acq(); } + else when order == .Acquire_Release { intrinsics.atomic_fence_acqrel(); } + else when order == .Sequentially_Consistent { intrinsics.atomic_fence(); } + else { #panic("unknown order"); } +} + + +atomic_store :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) { + when order == .Relaxed { intrinsics.atomic_store_relaxed(dst, val); } + else when order == .Release { intrinsics.atomic_store_rel(dst, val); } + else when order == .Sequentially_Consistent { intrinsics.atomic_store(dst, val); } + else when order == .Acquire { #panic("there is not such thing as an acquire store"); } + else when order == .Acquire_Release { #panic("there is not such thing as an acquire/release store"); } + else { #panic("unknown order"); } +} + +atomic_load :: #force_inline proc(dst: ^$T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_load_relaxed(dst); } + else when order == .Acquire { return intrinsics.atomic_load_acq(dst); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_load(dst); } + else when order == .Release { #panic("there is no such thing as a release load"); } + else when order == .Acquire_Release { #panic("there is no such thing as an acquire/release load"); } + else { #panic("unknown order"); } +} + +atomic_exchange :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_xchg_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_xchg_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_xchg_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_xchg_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_xchg(dst, val); } + else { #panic("unknown order"); } +} + +atomic_compare_exchange :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) { + when failure == .Relaxed { + when success == .Relaxed { return intrinsics.atomic_cxchg_relaxed(dst, old, new); } + else when success == .Acquire { return intrinsics.atomic_cxchg_acq_failrelaxed(dst, old, new); } + else when success == .Acquire_Release { return intrinsics.atomic_cxchg_acqrel_failrelaxed(dst, old, new); } + else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg_failrelaxed(dst, old, new); } + else when success == .Release { return intrinsics.atomic_cxchg_rel(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else when failure == .Acquire { + when success == .Release { return intrinsics.atomic_cxchg_acqrel(dst, old, new); } + else when success == .Acquire { return intrinsics.atomic_cxchg_acq(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else when failure == .Sequentially_Consistent { + when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else when failure == .Acquire_Release { + #panic("there is not such thing as an acquire/release failure ordering"); + } else when failure == .Release { + when success == .Acquire { return instrinsics.atomic_cxchg_failacq(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else { + return T{}, false; + } + +} + +atomic_compare_exchange_weak :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) { + when failure == .Relaxed { + when success == .Relaxed { return intrinsics.atomic_cxchgweak_relaxed(dst, old, new); } + else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq_failrelaxed(dst, old, new); } + else when success == .Acquire_Release { return intrinsics.atomic_cxchgweak_acqrel_failrelaxed(dst, old, new); } + else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak_failrelaxed(dst, old, new); } + else when success == .Release { return intrinsics.atomic_cxchgweak_rel(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else when failure == .Acquire { + when success == .Release { return intrinsics.atomic_cxchgweak_acqrel(dst, old, new); } + else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else when failure == .Sequentially_Consistent { + when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else when failure == .Acquire_Release { + #panic("there is not such thing as an acquire/release failure ordering"); + } else when failure == .Release { + when success == .Acquire { return intrinsics.atomic_cxchgweak_failacq(dst, old, new); } + else { #panic("an unknown ordering combination"); } + } else { + return T{}, false; + } + +} + + +atomic_add :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_add_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_add_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_add_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_add_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_add(dst, val); } + else { #panic("unknown order"); } +} + +atomic_sub :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_sub_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_sub_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_sub_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_sub_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_sub(dst, val); } + else { #panic("unknown order"); } +} + +atomic_and :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_and_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_and_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_and_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_and_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_and(dst, val); } + else { #panic("unknown order"); } +} + +atomic_nand :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_nand_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_nand_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_nand_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_nand_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_nand(dst, val); } + else { #panic("unknown order"); } +} + +atomic_or :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_or_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_or_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_or_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_or_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_or(dst, val); } + else { #panic("unknown order"); } +} + +atomic_xor :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T { + when order == .Relaxed { return intrinsics.atomic_xor_relaxed(dst, val); } + else when order == .Release { return intrinsics.atomic_xor_rel(dst, val); } + else when order == .Acquire { return intrinsics.atomic_xor_acq(dst, val); } + else when order == .Acquire_Release { return intrinsics.atomic_xor_acqrel(dst, val); } + else when order == .Sequentially_Consistent { return intrinsics.atomic_xor(dst, val); } + else { #panic("unknown order"); } +} + diff --git a/core/sync/sync2/channel.odin b/core/sync/sync2/channel.odin new file mode 100644 index 000000000..782b1d86a --- /dev/null +++ b/core/sync/sync2/channel.odin @@ -0,0 +1,887 @@ +package sync2 + +// TODO(bill): The Channel implementation needs a complete rewrite for this new package sync design +// Especially how the `select` things work + +import "core:mem" +import "core:time" +import "intrinsics" +import "core:math/rand" + +_, _ :: time, rand; + +Channel_Direction :: enum i8 { + Both = 0, + Send = +1, + Recv = -1, +} + +Channel :: struct(T: typeid, Direction := Channel_Direction.Both) { + using _internal: ^Raw_Channel, +} + +channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) { + context.allocator = allocator; + ch._internal = raw_channel_create(size_of(T), align_of(T), cap); + return; +} + +channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) { + context.allocator = allocator; + ch._internal = raw_channel_create(size_of(T), align_of(T), cap); + return; +} + +channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) { + context.allocator = allocator; + ch._internal = raw_channel_create(size_of(T), align_of(T), cap); + return; +} +channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) { + context.allocator = allocator; + ch._internal = raw_channel_create(size_of(T), align_of(T), cap); + return; +} + +channel_destroy :: proc(ch: $C/Channel($T, $D)) { + raw_channel_destroy(ch._internal); +} + +channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) { + res._internal = ch._internal; + return; +} + +channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) { + res._internal = ch._internal; + return; +} + + +channel_len :: proc(ch: $C/Channel($T, $D)) -> int { + return ch._internal.len if ch._internal != nil else 0; +} +channel_cap :: proc(ch: $C/Channel($T, $D)) -> int { + return ch._internal.cap if ch._internal != nil else 0; +} + + +channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both { + msg := msg; + _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc); +} +channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both { + msg := msg; + return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc); +} + +channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both { + c := ch._internal; + if c == nil { + panic(message="cannot recv message; channel is nil", loc=loc); + } + mutex_lock(&c.mutex); + raw_channel_recv_impl(c, &msg, loc); + mutex_unlock(&c.mutex); + return; +} +channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both { + c := ch._internal; + if c != nil && mutex_try_lock(&c.mutex) { + if c.len > 0 { + raw_channel_recv_impl(c, &msg, loc); + ok = true; + } + mutex_unlock(&c.mutex); + } + return; +} +channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both { + res: T; + res, ok = channel_try_recv(ch, loc); + if ok && msg != nil { + msg^ = res; + } + return; +} + + +channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool { + return ch._internal == nil; +} +channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool { + c := ch._internal; + return c != nil && !c.closed; +} + + +channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool { + return a._internal == b._internal; +} +channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool { + return a._internal != b._internal; +} + + +channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both { + return raw_channel_can_send(ch._internal); +} +channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both { + return raw_channel_can_recv(ch._internal); +} + + +channel_peek :: proc(ch: $C/Channel($T, $D)) -> int { + c := ch._internal; + if c == nil { + return -1; + } + if intrinsics.atomic_load(&c.closed) { + return -1; + } + return intrinsics.atomic_load(&c.len); +} + + +channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) { + raw_channel_close(ch._internal, loc); +} + + +channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both { + c := ch._internal; + if c == nil { + return; + } + + if !c.closed || c.len > 0 { + msg, ok = channel_recv(ch), true; + } + return; +} +channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both { + raw_channel_drain(ch._internal); +} + + +channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both { + for msg in channel_iterator(src) { + channel_send(dst, msg); + } +} + + +Raw_Channel_Wait_Queue :: struct { + next: ^Raw_Channel_Wait_Queue, + state: ^uintptr, +} + + +Raw_Channel :: struct { + closed: bool, + ready: bool, // ready to recv + data_offset: u16, // data is stored at the end of this data structure + elem_size: u32, + len, cap: int, + read, write: int, + mutex: Mutex, + cond: Cond, + allocator: mem.Allocator, + + sendq: ^Raw_Channel_Wait_Queue, + recvq: ^Raw_Channel_Wait_Queue, +} + +raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { + val.next = head^; + head^ = val; +} +raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) { + p := head; + for p^ != nil && p^ != val { + p = &p^.next; + } + if p != nil { + p^ = p^.next; + } +} + + +raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel { + assert(int(u32(elem_size)) == elem_size); + + s := size_of(Raw_Channel); + s = mem.align_forward_int(s, elem_align); + data_offset := uintptr(s); + s += elem_size * max(cap, 1); + + a := max(elem_align, align_of(Raw_Channel)); + + c := (^Raw_Channel)(mem.alloc(s, a)); + if c == nil { + return nil; + } + + c.data_offset = u16(data_offset); + c.elem_size = u32(elem_size); + c.len, c.cap = 0, max(cap, 0); + c.read, c.write = 0, 0; + c.allocator = context.allocator; + c.closed = false; + + return c; +} + + +raw_channel_destroy :: proc(c: ^Raw_Channel) { + if c == nil { + return; + } + context.allocator = c.allocator; + intrinsics.atomic_store(&c.closed, true); + free(c); +} + +raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) { + if c == nil { + panic(message="cannot close nil channel", loc=loc); + } + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + intrinsics.atomic_store(&c.closed, true); + + // Release readers and writers + raw_channel_wait_queue_broadcast(c.recvq); + raw_channel_wait_queue_broadcast(c.sendq); + cond_broadcast(&c.cond); +} + + + +raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool { + send :: proc(c: ^Raw_Channel, src: rawptr) { + data := uintptr(c) + uintptr(c.data_offset); + dst := data + uintptr(c.write * int(c.elem_size)); + mem.copy(rawptr(dst), src, int(c.elem_size)); + c.len += 1; + c.write = (c.write + 1) % max(c.cap, 1); + } + + switch { + case c == nil: + panic(message="cannot send message; channel is nil", loc=loc); + case c.closed: + panic(message="cannot send message; channel is closed", loc=loc); + } + + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + + if c.cap > 0 { + if !block && c.len >= c.cap { + return false; + } + + for c.len >= c.cap { + cond_wait(&c.cond, &c.mutex); + } + } else if c.len > 0 { // TODO(bill): determine correct behaviour + if !block { + return false; + } + cond_wait(&c.cond, &c.mutex); + } else if c.len == 0 && !block { + return false; + } + + send(c, msg); + cond_signal(&c.cond); + raw_channel_wait_queue_signal(c.recvq); + + return true; +} + +raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) { + recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) { + if c.len < 1 { + panic(message="cannot recv message; channel is empty", loc=loc); + } + c.len -= 1; + + data := uintptr(c) + uintptr(c.data_offset); + src := data + uintptr(c.read * int(c.elem_size)); + mem.copy(dst, rawptr(src), int(c.elem_size)); + c.read = (c.read + 1) % max(c.cap, 1); + } + + if c == nil { + panic(message="cannot recv message; channel is nil", loc=loc); + } + intrinsics.atomic_store(&c.ready, true); + for c.len < 1 { + raw_channel_wait_queue_signal(c.sendq); + cond_wait(&c.cond, &c.mutex); + } + intrinsics.atomic_store(&c.ready, false); + recv(c, res, loc); + if c.cap > 0 { + if c.len == c.cap - 1 { + // NOTE(bill): Only signal on the last one + cond_signal(&c.cond); + } + } else { + cond_signal(&c.cond); + } +} + + +raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) { + if c == nil { + return false; + } + mutex_lock(&c.mutex); + switch { + case c.closed: + ok = false; + case c.cap > 0: + ok = c.ready && c.len < c.cap; + case: + ok = c.ready && c.len == 0; + } + mutex_unlock(&c.mutex); + return; +} +raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) { + if c == nil { + return false; + } + mutex_lock(&c.mutex); + ok = c.len > 0; + mutex_unlock(&c.mutex); + return; +} + + +raw_channel_drain :: proc(c: ^Raw_Channel) { + if c == nil { + return; + } + mutex_lock(&c.mutex); + c.len = 0; + c.read = 0; + c.write = 0; + mutex_unlock(&c.mutex); +} + + + +MAX_SELECT_CHANNELS :: 64; +SELECT_MAX_TIMEOUT :: max(time.Duration); + +Select_Command :: enum { + Recv, + Send, +} + +Select_Channel :: struct { + channel: ^Raw_Channel, + command: Select_Command, +} + + + +select :: proc(channels: ..Select_Channel) -> (index: int) { + return select_timeout(SELECT_MAX_TIMEOUT, ..channels); +} +select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + if c.channel == nil { + continue; + } + switch c.command { + case .Recv: + if raw_channel_can_recv(c.channel) { + candidates[count] = i; + count += 1; + } + case .Send: + if raw_channel_can_send(c.channel) { + candidates[count] = i; + count += 1; + } + } + } + + if count == 0 { + wait_state: uintptr = 0; + for _, i in channels { + q := &queues[i]; + q.state = &wait_state; + } + + for c, i in channels { + if c.channel == nil { + continue; + } + q := &queues[i]; + switch c.command { + case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q); + case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q); + } + } + raw_channel_wait_queue_wait_on(&wait_state, timeout); + for c, i in channels { + if c.channel == nil { + continue; + } + q := &queues[i]; + switch c.command { + case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q); + case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q); + } + } + + for c, i in channels { + switch c.command { + case .Recv: + if raw_channel_can_recv(c.channel) { + candidates[count] = i; + count += 1; + } + case .Send: + if raw_channel_can_send(c.channel) { + candidates[count] = i; + count += 1; + } + } + } + if count == 0 && timeout == SELECT_MAX_TIMEOUT { + index = -1; + return; + } + + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.recvq, q); + } + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.recvq, q); + } + + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.recvq, q); + } + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.recvq, q); + } + + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + msg = channel_recv(channels[index]); + + return; +} + +select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.recvq, q); + } + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.recvq, q); + } + + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + + if msg != nil { + channel_send(channels[index], msg); + } + + return; +} + +select_send :: proc(channels: ..^Raw_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue; + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + state: uintptr; + for c, i in channels { + q := &queues[i]; + q.state = &state; + raw_channel_wait_queue_insert(&c.sendq, q); + } + raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT); + for c, i in channels { + q := &queues[i]; + raw_channel_wait_queue_remove(&c.sendq, q); + } + + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + assert(count != 0); + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_try :: proc(channels: ..Select_Channel) -> (index: int) { + switch len(channels) { + case 0: + panic("sync: select with no channels"); + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + + backing: [MAX_SELECT_CHANNELS]int; + candidates := backing[:]; + cap := len(channels); + candidates = candidates[:cap]; + + count := u32(0); + for c, i in channels { + switch c.command { + case .Recv: + if raw_channel_can_recv(c.channel) { + candidates[count] = i; + count += 1; + } + case .Send: + if raw_channel_can_send(c.channel) { + candidates[count] = i; + count += 1; + } + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + + +select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) { + switch len(channels) { + case 0: + index = -1; + return; + case 1: + index = -1; + if raw_channel_can_recv(channels[0]) { + index = 0; + } + return; + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + + +select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check { + switch len(channels) { + case 0: + return -1; + case 1: + if raw_channel_can_send(channels[0]) { + return 0; + } + return -1; + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + return; +} + +select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) { + switch len(channels) { + case 0: + index = -1; + return; + case 1: + ok: bool; + if msg, ok = channel_try_recv(channels[0]); ok { + index = 0; + } + return; + } + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if channel_can_recv(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + msg = channel_recv(channels[index]); + return; +} + +select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) { + index = -1; + switch len(channels) { + case 0: + return; + case 1: + if channel_try_send(channels[0], msg) { + index = 0; + } + return; + } + + + assert(len(channels) <= MAX_SELECT_CHANNELS); + candidates: [MAX_SELECT_CHANNELS]int; + + count := u32(0); + for c, i in channels { + if raw_channel_can_send(c) { + candidates[count] = i; + count += 1; + } + } + + if count == 0 { + index = -1; + return; + } + + t := time.now(); + r := rand.create(transmute(u64)t); + i := rand.uint32(&r); + + index = candidates[i % count]; + channel_send(channels[index], msg); + return; +} + diff --git a/core/sync/sync2/channel_unix.odin b/core/sync/sync2/channel_unix.odin new file mode 100644 index 000000000..7429b67db --- /dev/null +++ b/core/sync/sync2/channel_unix.odin @@ -0,0 +1,17 @@ +//+build linux, darwin, freebsd +//+private +package sync2 + +import "core:time" + +raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { + // stub +} + +raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { + // stub +} + +raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { + // stub +} diff --git a/core/sync/sync2/channel_windows.odin b/core/sync/sync2/channel_windows.odin new file mode 100644 index 000000000..a38a9cc2c --- /dev/null +++ b/core/sync/sync2/channel_windows.odin @@ -0,0 +1,35 @@ +//+build windows +//+private +package sync2 + +import "intrinsics" +import win32 "core:sys/windows" +import "core:time" + +raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) { + ms: win32.DWORD = win32.INFINITE; + if max(time.Duration) != SELECT_MAX_TIMEOUT { + ms = win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000); + } + + v := intrinsics.atomic_load(state); + for v == 0 { + win32.WaitOnAddress(state, &v, size_of(state^), ms); + v = intrinsics.atomic_load(state); + } + intrinsics.atomic_store(state, 0); +} + +raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) { + for x := q; x != nil; x = x.next { + intrinsics.atomic_add(x.state, 1); + win32.WakeByAddressSingle(x.state); + } +} + +raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) { + for x := q; x != nil; x = x.next { + intrinsics.atomic_add(x.state, 1); + win32.WakeByAddressAll(x.state); + } +} diff --git a/core/sync/sync2/extended.odin b/core/sync/sync2/extended.odin new file mode 100644 index 000000000..3c439b225 --- /dev/null +++ b/core/sync/sync2/extended.odin @@ -0,0 +1,215 @@ +package sync2 + +import "core:runtime" +import "intrinsics" + +// A Wait_Group waits for a collection of threads to finish +// +// A Wait_Group must not be copied after first use +Wait_Group :: struct { + counter: int, + mutex: Mutex, + cond: Cond, +} + +wait_group_add :: proc(wg: ^Wait_Group, delta: int) { + if delta == 0 { + return; + } + + mutex_lock(&wg.mutex); + defer mutex_unlock(&wg.mutex); + + intrinsics.atomic_add(&wg.counter, delta); + if wg.counter < 0 { + panic("sync.Wait_Group negative counter"); + } + if wg.counter == 0 { + cond_broadcast(&wg.cond); + if wg.counter != 0 { + panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait"); + } + } +} + +wait_group_done :: proc(wg: ^Wait_Group) { + wait_group_add(wg, -1); +} + +wait_group_wait :: proc(wg: ^Wait_Group) { + mutex_lock(&wg.mutex); + defer mutex_unlock(&wg.mutex); + + if wg.counter != 0 { + cond_wait(&wg.cond, &wg.mutex); + if wg.counter != 0 { + panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait"); + } + } +} + + + +// A barrier enabling multiple threads to synchronize the beginning of some computation +/* + * Example: + * + * package example + * + * import "core:fmt" + * import "core:sync" + * import "core:thread" + * + * barrier := &sync.Barrier{}; + * + * main :: proc() { + * fmt.println("Start"); + * + * THREAD_COUNT :: 4; + * threads: [THREAD_COUNT]^thread.Thread; + * + * sync.barrier_init(barrier, THREAD_COUNT); + * defer sync.barrier_destroy(barrier); + * + * + * for _, i in threads { + * threads[i] = thread.create_and_start(proc(t: ^thread.Thread) { + * // Same messages will be printed together but without any interleaving + * fmt.println("Getting ready!"); + * sync.barrier_wait(barrier); + * fmt.println("Off their marks they go!"); + * }); + * } + * + * for t in threads { + * thread.destroy(t); // join and free thread + * } + * fmt.println("Finished"); + * } + * + */ +Barrier :: struct { + mutex: Mutex, + cond: Cond, + index: int, + generation_id: int, + thread_count: int, +} + +barrier_init :: proc(b: ^Barrier, thread_count: int) { + b.index = 0; + b.generation_id = 0; + b.thread_count = thread_count; +} + +// Block the current thread until all threads have rendezvoused +// Barrier can be reused after all threads rendezvoused once, and can be used continuously +barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) { + mutex_lock(&b.mutex); + defer mutex_unlock(&b.mutex); + local_gen := b.generation_id; + b.index += 1; + if b.index < b.thread_count { + for local_gen == b.generation_id && b.index < b.thread_count { + cond_wait(&b.cond, &b.mutex); + } + return false; + } + + b.index = 0; + b.generation_id += 1; + cond_broadcast(&b.cond); + return true; +} + + + +Ticket_Mutex :: struct { + ticket: uint, + serving: uint, +} + +ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) { + ticket := intrinsics.atomic_add_relaxed(&m.ticket, 1); + for ticket != intrinsics.atomic_load_acq(&m.serving) { + intrinsics.cpu_relax(); + } +} + +ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) { + intrinsics.atomic_add_relaxed(&m.serving, 1); +} + + + +Benaphore :: struct { + counter: int, + sema: Sema, +} + +benaphore_lock :: proc(b: ^Benaphore) { + if intrinsics.atomic_add_acq(&b.counter, 1) > 1 { + sema_wait(&b.sema); + } +} + +benaphore_try_lock :: proc(b: ^Benaphore) -> bool { + v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0); + return v == 0; +} + +benaphore_unlock :: proc(b: ^Benaphore) { + if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 { + sema_post(&b.sema); + } +} + +Recursive_Benaphore :: struct { + counter: int, + owner: int, + recursion: int, + sema: Sema, +} + +recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) { + tid := runtime.current_thread_id(); + if intrinsics.atomic_add_acq(&b.counter, 1) > 1 { + if tid != b.owner { + sema_wait(&b.sema); + } + } + // inside the lock + b.owner = tid; + b.recursion += 1; +} + +recursive_benaphore_try_lock :: proc(b: ^Recursive_Benaphore) -> bool { + tid := runtime.current_thread_id(); + if b.owner == tid { + intrinsics.atomic_add_acq(&b.counter, 1); + } + + if v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0); v != 0 { + return false; + } + // inside the lock + b.owner = tid; + b.recursion += 1; + return true; +} + +recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) { + tid := runtime.current_thread_id(); + assert(tid == b.owner); + b.recursion -= 1; + recursion := b.recursion; + if recursion == 0 { + b.owner = 0; + } + if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 { + if recursion == 0 { + sema_post(&b.sema); + } + } + // outside the lock +} diff --git a/core/sync/sync2/primitives.odin b/core/sync/sync2/primitives.odin new file mode 100644 index 000000000..dd6688a50 --- /dev/null +++ b/core/sync/sync2/primitives.odin @@ -0,0 +1,185 @@ +package sync2 + +import "core:time" +import "core:runtime" + +// A Mutex is a mutual exclusion lock +// The zero value for a Mutex is an unlocked mutex +// +// A Mutex must not be copied after first use +Mutex :: struct { + impl: _Mutex, +} + +// mutex_lock locks m +mutex_lock :: proc(m: ^Mutex) { + _mutex_lock(m); +} + +// mutex_lock unlocks m +mutex_unlock :: proc(m: ^Mutex) { + _mutex_unlock(m); +} + +// mutex_lock tries to lock m, will return true on success, and false on failure +mutex_try_lock :: proc(m: ^Mutex) -> bool { + return _mutex_try_lock(m); +} + +// A RW_Mutex is a reader/writer mutual exclusion lock +// The lock can be held by any arbitrary number of readers or a single writer +// The zero value for a RW_Mutex is an unlocked mutex +// +// A RW_Mutex must not be copied after first use +RW_Mutex :: struct { + impl: _RW_Mutex, +} + +// rw_mutex_lock locks rw for writing (with a single writer) +// If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available. +rw_mutex_lock :: proc(rw: ^RW_Mutex) { + _rw_mutex_lock(rw); +} + +// rw_mutex_unlock unlocks rw for writing (with a single writer) +rw_mutex_unlock :: proc(rw: ^RW_Mutex) { + _rw_mutex_unlock(rw); +} + +// rw_mutex_try_lock tries to lock rw for writing (with a single writer) +rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { + return _rw_mutex_try_lock(rw); +} + +// rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers) +rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { + _rw_mutex_shared_lock(rw); +} + +// rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers) +rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { + _rw_mutex_shared_unlock(rw); +} + +// rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers) +rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { + return _rw_mutex_try_shared_lock(rw); +} + + +// A Recusrive_Mutex is a recursive mutual exclusion lock +// The zero value for a Recursive_Mutex is an unlocked mutex +// +// A Recursive_Mutex must not be copied after first use +Recursive_Mutex :: struct { + // TODO(bill): Is this implementation too lazy? + // Can this be made to work on all OSes without construction and destruction, i.e. Zero is Initialized + // CRITICAL_SECTION would be a perfect candidate for this on Windows but that cannot be "dumb" + + owner: int, + recursion: int, + mutex: Mutex, +} + +recursive_mutex_lock :: proc(m: ^Recursive_Mutex) { + tid := runtime.current_thread_id(); + if tid != m.owner { + mutex_lock(&m.mutex); + } + // inside the lock + m.owner = tid; + m.recursion += 1; +} + +recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) { + tid := runtime.current_thread_id(); + assert(tid == m.owner); + m.recursion -= 1; + recursion := m.recursion; + if recursion == 0 { + m.owner = 0; + } + if recursion == 0 { + mutex_unlock(&m.mutex); + } + // outside the lock + +} + +recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool { + tid := runtime.current_thread_id(); + if m.owner == tid { + return mutex_try_lock(&m.mutex); + } + if !mutex_try_lock(&m.mutex) { + return false; + } + // inside the lock + m.owner = tid; + m.recursion += 1; + return true; +} + + + +// Cond implements a condition variable, a rendezvous point for threads +// waiting for signalling the occurence of an event +// +// A Cond must not be copied after first use +Cond :: struct { + impl: _Cond, +} + +cond_wait :: proc(c: ^Cond, m: ^Mutex) { + _cond_wait(c, m); +} + +cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { + return _cond_wait_with_timeout(c, m, timeout); +} + +cond_signal :: proc(c: ^Cond) { + _cond_signal(c); +} + +cond_broadcast :: proc(c: ^Cond) { + _cond_broadcast(c); +} + + + +// When waited upon, blocks until the internal count is greater than zero, then subtracts one. +// Posting to the semaphore increases the count by one, or the provided amount. +// +// A Sema must not be copied after first use +Sema :: struct { + // TODO(bill): Is this implementation too lazy? + // Can this be made to work on all OSes without construction and destruction, i.e. Zero is Initialized + + mutex: Mutex, + cond: Cond, + count: int, +} + + +sema_wait :: proc(s: ^Sema) { + mutex_lock(&s.mutex); + defer mutex_unlock(&s.mutex); + + for s.count == 0 { + cond_wait(&s.cond, &s.mutex); + } + + s.count -= 1; + if s.count > 0 { + cond_signal(&s.cond); + } +} + +sema_post :: proc(s: ^Sema, count := 1) { + mutex_lock(&s.mutex); + defer mutex_unlock(&s.mutex); + + s.count += count; + cond_signal(&s.cond); +} diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/sync2/primitives_atomic.odin new file mode 100644 index 000000000..6133ed77b --- /dev/null +++ b/core/sync/sync2/primitives_atomic.odin @@ -0,0 +1,244 @@ +//+build linux, darwin, freebsd +//+private +package sync2 + +when !#config(ODIN_SYNC_USE_PTHREADS, false) { + +import "intrinsics" +import "core:time" + +_Mutex_State :: enum i32 { + Unlocked = 0, + Locked = 1, + Waiting = 2, +} +_Mutex :: struct { + state: _Mutex_State, +} + +_mutex_lock :: proc(m: ^Mutex) { + if intrinsics.atomic_xchg_rel(&m.impl.state, .Unlocked) != .Unlocked { + _mutex_unlock_slow(m); + } +} + +_mutex_unlock :: proc(m: ^Mutex) { + switch intrinsics.atomic_xchg_rel(&m.impl.state, .Unlocked) { + case .Unlocked: + unreachable(); + case .Locked: + // Okay + case .Waiting: + _mutex_unlock_slow(m); + } +} + +_mutex_try_lock :: proc(m: ^Mutex) -> bool { + _, ok := intrinsics.atomic_cxchg_acq(&m.impl.state, .Unlocked, .Locked); + return ok; +} + + + +_mutex_lock_slow :: proc(m: ^Mutex, curr_state: _Mutex_State) { + new_state := curr_state; // Make a copy of it + + spin_lock: for spin in 0..<i32(100) { + state, ok := intrinsics.atomic_cxchgweak_acq(&m.impl.state, .Unlocked, new_state); + if ok { + return; + } + + if state == .Waiting { + break spin_lock; + } + + for i := min(spin+1, 32); i > 0; i -= 1 { + intrinsics.cpu_relax(); + } + } + + for { + if intrinsics.atomic_xchg_acq(&m.impl.state, .Waiting) == .Unlocked { + return; + } + + // TODO(bill): Use a Futex here for Linux to improve performance and error handling + intrinsics.cpu_relax(); + } +} + + +_mutex_unlock_slow :: proc(m: ^Mutex) { + // TODO(bill): Use a Futex here for Linux to improve performance and error handling +} + + +RW_Mutex_State :: distinct uint; +RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2; +RW_Mutex_State_Is_Writing :: RW_Mutex_State(1); +RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1; +RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width; + +RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1; +RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width; + + +_RW_Mutex :: struct { + state: RW_Mutex_State, + mutex: Mutex, + sema: Sema, +} + +_rw_mutex_lock :: proc(rw: ^RW_Mutex) { + _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Writer); + mutex_lock(&rw.impl.mutex); + + state := intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Writer); + if state & RW_Mutex_State_Reader_Mask != 0 { + sema_wait(&rw.impl.sema); + } +} + +_rw_mutex_unlock :: proc(rw: ^RW_Mutex) { + _ = intrinsics.atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing); + mutex_unlock(&rw.impl.mutex); +} + +_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { + if mutex_try_lock(&rw.impl.mutex) { + state := intrinsics.atomic_load(&rw.impl.state); + if state & RW_Mutex_State_Reader_Mask == 0 { + _ = intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing); + return true; + } + + mutex_unlock(&rw.impl.mutex); + } + return false; +} + +_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { + state := intrinsics.atomic_load(&rw.impl.state); + for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { + ok: bool; + state, ok = intrinsics.atomic_cxchgweak(&rw.impl.state, state, state + RW_Mutex_State_Reader); + if ok { + return; + } + } + + mutex_lock(&rw.impl.mutex); + _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader); + mutex_unlock(&rw.impl.mutex); +} + +_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { + state := intrinsics.atomic_sub(&rw.impl.state, RW_Mutex_State_Reader); + + if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && + (state & RW_Mutex_State_Is_Writing != 0) { + sema_post(&rw.impl.sema); + } +} + +_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { + state := intrinsics.atomic_load(&rw.impl.state); + if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { + _, ok := intrinsics.atomic_cxchg(&rw.impl.state, state, state + RW_Mutex_State_Reader); + if ok { + return true; + } + } + if mutex_try_lock(&rw.impl.mutex) { + _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader); + mutex_unlock(&rw.impl.mutex); + return true; + } + + return false; +} + + + +Queue_Item :: struct { + next: ^Queue_Item, + futex: i32, +} + +queue_item_wait :: proc(item: ^Queue_Item) { + for intrinsics.atomic_load_acq(&item.futex) == 0 { + // TODO(bill): Use a Futex here for Linux to improve performance and error handling + intrinsics.cpu_relax(); + } +} +queue_item_signal :: proc(item: ^Queue_Item) { + intrinsics.atomic_store_rel(&item.futex, 1); + // TODO(bill): Use a Futex here for Linux to improve performance and error handling +} + + +_Cond :: struct { + queue_mutex: Mutex, + queue_head: ^Queue_Item, + pending: bool, +} + +_cond_wait :: proc(c: ^Cond, m: ^Mutex) { + waiter := &Queue_Item{}; + + mutex_lock(&c.impl.queue_mutex); + waiter.next = c.impl.queue_head; + c.impl.queue_head = waiter; + + intrinsics.atomic_store(&c.impl.pending, true); + mutex_unlock(&c.impl.queue_mutex); + + mutex_unlock(m); + queue_item_wait(waiter); + mutex_lock(m); +} + +_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { + // TODO(bill): _cond_wait_with_timeout for unix + return false; +} + +_cond_signal :: proc(c: ^Cond) { + if !intrinsics.atomic_load(&c.impl.pending) { + return; + } + + mutex_lock(&c.impl.queue_mutex); + waiter := c.impl.queue_head; + if c.impl.queue_head != nil { + c.impl.queue_head = c.impl.queue_head.next; + } + intrinsics.atomic_store(&c.impl.pending, c.impl.queue_head != nil); + mutex_unlock(&c.impl.queue_mutex); + + if waiter != nil { + queue_item_signal(waiter); + } +} + +_cond_broadcast :: proc(c: ^Cond) { + if !intrinsics.atomic_load(&c.impl.pending) { + return; + } + + intrinsics.atomic_store(&c.impl.pending, false); + + mutex_lock(&c.impl.queue_mutex); + waiters := c.impl.queue_head; + c.impl.queue_head = nil; + mutex_unlock(&c.impl.queue_mutex); + + for waiters != nil { + queue_item_signal(waiters); + waiters = waiters.next; + } +} + + +} // !ODIN_SYNC_USE_PTHREADS diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/sync2/primitives_pthreads.odin new file mode 100644 index 000000000..cb580f03f --- /dev/null +++ b/core/sync/sync2/primitives_pthreads.odin @@ -0,0 +1,155 @@ +//+build linux, darwin, freebsd +//+private +package sync2 + +when #config(ODIN_SYNC_USE_PTHREADS, false) { + +import "intrinsics" +import "core:time" +import "core:sys/unix" + +_Mutex_State :: enum i32 { + Unlocked = 0, + Locked = 1, + Waiting = 2, +} +_Mutex :: struct { + pthread_mutex: unix.pthread_mutex_t, +} + +_mutex_lock :: proc(m: ^Mutex) { + err := unix.pthread_mutex_lock(&m.impl.pthread_mutex); + assert(err == 0); +} + +_mutex_unlock :: proc(m: ^Mutex) { + err := unix.pthread_mutex_unlock(&m.impl.pthread_mutex); + assert(err == 0); +} + +_mutex_try_lock :: proc(m: ^Mutex) -> bool { + err := unix.pthread_mutex_trylock(&m.impl.pthread_mutex); + return err == 0; +} + + + +RW_Mutex_State :: distinct uint; +RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2; +RW_Mutex_State_Is_Writing :: RW_Mutex_State(1); +RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1; +RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width; + +RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1; +RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width; + + +_RW_Mutex :: struct { + // NOTE(bill): pthread_rwlock_t cannot be used since pthread_rwlock_destroy is required on some platforms + // TODO(bill): Can we determine which platforms exactly? + state: RW_Mutex_State, + mutex: Mutex, + sema: Sema, +} + +_rw_mutex_lock :: proc(rw: ^RW_Mutex) { + _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Writer); + mutex_lock(&rw.impl.mutex); + + state := intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Writer); + if state & RW_Mutex_State_Reader_Mask != 0 { + sema_wait(&rw.impl.sema); + } +} + +_rw_mutex_unlock :: proc(rw: ^RW_Mutex) { + _ = intrinsics.atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing); + mutex_unlock(&rw.impl.mutex); +} + +_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { + if mutex_try_lock(&rw.impl.mutex) { + state := intrinsics.atomic_load(&rw.impl.state); + if state & RW_Mutex_State_Reader_Mask == 0 { + _ = intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing); + return true; + } + + mutex_unlock(&rw.impl.mutex); + } + return false; +} + +_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { + state := intrinsics.atomic_load(&rw.impl.state); + for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { + ok: bool; + state, ok = intrinsics.atomic_cxchgweak(&rw.impl.state, state, state + RW_Mutex_State_Reader); + if ok { + return; + } + } + + mutex_lock(&rw.impl.mutex); + _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader); + mutex_unlock(&rw.impl.mutex); +} + +_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { + state := intrinsics.atomic_sub(&rw.impl.state, RW_Mutex_State_Reader); + + if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) && + (state & RW_Mutex_State_Is_Writing != 0) { + sema_post(&rw.impl.sema); + } +} + +_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { + state := intrinsics.atomic_load(&rw.impl.state); + if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 { + _, ok := intrinsics.atomic_cxchg(&rw.impl.state, state, state + RW_Mutex_State_Reader); + if ok { + return true; + } + } + if mutex_try_lock(&rw.impl.mutex) { + _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader); + mutex_unlock(&rw.impl.mutex); + return true; + } + + return false; +} + +_Cond :: struct { + pthread_cond: unix.pthread_cond_t, +} + +_cond_wait :: proc(c: ^Cond, m: ^Mutex) { + err := unix.pthread_cond_wait(&c.impl.pthread_cond, &m.impl.pthread_mutex); + assert(err == 0); +} + +_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { + ns := time.duration_nanoseconds(timeout); + timeout_timespec := &time.TimeSpec{ + tv_sec = ns / 1e9, + tv_nsec = ns % 1e9, + }; + err := unix.pthread_cond_timedwait(&c.impl.pthread_cond, &m.impl.pthread_mutex, timeout_timespec); + // TODO(bill): + return err == 0; +} + +_cond_signal :: proc(c: ^Cond) { + err := unix.pthread_cond_signal(&c.impl.pthread_cond); + assert(err == 0); +} + +_cond_broadcast :: proc(c: ^Cond) { + err := unix.pthread_cond_broadcast(&c.impl.pthread_cond); + assert(err == 0); +} + + +} // ODIN_SYNC_USE_PTHREADS diff --git a/core/sync/sync2/primitives_windows.odin b/core/sync/sync2/primitives_windows.odin new file mode 100644 index 000000000..02b6cd733 --- /dev/null +++ b/core/sync/sync2/primitives_windows.odin @@ -0,0 +1,73 @@ +//+build windows +//+private +package sync2 + +import "core:time" +import win32 "core:sys/windows" + +_Mutex :: struct { + srwlock: win32.SRWLOCK, +} + +_mutex_lock :: proc(m: ^Mutex) { + win32.AcquireSRWLockExclusive(&m.impl.srwlock); +} + +_mutex_unlock :: proc(m: ^Mutex) { + win32.ReleaseSRWLockExclusive(&m.impl.srwlock); +} + +_mutex_try_lock :: proc(m: ^Mutex) -> bool { + return bool(win32.TryAcquireSRWLockExclusive(&m.impl.srwlock)); +} + +_RW_Mutex :: struct { + srwlock: win32.SRWLOCK, +} + +_rw_mutex_lock :: proc(rw: ^RW_Mutex) { + win32.AcquireSRWLockExclusive(&rw.impl.srwlock); +} + +_rw_mutex_unlock :: proc(rw: ^RW_Mutex) { + win32.ReleaseSRWLockExclusive(&rw.impl.srwlock); +} + +_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool { + return bool(win32.TryAcquireSRWLockExclusive(&rw.impl.srwlock)); +} + +_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) { + win32.AcquireSRWLockShared(&rw.impl.srwlock); +} + +_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) { + win32.ReleaseSRWLockShared(&rw.impl.srwlock); +} + +_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool { + return bool(win32.TryAcquireSRWLockShared(&rw.impl.srwlock)); +} + + + +_Cond :: struct { + cond: win32.CONDITION_VARIABLE, +} + +_cond_wait :: proc(c: ^Cond, m: ^Mutex) { + _ = win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, win32.INFINITE, 0); +} + +_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool { + ms := win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000); + return cast(bool)win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, ms, 0); +} + +_cond_signal :: proc(c: ^Cond) { + win32.WakeConditionVariable(&c.impl.cond); +} + +_cond_broadcast :: proc(c: ^Cond) { + win32.WakeAllConditionVariable(&c.impl.cond); +} |