aboutsummaryrefslogtreecommitdiff
path: root/core/sync/sync2
diff options
context:
space:
mode:
authorgingerBill <bill@gingerbill.org>2021-04-11 15:18:28 +0100
committergingerBill <bill@gingerbill.org>2021-04-11 15:18:28 +0100
commit2db1fe74299766c9a29a33c39299d07e12556bb2 (patch)
tree00abf6383909f56c437a91a5da911ed39f337230 /core/sync/sync2
parent5bc9e4e4f78206d9a46a32505b27c314d93e61ff (diff)
New redesign of core:sync (stored under core:sync/sync2 for the time being)
Diffstat (limited to 'core/sync/sync2')
-rw-r--r--core/sync/sync2/atomic.odin170
-rw-r--r--core/sync/sync2/channel.odin887
-rw-r--r--core/sync/sync2/channel_unix.odin17
-rw-r--r--core/sync/sync2/channel_windows.odin35
-rw-r--r--core/sync/sync2/extended.odin215
-rw-r--r--core/sync/sync2/primitives.odin185
-rw-r--r--core/sync/sync2/primitives_atomic.odin244
-rw-r--r--core/sync/sync2/primitives_pthreads.odin155
-rw-r--r--core/sync/sync2/primitives_windows.odin73
9 files changed, 1981 insertions, 0 deletions
diff --git a/core/sync/sync2/atomic.odin b/core/sync/sync2/atomic.odin
new file mode 100644
index 000000000..8240c0fcd
--- /dev/null
+++ b/core/sync/sync2/atomic.odin
@@ -0,0 +1,170 @@
+package sync2
+
+import "intrinsics"
+
+// TODO(bill): Is this even a good design? The intrinsics seem to be more than good enough and just as clean
+
+Ordering :: enum {
+ Relaxed, // Monotonic
+ Release,
+ Acquire,
+ Acquire_Release,
+ Sequentially_Consistent,
+}
+
+strongest_failure_ordering_table := [Ordering]Ordering{
+ .Relaxed = .Relaxed,
+ .Release = .Relaxed,
+ .Acquire = .Acquire,
+ .Acquire_Release = .Acquire,
+ .Sequentially_Consistent = .Sequentially_Consistent,
+};
+
+strongest_failure_ordering :: #force_inline proc(order: Ordering) -> Ordering {
+ return strongest_failure_ordering_table[order];
+}
+
+fence :: #force_inline proc($order: Ordering) {
+ when order == .Relaxed { #panic("there is no such thing as a relaxed fence"); }
+ else when order == .Release { intrinsics.atomic_fence_rel(); }
+ else when order == .Acquire { intrinsics.atomic_fence_acq(); }
+ else when order == .Acquire_Release { intrinsics.atomic_fence_acqrel(); }
+ else when order == .Sequentially_Consistent { intrinsics.atomic_fence(); }
+ else { #panic("unknown order"); }
+}
+
+
+atomic_store :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) {
+ when order == .Relaxed { intrinsics.atomic_store_relaxed(dst, val); }
+ else when order == .Release { intrinsics.atomic_store_rel(dst, val); }
+ else when order == .Sequentially_Consistent { intrinsics.atomic_store(dst, val); }
+ else when order == .Acquire { #panic("there is not such thing as an acquire store"); }
+ else when order == .Acquire_Release { #panic("there is not such thing as an acquire/release store"); }
+ else { #panic("unknown order"); }
+}
+
+atomic_load :: #force_inline proc(dst: ^$T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_load_relaxed(dst); }
+ else when order == .Acquire { return intrinsics.atomic_load_acq(dst); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_load(dst); }
+ else when order == .Release { #panic("there is no such thing as a release load"); }
+ else when order == .Acquire_Release { #panic("there is no such thing as an acquire/release load"); }
+ else { #panic("unknown order"); }
+}
+
+atomic_exchange :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_xchg_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_xchg_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_xchg_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_xchg_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_xchg(dst, val); }
+ else { #panic("unknown order"); }
+}
+
+atomic_compare_exchange :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) {
+ when failure == .Relaxed {
+ when success == .Relaxed { return intrinsics.atomic_cxchg_relaxed(dst, old, new); }
+ else when success == .Acquire { return intrinsics.atomic_cxchg_acq_failrelaxed(dst, old, new); }
+ else when success == .Acquire_Release { return intrinsics.atomic_cxchg_acqrel_failrelaxed(dst, old, new); }
+ else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg_failrelaxed(dst, old, new); }
+ else when success == .Release { return intrinsics.atomic_cxchg_rel(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else when failure == .Acquire {
+ when success == .Release { return intrinsics.atomic_cxchg_acqrel(dst, old, new); }
+ else when success == .Acquire { return intrinsics.atomic_cxchg_acq(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else when failure == .Sequentially_Consistent {
+ when success == .Sequentially_Consistent { return intrinsics.atomic_cxchg(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else when failure == .Acquire_Release {
+ #panic("there is not such thing as an acquire/release failure ordering");
+ } else when failure == .Release {
+ when success == .Acquire { return instrinsics.atomic_cxchg_failacq(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else {
+ return T{}, false;
+ }
+
+}
+
+atomic_compare_exchange_weak :: #force_inline proc(dst: ^$T, old, new: T, $success, $failure: Ordering) -> (val: T, ok: bool) {
+ when failure == .Relaxed {
+ when success == .Relaxed { return intrinsics.atomic_cxchgweak_relaxed(dst, old, new); }
+ else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq_failrelaxed(dst, old, new); }
+ else when success == .Acquire_Release { return intrinsics.atomic_cxchgweak_acqrel_failrelaxed(dst, old, new); }
+ else when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak_failrelaxed(dst, old, new); }
+ else when success == .Release { return intrinsics.atomic_cxchgweak_rel(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else when failure == .Acquire {
+ when success == .Release { return intrinsics.atomic_cxchgweak_acqrel(dst, old, new); }
+ else when success == .Acquire { return intrinsics.atomic_cxchgweak_acq(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else when failure == .Sequentially_Consistent {
+ when success == .Sequentially_Consistent { return intrinsics.atomic_cxchgweak(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else when failure == .Acquire_Release {
+ #panic("there is not such thing as an acquire/release failure ordering");
+ } else when failure == .Release {
+ when success == .Acquire { return intrinsics.atomic_cxchgweak_failacq(dst, old, new); }
+ else { #panic("an unknown ordering combination"); }
+ } else {
+ return T{}, false;
+ }
+
+}
+
+
+atomic_add :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_add_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_add_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_add_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_add_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_add(dst, val); }
+ else { #panic("unknown order"); }
+}
+
+atomic_sub :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_sub_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_sub_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_sub_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_sub_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_sub(dst, val); }
+ else { #panic("unknown order"); }
+}
+
+atomic_and :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_and_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_and_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_and_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_and_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_and(dst, val); }
+ else { #panic("unknown order"); }
+}
+
+atomic_nand :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_nand_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_nand_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_nand_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_nand_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_nand(dst, val); }
+ else { #panic("unknown order"); }
+}
+
+atomic_or :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_or_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_or_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_or_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_or_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_or(dst, val); }
+ else { #panic("unknown order"); }
+}
+
+atomic_xor :: #force_inline proc(dst: ^$T, val: T, $order: Ordering) -> T {
+ when order == .Relaxed { return intrinsics.atomic_xor_relaxed(dst, val); }
+ else when order == .Release { return intrinsics.atomic_xor_rel(dst, val); }
+ else when order == .Acquire { return intrinsics.atomic_xor_acq(dst, val); }
+ else when order == .Acquire_Release { return intrinsics.atomic_xor_acqrel(dst, val); }
+ else when order == .Sequentially_Consistent { return intrinsics.atomic_xor(dst, val); }
+ else { #panic("unknown order"); }
+}
+
diff --git a/core/sync/sync2/channel.odin b/core/sync/sync2/channel.odin
new file mode 100644
index 000000000..782b1d86a
--- /dev/null
+++ b/core/sync/sync2/channel.odin
@@ -0,0 +1,887 @@
+package sync2
+
+// TODO(bill): The Channel implementation needs a complete rewrite for this new package sync design
+// Especially how the `select` things work
+
+import "core:mem"
+import "core:time"
+import "intrinsics"
+import "core:math/rand"
+
+_, _ :: time, rand;
+
+Channel_Direction :: enum i8 {
+ Both = 0,
+ Send = +1,
+ Recv = -1,
+}
+
+Channel :: struct(T: typeid, Direction := Channel_Direction.Both) {
+ using _internal: ^Raw_Channel,
+}
+
+channel_init :: proc(ch: ^$C/Channel($T, $D), cap := 0, allocator := context.allocator) {
+ context.allocator = allocator;
+ ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+ return;
+}
+
+channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Both)) {
+ context.allocator = allocator;
+ ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+ return;
+}
+
+channel_make_send :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Send)) {
+ context.allocator = allocator;
+ ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+ return;
+}
+channel_make_recv :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T, .Recv)) {
+ context.allocator = allocator;
+ ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+ return;
+}
+
+channel_destroy :: proc(ch: $C/Channel($T, $D)) {
+ raw_channel_destroy(ch._internal);
+}
+
+channel_as_send :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Send)) {
+ res._internal = ch._internal;
+ return;
+}
+
+channel_as_recv :: proc(ch: $C/Channel($T, .Both)) -> (res: Channel(T, .Recv)) {
+ res._internal = ch._internal;
+ return;
+}
+
+
+channel_len :: proc(ch: $C/Channel($T, $D)) -> int {
+ return ch._internal.len if ch._internal != nil else 0;
+}
+channel_cap :: proc(ch: $C/Channel($T, $D)) -> int {
+ return ch._internal.cap if ch._internal != nil else 0;
+}
+
+
+channel_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) where D >= .Both {
+ msg := msg;
+ _ = raw_channel_send_impl(ch._internal, &msg, /*block*/true, loc);
+}
+channel_try_send :: proc(ch: $C/Channel($T, $D), msg: T, loc := #caller_location) -> bool where D >= .Both {
+ msg := msg;
+ return raw_channel_send_impl(ch._internal, &msg, /*block*/false, loc);
+}
+
+channel_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T) where D <= .Both {
+ c := ch._internal;
+ if c == nil {
+ panic(message="cannot recv message; channel is nil", loc=loc);
+ }
+ mutex_lock(&c.mutex);
+ raw_channel_recv_impl(c, &msg, loc);
+ mutex_unlock(&c.mutex);
+ return;
+}
+channel_try_recv :: proc(ch: $C/Channel($T, $D), loc := #caller_location) -> (msg: T, ok: bool) where D <= .Both {
+ c := ch._internal;
+ if c != nil && mutex_try_lock(&c.mutex) {
+ if c.len > 0 {
+ raw_channel_recv_impl(c, &msg, loc);
+ ok = true;
+ }
+ mutex_unlock(&c.mutex);
+ }
+ return;
+}
+channel_try_recv_ptr :: proc(ch: $C/Channel($T, $D), msg: ^T, loc := #caller_location) -> (ok: bool) where D <= .Both {
+ res: T;
+ res, ok = channel_try_recv(ch, loc);
+ if ok && msg != nil {
+ msg^ = res;
+ }
+ return;
+}
+
+
+channel_is_nil :: proc(ch: $C/Channel($T, $D)) -> bool {
+ return ch._internal == nil;
+}
+channel_is_open :: proc(ch: $C/Channel($T, $D)) -> bool {
+ c := ch._internal;
+ return c != nil && !c.closed;
+}
+
+
+channel_eq :: proc(a, b: $C/Channel($T, $D)) -> bool {
+ return a._internal == b._internal;
+}
+channel_ne :: proc(a, b: $C/Channel($T, $D)) -> bool {
+ return a._internal != b._internal;
+}
+
+
+channel_can_send :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D >= .Both {
+ return raw_channel_can_send(ch._internal);
+}
+channel_can_recv :: proc(ch: $C/Channel($T, $D)) -> (ok: bool) where D <= .Both {
+ return raw_channel_can_recv(ch._internal);
+}
+
+
+channel_peek :: proc(ch: $C/Channel($T, $D)) -> int {
+ c := ch._internal;
+ if c == nil {
+ return -1;
+ }
+ if intrinsics.atomic_load(&c.closed) {
+ return -1;
+ }
+ return intrinsics.atomic_load(&c.len);
+}
+
+
+channel_close :: proc(ch: $C/Channel($T, $D), loc := #caller_location) {
+ raw_channel_close(ch._internal, loc);
+}
+
+
+channel_iterator :: proc(ch: $C/Channel($T, $D)) -> (msg: T, ok: bool) where D <= .Both {
+ c := ch._internal;
+ if c == nil {
+ return;
+ }
+
+ if !c.closed || c.len > 0 {
+ msg, ok = channel_recv(ch), true;
+ }
+ return;
+}
+channel_drain :: proc(ch: $C/Channel($T, $D)) where D >= .Both {
+ raw_channel_drain(ch._internal);
+}
+
+
+channel_move :: proc(dst: $C1/Channel($T, $D1) src: $C2/Channel(T, $D2)) where D1 <= .Both, D2 >= .Both {
+ for msg in channel_iterator(src) {
+ channel_send(dst, msg);
+ }
+}
+
+
+Raw_Channel_Wait_Queue :: struct {
+ next: ^Raw_Channel_Wait_Queue,
+ state: ^uintptr,
+}
+
+
+Raw_Channel :: struct {
+ closed: bool,
+ ready: bool, // ready to recv
+ data_offset: u16, // data is stored at the end of this data structure
+ elem_size: u32,
+ len, cap: int,
+ read, write: int,
+ mutex: Mutex,
+ cond: Cond,
+ allocator: mem.Allocator,
+
+ sendq: ^Raw_Channel_Wait_Queue,
+ recvq: ^Raw_Channel_Wait_Queue,
+}
+
+raw_channel_wait_queue_insert :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
+ val.next = head^;
+ head^ = val;
+}
+raw_channel_wait_queue_remove :: proc(head: ^^Raw_Channel_Wait_Queue, val: ^Raw_Channel_Wait_Queue) {
+ p := head;
+ for p^ != nil && p^ != val {
+ p = &p^.next;
+ }
+ if p != nil {
+ p^ = p^.next;
+ }
+}
+
+
+raw_channel_create :: proc(elem_size, elem_align: int, cap := 0) -> ^Raw_Channel {
+ assert(int(u32(elem_size)) == elem_size);
+
+ s := size_of(Raw_Channel);
+ s = mem.align_forward_int(s, elem_align);
+ data_offset := uintptr(s);
+ s += elem_size * max(cap, 1);
+
+ a := max(elem_align, align_of(Raw_Channel));
+
+ c := (^Raw_Channel)(mem.alloc(s, a));
+ if c == nil {
+ return nil;
+ }
+
+ c.data_offset = u16(data_offset);
+ c.elem_size = u32(elem_size);
+ c.len, c.cap = 0, max(cap, 0);
+ c.read, c.write = 0, 0;
+ c.allocator = context.allocator;
+ c.closed = false;
+
+ return c;
+}
+
+
+raw_channel_destroy :: proc(c: ^Raw_Channel) {
+ if c == nil {
+ return;
+ }
+ context.allocator = c.allocator;
+ intrinsics.atomic_store(&c.closed, true);
+ free(c);
+}
+
+raw_channel_close :: proc(c: ^Raw_Channel, loc := #caller_location) {
+ if c == nil {
+ panic(message="cannot close nil channel", loc=loc);
+ }
+ mutex_lock(&c.mutex);
+ defer mutex_unlock(&c.mutex);
+ intrinsics.atomic_store(&c.closed, true);
+
+ // Release readers and writers
+ raw_channel_wait_queue_broadcast(c.recvq);
+ raw_channel_wait_queue_broadcast(c.sendq);
+ cond_broadcast(&c.cond);
+}
+
+
+
+raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, block: bool, loc := #caller_location) -> bool {
+ send :: proc(c: ^Raw_Channel, src: rawptr) {
+ data := uintptr(c) + uintptr(c.data_offset);
+ dst := data + uintptr(c.write * int(c.elem_size));
+ mem.copy(rawptr(dst), src, int(c.elem_size));
+ c.len += 1;
+ c.write = (c.write + 1) % max(c.cap, 1);
+ }
+
+ switch {
+ case c == nil:
+ panic(message="cannot send message; channel is nil", loc=loc);
+ case c.closed:
+ panic(message="cannot send message; channel is closed", loc=loc);
+ }
+
+ mutex_lock(&c.mutex);
+ defer mutex_unlock(&c.mutex);
+
+ if c.cap > 0 {
+ if !block && c.len >= c.cap {
+ return false;
+ }
+
+ for c.len >= c.cap {
+ cond_wait(&c.cond, &c.mutex);
+ }
+ } else if c.len > 0 { // TODO(bill): determine correct behaviour
+ if !block {
+ return false;
+ }
+ cond_wait(&c.cond, &c.mutex);
+ } else if c.len == 0 && !block {
+ return false;
+ }
+
+ send(c, msg);
+ cond_signal(&c.cond);
+ raw_channel_wait_queue_signal(c.recvq);
+
+ return true;
+}
+
+raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) {
+ recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) {
+ if c.len < 1 {
+ panic(message="cannot recv message; channel is empty", loc=loc);
+ }
+ c.len -= 1;
+
+ data := uintptr(c) + uintptr(c.data_offset);
+ src := data + uintptr(c.read * int(c.elem_size));
+ mem.copy(dst, rawptr(src), int(c.elem_size));
+ c.read = (c.read + 1) % max(c.cap, 1);
+ }
+
+ if c == nil {
+ panic(message="cannot recv message; channel is nil", loc=loc);
+ }
+ intrinsics.atomic_store(&c.ready, true);
+ for c.len < 1 {
+ raw_channel_wait_queue_signal(c.sendq);
+ cond_wait(&c.cond, &c.mutex);
+ }
+ intrinsics.atomic_store(&c.ready, false);
+ recv(c, res, loc);
+ if c.cap > 0 {
+ if c.len == c.cap - 1 {
+ // NOTE(bill): Only signal on the last one
+ cond_signal(&c.cond);
+ }
+ } else {
+ cond_signal(&c.cond);
+ }
+}
+
+
+raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) {
+ if c == nil {
+ return false;
+ }
+ mutex_lock(&c.mutex);
+ switch {
+ case c.closed:
+ ok = false;
+ case c.cap > 0:
+ ok = c.ready && c.len < c.cap;
+ case:
+ ok = c.ready && c.len == 0;
+ }
+ mutex_unlock(&c.mutex);
+ return;
+}
+raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
+ if c == nil {
+ return false;
+ }
+ mutex_lock(&c.mutex);
+ ok = c.len > 0;
+ mutex_unlock(&c.mutex);
+ return;
+}
+
+
+raw_channel_drain :: proc(c: ^Raw_Channel) {
+ if c == nil {
+ return;
+ }
+ mutex_lock(&c.mutex);
+ c.len = 0;
+ c.read = 0;
+ c.write = 0;
+ mutex_unlock(&c.mutex);
+}
+
+
+
+MAX_SELECT_CHANNELS :: 64;
+SELECT_MAX_TIMEOUT :: max(time.Duration);
+
+Select_Command :: enum {
+ Recv,
+ Send,
+}
+
+Select_Channel :: struct {
+ channel: ^Raw_Channel,
+ command: Select_Command,
+}
+
+
+
+select :: proc(channels: ..Select_Channel) -> (index: int) {
+ return select_timeout(SELECT_MAX_TIMEOUT, ..channels);
+}
+select_timeout :: proc(timeout: time.Duration, channels: ..Select_Channel) -> (index: int) {
+ switch len(channels) {
+ case 0:
+ panic("sync: select with no channels");
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+ backing: [MAX_SELECT_CHANNELS]int;
+ queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+ candidates := backing[:];
+ cap := len(channels);
+ candidates = candidates[:cap];
+
+ count := u32(0);
+ for c, i in channels {
+ if c.channel == nil {
+ continue;
+ }
+ switch c.command {
+ case .Recv:
+ if raw_channel_can_recv(c.channel) {
+ candidates[count] = i;
+ count += 1;
+ }
+ case .Send:
+ if raw_channel_can_send(c.channel) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ }
+
+ if count == 0 {
+ wait_state: uintptr = 0;
+ for _, i in channels {
+ q := &queues[i];
+ q.state = &wait_state;
+ }
+
+ for c, i in channels {
+ if c.channel == nil {
+ continue;
+ }
+ q := &queues[i];
+ switch c.command {
+ case .Recv: raw_channel_wait_queue_insert(&c.channel.recvq, q);
+ case .Send: raw_channel_wait_queue_insert(&c.channel.sendq, q);
+ }
+ }
+ raw_channel_wait_queue_wait_on(&wait_state, timeout);
+ for c, i in channels {
+ if c.channel == nil {
+ continue;
+ }
+ q := &queues[i];
+ switch c.command {
+ case .Recv: raw_channel_wait_queue_remove(&c.channel.recvq, q);
+ case .Send: raw_channel_wait_queue_remove(&c.channel.sendq, q);
+ }
+ }
+
+ for c, i in channels {
+ switch c.command {
+ case .Recv:
+ if raw_channel_can_recv(c.channel) {
+ candidates[count] = i;
+ count += 1;
+ }
+ case .Send:
+ if raw_channel_can_send(c.channel) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ }
+ if count == 0 && timeout == SELECT_MAX_TIMEOUT {
+ index = -1;
+ return;
+ }
+
+ assert(count != 0);
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ return;
+}
+
+select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
+ switch len(channels) {
+ case 0:
+ panic("sync: select with no channels");
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+ backing: [MAX_SELECT_CHANNELS]int;
+ queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+ candidates := backing[:];
+ cap := len(channels);
+ candidates = candidates[:cap];
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ state: uintptr;
+ for c, i in channels {
+ q := &queues[i];
+ q.state = &state;
+ raw_channel_wait_queue_insert(&c.recvq, q);
+ }
+ raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
+ for c, i in channels {
+ q := &queues[i];
+ raw_channel_wait_queue_remove(&c.recvq, q);
+ }
+
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ assert(count != 0);
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ return;
+}
+
+select_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
+ switch len(channels) {
+ case 0:
+ panic("sync: select with no channels");
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+ queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+ candidates: [MAX_SELECT_CHANNELS]int;
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ state: uintptr;
+ for c, i in channels {
+ q := &queues[i];
+ q.state = &state;
+ raw_channel_wait_queue_insert(&c.recvq, q);
+ }
+ raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
+ for c, i in channels {
+ q := &queues[i];
+ raw_channel_wait_queue_remove(&c.recvq, q);
+ }
+
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ assert(count != 0);
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ msg = channel_recv(channels[index]);
+
+ return;
+}
+
+select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
+ switch len(channels) {
+ case 0:
+ panic("sync: select with no channels");
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+ backing: [MAX_SELECT_CHANNELS]int;
+ queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+ candidates := backing[:];
+ cap := len(channels);
+ candidates = candidates[:cap];
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ state: uintptr;
+ for c, i in channels {
+ q := &queues[i];
+ q.state = &state;
+ raw_channel_wait_queue_insert(&c.recvq, q);
+ }
+ raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
+ for c, i in channels {
+ q := &queues[i];
+ raw_channel_wait_queue_remove(&c.recvq, q);
+ }
+
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ assert(count != 0);
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+
+ if msg != nil {
+ channel_send(channels[index], msg);
+ }
+
+ return;
+}
+
+select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
+ switch len(channels) {
+ case 0:
+ panic("sync: select with no channels");
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+ candidates: [MAX_SELECT_CHANNELS]int;
+ queues: [MAX_SELECT_CHANNELS]Raw_Channel_Wait_Queue;
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_send(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ state: uintptr;
+ for c, i in channels {
+ q := &queues[i];
+ q.state = &state;
+ raw_channel_wait_queue_insert(&c.sendq, q);
+ }
+ raw_channel_wait_queue_wait_on(&state, SELECT_MAX_TIMEOUT);
+ for c, i in channels {
+ q := &queues[i];
+ raw_channel_wait_queue_remove(&c.sendq, q);
+ }
+
+ for c, i in channels {
+ if raw_channel_can_send(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ assert(count != 0);
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ return;
+}
+
+select_try :: proc(channels: ..Select_Channel) -> (index: int) {
+ switch len(channels) {
+ case 0:
+ panic("sync: select with no channels");
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+
+ backing: [MAX_SELECT_CHANNELS]int;
+ candidates := backing[:];
+ cap := len(channels);
+ candidates = candidates[:cap];
+
+ count := u32(0);
+ for c, i in channels {
+ switch c.command {
+ case .Recv:
+ if raw_channel_can_recv(c.channel) {
+ candidates[count] = i;
+ count += 1;
+ }
+ case .Send:
+ if raw_channel_can_send(c.channel) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+ }
+
+ if count == 0 {
+ index = -1;
+ return;
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ return;
+}
+
+
+select_try_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
+ switch len(channels) {
+ case 0:
+ index = -1;
+ return;
+ case 1:
+ index = -1;
+ if raw_channel_can_recv(channels[0]) {
+ index = 0;
+ }
+ return;
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+ candidates: [MAX_SELECT_CHANNELS]int;
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ index = -1;
+ return;
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ return;
+}
+
+
+select_try_send :: proc(channels: ..^Raw_Channel) -> (index: int) #no_bounds_check {
+ switch len(channels) {
+ case 0:
+ return -1;
+ case 1:
+ if raw_channel_can_send(channels[0]) {
+ return 0;
+ }
+ return -1;
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+ candidates: [MAX_SELECT_CHANNELS]int;
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_send(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ index = -1;
+ return;
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ return;
+}
+
+select_try_recv_msg :: proc(channels: ..$C/Channel($T, $D)) -> (msg: T, index: int) {
+ switch len(channels) {
+ case 0:
+ index = -1;
+ return;
+ case 1:
+ ok: bool;
+ if msg, ok = channel_try_recv(channels[0]); ok {
+ index = 0;
+ }
+ return;
+ }
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+ candidates: [MAX_SELECT_CHANNELS]int;
+
+ count := u32(0);
+ for c, i in channels {
+ if channel_can_recv(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ index = -1;
+ return;
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ msg = channel_recv(channels[index]);
+ return;
+}
+
+select_try_send_msg :: proc(msg: $T, channels: ..$C/Channel(T, $D)) -> (index: int) {
+ index = -1;
+ switch len(channels) {
+ case 0:
+ return;
+ case 1:
+ if channel_try_send(channels[0], msg) {
+ index = 0;
+ }
+ return;
+ }
+
+
+ assert(len(channels) <= MAX_SELECT_CHANNELS);
+ candidates: [MAX_SELECT_CHANNELS]int;
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_send(c) {
+ candidates[count] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ index = -1;
+ return;
+ }
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ channel_send(channels[index], msg);
+ return;
+}
+
diff --git a/core/sync/sync2/channel_unix.odin b/core/sync/sync2/channel_unix.odin
new file mode 100644
index 000000000..7429b67db
--- /dev/null
+++ b/core/sync/sync2/channel_unix.odin
@@ -0,0 +1,17 @@
+//+build linux, darwin, freebsd
+//+private
+package sync2
+
+import "core:time"
+
+raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) {
+ // stub
+}
+
+raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) {
+ // stub
+}
+
+raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) {
+ // stub
+}
diff --git a/core/sync/sync2/channel_windows.odin b/core/sync/sync2/channel_windows.odin
new file mode 100644
index 000000000..a38a9cc2c
--- /dev/null
+++ b/core/sync/sync2/channel_windows.odin
@@ -0,0 +1,35 @@
+//+build windows
+//+private
+package sync2
+
+import "intrinsics"
+import win32 "core:sys/windows"
+import "core:time"
+
+raw_channel_wait_queue_wait_on :: proc(state: ^uintptr, timeout: time.Duration) {
+ ms: win32.DWORD = win32.INFINITE;
+ if max(time.Duration) != SELECT_MAX_TIMEOUT {
+ ms = win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000);
+ }
+
+ v := intrinsics.atomic_load(state);
+ for v == 0 {
+ win32.WaitOnAddress(state, &v, size_of(state^), ms);
+ v = intrinsics.atomic_load(state);
+ }
+ intrinsics.atomic_store(state, 0);
+}
+
+raw_channel_wait_queue_signal :: proc(q: ^Raw_Channel_Wait_Queue) {
+ for x := q; x != nil; x = x.next {
+ intrinsics.atomic_add(x.state, 1);
+ win32.WakeByAddressSingle(x.state);
+ }
+}
+
+raw_channel_wait_queue_broadcast :: proc(q: ^Raw_Channel_Wait_Queue) {
+ for x := q; x != nil; x = x.next {
+ intrinsics.atomic_add(x.state, 1);
+ win32.WakeByAddressAll(x.state);
+ }
+}
diff --git a/core/sync/sync2/extended.odin b/core/sync/sync2/extended.odin
new file mode 100644
index 000000000..3c439b225
--- /dev/null
+++ b/core/sync/sync2/extended.odin
@@ -0,0 +1,215 @@
+package sync2
+
+import "core:runtime"
+import "intrinsics"
+
+// A Wait_Group waits for a collection of threads to finish
+//
+// A Wait_Group must not be copied after first use
+Wait_Group :: struct {
+ counter: int,
+ mutex: Mutex,
+ cond: Cond,
+}
+
+wait_group_add :: proc(wg: ^Wait_Group, delta: int) {
+ if delta == 0 {
+ return;
+ }
+
+ mutex_lock(&wg.mutex);
+ defer mutex_unlock(&wg.mutex);
+
+ intrinsics.atomic_add(&wg.counter, delta);
+ if wg.counter < 0 {
+ panic("sync.Wait_Group negative counter");
+ }
+ if wg.counter == 0 {
+ cond_broadcast(&wg.cond);
+ if wg.counter != 0 {
+ panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait");
+ }
+ }
+}
+
+wait_group_done :: proc(wg: ^Wait_Group) {
+ wait_group_add(wg, -1);
+}
+
+wait_group_wait :: proc(wg: ^Wait_Group) {
+ mutex_lock(&wg.mutex);
+ defer mutex_unlock(&wg.mutex);
+
+ if wg.counter != 0 {
+ cond_wait(&wg.cond, &wg.mutex);
+ if wg.counter != 0 {
+ panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait");
+ }
+ }
+}
+
+
+
+// A barrier enabling multiple threads to synchronize the beginning of some computation
+/*
+ * Example:
+ *
+ * package example
+ *
+ * import "core:fmt"
+ * import "core:sync"
+ * import "core:thread"
+ *
+ * barrier := &sync.Barrier{};
+ *
+ * main :: proc() {
+ * fmt.println("Start");
+ *
+ * THREAD_COUNT :: 4;
+ * threads: [THREAD_COUNT]^thread.Thread;
+ *
+ * sync.barrier_init(barrier, THREAD_COUNT);
+ * defer sync.barrier_destroy(barrier);
+ *
+ *
+ * for _, i in threads {
+ * threads[i] = thread.create_and_start(proc(t: ^thread.Thread) {
+ * // Same messages will be printed together but without any interleaving
+ * fmt.println("Getting ready!");
+ * sync.barrier_wait(barrier);
+ * fmt.println("Off their marks they go!");
+ * });
+ * }
+ *
+ * for t in threads {
+ * thread.destroy(t); // join and free thread
+ * }
+ * fmt.println("Finished");
+ * }
+ *
+ */
+Barrier :: struct {
+ mutex: Mutex,
+ cond: Cond,
+ index: int,
+ generation_id: int,
+ thread_count: int,
+}
+
+barrier_init :: proc(b: ^Barrier, thread_count: int) {
+ b.index = 0;
+ b.generation_id = 0;
+ b.thread_count = thread_count;
+}
+
+// Block the current thread until all threads have rendezvoused
+// Barrier can be reused after all threads rendezvoused once, and can be used continuously
+barrier_wait :: proc(b: ^Barrier) -> (is_leader: bool) {
+ mutex_lock(&b.mutex);
+ defer mutex_unlock(&b.mutex);
+ local_gen := b.generation_id;
+ b.index += 1;
+ if b.index < b.thread_count {
+ for local_gen == b.generation_id && b.index < b.thread_count {
+ cond_wait(&b.cond, &b.mutex);
+ }
+ return false;
+ }
+
+ b.index = 0;
+ b.generation_id += 1;
+ cond_broadcast(&b.cond);
+ return true;
+}
+
+
+
+Ticket_Mutex :: struct {
+ ticket: uint,
+ serving: uint,
+}
+
+ticket_mutex_lock :: #force_inline proc(m: ^Ticket_Mutex) {
+ ticket := intrinsics.atomic_add_relaxed(&m.ticket, 1);
+ for ticket != intrinsics.atomic_load_acq(&m.serving) {
+ intrinsics.cpu_relax();
+ }
+}
+
+ticket_mutex_unlock :: #force_inline proc(m: ^Ticket_Mutex) {
+ intrinsics.atomic_add_relaxed(&m.serving, 1);
+}
+
+
+
+Benaphore :: struct {
+ counter: int,
+ sema: Sema,
+}
+
+benaphore_lock :: proc(b: ^Benaphore) {
+ if intrinsics.atomic_add_acq(&b.counter, 1) > 1 {
+ sema_wait(&b.sema);
+ }
+}
+
+benaphore_try_lock :: proc(b: ^Benaphore) -> bool {
+ v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0);
+ return v == 0;
+}
+
+benaphore_unlock :: proc(b: ^Benaphore) {
+ if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 {
+ sema_post(&b.sema);
+ }
+}
+
+Recursive_Benaphore :: struct {
+ counter: int,
+ owner: int,
+ recursion: int,
+ sema: Sema,
+}
+
+recursive_benaphore_lock :: proc(b: ^Recursive_Benaphore) {
+ tid := runtime.current_thread_id();
+ if intrinsics.atomic_add_acq(&b.counter, 1) > 1 {
+ if tid != b.owner {
+ sema_wait(&b.sema);
+ }
+ }
+ // inside the lock
+ b.owner = tid;
+ b.recursion += 1;
+}
+
+recursive_benaphore_try_lock :: proc(b: ^Recursive_Benaphore) -> bool {
+ tid := runtime.current_thread_id();
+ if b.owner == tid {
+ intrinsics.atomic_add_acq(&b.counter, 1);
+ }
+
+ if v, _ := intrinsics.atomic_cxchg_acq(&b.counter, 1, 0); v != 0 {
+ return false;
+ }
+ // inside the lock
+ b.owner = tid;
+ b.recursion += 1;
+ return true;
+}
+
+recursive_benaphore_unlock :: proc(b: ^Recursive_Benaphore) {
+ tid := runtime.current_thread_id();
+ assert(tid == b.owner);
+ b.recursion -= 1;
+ recursion := b.recursion;
+ if recursion == 0 {
+ b.owner = 0;
+ }
+ if intrinsics.atomic_sub_rel(&b.counter, 1) > 0 {
+ if recursion == 0 {
+ sema_post(&b.sema);
+ }
+ }
+ // outside the lock
+}
diff --git a/core/sync/sync2/primitives.odin b/core/sync/sync2/primitives.odin
new file mode 100644
index 000000000..dd6688a50
--- /dev/null
+++ b/core/sync/sync2/primitives.odin
@@ -0,0 +1,185 @@
+package sync2
+
+import "core:time"
+import "core:runtime"
+
+// A Mutex is a mutual exclusion lock
+// The zero value for a Mutex is an unlocked mutex
+//
+// A Mutex must not be copied after first use
+Mutex :: struct {
+ impl: _Mutex,
+}
+
+// mutex_lock locks m
+mutex_lock :: proc(m: ^Mutex) {
+ _mutex_lock(m);
+}
+
+// mutex_lock unlocks m
+mutex_unlock :: proc(m: ^Mutex) {
+ _mutex_unlock(m);
+}
+
+// mutex_lock tries to lock m, will return true on success, and false on failure
+mutex_try_lock :: proc(m: ^Mutex) -> bool {
+ return _mutex_try_lock(m);
+}
+
+// A RW_Mutex is a reader/writer mutual exclusion lock
+// The lock can be held by any arbitrary number of readers or a single writer
+// The zero value for a RW_Mutex is an unlocked mutex
+//
+// A RW_Mutex must not be copied after first use
+RW_Mutex :: struct {
+ impl: _RW_Mutex,
+}
+
+// rw_mutex_lock locks rw for writing (with a single writer)
+// If the mutex is already locked for reading or writing, the mutex blocks until the mutex is available.
+rw_mutex_lock :: proc(rw: ^RW_Mutex) {
+ _rw_mutex_lock(rw);
+}
+
+// rw_mutex_unlock unlocks rw for writing (with a single writer)
+rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
+ _rw_mutex_unlock(rw);
+}
+
+// rw_mutex_try_lock tries to lock rw for writing (with a single writer)
+rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
+ return _rw_mutex_try_lock(rw);
+}
+
+// rw_mutex_shared_lock locks rw for reading (with arbitrary number of readers)
+rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
+ _rw_mutex_shared_lock(rw);
+}
+
+// rw_mutex_shared_unlock unlocks rw for reading (with arbitrary number of readers)
+rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
+ _rw_mutex_shared_unlock(rw);
+}
+
+// rw_mutex_try_shared_lock tries to lock rw for reading (with arbitrary number of readers)
+rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
+ return _rw_mutex_try_shared_lock(rw);
+}
+
+
+// A Recusrive_Mutex is a recursive mutual exclusion lock
+// The zero value for a Recursive_Mutex is an unlocked mutex
+//
+// A Recursive_Mutex must not be copied after first use
+Recursive_Mutex :: struct {
+ // TODO(bill): Is this implementation too lazy?
+ // Can this be made to work on all OSes without construction and destruction, i.e. Zero is Initialized
+ // CRITICAL_SECTION would be a perfect candidate for this on Windows but that cannot be "dumb"
+
+ owner: int,
+ recursion: int,
+ mutex: Mutex,
+}
+
+recursive_mutex_lock :: proc(m: ^Recursive_Mutex) {
+ tid := runtime.current_thread_id();
+ if tid != m.owner {
+ mutex_lock(&m.mutex);
+ }
+ // inside the lock
+ m.owner = tid;
+ m.recursion += 1;
+}
+
+recursive_mutex_unlock :: proc(m: ^Recursive_Mutex) {
+ tid := runtime.current_thread_id();
+ assert(tid == m.owner);
+ m.recursion -= 1;
+ recursion := m.recursion;
+ if recursion == 0 {
+ m.owner = 0;
+ }
+ if recursion == 0 {
+ mutex_unlock(&m.mutex);
+ }
+ // outside the lock
+
+}
+
+recursive_mutex_try_lock :: proc(m: ^Recursive_Mutex) -> bool {
+ tid := runtime.current_thread_id();
+ if m.owner == tid {
+ return mutex_try_lock(&m.mutex);
+ }
+ if !mutex_try_lock(&m.mutex) {
+ return false;
+ }
+ // inside the lock
+ m.owner = tid;
+ m.recursion += 1;
+ return true;
+}
+
+
+
+// Cond implements a condition variable, a rendezvous point for threads
+// waiting for signalling the occurence of an event
+//
+// A Cond must not be copied after first use
+Cond :: struct {
+ impl: _Cond,
+}
+
+cond_wait :: proc(c: ^Cond, m: ^Mutex) {
+ _cond_wait(c, m);
+}
+
+cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
+ return _cond_wait_with_timeout(c, m, timeout);
+}
+
+cond_signal :: proc(c: ^Cond) {
+ _cond_signal(c);
+}
+
+cond_broadcast :: proc(c: ^Cond) {
+ _cond_broadcast(c);
+}
+
+
+
+// When waited upon, blocks until the internal count is greater than zero, then subtracts one.
+// Posting to the semaphore increases the count by one, or the provided amount.
+//
+// A Sema must not be copied after first use
+Sema :: struct {
+ // TODO(bill): Is this implementation too lazy?
+ // Can this be made to work on all OSes without construction and destruction, i.e. Zero is Initialized
+
+ mutex: Mutex,
+ cond: Cond,
+ count: int,
+}
+
+
+sema_wait :: proc(s: ^Sema) {
+ mutex_lock(&s.mutex);
+ defer mutex_unlock(&s.mutex);
+
+ for s.count == 0 {
+ cond_wait(&s.cond, &s.mutex);
+ }
+
+ s.count -= 1;
+ if s.count > 0 {
+ cond_signal(&s.cond);
+ }
+}
+
+sema_post :: proc(s: ^Sema, count := 1) {
+ mutex_lock(&s.mutex);
+ defer mutex_unlock(&s.mutex);
+
+ s.count += count;
+ cond_signal(&s.cond);
+}
diff --git a/core/sync/sync2/primitives_atomic.odin b/core/sync/sync2/primitives_atomic.odin
new file mode 100644
index 000000000..6133ed77b
--- /dev/null
+++ b/core/sync/sync2/primitives_atomic.odin
@@ -0,0 +1,244 @@
+//+build linux, darwin, freebsd
+//+private
+package sync2
+
+when !#config(ODIN_SYNC_USE_PTHREADS, false) {
+
+import "intrinsics"
+import "core:time"
+
+_Mutex_State :: enum i32 {
+ Unlocked = 0,
+ Locked = 1,
+ Waiting = 2,
+}
+_Mutex :: struct {
+ state: _Mutex_State,
+}
+
+_mutex_lock :: proc(m: ^Mutex) {
+ if intrinsics.atomic_xchg_rel(&m.impl.state, .Unlocked) != .Unlocked {
+ _mutex_unlock_slow(m);
+ }
+}
+
+_mutex_unlock :: proc(m: ^Mutex) {
+ switch intrinsics.atomic_xchg_rel(&m.impl.state, .Unlocked) {
+ case .Unlocked:
+ unreachable();
+ case .Locked:
+ // Okay
+ case .Waiting:
+ _mutex_unlock_slow(m);
+ }
+}
+
+_mutex_try_lock :: proc(m: ^Mutex) -> bool {
+ _, ok := intrinsics.atomic_cxchg_acq(&m.impl.state, .Unlocked, .Locked);
+ return ok;
+}
+
+
+
+_mutex_lock_slow :: proc(m: ^Mutex, curr_state: _Mutex_State) {
+ new_state := curr_state; // Make a copy of it
+
+ spin_lock: for spin in 0..<i32(100) {
+ state, ok := intrinsics.atomic_cxchgweak_acq(&m.impl.state, .Unlocked, new_state);
+ if ok {
+ return;
+ }
+
+ if state == .Waiting {
+ break spin_lock;
+ }
+
+ for i := min(spin+1, 32); i > 0; i -= 1 {
+ intrinsics.cpu_relax();
+ }
+ }
+
+ for {
+ if intrinsics.atomic_xchg_acq(&m.impl.state, .Waiting) == .Unlocked {
+ return;
+ }
+
+ // TODO(bill): Use a Futex here for Linux to improve performance and error handling
+ intrinsics.cpu_relax();
+ }
+}
+
+
+_mutex_unlock_slow :: proc(m: ^Mutex) {
+ // TODO(bill): Use a Futex here for Linux to improve performance and error handling
+}
+
+
+RW_Mutex_State :: distinct uint;
+RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2;
+RW_Mutex_State_Is_Writing :: RW_Mutex_State(1);
+RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1;
+RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width;
+
+RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1;
+RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width;
+
+
+_RW_Mutex :: struct {
+ state: RW_Mutex_State,
+ mutex: Mutex,
+ sema: Sema,
+}
+
+_rw_mutex_lock :: proc(rw: ^RW_Mutex) {
+ _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Writer);
+ mutex_lock(&rw.impl.mutex);
+
+ state := intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Writer);
+ if state & RW_Mutex_State_Reader_Mask != 0 {
+ sema_wait(&rw.impl.sema);
+ }
+}
+
+_rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
+ _ = intrinsics.atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing);
+ mutex_unlock(&rw.impl.mutex);
+}
+
+_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
+ if mutex_try_lock(&rw.impl.mutex) {
+ state := intrinsics.atomic_load(&rw.impl.state);
+ if state & RW_Mutex_State_Reader_Mask == 0 {
+ _ = intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing);
+ return true;
+ }
+
+ mutex_unlock(&rw.impl.mutex);
+ }
+ return false;
+}
+
+_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
+ state := intrinsics.atomic_load(&rw.impl.state);
+ for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
+ ok: bool;
+ state, ok = intrinsics.atomic_cxchgweak(&rw.impl.state, state, state + RW_Mutex_State_Reader);
+ if ok {
+ return;
+ }
+ }
+
+ mutex_lock(&rw.impl.mutex);
+ _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader);
+ mutex_unlock(&rw.impl.mutex);
+}
+
+_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
+ state := intrinsics.atomic_sub(&rw.impl.state, RW_Mutex_State_Reader);
+
+ if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) &&
+ (state & RW_Mutex_State_Is_Writing != 0) {
+ sema_post(&rw.impl.sema);
+ }
+}
+
+_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
+ state := intrinsics.atomic_load(&rw.impl.state);
+ if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
+ _, ok := intrinsics.atomic_cxchg(&rw.impl.state, state, state + RW_Mutex_State_Reader);
+ if ok {
+ return true;
+ }
+ }
+ if mutex_try_lock(&rw.impl.mutex) {
+ _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader);
+ mutex_unlock(&rw.impl.mutex);
+ return true;
+ }
+
+ return false;
+}
+
+
+
+Queue_Item :: struct {
+ next: ^Queue_Item,
+ futex: i32,
+}
+
+queue_item_wait :: proc(item: ^Queue_Item) {
+ for intrinsics.atomic_load_acq(&item.futex) == 0 {
+ // TODO(bill): Use a Futex here for Linux to improve performance and error handling
+ intrinsics.cpu_relax();
+ }
+}
+queue_item_signal :: proc(item: ^Queue_Item) {
+ intrinsics.atomic_store_rel(&item.futex, 1);
+ // TODO(bill): Use a Futex here for Linux to improve performance and error handling
+}
+
+
+_Cond :: struct {
+ queue_mutex: Mutex,
+ queue_head: ^Queue_Item,
+ pending: bool,
+}
+
+_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
+ waiter := &Queue_Item{};
+
+ mutex_lock(&c.impl.queue_mutex);
+ waiter.next = c.impl.queue_head;
+ c.impl.queue_head = waiter;
+
+ intrinsics.atomic_store(&c.impl.pending, true);
+ mutex_unlock(&c.impl.queue_mutex);
+
+ mutex_unlock(m);
+ queue_item_wait(waiter);
+ mutex_lock(m);
+}
+
+_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
+ // TODO(bill): _cond_wait_with_timeout for unix
+ return false;
+}
+
+_cond_signal :: proc(c: ^Cond) {
+ if !intrinsics.atomic_load(&c.impl.pending) {
+ return;
+ }
+
+ mutex_lock(&c.impl.queue_mutex);
+ waiter := c.impl.queue_head;
+ if c.impl.queue_head != nil {
+ c.impl.queue_head = c.impl.queue_head.next;
+ }
+ intrinsics.atomic_store(&c.impl.pending, c.impl.queue_head != nil);
+ mutex_unlock(&c.impl.queue_mutex);
+
+ if waiter != nil {
+ queue_item_signal(waiter);
+ }
+}
+
+_cond_broadcast :: proc(c: ^Cond) {
+ if !intrinsics.atomic_load(&c.impl.pending) {
+ return;
+ }
+
+ intrinsics.atomic_store(&c.impl.pending, false);
+
+ mutex_lock(&c.impl.queue_mutex);
+ waiters := c.impl.queue_head;
+ c.impl.queue_head = nil;
+ mutex_unlock(&c.impl.queue_mutex);
+
+ for waiters != nil {
+ queue_item_signal(waiters);
+ waiters = waiters.next;
+ }
+}
+
+
+} // !ODIN_SYNC_USE_PTHREADS
diff --git a/core/sync/sync2/primitives_pthreads.odin b/core/sync/sync2/primitives_pthreads.odin
new file mode 100644
index 000000000..cb580f03f
--- /dev/null
+++ b/core/sync/sync2/primitives_pthreads.odin
@@ -0,0 +1,155 @@
+//+build linux, darwin, freebsd
+//+private
+package sync2
+
+when #config(ODIN_SYNC_USE_PTHREADS, false) {
+
+import "intrinsics"
+import "core:time"
+import "core:sys/unix"
+
+_Mutex_State :: enum i32 {
+ Unlocked = 0,
+ Locked = 1,
+ Waiting = 2,
+}
+_Mutex :: struct {
+ pthread_mutex: unix.pthread_mutex_t,
+}
+
+_mutex_lock :: proc(m: ^Mutex) {
+ err := unix.pthread_mutex_lock(&m.impl.pthread_mutex);
+ assert(err == 0);
+}
+
+_mutex_unlock :: proc(m: ^Mutex) {
+ err := unix.pthread_mutex_unlock(&m.impl.pthread_mutex);
+ assert(err == 0);
+}
+
+_mutex_try_lock :: proc(m: ^Mutex) -> bool {
+ err := unix.pthread_mutex_trylock(&m.impl.pthread_mutex);
+ return err == 0;
+}
+
+
+
+RW_Mutex_State :: distinct uint;
+RW_Mutex_State_Half_Width :: size_of(RW_Mutex_State)*8/2;
+RW_Mutex_State_Is_Writing :: RW_Mutex_State(1);
+RW_Mutex_State_Writer :: RW_Mutex_State(1)<<1;
+RW_Mutex_State_Reader :: RW_Mutex_State(1)<<RW_Mutex_State_Half_Width;
+
+RW_Mutex_State_Writer_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << 1;
+RW_Mutex_State_Reader_Mask :: RW_Mutex_State(1<<(RW_Mutex_State_Half_Width-1) - 1) << RW_Mutex_State_Half_Width;
+
+
+_RW_Mutex :: struct {
+ // NOTE(bill): pthread_rwlock_t cannot be used since pthread_rwlock_destroy is required on some platforms
+ // TODO(bill): Can we determine which platforms exactly?
+ state: RW_Mutex_State,
+ mutex: Mutex,
+ sema: Sema,
+}
+
+_rw_mutex_lock :: proc(rw: ^RW_Mutex) {
+ _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Writer);
+ mutex_lock(&rw.impl.mutex);
+
+ state := intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Writer);
+ if state & RW_Mutex_State_Reader_Mask != 0 {
+ sema_wait(&rw.impl.sema);
+ }
+}
+
+_rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
+ _ = intrinsics.atomic_and(&rw.impl.state, ~RW_Mutex_State_Is_Writing);
+ mutex_unlock(&rw.impl.mutex);
+}
+
+_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
+ if mutex_try_lock(&rw.impl.mutex) {
+ state := intrinsics.atomic_load(&rw.impl.state);
+ if state & RW_Mutex_State_Reader_Mask == 0 {
+ _ = intrinsics.atomic_or(&rw.impl.state, RW_Mutex_State_Is_Writing);
+ return true;
+ }
+
+ mutex_unlock(&rw.impl.mutex);
+ }
+ return false;
+}
+
+_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
+ state := intrinsics.atomic_load(&rw.impl.state);
+ for state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
+ ok: bool;
+ state, ok = intrinsics.atomic_cxchgweak(&rw.impl.state, state, state + RW_Mutex_State_Reader);
+ if ok {
+ return;
+ }
+ }
+
+ mutex_lock(&rw.impl.mutex);
+ _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader);
+ mutex_unlock(&rw.impl.mutex);
+}
+
+_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
+ state := intrinsics.atomic_sub(&rw.impl.state, RW_Mutex_State_Reader);
+
+ if (state & RW_Mutex_State_Reader_Mask == RW_Mutex_State_Reader) &&
+ (state & RW_Mutex_State_Is_Writing != 0) {
+ sema_post(&rw.impl.sema);
+ }
+}
+
+_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
+ state := intrinsics.atomic_load(&rw.impl.state);
+ if state & (RW_Mutex_State_Is_Writing|RW_Mutex_State_Writer_Mask) == 0 {
+ _, ok := intrinsics.atomic_cxchg(&rw.impl.state, state, state + RW_Mutex_State_Reader);
+ if ok {
+ return true;
+ }
+ }
+ if mutex_try_lock(&rw.impl.mutex) {
+ _ = intrinsics.atomic_add(&rw.impl.state, RW_Mutex_State_Reader);
+ mutex_unlock(&rw.impl.mutex);
+ return true;
+ }
+
+ return false;
+}
+
+_Cond :: struct {
+ pthread_cond: unix.pthread_cond_t,
+}
+
+_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
+ err := unix.pthread_cond_wait(&c.impl.pthread_cond, &m.impl.pthread_mutex);
+ assert(err == 0);
+}
+
+_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
+ ns := time.duration_nanoseconds(timeout);
+ timeout_timespec := &time.TimeSpec{
+ tv_sec = ns / 1e9,
+ tv_nsec = ns % 1e9,
+ };
+ err := unix.pthread_cond_timedwait(&c.impl.pthread_cond, &m.impl.pthread_mutex, timeout_timespec);
+ // TODO(bill):
+ return err == 0;
+}
+
+_cond_signal :: proc(c: ^Cond) {
+ err := unix.pthread_cond_signal(&c.impl.pthread_cond);
+ assert(err == 0);
+}
+
+_cond_broadcast :: proc(c: ^Cond) {
+ err := unix.pthread_cond_broadcast(&c.impl.pthread_cond);
+ assert(err == 0);
+}
+
+
+} // ODIN_SYNC_USE_PTHREADS
diff --git a/core/sync/sync2/primitives_windows.odin b/core/sync/sync2/primitives_windows.odin
new file mode 100644
index 000000000..02b6cd733
--- /dev/null
+++ b/core/sync/sync2/primitives_windows.odin
@@ -0,0 +1,73 @@
+//+build windows
+//+private
+package sync2
+
+import "core:time"
+import win32 "core:sys/windows"
+
+_Mutex :: struct {
+ srwlock: win32.SRWLOCK,
+}
+
+_mutex_lock :: proc(m: ^Mutex) {
+ win32.AcquireSRWLockExclusive(&m.impl.srwlock);
+}
+
+_mutex_unlock :: proc(m: ^Mutex) {
+ win32.ReleaseSRWLockExclusive(&m.impl.srwlock);
+}
+
+_mutex_try_lock :: proc(m: ^Mutex) -> bool {
+ return bool(win32.TryAcquireSRWLockExclusive(&m.impl.srwlock));
+}
+
+_RW_Mutex :: struct {
+ srwlock: win32.SRWLOCK,
+}
+
+_rw_mutex_lock :: proc(rw: ^RW_Mutex) {
+ win32.AcquireSRWLockExclusive(&rw.impl.srwlock);
+}
+
+_rw_mutex_unlock :: proc(rw: ^RW_Mutex) {
+ win32.ReleaseSRWLockExclusive(&rw.impl.srwlock);
+}
+
+_rw_mutex_try_lock :: proc(rw: ^RW_Mutex) -> bool {
+ return bool(win32.TryAcquireSRWLockExclusive(&rw.impl.srwlock));
+}
+
+_rw_mutex_shared_lock :: proc(rw: ^RW_Mutex) {
+ win32.AcquireSRWLockShared(&rw.impl.srwlock);
+}
+
+_rw_mutex_shared_unlock :: proc(rw: ^RW_Mutex) {
+ win32.ReleaseSRWLockShared(&rw.impl.srwlock);
+}
+
+_rw_mutex_try_shared_lock :: proc(rw: ^RW_Mutex) -> bool {
+ return bool(win32.TryAcquireSRWLockShared(&rw.impl.srwlock));
+}
+
+
+
+_Cond :: struct {
+ cond: win32.CONDITION_VARIABLE,
+}
+
+_cond_wait :: proc(c: ^Cond, m: ^Mutex) {
+ _ = win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, win32.INFINITE, 0);
+}
+
+_cond_wait_with_timeout :: proc(c: ^Cond, m: ^Mutex, timeout: time.Duration) -> bool {
+ ms := win32.DWORD((max(time.duration_nanoseconds(timeout), 0) + 999999)/1000000);
+ return cast(bool)win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, ms, 0);
+}
+
+_cond_signal :: proc(c: ^Cond) {
+ win32.WakeConditionVariable(&c.impl.cond);
+}
+
+_cond_broadcast :: proc(c: ^Cond) {
+ win32.WakeAllConditionVariable(&c.impl.cond);
+}