diff options
| author | gingerBill <gingerBill@users.noreply.github.com> | 2019-12-01 11:33:23 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-12-01 11:33:23 +0000 |
| commit | 3fd5c3cd851d8f4dfd441141ca7e96889f069933 (patch) | |
| tree | 67f47e79f5c5bb80a3ed1b1e9d79a61c08c0a29d /core/sync | |
| parent | 0c0c83ee295fe8787a4bdc8b826a5432abba2ca9 (diff) | |
| parent | 99121d6ff2b02f3d16b791eb103bb9f9e8b96475 (diff) | |
Merge pull request #458 from Tetralux/linux-threads
Implement core:thread and core:sync on Unix using pthreads
Diffstat (limited to 'core/sync')
| -rw-r--r-- | core/sync/sync.odin | 27 | ||||
| -rw-r--r-- | core/sync/sync_darwin.odin | 39 | ||||
| -rw-r--r-- | core/sync/sync_linux.odin | 100 | ||||
| -rw-r--r-- | core/sync/sync_unix.odin | 99 | ||||
| -rw-r--r-- | core/sync/sync_windows.odin | 71 |
5 files changed, 203 insertions, 133 deletions
diff --git a/core/sync/sync.odin b/core/sync/sync.odin new file mode 100644 index 000000000..5a0512275 --- /dev/null +++ b/core/sync/sync.odin @@ -0,0 +1,27 @@ +package sync + +foreign { + @(link_name="llvm.x86.sse2.pause") + yield_processor :: proc() --- +} + +Ticket_Mutex :: struct { + ticket: u64, + serving: u64, +} + +ticket_mutex_init :: proc(m: ^Ticket_Mutex) { + atomic_store(&m.ticket, 0, .Relaxed); + atomic_store(&m.serving, 0, .Relaxed); +} + +ticket_mutex_lock :: inline proc(m: ^Ticket_Mutex) { + ticket := atomic_add(&m.ticket, 1, .Relaxed); + for ticket != m.serving { + yield_processor(); + } +} + +ticket_mutex_unlock :: inline proc(m: ^Ticket_Mutex) { + atomic_add(&m.serving, 1, .Relaxed); +} diff --git a/core/sync/sync_darwin.odin b/core/sync/sync_darwin.odin new file mode 100644 index 000000000..2c86e21db --- /dev/null +++ b/core/sync/sync_darwin.odin @@ -0,0 +1,39 @@ +package sync + +import "core:sys/darwin" + +import "core:c" + +// The Darwin docs say it best: +// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. +// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, +// but when there are none left, a thread must wait until another thread returns one. +Semaphore :: struct #align 16 { + handle: darwin.semaphore_t, +} +// TODO(tetra): Only marked with alignment because we cannot mark distinct integers with alignments. +// See core/sys/unix/pthread_linux.odin/pthread_t. + +semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { + ct := darwin.mach_task_self(); + res := darwin.semaphore_create(ct, &s.handle, 0, c.int(initial_count)); + assert(res == 0); +} + +semaphore_destroy :: proc(s: ^Semaphore) { + ct := darwin.mach_task_self(); + res := darwin.semaphore_destroy(ct, s.handle); + assert(res == 0); + s.handle = {}; +} + +semaphore_post :: proc(s: ^Semaphore, count := 1) { + assert(count == 1); + res := darwin.semaphore_signal(s.handle); + assert(res == 0); +} + +semaphore_wait_for :: proc(s: ^Semaphore) { + res := darwin.semaphore_wait(s.handle); + assert(res == 0); +} diff --git a/core/sync/sync_linux.odin b/core/sync/sync_linux.odin index dcb2ee8e9..dc761f6aa 100644 --- a/core/sync/sync_linux.odin +++ b/core/sync/sync_linux.odin @@ -1,98 +1,28 @@ package sync -/* +import "core:sys/unix" -import "core:atomics" -import "core:os" - -Semaphore :: struct { - // _handle: win32.Handle, -} - -Mutex :: struct { - _semaphore: Semaphore, - _counter: i32, - _owner: i32, - _recursion: i32, -} - -current_thread_id :: proc() -> i32 { - return i32(os.current_thread_id()); +// The Darwin docs say it best: +// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. +// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, +// but when there are none left, a thread must wait until another thread returns one. +Semaphore :: struct #align 16 { + handle: unix.sem_t, } -semaphore_init :: proc(s: ^Semaphore) { - // s._handle = win32.CreateSemaphoreA(nil, 0, 1<<31-1, nil); +semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { + assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0); } semaphore_destroy :: proc(s: ^Semaphore) { - // win32.CloseHandle(s._handle); + assert(unix.sem_destroy(&s.handle) == 0); + s.handle = {}; } -semaphore_post :: proc(s: ^Semaphore, count: int) { - // win32.ReleaseSemaphore(s._handle, cast(i32)count, nil); +semaphore_post :: proc(s: ^Semaphore, count := 1) { + assert(unix.sem_post(&s.handle) == 0); } -semaphore_release :: inline proc(s: ^Semaphore) { - semaphore_post(s, 1); +semaphore_wait_for :: proc(s: ^Semaphore) { + assert(unix.sem_wait(&s.handle) == 0); } - -semaphore_wait :: proc(s: ^Semaphore) { - // win32.WaitForSingleObject(s._handle, win32.INFINITE); -} - - -mutex_init :: proc(m: ^Mutex) { - atomics.store(&m._counter, 0); - atomics.store(&m._owner, current_thread_id()); - semaphore_init(&m._semaphore); - m._recursion = 0; -} -mutex_destroy :: proc(m: ^Mutex) { - semaphore_destroy(&m._semaphore); -} -mutex_lock :: proc(m: ^Mutex) { - thread_id := current_thread_id(); - if atomics.fetch_add(&m._counter, 1) > 0 { - if thread_id != atomics.load(&m._owner) { - semaphore_wait(&m._semaphore); - } - } - atomics.store(&m._owner, thread_id); - m._recursion += 1; -} -mutex_try_lock :: proc(m: ^Mutex) -> bool { - thread_id := current_thread_id(); - if atomics.load(&m._owner) == thread_id { - atomics.fetch_add(&m._counter, 1); - } else { - expected: i32 = 0; - if atomics.load(&m._counter) != 0 { - return false; - } - if atomics.compare_exchange(&m._counter, expected, 1) == 0 { - return false; - } - atomics.store(&m._owner, thread_id); - } - m._recursion += 1; - return true; -} -mutex_unlock :: proc(m: ^Mutex) { - recursion: i32; - thread_id := current_thread_id(); - assert(thread_id == atomics.load(&m._owner)); - - m._recursion -= 1; - recursion = m._recursion; - if recursion == 0 { - atomics.store(&m._owner, thread_id); - } - - if atomics.fetch_add(&m._counter, -1) > 1 { - if recursion == 0 { - semaphore_release(&m._semaphore); - } - } -} - -*/ diff --git a/core/sync/sync_unix.odin b/core/sync/sync_unix.odin new file mode 100644 index 000000000..71ba54128 --- /dev/null +++ b/core/sync/sync_unix.odin @@ -0,0 +1,99 @@ +// +build linux, darwin +package sync + +import "core:sys/unix" + +// A lock that can only be held by one thread at once. +Mutex :: struct { + handle: unix.pthread_mutex_t, +} + +// Blocks until signalled, and then lets past exactly +// one thread. +Condition :: struct { + handle: unix.pthread_cond_t, + + // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent. + // This means that you may signal the condition before anyone is waiting to cause the + // next thread that tries to wait to just pass by uninterrupted, without sleeping. + // Without this, signalling a condition will only wake up a thread which is already waiting, + // but not one that is about to wait, which can cause your program to become out of sync in + // ways that are hard to debug or fix. + flag: bool, // atomically mutated + + mutex: Mutex, +} + + + +mutex_init :: proc(m: ^Mutex) { + // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. + attrs: unix.pthread_mutexattr_t; + assert(unix.pthread_mutexattr_init(&attrs) == 0); + defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error + + assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0); +} + +mutex_destroy :: proc(m: ^Mutex) { + assert(unix.pthread_mutex_destroy(&m.handle) == 0); + m.handle = {}; +} + +mutex_lock :: proc(m: ^Mutex) { + assert(unix.pthread_mutex_lock(&m.handle) == 0); +} + +// Returns false if someone else holds the lock. +mutex_try_lock :: proc(m: ^Mutex) -> bool { + return unix.pthread_mutex_trylock(&m.handle) == 0; +} + +mutex_unlock :: proc(m: ^Mutex) { + assert(unix.pthread_mutex_unlock(&m.handle) == 0); +} + + +condition_init :: proc(c: ^Condition) { + // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition. + attrs: unix.pthread_condattr_t; + assert(unix.pthread_condattr_init(&attrs) == 0); + defer unix.pthread_condattr_destroy(&attrs); // ignores destruction error + + assert(unix.pthread_cond_init(&c.handle, &attrs) == 0); + + mutex_init(&c.mutex); + c.flag = false; +} + +condition_destroy :: proc(c: ^Condition) { + assert(unix.pthread_cond_destroy(&c.handle) == 0); + mutex_destroy(&c.mutex); + c.handle = {}; +} + +// Awaken exactly one thread who is waiting on the condition. +condition_signal :: proc(c: ^Condition) { + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + atomic_swap(&c.flag, true, .Sequentially_Consistent); + assert(unix.pthread_cond_signal(&c.handle) == 0); +} + +// Wait for the condition to be signalled. +// Does not block if the condition has been signalled and no one +// has waited on it yet. +condition_wait_for :: proc(c: ^Condition) { + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, + // the thread that gets signalled and wakes up, discovers that the flag was taken and goes + // back to sleep. + // Though this overall behavior is the most sane, there may be a better way to do this that means that + // the first thread to wait, gets the flag first. + if atomic_swap(&c.flag, false, .Sequentially_Consistent) do return; + for { + assert(unix.pthread_cond_wait(&c.handle, &c.mutex.handle) == 0); + if atomic_swap(&c.flag, false, .Sequentially_Consistent) do break; + } +} diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin index b0a9d944c..a99ac8497 100644 --- a/core/sync/sync_windows.odin +++ b/core/sync/sync_windows.odin @@ -1,51 +1,40 @@ +// +build windows package sync import "core:sys/win32" -foreign { - @(link_name="llvm.x86.sse2.pause") - yield_processor :: proc() --- -} - -Semaphore :: struct { - _handle: win32.Handle, -} - +// A lock that can only be held by one thread at once. Mutex :: struct { _critical_section: win32.Critical_Section, } +// Blocks until signalled. +// When signalled, awakens exactly one waiting thread. Condition :: struct { event: win32.Handle, } -Ticket_Mutex :: struct { - ticket: u64, - serving: u64, +// When waited upon, blocks until the internal count is greater than zero, then subtracts one. +// Posting to the semaphore increases the count by one, or the provided amount. +Semaphore :: struct { + _handle: win32.Handle, } -current_thread_id :: proc() -> i32 { - return i32(win32.get_current_thread_id()); -} - -semaphore_init :: proc(s: ^Semaphore) { - s._handle = win32.create_semaphore_w(nil, 0, 1<<31-1, nil); +semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { + s._handle = win32.create_semaphore_w(nil, i32(initial_count), 1<<31-1, nil); } semaphore_destroy :: proc(s: ^Semaphore) { win32.close_handle(s._handle); } -semaphore_post :: proc(s: ^Semaphore, count: int) { +semaphore_post :: proc(s: ^Semaphore, count := 1) { win32.release_semaphore(s._handle, i32(count), nil); } -semaphore_release :: inline proc(s: ^Semaphore) { - semaphore_post(s, 1); -} - -semaphore_wait :: proc(s: ^Semaphore) { +semaphore_wait_for :: proc(s: ^Semaphore) { + // NOTE(tetra, 2019-10-30): wait_for_single_object decrements the count before it returns. result := win32.wait_for_single_object(s._handle, win32.INFINITE); assert(result != win32.WAIT_FAILED); } @@ -73,39 +62,25 @@ mutex_unlock :: proc(m: ^Mutex) { condition_init :: proc(using c: ^Condition) { + // create an auto-reset event. + // NOTE(tetra, 2019-10-30): this will, when signalled, signal exactly one waiting thread + // and then reset itself automatically. event = win32.create_event_w(nil, false, false, nil); assert(event != nil); } -condition_signal :: proc(using c: ^Condition) { - ok := win32.set_event(event); - assert(bool(ok)); -} - -condition_wait_for :: proc(using c: ^Condition) { - result := win32.wait_for_single_object(event, win32.INFINITE); - assert(result != win32.WAIT_FAILED); -} - condition_destroy :: proc(using c: ^Condition) { if event != nil { win32.close_handle(event); } } - -ticket_mutex_init :: proc(m: ^Ticket_Mutex) { - atomic_store(&m.ticket, 0, Ordering.Relaxed); - atomic_store(&m.serving, 0, Ordering.Relaxed); -} - -ticket_mutex_lock :: inline proc(m: ^Ticket_Mutex) { - ticket := atomic_add(&m.ticket, 1, Ordering.Relaxed); - for ticket != m.serving { - yield_processor(); - } +condition_signal :: proc(using c: ^Condition) { + ok := win32.set_event(event); + assert(bool(ok)); } -ticket_mutex_unlock :: inline proc(m: ^Ticket_Mutex) { - atomic_add(&m.serving, 1, Ordering.Relaxed); -} +condition_wait_for :: proc(using c: ^Condition) { + result := win32.wait_for_single_object(event, win32.INFINITE); + assert(result != win32.WAIT_FAILED); +}
\ No newline at end of file |