aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJack Mordaunt <jackmordaunt.dev@gmail.com>2025-06-12 15:06:27 -0300
committerJack Mordaunt <jackmordaunt.dev@gmail.com>2025-06-12 17:35:48 -0300
commit2d12e265ccb51ce6385f56a53e2ea261eb92ac82 (patch)
tree59322b0eebbe8a710f3e743f0b051244fda21702 /tests
parentc29168f76f05e98e7532c65eda253e14992f8ddf (diff)
tests/core/sync/chan: add test for contended try_send
This test ensures that contending threads racing to try_send against a single blocking read will result in exactly one winner without any senders blocking.
Diffstat (limited to 'tests')
-rw-r--r--tests/core/sync/chan/test_core_sync_chan.odin149
1 files changed, 149 insertions, 0 deletions
diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin
index 52b1f7d31..ae7456d99 100644
--- a/tests/core/sync/chan/test_core_sync_chan.odin
+++ b/tests/core/sync/chan/test_core_sync_chan.odin
@@ -4,6 +4,7 @@ import "base:runtime"
import "base:intrinsics"
import "core:log"
import "core:math/rand"
+import "core:sync"
import "core:sync/chan"
import "core:testing"
import "core:thread"
@@ -227,6 +228,154 @@ test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) {
testing.expect(t, !chan.send(ch, 32))
}
+// Ensures that try_send for unbuffered channels works as expected.
+// If 1 reader of a channel, and 3 try_senders, only one of the senders
+// will succeed and none of them will block.
+@test
+test_unbuffered_try_send_chan_contention :: proc(t: ^testing.T) {
+ testing.set_fail_timeout(t, FAIL_TIME)
+
+ start, start_alloc_err := chan.create(chan.Chan(any), context.allocator)
+ assert(start_alloc_err == nil, "allocation failed")
+ defer chan.destroy(start)
+
+ trigger, trigger_alloc_err := chan.create(chan.Chan(any), context.allocator)
+ assert(trigger_alloc_err == nil, "allocation failed")
+ defer chan.destroy(trigger)
+
+ results, results_alloc_err := chan.create(chan.Chan(int), 3, context.allocator)
+ assert(results_alloc_err == nil, "allocation failed")
+ defer chan.destroy(results)
+
+ ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator)
+ assert(ch_alloc_err == nil, "allocation failed")
+ defer chan.destroy(ch)
+
+ // There are no readers or writers, so calling recv or send would block!
+ testing.expect_value(t, chan.can_send(ch), false)
+ testing.expect_value(t, chan.can_recv(ch), false)
+
+ // Non-blocking operations should not block, and should return false.
+ testing.expect_value(t, chan.try_send(ch, -1), false)
+ if v, ok := chan.try_recv(ch); ok {
+ testing.expect_value(t, ok, false)
+ testing.expect_value(t, v, 0)
+ }
+
+ // Spinup several threads contending to send on an unbuffered channel.
+ contenders: [3]^thread.Thread
+ wait: sync.Wait_Group
+
+ for ii in 0..<len(contenders) {
+ sync.wait_group_add(&wait, 1)
+ Context :: struct {
+ id: int,
+ start: chan.Chan(any),
+ trigger: chan.Chan(any),
+ results: chan.Chan(int),
+ ch: chan.Chan(int),
+ wg: ^sync.Wait_Group,
+ }
+ ctx := Context {
+ id = ii,
+ start = start,
+ trigger = trigger,
+ results = results,
+ ch = ch,
+ wg = &wait,
+ }
+ contenders[ii] = thread.create_and_start_with_poly_data(ctx, proc(ctx: Context) {
+ defer sync.wait_group_done(ctx.wg)
+
+ assert(!chan.can_send(ctx.ch), "channel shouldn't be ready for non-blocking send yet")
+ assert(chan.send(ctx.start, "ready"))
+
+ log.debugf("contender %v: ready", ctx.id)
+
+ // Wait for trigger to be closed so that all contenders have the same opportunity.
+ _, _ = chan.recv(ctx.trigger)
+
+ log.debugf("contender %v: racing", ctx.id)
+
+ // Attempt to send a value. We are competing against the other contenders.
+ ok := chan.try_send(ctx.ch, 42)
+ if ok {
+ log.debugf("contender %v: sent!", ctx.id)
+ assert(chan.send(ctx.results, 1))
+ } else {
+ log.debugf("contender %v: too-slow", ctx.id)
+ assert(chan.send(ctx.results, -1))
+ }
+ }, init_context = context)
+ }
+
+ // Spinup a closer thread that will close the results channel once all
+ // contenders are done. This lets the test thread check for spurious results by
+ // draining the results until closed.
+ results_closer := thread.create_and_start_with_poly_data2(&wait, results, proc(wg: ^sync.Wait_Group, results: chan.Chan(int)) {
+ sync.wait_group_wait(wg)
+ assert(chan.close(results))
+ })
+
+ // Wait for contenders to be ready.
+ for _ in 0..<len(contenders) {
+ if data, ok := chan.recv(start); !ok {
+ testing.expect_value(t, ok, true)
+ testing.expect_value(t, data.(string), "ready")
+ }
+ }
+
+ // Fire the trigger when the test thread is ready to receive.
+ trigger_closer := thread.create_and_start_with_poly_data2(trigger, ch, proc(trigger: chan.Chan(any), ch: chan.Chan(int)) {
+ for !chan.can_send(ch) {
+ thread.yield()
+ }
+ assert(chan.close(trigger))
+ })
+
+ // Blocking read, wait for a sender.
+ if v, ok := chan.recv(ch); !ok {
+ testing.expect_value(t, ok, true)
+ testing.expect_value(t, v, 42)
+ }
+
+ did_send_count: int
+ did_not_send_count: int
+
+ // Let the contenders fight to send a value.
+ for {
+ data, ok := chan.recv(results)
+ if !ok {
+ break
+ }
+
+ log.debugf("data: %v, ok: %v", data, ok)
+
+ switch data {
+ case 1:
+ did_send_count += 1
+ case -1:
+ did_not_send_count += 1
+ case:
+ testing.fail_now(t, "got spurious result")
+ }
+ }
+
+ thread.join(trigger_closer)
+ thread.join(results_closer)
+ thread.join_multiple(..contenders[:])
+
+ defer for tr in contenders {
+ thread.destroy(tr)
+ }
+ defer thread.destroy(trigger_closer)
+ defer thread.destroy(results_closer)
+
+ // Expect that one got to send and the others did not.
+ testing.expect_value(t, did_send_count, 1)
+ testing.expect_value(t, did_not_send_count, len(contenders)-1)
+}
+
// This test guarantees a buffered channel's messages can still be received
// even after closing. This is currently how the API works. If that changes,
// this test will need to change.