diff options
| author | Jack Mordaunt <jackmordaunt.dev@gmail.com> | 2025-06-12 15:06:27 -0300 |
|---|---|---|
| committer | Jack Mordaunt <jackmordaunt.dev@gmail.com> | 2025-06-12 17:35:48 -0300 |
| commit | 2d12e265ccb51ce6385f56a53e2ea261eb92ac82 (patch) | |
| tree | 59322b0eebbe8a710f3e743f0b051244fda21702 | |
| parent | c29168f76f05e98e7532c65eda253e14992f8ddf (diff) | |
tests/core/sync/chan: add test for contended try_send
This test ensures that contending threads racing to try_send against a
single blocking read will result in exactly one winner without any
senders blocking.
| -rw-r--r-- | tests/core/sync/chan/test_core_sync_chan.odin | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 52b1f7d31..ae7456d99 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -4,6 +4,7 @@ import "base:runtime" import "base:intrinsics" import "core:log" import "core:math/rand" +import "core:sync" import "core:sync/chan" import "core:testing" import "core:thread" @@ -227,6 +228,154 @@ test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) { testing.expect(t, !chan.send(ch, 32)) } +// Ensures that try_send for unbuffered channels works as expected. +// If 1 reader of a channel, and 3 try_senders, only one of the senders +// will succeed and none of them will block. +@test +test_unbuffered_try_send_chan_contention :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + start, start_alloc_err := chan.create(chan.Chan(any), context.allocator) + assert(start_alloc_err == nil, "allocation failed") + defer chan.destroy(start) + + trigger, trigger_alloc_err := chan.create(chan.Chan(any), context.allocator) + assert(trigger_alloc_err == nil, "allocation failed") + defer chan.destroy(trigger) + + results, results_alloc_err := chan.create(chan.Chan(int), 3, context.allocator) + assert(results_alloc_err == nil, "allocation failed") + defer chan.destroy(results) + + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + // There are no readers or writers, so calling recv or send would block! + testing.expect_value(t, chan.can_send(ch), false) + testing.expect_value(t, chan.can_recv(ch), false) + + // Non-blocking operations should not block, and should return false. + testing.expect_value(t, chan.try_send(ch, -1), false) + if v, ok := chan.try_recv(ch); ok { + testing.expect_value(t, ok, false) + testing.expect_value(t, v, 0) + } + + // Spinup several threads contending to send on an unbuffered channel. + contenders: [3]^thread.Thread + wait: sync.Wait_Group + + for ii in 0..<len(contenders) { + sync.wait_group_add(&wait, 1) + Context :: struct { + id: int, + start: chan.Chan(any), + trigger: chan.Chan(any), + results: chan.Chan(int), + ch: chan.Chan(int), + wg: ^sync.Wait_Group, + } + ctx := Context { + id = ii, + start = start, + trigger = trigger, + results = results, + ch = ch, + wg = &wait, + } + contenders[ii] = thread.create_and_start_with_poly_data(ctx, proc(ctx: Context) { + defer sync.wait_group_done(ctx.wg) + + assert(!chan.can_send(ctx.ch), "channel shouldn't be ready for non-blocking send yet") + assert(chan.send(ctx.start, "ready")) + + log.debugf("contender %v: ready", ctx.id) + + // Wait for trigger to be closed so that all contenders have the same opportunity. + _, _ = chan.recv(ctx.trigger) + + log.debugf("contender %v: racing", ctx.id) + + // Attempt to send a value. We are competing against the other contenders. + ok := chan.try_send(ctx.ch, 42) + if ok { + log.debugf("contender %v: sent!", ctx.id) + assert(chan.send(ctx.results, 1)) + } else { + log.debugf("contender %v: too-slow", ctx.id) + assert(chan.send(ctx.results, -1)) + } + }, init_context = context) + } + + // Spinup a closer thread that will close the results channel once all + // contenders are done. This lets the test thread check for spurious results by + // draining the results until closed. + results_closer := thread.create_and_start_with_poly_data2(&wait, results, proc(wg: ^sync.Wait_Group, results: chan.Chan(int)) { + sync.wait_group_wait(wg) + assert(chan.close(results)) + }) + + // Wait for contenders to be ready. + for _ in 0..<len(contenders) { + if data, ok := chan.recv(start); !ok { + testing.expect_value(t, ok, true) + testing.expect_value(t, data.(string), "ready") + } + } + + // Fire the trigger when the test thread is ready to receive. + trigger_closer := thread.create_and_start_with_poly_data2(trigger, ch, proc(trigger: chan.Chan(any), ch: chan.Chan(int)) { + for !chan.can_send(ch) { + thread.yield() + } + assert(chan.close(trigger)) + }) + + // Blocking read, wait for a sender. + if v, ok := chan.recv(ch); !ok { + testing.expect_value(t, ok, true) + testing.expect_value(t, v, 42) + } + + did_send_count: int + did_not_send_count: int + + // Let the contenders fight to send a value. + for { + data, ok := chan.recv(results) + if !ok { + break + } + + log.debugf("data: %v, ok: %v", data, ok) + + switch data { + case 1: + did_send_count += 1 + case -1: + did_not_send_count += 1 + case: + testing.fail_now(t, "got spurious result") + } + } + + thread.join(trigger_closer) + thread.join(results_closer) + thread.join_multiple(..contenders[:]) + + defer for tr in contenders { + thread.destroy(tr) + } + defer thread.destroy(trigger_closer) + defer thread.destroy(results_closer) + + // Expect that one got to send and the others did not. + testing.expect_value(t, did_send_count, 1) + testing.expect_value(t, did_not_send_count, len(contenders)-1) +} + // This test guarantees a buffered channel's messages can still be received // even after closing. This is currently how the API works. If that changes, // this test will need to change. |