aboutsummaryrefslogtreecommitdiff
path: root/core/sync
diff options
context:
space:
mode:
authorgingerBill <gingerBill@users.noreply.github.com>2019-12-01 11:33:23 +0000
committerGitHub <noreply@github.com>2019-12-01 11:33:23 +0000
commit3fd5c3cd851d8f4dfd441141ca7e96889f069933 (patch)
tree67f47e79f5c5bb80a3ed1b1e9d79a61c08c0a29d /core/sync
parent0c0c83ee295fe8787a4bdc8b826a5432abba2ca9 (diff)
parent99121d6ff2b02f3d16b791eb103bb9f9e8b96475 (diff)
Merge pull request #458 from Tetralux/linux-threads
Implement core:thread and core:sync on Unix using pthreads
Diffstat (limited to 'core/sync')
-rw-r--r--core/sync/sync.odin27
-rw-r--r--core/sync/sync_darwin.odin39
-rw-r--r--core/sync/sync_linux.odin100
-rw-r--r--core/sync/sync_unix.odin99
-rw-r--r--core/sync/sync_windows.odin71
5 files changed, 203 insertions, 133 deletions
diff --git a/core/sync/sync.odin b/core/sync/sync.odin
new file mode 100644
index 000000000..5a0512275
--- /dev/null
+++ b/core/sync/sync.odin
@@ -0,0 +1,27 @@
+package sync
+
+foreign {
+ @(link_name="llvm.x86.sse2.pause")
+ yield_processor :: proc() ---
+}
+
+Ticket_Mutex :: struct {
+ ticket: u64,
+ serving: u64,
+}
+
+ticket_mutex_init :: proc(m: ^Ticket_Mutex) {
+ atomic_store(&m.ticket, 0, .Relaxed);
+ atomic_store(&m.serving, 0, .Relaxed);
+}
+
+ticket_mutex_lock :: inline proc(m: ^Ticket_Mutex) {
+ ticket := atomic_add(&m.ticket, 1, .Relaxed);
+ for ticket != m.serving {
+ yield_processor();
+ }
+}
+
+ticket_mutex_unlock :: inline proc(m: ^Ticket_Mutex) {
+ atomic_add(&m.serving, 1, .Relaxed);
+}
diff --git a/core/sync/sync_darwin.odin b/core/sync/sync_darwin.odin
new file mode 100644
index 000000000..2c86e21db
--- /dev/null
+++ b/core/sync/sync_darwin.odin
@@ -0,0 +1,39 @@
+package sync
+
+import "core:sys/darwin"
+
+import "core:c"
+
+// The Darwin docs say it best:
+// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously.
+// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens,
+// but when there are none left, a thread must wait until another thread returns one.
+Semaphore :: struct #align 16 {
+ handle: darwin.semaphore_t,
+}
+// TODO(tetra): Only marked with alignment because we cannot mark distinct integers with alignments.
+// See core/sys/unix/pthread_linux.odin/pthread_t.
+
+semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
+ ct := darwin.mach_task_self();
+ res := darwin.semaphore_create(ct, &s.handle, 0, c.int(initial_count));
+ assert(res == 0);
+}
+
+semaphore_destroy :: proc(s: ^Semaphore) {
+ ct := darwin.mach_task_self();
+ res := darwin.semaphore_destroy(ct, s.handle);
+ assert(res == 0);
+ s.handle = {};
+}
+
+semaphore_post :: proc(s: ^Semaphore, count := 1) {
+ assert(count == 1);
+ res := darwin.semaphore_signal(s.handle);
+ assert(res == 0);
+}
+
+semaphore_wait_for :: proc(s: ^Semaphore) {
+ res := darwin.semaphore_wait(s.handle);
+ assert(res == 0);
+}
diff --git a/core/sync/sync_linux.odin b/core/sync/sync_linux.odin
index dcb2ee8e9..dc761f6aa 100644
--- a/core/sync/sync_linux.odin
+++ b/core/sync/sync_linux.odin
@@ -1,98 +1,28 @@
package sync
-/*
+import "core:sys/unix"
-import "core:atomics"
-import "core:os"
-
-Semaphore :: struct {
- // _handle: win32.Handle,
-}
-
-Mutex :: struct {
- _semaphore: Semaphore,
- _counter: i32,
- _owner: i32,
- _recursion: i32,
-}
-
-current_thread_id :: proc() -> i32 {
- return i32(os.current_thread_id());
+// The Darwin docs say it best:
+// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously.
+// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens,
+// but when there are none left, a thread must wait until another thread returns one.
+Semaphore :: struct #align 16 {
+ handle: unix.sem_t,
}
-semaphore_init :: proc(s: ^Semaphore) {
- // s._handle = win32.CreateSemaphoreA(nil, 0, 1<<31-1, nil);
+semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
+ assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0);
}
semaphore_destroy :: proc(s: ^Semaphore) {
- // win32.CloseHandle(s._handle);
+ assert(unix.sem_destroy(&s.handle) == 0);
+ s.handle = {};
}
-semaphore_post :: proc(s: ^Semaphore, count: int) {
- // win32.ReleaseSemaphore(s._handle, cast(i32)count, nil);
+semaphore_post :: proc(s: ^Semaphore, count := 1) {
+ assert(unix.sem_post(&s.handle) == 0);
}
-semaphore_release :: inline proc(s: ^Semaphore) {
- semaphore_post(s, 1);
+semaphore_wait_for :: proc(s: ^Semaphore) {
+ assert(unix.sem_wait(&s.handle) == 0);
}
-
-semaphore_wait :: proc(s: ^Semaphore) {
- // win32.WaitForSingleObject(s._handle, win32.INFINITE);
-}
-
-
-mutex_init :: proc(m: ^Mutex) {
- atomics.store(&m._counter, 0);
- atomics.store(&m._owner, current_thread_id());
- semaphore_init(&m._semaphore);
- m._recursion = 0;
-}
-mutex_destroy :: proc(m: ^Mutex) {
- semaphore_destroy(&m._semaphore);
-}
-mutex_lock :: proc(m: ^Mutex) {
- thread_id := current_thread_id();
- if atomics.fetch_add(&m._counter, 1) > 0 {
- if thread_id != atomics.load(&m._owner) {
- semaphore_wait(&m._semaphore);
- }
- }
- atomics.store(&m._owner, thread_id);
- m._recursion += 1;
-}
-mutex_try_lock :: proc(m: ^Mutex) -> bool {
- thread_id := current_thread_id();
- if atomics.load(&m._owner) == thread_id {
- atomics.fetch_add(&m._counter, 1);
- } else {
- expected: i32 = 0;
- if atomics.load(&m._counter) != 0 {
- return false;
- }
- if atomics.compare_exchange(&m._counter, expected, 1) == 0 {
- return false;
- }
- atomics.store(&m._owner, thread_id);
- }
- m._recursion += 1;
- return true;
-}
-mutex_unlock :: proc(m: ^Mutex) {
- recursion: i32;
- thread_id := current_thread_id();
- assert(thread_id == atomics.load(&m._owner));
-
- m._recursion -= 1;
- recursion = m._recursion;
- if recursion == 0 {
- atomics.store(&m._owner, thread_id);
- }
-
- if atomics.fetch_add(&m._counter, -1) > 1 {
- if recursion == 0 {
- semaphore_release(&m._semaphore);
- }
- }
-}
-
-*/
diff --git a/core/sync/sync_unix.odin b/core/sync/sync_unix.odin
new file mode 100644
index 000000000..71ba54128
--- /dev/null
+++ b/core/sync/sync_unix.odin
@@ -0,0 +1,99 @@
+// +build linux, darwin
+package sync
+
+import "core:sys/unix"
+
+// A lock that can only be held by one thread at once.
+Mutex :: struct {
+ handle: unix.pthread_mutex_t,
+}
+
+// Blocks until signalled, and then lets past exactly
+// one thread.
+Condition :: struct {
+ handle: unix.pthread_cond_t,
+
+ // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent.
+ // This means that you may signal the condition before anyone is waiting to cause the
+ // next thread that tries to wait to just pass by uninterrupted, without sleeping.
+ // Without this, signalling a condition will only wake up a thread which is already waiting,
+ // but not one that is about to wait, which can cause your program to become out of sync in
+ // ways that are hard to debug or fix.
+ flag: bool, // atomically mutated
+
+ mutex: Mutex,
+}
+
+
+
+mutex_init :: proc(m: ^Mutex) {
+ // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex.
+ attrs: unix.pthread_mutexattr_t;
+ assert(unix.pthread_mutexattr_init(&attrs) == 0);
+ defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error
+
+ assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0);
+}
+
+mutex_destroy :: proc(m: ^Mutex) {
+ assert(unix.pthread_mutex_destroy(&m.handle) == 0);
+ m.handle = {};
+}
+
+mutex_lock :: proc(m: ^Mutex) {
+ assert(unix.pthread_mutex_lock(&m.handle) == 0);
+}
+
+// Returns false if someone else holds the lock.
+mutex_try_lock :: proc(m: ^Mutex) -> bool {
+ return unix.pthread_mutex_trylock(&m.handle) == 0;
+}
+
+mutex_unlock :: proc(m: ^Mutex) {
+ assert(unix.pthread_mutex_unlock(&m.handle) == 0);
+}
+
+
+condition_init :: proc(c: ^Condition) {
+ // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition.
+ attrs: unix.pthread_condattr_t;
+ assert(unix.pthread_condattr_init(&attrs) == 0);
+ defer unix.pthread_condattr_destroy(&attrs); // ignores destruction error
+
+ assert(unix.pthread_cond_init(&c.handle, &attrs) == 0);
+
+ mutex_init(&c.mutex);
+ c.flag = false;
+}
+
+condition_destroy :: proc(c: ^Condition) {
+ assert(unix.pthread_cond_destroy(&c.handle) == 0);
+ mutex_destroy(&c.mutex);
+ c.handle = {};
+}
+
+// Awaken exactly one thread who is waiting on the condition.
+condition_signal :: proc(c: ^Condition) {
+ mutex_lock(&c.mutex);
+ defer mutex_unlock(&c.mutex);
+ atomic_swap(&c.flag, true, .Sequentially_Consistent);
+ assert(unix.pthread_cond_signal(&c.handle) == 0);
+}
+
+// Wait for the condition to be signalled.
+// Does not block if the condition has been signalled and no one
+// has waited on it yet.
+condition_wait_for :: proc(c: ^Condition) {
+ mutex_lock(&c.mutex);
+ defer mutex_unlock(&c.mutex);
+ // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs,
+ // the thread that gets signalled and wakes up, discovers that the flag was taken and goes
+ // back to sleep.
+ // Though this overall behavior is the most sane, there may be a better way to do this that means that
+ // the first thread to wait, gets the flag first.
+ if atomic_swap(&c.flag, false, .Sequentially_Consistent) do return;
+ for {
+ assert(unix.pthread_cond_wait(&c.handle, &c.mutex.handle) == 0);
+ if atomic_swap(&c.flag, false, .Sequentially_Consistent) do break;
+ }
+}
diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin
index b0a9d944c..a99ac8497 100644
--- a/core/sync/sync_windows.odin
+++ b/core/sync/sync_windows.odin
@@ -1,51 +1,40 @@
+// +build windows
package sync
import "core:sys/win32"
-foreign {
- @(link_name="llvm.x86.sse2.pause")
- yield_processor :: proc() ---
-}
-
-Semaphore :: struct {
- _handle: win32.Handle,
-}
-
+// A lock that can only be held by one thread at once.
Mutex :: struct {
_critical_section: win32.Critical_Section,
}
+// Blocks until signalled.
+// When signalled, awakens exactly one waiting thread.
Condition :: struct {
event: win32.Handle,
}
-Ticket_Mutex :: struct {
- ticket: u64,
- serving: u64,
+// When waited upon, blocks until the internal count is greater than zero, then subtracts one.
+// Posting to the semaphore increases the count by one, or the provided amount.
+Semaphore :: struct {
+ _handle: win32.Handle,
}
-current_thread_id :: proc() -> i32 {
- return i32(win32.get_current_thread_id());
-}
-
-semaphore_init :: proc(s: ^Semaphore) {
- s._handle = win32.create_semaphore_w(nil, 0, 1<<31-1, nil);
+semaphore_init :: proc(s: ^Semaphore, initial_count := 0) {
+ s._handle = win32.create_semaphore_w(nil, i32(initial_count), 1<<31-1, nil);
}
semaphore_destroy :: proc(s: ^Semaphore) {
win32.close_handle(s._handle);
}
-semaphore_post :: proc(s: ^Semaphore, count: int) {
+semaphore_post :: proc(s: ^Semaphore, count := 1) {
win32.release_semaphore(s._handle, i32(count), nil);
}
-semaphore_release :: inline proc(s: ^Semaphore) {
- semaphore_post(s, 1);
-}
-
-semaphore_wait :: proc(s: ^Semaphore) {
+semaphore_wait_for :: proc(s: ^Semaphore) {
+ // NOTE(tetra, 2019-10-30): wait_for_single_object decrements the count before it returns.
result := win32.wait_for_single_object(s._handle, win32.INFINITE);
assert(result != win32.WAIT_FAILED);
}
@@ -73,39 +62,25 @@ mutex_unlock :: proc(m: ^Mutex) {
condition_init :: proc(using c: ^Condition) {
+ // create an auto-reset event.
+ // NOTE(tetra, 2019-10-30): this will, when signalled, signal exactly one waiting thread
+ // and then reset itself automatically.
event = win32.create_event_w(nil, false, false, nil);
assert(event != nil);
}
-condition_signal :: proc(using c: ^Condition) {
- ok := win32.set_event(event);
- assert(bool(ok));
-}
-
-condition_wait_for :: proc(using c: ^Condition) {
- result := win32.wait_for_single_object(event, win32.INFINITE);
- assert(result != win32.WAIT_FAILED);
-}
-
condition_destroy :: proc(using c: ^Condition) {
if event != nil {
win32.close_handle(event);
}
}
-
-ticket_mutex_init :: proc(m: ^Ticket_Mutex) {
- atomic_store(&m.ticket, 0, Ordering.Relaxed);
- atomic_store(&m.serving, 0, Ordering.Relaxed);
-}
-
-ticket_mutex_lock :: inline proc(m: ^Ticket_Mutex) {
- ticket := atomic_add(&m.ticket, 1, Ordering.Relaxed);
- for ticket != m.serving {
- yield_processor();
- }
+condition_signal :: proc(using c: ^Condition) {
+ ok := win32.set_event(event);
+ assert(bool(ok));
}
-ticket_mutex_unlock :: inline proc(m: ^Ticket_Mutex) {
- atomic_add(&m.serving, 1, Ordering.Relaxed);
-}
+condition_wait_for :: proc(using c: ^Condition) {
+ result := win32.wait_for_single_object(event, win32.INFINITE);
+ assert(result != win32.WAIT_FAILED);
+} \ No newline at end of file