diff options
| author | Laytan <laytanlaats@hotmail.com> | 2025-06-20 22:11:39 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-20 22:11:39 +0200 |
| commit | 7f648d11d6a53b083108508e46066a7d2916b534 (patch) | |
| tree | b6547603c9fd74b4687b5d8c2da61ee29c1c2639 /tests | |
| parent | 8e782d9a0028987b37d597d79ebf53953ed9592f (diff) | |
| parent | 17927729dd56574de1f547a9d34369bd4731fa41 (diff) | |
Merge pull request #5329 from JackMordaunt/jfm-fix_chan_try_send
chan: fix try_send and send
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/core/sync/chan/test_core_sync_chan.odin | 319 |
1 files changed, 272 insertions, 47 deletions
diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index a87452eb0..304986ae7 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -4,6 +4,7 @@ import "base:runtime" import "base:intrinsics" import "core:log" import "core:math/rand" +import "core:sync" import "core:sync/chan" import "core:testing" import "core:thread" @@ -33,18 +34,16 @@ Comm :: struct { BUFFER_SIZE :: 8 MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second -SLEEP_TIME :: 1 * time.Millisecond + +// Synchronizes try_select tests that require access to global state. +test_lock: sync.Mutex +__global_context_for_test: rawptr comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data - manual_buffering := data.manual_buffering n: i64 - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - recv_loop: for msg in chan.recv(data.host) { #partial switch msg.type { case .Add: n += msg.i @@ -56,14 +55,6 @@ comm_client :: proc(th: ^thread.Thread) { case: panic("Unknown message type for client.") } - - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - } - - for manual_buffering && !chan.can_send(data.host) { - thread.yield() } chan.send(data.client, Message{.Result, n}) @@ -72,9 +63,6 @@ comm_client :: proc(th: ^thread.Thread) { send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) { expected = 1 - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.Add, 1}) log.debug(Message{.Add, 1}) @@ -96,9 +84,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: expected /= msg.i } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } if manual_buffering { testing.expect(t, chan.len(host) == 0) } @@ -107,9 +92,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: log.debug(msg) } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.End, 0}) log.debug(Message{.End, 0}) chan.close(host) @@ -148,18 +130,15 @@ test_chan_buffered :: proc(t: ^testing.T) { expected := send_messages(t, comm.host, manual_buffering = false) - // Sleep so we can give the other thread enough time to buffer its message. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.len(comm.client), 1) - result, ok := chan.try_recv(comm.client) + result, ok := chan.recv(comm.client) + testing.expect_value(t, ok, true) + testing.expect_value(t, result.i, expected) - // One more sleep to ensure it has enough time to close. - time.sleep(SLEEP_TIME) + // Wait for channel to close. + _, ok = chan.recv(comm.client) + testing.expect(t, !ok, "channel should have been closed") testing.expect_value(t, chan.is_closed(comm.client), true) - testing.expect_value(t, ok, true) - testing.expect_value(t, result.i, expected) log.debug(result, expected) // Make sure sending to closed channels fails. @@ -171,6 +150,8 @@ test_chan_buffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test @@ -193,6 +174,10 @@ test_chan_unbuffered :: proc(t: ^testing.T) { testing.expect(t, !chan.is_buffered(comm.client)) testing.expect(t, chan.is_unbuffered(comm.host)) testing.expect(t, chan.is_unbuffered(comm.client)) + testing.expect(t, !chan.can_send(comm.host)) + testing.expect(t, !chan.can_send(comm.client)) + testing.expect(t, !chan.can_recv(comm.host)) + testing.expect(t, !chan.can_recv(comm.client)) testing.expect_value(t, chan.len(comm.host), 0) testing.expect_value(t, chan.len(comm.client), 0) testing.expect_value(t, chan.cap(comm.host), 0) @@ -203,25 +188,16 @@ test_chan_unbuffered :: proc(t: ^testing.T) { reckoner.data = &comm thread.start(reckoner) - for !chan.can_send(comm.client) { - thread.yield() - } - expected := send_messages(t, comm.host) testing.expect_value(t, chan.is_closed(comm.host), true) - for !chan.can_recv(comm.client) { - thread.yield() - } - - result, ok := chan.try_recv(comm.client) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) log.debug(result, expected) - // Sleep so we can give the other thread enough time to close its side - // after we've received its message. - time.sleep(SLEEP_TIME) + _, ok2 := chan.recv(comm.client) + testing.expect(t, !ok2, "read of closed channel should return false") testing.expect_value(t, chan.is_closed(comm.client), true) @@ -234,6 +210,8 @@ test_chan_unbuffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test @@ -250,6 +228,198 @@ test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) { testing.expect(t, !chan.send(ch, 32)) } +// Ensures that if a thread is doing a blocking send and the channel +// is closed, it will report false to indicate a failure to complete. +@test +test_fail_blocking_send_on_close :: proc(t: ^testing.T) { + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + sender := thread.create_and_start_with_poly_data(ch, proc(ch: chan.Chan(int)) { + assert(!chan.send(ch, 42)) + }) + + for !chan.can_recv(ch) { + thread.yield() + } + + testing.expect(t, chan.close(ch)) + thread.join(sender) + thread.destroy(sender) +} + +// Ensures that if a thread is doing a blocking read and the channel +// is closed, it will report false to indicate a failure to complete. +@test +test_fail_blocking_recv_on_close :: proc(t: ^testing.T) { + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + reader := thread.create_and_start_with_poly_data(ch, proc(ch: chan.Chan(int)) { + v, ok := chan.recv(ch) + assert(!ok) + assert(v == 0) + }) + + for !chan.can_send(ch) { + thread.yield() + } + + testing.expect(t, chan.close(ch)) + thread.join(reader) + thread.destroy(reader) +} + +// Ensures that try_send for unbuffered channels works as expected. +// If 1 reader of a channel, and 3 try_senders, only one of the senders +// will succeed and none of them will block. +@test +test_unbuffered_try_send_chan_contention :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + start, start_alloc_err := chan.create(chan.Chan(any), context.allocator) + assert(start_alloc_err == nil, "allocation failed") + defer chan.destroy(start) + + trigger, trigger_alloc_err := chan.create(chan.Chan(any), context.allocator) + assert(trigger_alloc_err == nil, "allocation failed") + defer chan.destroy(trigger) + + results, results_alloc_err := chan.create(chan.Chan(int), 3, context.allocator) + assert(results_alloc_err == nil, "allocation failed") + defer chan.destroy(results) + + ch, ch_alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(ch_alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + // There are no readers or writers, so calling recv or send would block! + testing.expect_value(t, chan.can_send(ch), false) + testing.expect_value(t, chan.can_recv(ch), false) + + // Non-blocking operations should not block, and should return false. + testing.expect_value(t, chan.try_send(ch, -1), false) + if v, ok := chan.try_recv(ch); ok { + testing.expect_value(t, ok, false) + testing.expect_value(t, v, 0) + } + + // Spinup several threads contending to send on an unbuffered channel. + contenders: [3]^thread.Thread + wait: sync.Wait_Group + + for ii in 0..<len(contenders) { + sync.wait_group_add(&wait, 1) + Context :: struct { + id: int, + start: chan.Chan(any), + trigger: chan.Chan(any), + results: chan.Chan(int), + ch: chan.Chan(int), + wg: ^sync.Wait_Group, + } + ctx := Context { + id = ii, + start = start, + trigger = trigger, + results = results, + ch = ch, + wg = &wait, + } + contenders[ii] = thread.create_and_start_with_poly_data(ctx, proc(ctx: Context) { + defer sync.wait_group_done(ctx.wg) + + assert(!chan.can_send(ctx.ch), "channel shouldn't be ready for non-blocking send yet") + assert(chan.send(ctx.start, "ready")) + + log.debugf("contender %v: ready", ctx.id) + + // Wait for trigger to be closed so that all contenders have the same opportunity. + _, _ = chan.recv(ctx.trigger) + + log.debugf("contender %v: racing", ctx.id) + + // Attempt to send a value. We are competing against the other contenders. + ok := chan.try_send(ctx.ch, 42) + if ok { + log.debugf("contender %v: sent!", ctx.id) + assert(chan.send(ctx.results, 1)) + } else { + log.debugf("contender %v: too-slow", ctx.id) + assert(chan.send(ctx.results, -1)) + } + }, init_context = context) + } + + // Spinup a closer thread that will close the results channel once all + // contenders are done. This lets the test thread check for spurious results by + // draining the results until closed. + results_closer := thread.create_and_start_with_poly_data2(&wait, results, proc(wg: ^sync.Wait_Group, results: chan.Chan(int)) { + sync.wait_group_wait(wg) + assert(chan.close(results)) + }) + + // Wait for contenders to be ready. + for _ in 0..<len(contenders) { + if data, ok := chan.recv(start); !ok { + testing.expect_value(t, ok, true) + testing.expect_value(t, data.(string), "ready") + } + } + + // Fire the trigger when the test thread is ready to receive. + trigger_closer := thread.create_and_start_with_poly_data2(trigger, ch, proc(trigger: chan.Chan(any), ch: chan.Chan(int)) { + for !chan.can_send(ch) { + thread.yield() + } + assert(chan.close(trigger)) + }) + + // Blocking read, wait for a sender. + if v, ok := chan.recv(ch); !ok { + testing.expect_value(t, ok, true) + testing.expect_value(t, v, 42) + } + + did_send_count: int + did_not_send_count: int + + // Let the contenders fight to send a value. + for { + data, ok := chan.recv(results) + if !ok { + break + } + + log.debugf("data: %v, ok: %v", data, ok) + + switch data { + case 1: + did_send_count += 1 + case -1: + did_not_send_count += 1 + case: + testing.fail_now(t, "got spurious result") + } + } + + thread.join(trigger_closer) + thread.join(results_closer) + thread.join_multiple(..contenders[:]) + + defer for tr in contenders { + thread.destroy(tr) + } + defer thread.destroy(trigger_closer) + defer thread.destroy(results_closer) + + // Expect that one got to send and the others did not. + testing.expect_value(t, did_send_count, 1) + testing.expect_value(t, did_not_send_count, len(contenders)-1) +} + // This test guarantees a buffered channel's messages can still be received // even after closing. This is currently how the API works. If that changes, // this test will need to change. @@ -279,6 +449,7 @@ test_accept_message_from_closed_buffered_chan :: proc(t: ^testing.T) { /* @test test_try_select_raw_happy :: proc(t: ^testing.T) { + sync.guard(&test_lock) testing.set_fail_timeout(t, FAIL_TIME) recv1, recv1_err := chan.create(chan.Chan(int), context.allocator) @@ -351,6 +522,7 @@ test_try_select_raw_happy :: proc(t: ^testing.T) { // try_select_raw operation does not block. @test test_try_select_raw_default_state :: proc(t: ^testing.T) { + sync.guard(&test_lock) testing.set_fail_timeout(t, FAIL_TIME) recv1, recv1_err := chan.create(chan.Chan(int), context.allocator) @@ -377,6 +549,7 @@ test_try_select_raw_default_state :: proc(t: ^testing.T) { // thread between calls to can_{send,recv} and try_{send,recv}_raw. @test test_try_select_raw_no_toctou :: proc(t: ^testing.T) { + sync.guard(&test_lock) testing.set_fail_timeout(t, FAIL_TIME) // Trigger will be used to coordinate between the thief and the try_select. @@ -385,9 +558,6 @@ test_try_select_raw_no_toctou :: proc(t: ^testing.T) { assert(trigger_err == nil, "allocation failed") defer chan.destroy(trigger) - @(static) - __global_context_for_test: rawptr - __global_context_for_test = &trigger defer __global_context_for_test = nil @@ -452,3 +622,58 @@ test_try_select_raw_no_toctou :: proc(t: ^testing.T) { thread.join(thief) thread.destroy(thief) } + +// Ensures that a sender will always report correctly whether the value was received +// or not in the event of channel closure. +// +// 1. send thread does a blocking send +// 2. recv and close threads race +// 3. send returns false if close won and reports true if recv won +// +// We know if recv won by whether it sends us the original value on the results channel. +// This test is non-deterministic. +@test +test_send_close_read :: proc(t: ^testing.T) { + trigger, trigger_err := chan.create(chan.Chan(int), context.allocator) + assert(trigger_err == nil, "allocation failed") + defer chan.destroy(trigger) + + ch, alloc_err := chan.create(chan.Chan(int), context.allocator) + assert(alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + results, results_err := chan.create(chan.Chan(int), 1, context.allocator) + assert(results_err == nil, "allocation failed") + defer chan.destroy(results) + + receiver := thread.create_and_start_with_poly_data3(trigger, results, ch, proc(trigger, results, ch: chan.Chan(int)) { + _, _ = chan.recv(trigger) + v, _ := chan.recv(ch) + assert(chan.send(results, v)) + }) + + closer := thread.create_and_start_with_poly_data2(trigger, ch, proc(trigger, ch: chan.Chan(int)) { + _, _ = chan.recv(trigger) + ok := chan.close(ch) + assert(ok) + }) + + testing.expect(t, chan.close(trigger)) + + did_send := chan.send(ch, 42) + + v, ok := chan.recv(results) + testing.expect(t, ok) + + if v == 42 { + testing.expect(t, did_send) + } else { + testing.expect(t, !did_send) + } + + thread.join_multiple(receiver, closer) + thread.destroy(receiver) + thread.destroy(closer) +} + + |