aboutsummaryrefslogtreecommitdiff
path: root/core/sync
diff options
context:
space:
mode:
authorgingerBill <bill@gingerbill.org>2020-07-15 00:25:37 +0100
committergingerBill <bill@gingerbill.org>2020-07-15 00:25:37 +0100
commit3a1492fc995e4050796dc23eb2afda8e94101536 (patch)
tree11e28ed025e55d2409c78f3c7099cf4d1b20f7e9 /core/sync
parentca818fb857e24c46dd9ffb3709dd661158445e38 (diff)
Add `sync.Wait_Group`
Diffstat (limited to 'core/sync')
-rw-r--r--core/sync/channel.odin34
-rw-r--r--core/sync/wait_group.odin58
2 files changed, 87 insertions, 5 deletions
diff --git a/core/sync/channel.odin b/core/sync/channel.odin
index ed9c526ad..5397bdc2e 100644
--- a/core/sync/channel.odin
+++ b/core/sync/channel.odin
@@ -1,6 +1,5 @@
package sync
-// import "core:fmt"
import "core:mem"
import "core:time"
import "core:intrinsics"
@@ -113,17 +112,32 @@ channel_close :: proc(ch: $C/Channel($T), loc := #caller_location) {
}
-channel_iterator :: proc(ch: $C/Channel($T)) -> (val: T, ok: bool) {
+channel_iterator :: proc(ch: $C/Channel($T)) -> (msg: T, ok: bool) {
c := ch._internal;
if c == nil {
return;
}
if !c.closed || c.len > 0 {
- val, ok = channel_recv(ch), true;
+ msg, ok = channel_recv(ch), true;
}
return;
}
+channel_drain :: proc(ch: $C/Channel($T)) {
+ raw_channel_drain(ch._internal);
+}
+
+
+channel_move :: proc(dst, src: $C/Channel($T)) {
+ // for channel_len(src) > 0 {
+ // msg := channel_recv(src);
+ // channel_send(dst, msg);
+ // }
+ for msg in channel_iterator(src) {
+ channel_send(dst, msg);
+ }
+}
+
channel_select_recv :: proc(channels: ..^Raw_Channel) -> (index: int) {
@@ -247,8 +261,6 @@ channel_select_send_msg :: proc(msg: $T, channels: ..$C/Channel(T)) -> (index: i
-
-
Raw_Channel :: struct {
data: rawptr,
elem_size: int,
@@ -393,3 +405,15 @@ raw_channel_can_recv :: proc(c: ^Raw_Channel) -> (ok: bool) {
mutex_unlock(&c.mutex);
return;
}
+
+
+raw_channel_drain :: proc(c: ^Raw_Channel) {
+ if c == nil {
+ return;
+ }
+ mutex_lock(&c.mutex);
+ c.len = 0;
+ c.read = 0;
+ c.write = 0;
+ mutex_unlock(&c.mutex);
+}
diff --git a/core/sync/wait_group.odin b/core/sync/wait_group.odin
new file mode 100644
index 000000000..477bce9c2
--- /dev/null
+++ b/core/sync/wait_group.odin
@@ -0,0 +1,58 @@
+package sync
+
+import "intrinsics"
+
+Wait_Group :: struct {
+ counter: int,
+ mutex: Blocking_Mutex,
+ cond: Condition,
+}
+
+wait_group_init :: proc(wg: ^Wait_Group) {
+ wg.counter = 0;
+ blocking_mutex_init(&wg.mutex);
+ condition_init(&wg.cond, &wg.mutex);
+}
+
+
+wait_group_destroy :: proc(wg: ^Wait_Group) {
+ condition_destroy(&wg.cond);
+ blocking_mutex_destroy(&wg.mutex);
+}
+
+wait_group_add :: proc(wg: ^Wait_Group, delta: int) {
+ if delta == 0 {
+ return;
+ }
+
+ blocking_mutex_lock(&wg.mutex);
+ defer blocking_mutex_unlock(&wg.mutex);
+
+ intrinsics.atomic_add(&wg.counter, delta);
+ if wg.counter < 0 {
+ panic("sync.Wait_Group negative counter");
+ }
+ if wg.counter == 0 {
+ condition_broadcast(&wg.cond);
+ if wg.counter != 0 {
+ panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait");
+ }
+ }
+}
+
+wait_group_done :: proc(wg: ^Wait_Group) {
+ wait_group_add(wg, -1);
+}
+
+wait_group_wait :: proc(wg: ^Wait_Group) {
+ blocking_mutex_lock(&wg.mutex);
+ defer blocking_mutex_unlock(&wg.mutex);
+
+ if wg.counter != 0 {
+ condition_wait_for(&wg.cond);
+ if wg.counter != 0 {
+ panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait");
+ }
+ }
+}
+