From 99121d6ff2b02f3d16b791eb103bb9f9e8b96475 Mon Sep 17 00:00:00 2001 From: Tetralux Date: Sat, 26 Oct 2019 22:35:36 +0000 Subject: Implement core:thread and core:sync on Unix using pthreads Also do some cleanup and refactoring of the thread, sync and time APIs. - remove 'semaphore_release' because 'post' and 'wait' is easier to understand - change 'semaphore_wait' to '*_wait_for' to match Condition - pthreads can be given a stack, but doing so requires the user to set up the guard pages manually. BE WARNED. The alignment requirements of the stack are also platform-dependant; it may need to be page size aligned on some systems. Unclear which systems, however. See 'os.get_page_size', and 'mem.make_aligned'. HOWEVER: I was unable to get custom stacks with guard pages working reliably, so while you can do it, the API does not support it. - add 'os.get_page_size', 'mem.make_aligned', and 'mem.new_aligned'. - removed thread return values because windows and linux are not consistent; windows returns 'i32' and pthreads return 'void*'; besides which, if you really wanted to communicate how the thread exited, you probably wouldn't do it with the thread's exit code. - fixed 'thread.is_done' on Windows; it didn't report true immediately after calling 'thread.join'. - moved time related stuff out of 'core:os' to 'core:time'. - add 'mem.align_backward' - fixed default allocator alignment The heap on Windows, and calloc on Linux, both have no facility to request alignment. It's a bit of hack, but the heap_allocator now overallocates; `size + alignment` bytes, and aligns things to at least 2. It does both of these things to ensure that there is at least two bytes before the payload, which it uses to store how much padding it needed to insert in order to fulfil the alignment requested. - make conditions more sane by matching the Windows behaviour. The fact that they were signalled now lingers until a thread tries to wait, causing them to just pass by uninterrupted, without sleeping or locking the underlying mutex, as it would otherwise need to do. This means that a thread no longer has to be waiting in order to be signalled, which avoids timing bugs that causes deadlocks that are hard to debug and fix. See the comment on the `sync.Condition.flag` field. - add thread priority: `thread.create(worker_proc, .High)` --- core/sync/sync.odin | 27 ++++++++++++ core/sync/sync_darwin.odin | 39 +++++++++++++++++ core/sync/sync_linux.odin | 100 +++++++------------------------------------- core/sync/sync_unix.odin | 99 +++++++++++++++++++++++++++++++++++++++++++ core/sync/sync_windows.odin | 71 ++++++++++--------------------- 5 files changed, 203 insertions(+), 133 deletions(-) create mode 100644 core/sync/sync.odin create mode 100644 core/sync/sync_darwin.odin create mode 100644 core/sync/sync_unix.odin (limited to 'core/sync') diff --git a/core/sync/sync.odin b/core/sync/sync.odin new file mode 100644 index 000000000..5a0512275 --- /dev/null +++ b/core/sync/sync.odin @@ -0,0 +1,27 @@ +package sync + +foreign { + @(link_name="llvm.x86.sse2.pause") + yield_processor :: proc() --- +} + +Ticket_Mutex :: struct { + ticket: u64, + serving: u64, +} + +ticket_mutex_init :: proc(m: ^Ticket_Mutex) { + atomic_store(&m.ticket, 0, .Relaxed); + atomic_store(&m.serving, 0, .Relaxed); +} + +ticket_mutex_lock :: inline proc(m: ^Ticket_Mutex) { + ticket := atomic_add(&m.ticket, 1, .Relaxed); + for ticket != m.serving { + yield_processor(); + } +} + +ticket_mutex_unlock :: inline proc(m: ^Ticket_Mutex) { + atomic_add(&m.serving, 1, .Relaxed); +} diff --git a/core/sync/sync_darwin.odin b/core/sync/sync_darwin.odin new file mode 100644 index 000000000..2c86e21db --- /dev/null +++ b/core/sync/sync_darwin.odin @@ -0,0 +1,39 @@ +package sync + +import "core:sys/darwin" + +import "core:c" + +// The Darwin docs say it best: +// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. +// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, +// but when there are none left, a thread must wait until another thread returns one. +Semaphore :: struct #align 16 { + handle: darwin.semaphore_t, +} +// TODO(tetra): Only marked with alignment because we cannot mark distinct integers with alignments. +// See core/sys/unix/pthread_linux.odin/pthread_t. + +semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { + ct := darwin.mach_task_self(); + res := darwin.semaphore_create(ct, &s.handle, 0, c.int(initial_count)); + assert(res == 0); +} + +semaphore_destroy :: proc(s: ^Semaphore) { + ct := darwin.mach_task_self(); + res := darwin.semaphore_destroy(ct, s.handle); + assert(res == 0); + s.handle = {}; +} + +semaphore_post :: proc(s: ^Semaphore, count := 1) { + assert(count == 1); + res := darwin.semaphore_signal(s.handle); + assert(res == 0); +} + +semaphore_wait_for :: proc(s: ^Semaphore) { + res := darwin.semaphore_wait(s.handle); + assert(res == 0); +} diff --git a/core/sync/sync_linux.odin b/core/sync/sync_linux.odin index dcb2ee8e9..dc761f6aa 100644 --- a/core/sync/sync_linux.odin +++ b/core/sync/sync_linux.odin @@ -1,98 +1,28 @@ package sync -/* +import "core:sys/unix" -import "core:atomics" -import "core:os" - -Semaphore :: struct { - // _handle: win32.Handle, -} - -Mutex :: struct { - _semaphore: Semaphore, - _counter: i32, - _owner: i32, - _recursion: i32, -} - -current_thread_id :: proc() -> i32 { - return i32(os.current_thread_id()); +// The Darwin docs say it best: +// A semaphore is much like a lock, except that a finite number of threads can hold it simultaneously. +// Semaphores can be thought of as being much like piles of tokens; multiple threads can take these tokens, +// but when there are none left, a thread must wait until another thread returns one. +Semaphore :: struct #align 16 { + handle: unix.sem_t, } -semaphore_init :: proc(s: ^Semaphore) { - // s._handle = win32.CreateSemaphoreA(nil, 0, 1<<31-1, nil); +semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { + assert(unix.sem_init(&s.handle, 0, u32(initial_count)) == 0); } semaphore_destroy :: proc(s: ^Semaphore) { - // win32.CloseHandle(s._handle); + assert(unix.sem_destroy(&s.handle) == 0); + s.handle = {}; } -semaphore_post :: proc(s: ^Semaphore, count: int) { - // win32.ReleaseSemaphore(s._handle, cast(i32)count, nil); +semaphore_post :: proc(s: ^Semaphore, count := 1) { + assert(unix.sem_post(&s.handle) == 0); } -semaphore_release :: inline proc(s: ^Semaphore) { - semaphore_post(s, 1); +semaphore_wait_for :: proc(s: ^Semaphore) { + assert(unix.sem_wait(&s.handle) == 0); } - -semaphore_wait :: proc(s: ^Semaphore) { - // win32.WaitForSingleObject(s._handle, win32.INFINITE); -} - - -mutex_init :: proc(m: ^Mutex) { - atomics.store(&m._counter, 0); - atomics.store(&m._owner, current_thread_id()); - semaphore_init(&m._semaphore); - m._recursion = 0; -} -mutex_destroy :: proc(m: ^Mutex) { - semaphore_destroy(&m._semaphore); -} -mutex_lock :: proc(m: ^Mutex) { - thread_id := current_thread_id(); - if atomics.fetch_add(&m._counter, 1) > 0 { - if thread_id != atomics.load(&m._owner) { - semaphore_wait(&m._semaphore); - } - } - atomics.store(&m._owner, thread_id); - m._recursion += 1; -} -mutex_try_lock :: proc(m: ^Mutex) -> bool { - thread_id := current_thread_id(); - if atomics.load(&m._owner) == thread_id { - atomics.fetch_add(&m._counter, 1); - } else { - expected: i32 = 0; - if atomics.load(&m._counter) != 0 { - return false; - } - if atomics.compare_exchange(&m._counter, expected, 1) == 0 { - return false; - } - atomics.store(&m._owner, thread_id); - } - m._recursion += 1; - return true; -} -mutex_unlock :: proc(m: ^Mutex) { - recursion: i32; - thread_id := current_thread_id(); - assert(thread_id == atomics.load(&m._owner)); - - m._recursion -= 1; - recursion = m._recursion; - if recursion == 0 { - atomics.store(&m._owner, thread_id); - } - - if atomics.fetch_add(&m._counter, -1) > 1 { - if recursion == 0 { - semaphore_release(&m._semaphore); - } - } -} - -*/ diff --git a/core/sync/sync_unix.odin b/core/sync/sync_unix.odin new file mode 100644 index 000000000..71ba54128 --- /dev/null +++ b/core/sync/sync_unix.odin @@ -0,0 +1,99 @@ +// +build linux, darwin +package sync + +import "core:sys/unix" + +// A lock that can only be held by one thread at once. +Mutex :: struct { + handle: unix.pthread_mutex_t, +} + +// Blocks until signalled, and then lets past exactly +// one thread. +Condition :: struct { + handle: unix.pthread_cond_t, + + // NOTE(tetra, 2019-11-11): Used to mimic the more sane behavior of Windows' AutoResetEvent. + // This means that you may signal the condition before anyone is waiting to cause the + // next thread that tries to wait to just pass by uninterrupted, without sleeping. + // Without this, signalling a condition will only wake up a thread which is already waiting, + // but not one that is about to wait, which can cause your program to become out of sync in + // ways that are hard to debug or fix. + flag: bool, // atomically mutated + + mutex: Mutex, +} + + + +mutex_init :: proc(m: ^Mutex) { + // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the mutex. + attrs: unix.pthread_mutexattr_t; + assert(unix.pthread_mutexattr_init(&attrs) == 0); + defer unix.pthread_mutexattr_destroy(&attrs); // ignores destruction error + + assert(unix.pthread_mutex_init(&m.handle, &attrs) == 0); +} + +mutex_destroy :: proc(m: ^Mutex) { + assert(unix.pthread_mutex_destroy(&m.handle) == 0); + m.handle = {}; +} + +mutex_lock :: proc(m: ^Mutex) { + assert(unix.pthread_mutex_lock(&m.handle) == 0); +} + +// Returns false if someone else holds the lock. +mutex_try_lock :: proc(m: ^Mutex) -> bool { + return unix.pthread_mutex_trylock(&m.handle) == 0; +} + +mutex_unlock :: proc(m: ^Mutex) { + assert(unix.pthread_mutex_unlock(&m.handle) == 0); +} + + +condition_init :: proc(c: ^Condition) { + // NOTE(tetra, 2019-11-01): POSIX OOM if we cannot init the attrs or the condition. + attrs: unix.pthread_condattr_t; + assert(unix.pthread_condattr_init(&attrs) == 0); + defer unix.pthread_condattr_destroy(&attrs); // ignores destruction error + + assert(unix.pthread_cond_init(&c.handle, &attrs) == 0); + + mutex_init(&c.mutex); + c.flag = false; +} + +condition_destroy :: proc(c: ^Condition) { + assert(unix.pthread_cond_destroy(&c.handle) == 0); + mutex_destroy(&c.mutex); + c.handle = {}; +} + +// Awaken exactly one thread who is waiting on the condition. +condition_signal :: proc(c: ^Condition) { + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + atomic_swap(&c.flag, true, .Sequentially_Consistent); + assert(unix.pthread_cond_signal(&c.handle) == 0); +} + +// Wait for the condition to be signalled. +// Does not block if the condition has been signalled and no one +// has waited on it yet. +condition_wait_for :: proc(c: ^Condition) { + mutex_lock(&c.mutex); + defer mutex_unlock(&c.mutex); + // NOTE(tetra): If a thread comes by and steals the flag immediately after the signal occurs, + // the thread that gets signalled and wakes up, discovers that the flag was taken and goes + // back to sleep. + // Though this overall behavior is the most sane, there may be a better way to do this that means that + // the first thread to wait, gets the flag first. + if atomic_swap(&c.flag, false, .Sequentially_Consistent) do return; + for { + assert(unix.pthread_cond_wait(&c.handle, &c.mutex.handle) == 0); + if atomic_swap(&c.flag, false, .Sequentially_Consistent) do break; + } +} diff --git a/core/sync/sync_windows.odin b/core/sync/sync_windows.odin index b0a9d944c..a99ac8497 100644 --- a/core/sync/sync_windows.odin +++ b/core/sync/sync_windows.odin @@ -1,51 +1,40 @@ +// +build windows package sync import "core:sys/win32" -foreign { - @(link_name="llvm.x86.sse2.pause") - yield_processor :: proc() --- -} - -Semaphore :: struct { - _handle: win32.Handle, -} - +// A lock that can only be held by one thread at once. Mutex :: struct { _critical_section: win32.Critical_Section, } +// Blocks until signalled. +// When signalled, awakens exactly one waiting thread. Condition :: struct { event: win32.Handle, } -Ticket_Mutex :: struct { - ticket: u64, - serving: u64, +// When waited upon, blocks until the internal count is greater than zero, then subtracts one. +// Posting to the semaphore increases the count by one, or the provided amount. +Semaphore :: struct { + _handle: win32.Handle, } -current_thread_id :: proc() -> i32 { - return i32(win32.get_current_thread_id()); -} - -semaphore_init :: proc(s: ^Semaphore) { - s._handle = win32.create_semaphore_w(nil, 0, 1<<31-1, nil); +semaphore_init :: proc(s: ^Semaphore, initial_count := 0) { + s._handle = win32.create_semaphore_w(nil, i32(initial_count), 1<<31-1, nil); } semaphore_destroy :: proc(s: ^Semaphore) { win32.close_handle(s._handle); } -semaphore_post :: proc(s: ^Semaphore, count: int) { +semaphore_post :: proc(s: ^Semaphore, count := 1) { win32.release_semaphore(s._handle, i32(count), nil); } -semaphore_release :: inline proc(s: ^Semaphore) { - semaphore_post(s, 1); -} - -semaphore_wait :: proc(s: ^Semaphore) { +semaphore_wait_for :: proc(s: ^Semaphore) { + // NOTE(tetra, 2019-10-30): wait_for_single_object decrements the count before it returns. result := win32.wait_for_single_object(s._handle, win32.INFINITE); assert(result != win32.WAIT_FAILED); } @@ -73,39 +62,25 @@ mutex_unlock :: proc(m: ^Mutex) { condition_init :: proc(using c: ^Condition) { + // create an auto-reset event. + // NOTE(tetra, 2019-10-30): this will, when signalled, signal exactly one waiting thread + // and then reset itself automatically. event = win32.create_event_w(nil, false, false, nil); assert(event != nil); } -condition_signal :: proc(using c: ^Condition) { - ok := win32.set_event(event); - assert(bool(ok)); -} - -condition_wait_for :: proc(using c: ^Condition) { - result := win32.wait_for_single_object(event, win32.INFINITE); - assert(result != win32.WAIT_FAILED); -} - condition_destroy :: proc(using c: ^Condition) { if event != nil { win32.close_handle(event); } } - -ticket_mutex_init :: proc(m: ^Ticket_Mutex) { - atomic_store(&m.ticket, 0, Ordering.Relaxed); - atomic_store(&m.serving, 0, Ordering.Relaxed); -} - -ticket_mutex_lock :: inline proc(m: ^Ticket_Mutex) { - ticket := atomic_add(&m.ticket, 1, Ordering.Relaxed); - for ticket != m.serving { - yield_processor(); - } +condition_signal :: proc(using c: ^Condition) { + ok := win32.set_event(event); + assert(bool(ok)); } -ticket_mutex_unlock :: inline proc(m: ^Ticket_Mutex) { - atomic_add(&m.serving, 1, Ordering.Relaxed); -} +condition_wait_for :: proc(using c: ^Condition) { + result := win32.wait_for_single_object(event, win32.INFINITE); + assert(result != win32.WAIT_FAILED); +} \ No newline at end of file -- cgit v1.2.3