aboutsummaryrefslogtreecommitdiff
path: root/core/sync
diff options
context:
space:
mode:
authorgingerBill <bill@gingerbill.org>2020-07-14 16:37:29 +0100
committergingerBill <bill@gingerbill.org>2020-07-14 16:37:29 +0100
commitfc65aee307ad7aec09bab5e7d3a1b4941ce24c68 (patch)
tree25ec09b3edb441e8c060783b544d59d80c9c27d1 /core/sync
parentede135a08f71d00e8ea0a1d083baae9267753c7a (diff)
Update sync.Channel
Diffstat (limited to 'core/sync')
-rw-r--r--core/sync/channel.odin481
1 files changed, 288 insertions, 193 deletions
diff --git a/core/sync/channel.odin b/core/sync/channel.odin
index be8a96f47..059b5f747 100644
--- a/core/sync/channel.odin
+++ b/core/sync/channel.odin
@@ -2,290 +2,385 @@ package sync
import "core:mem"
import "core:time"
+import "core:fmt"
+import "core:intrinsics"
import "core:math/rand"
_, _ :: time, rand;
-chan :: struct(T: typeid) {
- qlen: uint,
- qcap: uint,
- closed: b32,
- sendx: uint,
- recvx: uint,
- mutex: Blocking_Mutex,
- allocator: mem.Allocator,
- buf: [0]T,
+Channel :: struct(T: typeid) {
+ using _internal: ^Raw_Channel,
}
-makechan :: proc($T: typeid, cap: int, allocator := context.allocator) -> ^chan(T) {
- chan_size :: size_of(chan(T));
- chan_align :: align_of(chan(T));
+channel_init :: proc(ch: ^$C/Channel($T), cap := 0, allocator := context.allocator) {
+ context.allocator = allocator;
+ ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+ return;
+}
- mem := uintptr(cap) * size_of(T);
- c := cast(^chan(T))mem.alloc(chan_size+mem, chan_align, allocator);
- c.allocator = allocator;
- c.qlen = 0;
- c.qcap = uint(cap);
- blocking_mutex_init(&c.mutex);
- return c;
+channel_make :: proc($T: typeid, cap := 0, allocator := context.allocator) -> (ch: Channel(T)) {
+ context.allocator = allocator;
+ ch._internal = raw_channel_create(size_of(T), align_of(T), cap);
+ return;
}
-chanbuf :: proc(c: ^$C/chan($T)) -> []T #no_bounds_check {
- return c.buf[0:c.qcap];
+
+channel_destroy :: proc(ch: $C/Channel($T)) {
+ raw_channel_destroy(ch._internal);
}
+channel_len :: proc(ch: $C/Channel($T)) -> int {
+ return ch._internal.len;
+}
+channel_cap :: proc(ch: $C/Channel($T)) -> int {
+ return ch._internal.cap;
+}
-/*
+channel_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) {
+ msg := msg;
+ _ = raw_channel_send_impl(ch._internal, &msg, false, loc);
+}
+channel_try_send :: proc(ch: $C/Channel($T), msg: T, loc := #caller_location) -> bool {
+ msg := msg;
+ return raw_channel_send_impl(ch._internal, &msg, true, loc);
+}
-Channel :: struct(T: typeid) {
- using internal: ^_Channel_Internal(T),
+channel_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T) {
+ c := ch._internal;
+ mutex_lock(&c.mutex);
+ raw_channel_recv_impl(c, &msg, loc);
+ mutex_unlock(&c.mutex);
+ return;
+}
+channel_try_recv :: proc(ch: $C/Channel($T), loc := #caller_location) -> (msg: T, ok: bool) {
+ c := ch._internal;
+ if mutex_try_lock(&c.mutex) {
+ if c.len > 0 {
+ raw_channel_recv_impl(c, &msg, loc);
+ ok = true;
+ }
+ mutex_unlock(&c.mutex);
+ }
+ return;
}
-_Channel_Internal :: struct(T: typeid) {
- allocator: mem.Allocator,
+channel_is_nil :: proc(ch: $C/Channel($T)) -> bool {
+ return ch._internal == nil;
+}
- queue: [dynamic]T,
- unbuffered_msg: T, // Will be used as the backing to the queue if no `cap` is given
+channel_eq :: proc(a, b: $C/Channel($T)) -> bool {
+ return a._internal == b._internal;
+}
+channel_ne :: proc(a, b: $C/Channel($T)) -> bool {
+ return a._internal != b._internal;
+}
- mutex: Mutex,
- r_cond: Condition,
- w_cond: Condition,
- closed: bool,
- r_waiting: int,
- w_waiting: int,
+channel_can_send :: proc(ch: $C/Channel($T)) -> (ok: bool) {
+ return raw_channel_can_send(ch._internal);
}
-
-channel_init :: proc(c: ^$C/Channel($T), cap: int = 0, allocator := context.allocator) {
- c^ = cast(C)channel_make(T, cap, allocator);
+channel_can_recv :: proc(ch: $C/Channel($T)) -> (ok: bool) {
+ return raw_channel_can_recv(ch._internal);
}
-channel_make :: proc($T: typeid, cap: int = 0, allocator := context.allocator) -> (ch: Channel(T)) {
- ch.internal = new(_Channel_Internal(T), allocator);
- if ch.internal == nil {
- return {};
+
+
+channel_peek :: proc(ch: $C/Channel($T)) -> int {
+ c := ch._internal;
+ if c == nil {
+ return -1;
}
- ch.allocator = allocator;
-
- mutex_init(&ch.mutex);
- condition_init(&ch.r_cond, &ch.mutex);
- condition_init(&ch.w_cond, &ch.mutex);
- ch.closed = false;
- ch.r_waiting = 0;
- ch.w_waiting = 0;
- ch.unbuffered_msg = T{};
-
- if cap > 0 {
- ch.queue = make([dynamic]T, 0, cap, ch.allocator);
- } else {
- d := mem.Raw_Dynamic_Array{
- data = &ch.unbuffered_msg,
- len = 0,
- cap = 1,
- allocator = mem.nil_allocator(),
- };
- ch.queue = transmute([dynamic]T)d;
+ if intrinsics.atomic_load(&c.closed) {
+ return -1;
}
- return ch;
+ return intrinsics.atomic_load(&c.len);
}
-channel_destroy :: proc(ch: $C/Channel($T)) {
- channel_close(ch);
- if channel_is_buffered(ch) {
- delete(ch.queue);
+channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) {
+ c := ch._internal;
+ if c == nil {
+ panic(message="cannot close nil channel", loc=loc);
}
-
- mutex_destroy(&ch.mutex);
- condition_destroy(&ch.r_cond);
- condition_destroy(&ch.w_cond);
- free(ch.internal, ch.allocator);
+ intrinsics.atomic_store(&c.closed, true);
}
-channel_close :: proc(ch: $C/Channel($T)) -> (ok: bool) {
- mutex_lock(&ch.mutex);
- if !ch.closed {
- ch.closed = true;
- condition_broadcast(&ch.r_cond);
- condition_broadcast(&ch.w_cond);
- ok = true;
+channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, open: bool) {
+ c := ch._internal;
+ switch {
+ case c == nil:
+ return;
+ case intrinsics.atomic_load(&c.closed):
+ if channel_can_recv(ch) {
+ val = channel_recv(ch);
+ open = true;
+ }
+ case:
+ val = channel_recv(ch);
+ open = true;
}
-
- mutex_unlock(&ch.mutex);
return;
}
-channel_write :: proc(ch: $C/Channel($T), msg: T) -> (ok: bool) {
- mutex_lock(&ch.mutex);
- defer mutex_unlock(&ch.mutex);
- if ch.closed {
+channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
+ backing: [64]int;
+ candidates := backing[:];
+ if len(channels) > len(backing) {
+ candidates = make([]int, len(channels), context.temp_allocator);
+ }
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_recv(c) {
+ candidates[i] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ index = -1;
return;
}
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
- for len(ch.queue) == cap(ch.queue) {
- ch.w_waiting += 1;
- condition_wait_for(&ch.w_cond);
- ch.w_waiting -= 1;
+ index = candidates[i % count];
+ return;
+}
+
+
+channel_select_send :: proc(channels: ..^Raw_Channel) -> (index: int) {
+ backing: [64]int;
+ candidates := backing[:];
+ if len(channels) > len(backing) {
+ candidates = make([]int, len(channels), context.temp_allocator);
}
- if len(ch.queue) < cap(ch.queue) {
- append(&ch.queue, msg);
- ok = true;
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_send(c) {
+ candidates[i] = i;
+ count += 1;
+ }
}
- if ch.r_waiting > 0 {
- condition_signal(&ch.r_cond);
+ if count == 0 {
+ index = -1;
+ return;
}
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
return;
}
-channel_read :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) #optional_ok {
- mutex_lock(&ch.mutex);
- defer mutex_unlock(&ch.mutex);
+channel_select_recv_msg :: proc(channels: ..$C/Channel($T)) -> (msg: T, index: int) {
+ backing: [64]int;
+ candidates := backing[:];
+ if len(channels) > len(backing) {
+ candidates = make([]int, len(channels), context.temp_allocator);
+ }
- for len(ch.queue) == 0 {
- if ch.closed {
- return;
+ count := u32(0);
+ for c, i in channels {
+ if channel_can_recv(c) {
+ candidates[i] = i;
+ count += 1;
}
-
- ch.r_waiting += 1;
- condition_wait_for(&ch.r_cond);
- ch.r_waiting -= 1;
}
- msg, ok = pop_front(&ch.queue);
-
- if ch.w_waiting > 0 {
- condition_signal(&ch.w_cond);
+ if count == 0 {
+ index = -1;
+ return;
}
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ msg = channel_recv(channels[index]);
return;
}
-channel_size :: proc(ch: $C/Channel($T)) -> (size: int) {
- if channel_is_buffered(ch) {
- mutex_lock(&ch.mutex);
- size = len(ch.queue);
- mutex_unlock(&ch.mutex);
+channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: int) {
+ backing: [64]int;
+ candidates := backing[:];
+ if len(channels) > len(backing) {
+ candidates = make([]int, len(channels), context.temp_allocator);
+ }
+
+ count := u32(0);
+ for c, i in channels {
+ if raw_channel_can_send(c) {
+ candidates[i] = i;
+ count += 1;
+ }
+ }
+
+ if count == 0 {
+ index = -1;
+ return;
}
+
+ t := time.now();
+ r := rand.create(transmute(u64)t);
+ i := rand.uint32(&r);
+
+ index = candidates[i % count];
+ channel_send(channels[index], msg);
return;
}
-channel_is_closed :: proc(ch: $C/Channel($T)) -> bool {
- mutex_lock(&ch.mutex);
- closed := ch.closed;
- mutex_unlock(&ch.mutex);
- return closed;
-}
-channel_is_buffered :: proc(ch: $C/Channel($T)) -> bool {
- q := transmute(mem.Raw_Dynamic_Array)ch.queue;
- return q.cap != 0 && (q.data != &ch.unbuffered_msg);
-}
-channel_can_write :: proc(ch: $C/Channel($T)) -> bool {
- mutex_lock(&ch.mutex);
- defer mutex_unlock(&ch.mutex);
- return len(ch.queue) < cap(ch.queue);
-}
-channel_can_read :: proc(ch: $C/Channel($T)) -> bool {
- mutex_lock(&ch.mutex);
- defer mutex_unlock(&ch.mutex);
- return len(ch.queue) > 0;
-}
-channel_can_read_write :: proc(ch: $C/Channel($T)) -> bool {
- mutex_lock(&ch.mutex);
- defer mutex_unlock(&ch.mutex);
- return 0 < len(ch.queue) && len(ch.queue) < cap(ch.queue);
+
+Raw_Channel :: struct {
+ data: rawptr,
+ elem_size: int,
+ len, cap: int,
+ read, write: int,
+ mutex: Mutex,
+ cond: Condition,
+ allocator: mem.Allocator,
+ closed: bool,
+ ready: bool, // ready to recv
}
-channel_iterator :: proc(ch: $C/Channel($T)) -> (elem: T, ok: bool) {
- mutex_lock(&ch.mutex);
- defer mutex_unlock(&ch.mutex);
- if len(ch.queue) > 0 {
- return channel_read(ch);
+raw_channel_create :: proc(elem_size, elem_align, cap: int) -> ^Raw_Channel {
+ s := size_of(Raw_Channel);
+ s = mem.align_forward_int(s, elem_align);
+ data_offset := uintptr(s);
+ s += elem_size * max(cap, 1);
+
+ a := max(elem_align, align_of(Raw_Channel));
+
+ c := (^Raw_Channel)(mem.alloc(s, a));
+ if c == nil {
+ return nil;
}
- return T{}, false;
+ c.data = rawptr(uintptr(c) + data_offset);
+ c.elem_size = elem_size;
+ c.len, c.cap = 0, max(cap, 0);
+ c.read, c.write = 0, 0;
+ mutex_init(&c.mutex);
+ condition_init(&c.cond, &c.mutex);
+ c.allocator = context.allocator;
+ c.closed = false;
+
+ return c;
}
+raw_channel_destroy :: proc(c: ^Raw_Channel) {
+ if c == nil {
+ return;
+ }
+ context.allocator = c.allocator;
+ c.closed = true;
-channel_select :: proc(readers, writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) {
- Candidate :: struct {
- ch: C,
- msg: T,
- index: int,
- read: bool,
- };
+ condition_destroy(&c.cond);
+ mutex_destroy(&c.mutex);
+ free(c);
+}
- count := 0;
- candidates := make([]Candidate, len(readers) + len(writers));
- defer delete(candidates);
- for c, i in readers {
- if channel_can_read(c) {
- candidates[count] = {
- ch = c,
- index = i,
- read = true,
- };
- count += 1;
- }
+raw_channel_send_impl :: proc(c: ^Raw_Channel, msg: rawptr, no_block: bool, loc := #caller_location) -> bool {
+ send :: proc(c: ^Raw_Channel, src: rawptr) {
+ dst := uintptr(c.data) + uintptr(c.write * c.elem_size);
+ mem.copy(rawptr(dst), src, c.elem_size);
+ c.len += 1;
+ c.write = (c.write + 1) % max(c.cap, 1);
}
- for c, i in writers {
- if channel_can_write(c) {
- candidates[count] = {
- ch = c,
- index = count,
- read = false,
- msg = write_msgs[i],
- };
- count += 1;
- }
+ switch {
+ case c == nil:
+ panic(message="cannot send message; channel is nil", loc=loc);
+ case c.closed:
+ panic(message="cannot send message; channel is closed", loc=loc);
}
- if count == 0 {
- return T{}, -1;
- }
+ mutex_lock(&c.mutex);
+ if c.cap > 0 {
+ if no_block && c.len >= c.cap {
+ mutex_unlock(&c.mutex);
+ return false;
+ }
- // Randomize the input
- r := rand.create(time.read_cycle_counter());
- s := candidates[rand.int_max(count, &r)];
- if s.read {
- ok: bool;
- if read_msg, ok = channel_read(s.ch); !ok {
- index = -1;
- return;
+ for c.len >= c.cap {
+ condition_wait_for(&c.cond);
}
- } else {
- if !channel_write(s.ch, s.msg) {
- index = -1;
- return;
+ }
+
+ send(c, msg);
+ mutex_unlock(&c.mutex);
+ condition_signal(&c.cond);
+
+ return true;
+}
+
+raw_channel_recv_impl :: proc(c: ^Raw_Channel, res: rawptr, loc := #caller_location) {
+ recv :: proc(c: ^Raw_Channel, dst: rawptr, loc := #caller_location) {
+ if c.len < 1 {
+ panic(message="cannot recv message; channel is empty", loc=loc);
}
+ c.len -= 1;
+ src := uintptr(c.data) + uintptr(c.read * c.elem_size);
+ mem.copy(dst, rawptr(src), c.elem_size);
+ c.read = (c.read + 1) % max(c.cap, 1);
}
- index = s.index;
- return;
+ if c == nil {
+ panic(message="cannot recv message; channel is nil", loc=loc);
+ }
+ intrinsics.atomic_store(&c.ready, true);
+ for c.len < 1 {
+ condition_wait_for(&c.cond);
+ }
+ intrinsics.atomic_store(&c.ready, false);
+ recv(c, res, loc);
+ if c.cap > 0 && c.len == c.cap - 1 {
+ condition_signal(&c.cond);
+ }
}
-channel_select_write :: proc(writers: []$C/Channel($T), write_msgs: []T) -> (read_msg: T, index: int) {
- return channel_select([]C{}, writers, msg);
+raw_channel_can_send :: proc(c: ^Raw_Channel) -> (ok: bool) {
+ if c == nil {
+ return false;
+ }
+ mutex_lock(&c.mutex);
+ switch {
+ case c.closed:
+ ok = false;
+ case c.cap > 0:
+ ok = c.len < c.cap;
+ case:
+ ok = !c.ready;
+ }
+ mutex_unlock(&c.mutex);
+ return;
}
-channel_select_read :: proc(readers: []$C/Channel($T)) -> (index: int) {
- _, index = channel_select(readers, []C{}, nil);
+raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
+ if c == nil {
+ return false;
+ }
+ mutex_lock(&c.mutex);
+ ok = c.len > 0;
+ mutex_unlock(&c.mutex);
return;
}
-*/