package sync_chan import "base:builtin" import "base:intrinsics" import "base:runtime" import "core:sync" import "core:math/rand" when ODIN_TEST { /* Hook for testing _try_select_raw allowing the test harness to manipulate the channels prior to the select actually operating on them. */ __try_select_raw_pause : proc() = nil } /* Determines what operations `Chan` supports. */ Direction :: enum { Send = -1, Both = 0, Recv = +1, } /* A typed wrapper around `Raw_Chan` which should be used preferably. Note: all procedures accepting `Raw_Chan` also accept `Chan`. **Inputs** - `$T`: The type of the messages - `Direction`: what `Direction` the channel supports Example: import "core:sync/chan" chan_example :: proc() { // Create an unbuffered channel with messages of type int, // supporting both sending and receiving. // Creating unidirectional channels, although possible, is useless. c, _ := chan.create(chan.Chan(int), context.allocator) defer chan.destroy(c) // This channel can now only be used for receiving messages recv_only_channel: chan.Chan(int, .Recv) = chan.as_recv(c) // This channel can now only be used for sending messages send_only_channel: chan.Chan(int, .Send) = chan.as_send(c) } */ Chan :: struct($T: typeid, $D: Direction = Direction.Both) { #subtype impl: ^Raw_Chan `fmt:"-"`, } /* `Raw_Chan` allows for thread-safe communication using fixed-size messages. This is the low-level implementation of `Chan`, which does not include the concept of Direction. Example: import "core:sync/chan" raw_chan_example :: proc() { // Create an unbuffered channel with messages of type int, c, _ := chan.create_raw(size_of(int), align_of(int), context.allocator) defer chan.destroy(c) } */ Raw_Chan :: struct { // Shared allocator: runtime.Allocator, allocation_size: int, msg_size: u16, closed: b16, // guarded by `mutex` mutex: sync.Mutex, r_cond: sync.Cond, w_cond: sync.Cond, r_waiting: int, // guarded by `mutex` w_waiting: int, // guarded by `mutex` did_read: bool, // lets a sender know if the value was read // Buffered queue: ^Raw_Queue, // Unbuffered unbuffered_data: rawptr, } /* Creates a buffered or unbuffered `Chan` instance. *Allocates Using Provided Allocator* **Inputs** - `$C`: Type of `Chan` to create - [`cap`: The capacity of the channel] omit for creating unbuffered channels - `allocator`: The allocator to use **Returns**: - The initialized `Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_example :: proc() { unbuffered: chan.Chan(int) buffered: chan.Chan(int) err: runtime.Allocator_Error unbuffered, err = chan.create(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(unbuffered) buffered, err = chan.create(chan.Chan(int), 10, context.allocator) assert(err == .None) defer chan.destroy(buffered) } */ create :: proc{ create_unbuffered, create_buffered, } /* Creates an unbuffered version of the specified `Chan` type. *Allocates Using Provided Allocator* **Inputs** - `$C`: Type of `Chan` to create - `allocator`: The allocator to use **Returns**: - The initialized `Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_unbuffered_example :: proc() { c, err := chan.create_unbuffered(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(c) } */ @(require_results) create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) where size_of(T) <= int(max(u16)) { c.impl, err = create_raw_unbuffered(size_of(T), align_of(T), allocator) return } /* Creates a buffered version of the specified `Chan` type. *Allocates Using Provided Allocator* **Inputs** - `$C`: Type of `Chan` to create - `cap`: The capacity of the channel - `allocator`: The allocator to use **Returns**: - The initialized `Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_buffered_example :: proc() { c, err := chan.create_buffered(chan.Chan(int), 10, context.allocator) assert(err == .None) defer chan.destroy(c) } */ @(require_results) create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error) where size_of(T) <= int(max(u16)) { c.impl, err = create_raw_buffered(size_of(T), align_of(T), cap, allocator) return } /* Creates a buffered or unbuffered `Raw_Chan` for messages of the specified size and alignment. *Allocates Using Provided Allocator* **Inputs** - `msg_size`: The size of the messages the messages being sent - `msg_alignment`: The alignment of the messages being sent - [`cap`: The capacity of the channel] omit for creating unbuffered channels - `allocator`: The allocator to use **Returns**: - The initialized `Raw_Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_raw_example :: proc() { unbuffered: ^chan.Raw_Chan buffered: ^chan.Raw_Chan err: runtime.Allocator_Error unbuffered, err = chan.create_raw(size_of(int), align_of(int), context.allocator) assert(err == .None) defer chan.destroy(unbuffered) buffered, err = chan.create_raw(size_of(int), align_of(int), 10, context.allocator) assert(err == .None) defer chan.destroy(buffered) } */ create_raw :: proc{ create_raw_unbuffered, create_raw_buffered, } /* Creates an unbuffered `Raw_Chan` for messages of the specified size and alignment. *Allocates Using Provided Allocator* **Inputs** - `msg_size`: The size of the messages the messages being sent - `msg_alignment`: The alignment of the messages being sent - `allocator`: The allocator to use **Returns**: - The initialized `Raw_Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_raw_unbuffered_example :: proc() { unbuffered, err := chan.create_raw(size_of(int), align_of(int), context.allocator) assert(err == .None) defer chan.destroy(unbuffered) } */ @(require_results) create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator, loc := #caller_location) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) { assert(msg_size <= int(max(u16))) align := max(align_of(Raw_Chan), msg_alignment) size := runtime.align_forward_int(size_of(Raw_Chan), align) offset := size size += msg_size size = runtime.align_forward_int(size, align) data := runtime.mem_alloc(size, align, allocator, loc) or_return c = (^Raw_Chan)(raw_data(data)) c.allocator = allocator c.allocation_size = size c.unbuffered_data = raw_data(data[offset:]) c.msg_size = u16(msg_size) return } /* Creates a buffered `Raw_Chan` for messages of the specified size and alignment. *Allocates Using Provided Allocator* **Inputs** - `msg_size`: The size of the messages the messages being sent - `msg_alignment`: The alignment of the messages being sent - `cap`: The capacity of the channel - `allocator`: The allocator to use **Returns**: - The initialized `Raw_Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_raw_unbuffered_example :: proc() { c, err := chan.create_raw_buffered(size_of(int), align_of(int), 10, context.allocator) assert(err == .None) defer chan.destroy(c) } */ @(require_results) create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator, loc := #caller_location) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) { assert(msg_size <= int(max(u16))) if cap <= 0 { return create_raw_unbuffered(msg_size, msg_alignment, allocator) } align := max(align_of(Raw_Chan), msg_alignment, align_of(Raw_Queue)) size := runtime.align_forward_int(size_of(Raw_Chan), align) q_offset := size size = runtime.align_forward_int(q_offset + size_of(Raw_Queue), msg_alignment) offset := size size += msg_size * cap size = runtime.align_forward_int(size, align) data := runtime.mem_alloc(size, align, allocator, loc) or_return ptr := raw_data(data) c = (^Raw_Chan)(raw_data(data)) c.allocator = allocator c.allocation_size = size bptr := ([^]byte)(ptr) c.queue = (^Raw_Queue)(bptr[q_offset:]) c.msg_size = u16(msg_size) raw_queue_init(c.queue, ([^]byte)(bptr[offset:]), cap, msg_size) return } /* Destroys the Channel. **Inputs** - `c`: The channel to destroy **Returns**: - An `Allocator_Error` */ destroy :: proc(c: ^Raw_Chan, loc := #caller_location) -> (err: runtime.Allocator_Error) { if c != nil { allocator := c.allocator err = runtime.mem_free_with_size(c, c.allocation_size, allocator, loc) } return } /* Creates a version of a channel that can only be used for sending not receiving. **Inputs** - `c`: The channel **Returns**: - An `Allocator_Error` Example: import "core:sync/chan" as_send_example :: proc() { // this procedure takes a channel that can only // be used for sending not receiving. producer :: proc(c: chan.Chan(int, .Send)) { chan.send(c, 112) // compile-time error: // value, ok := chan.recv(c) } c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) producer(chan.as_send(c)) } */ @(require_results) as_send :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (s: Chan(T, .Send)) where C.D <= .Both { return transmute(type_of(s))c } /* Creates a version of a channel that can only be used for receiving not sending. **Inputs** - `c`: The channel **Returns**: - An `Allocator_Error` Example: import "core:sync/chan" as_recv_example :: proc() { consumer :: proc(c: chan.Chan(int, .Recv)) { value, ok := chan.recv(c) // compile-time error: // chan.send(c, 22) } c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) chan.send(c, 112) consumer(chan.as_recv(c)) } */ @(require_results) as_recv :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (r: Chan(T, .Recv)) where C.D >= .Both { return transmute(type_of(r))c } /* Sends the specified message, blocking the current thread if: - the channel is unbuffered - the channel's buffer is full until the channel is being read from or the channel is closed. `send` will return `false` when attempting to send on an already closed channel. **Inputs** - `c`: The channel - `data`: The message to send **Returns** - `true` if the message was sent, `false` when the channel was already closed Example: import "core:sync/chan" send_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.send(c, 2)) // this would block since the channel has a buffersize of 1 // assert(chan.send(c, 2)) // sending on a closed channel returns false chan.close(c) assert(! chan.send(c, 2)) } */ send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both { data := data ok = send_raw(c, &data) return } /* Tries sending the specified message which is: - blocking: given the channel is unbuffered - non-blocking: given the channel is buffered **Inputs** - `c`: The channel - `data`: The message to send **Returns** - `true` if the message was sent, `false` when the channel was already closed or the channel's buffer was full Example: import "core:sync/chan" try_send_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.try_send(c, 2), "there is enough space") assert(!chan.try_send(c, 2), "the buffer is already full") } */ @(require_results) try_send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both { data := data ok = try_send_raw(c, &data) return } /* Reads a message from the channel, blocking the current thread if: - the channel is unbuffered - the channel's buffer is empty until the channel is being written to or the channel is closed. `recv` will return `false` when attempting to receive a message on an already closed channel. **Inputs** - `c`: The channel **Returns** - The message - `true` if a message was received, `false` when the channel was already closed Example: import "core:sync/chan" recv_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.send(c, 2)) value, ok := chan.recv(c) assert(ok, "the value was received") // this would block since the channel is now empty // value, ok = chan.recv(c) // reading from a closed channel returns false chan.close(c) value, ok = chan.recv(c) assert(!ok, "the channel is closed") } */ @(require_results) recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both { ok = recv_raw(c, &data) return } /* Tries reading a message from the channel in a non-blocking fashion. **Inputs** - `c`: The channel **Returns** - The message - `true` if a message was received, `false` when the channel was already closed or no message was available Example: import "core:sync/chan" try_recv_example :: proc() { c, err := chan.create(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(c) _, ok := chan.try_recv(c) assert(!ok, "there is not value to read") } */ @(require_results) try_recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both { ok = try_recv_raw(c, &data) return } /* Sends the specified message, blocking the current thread if: - the channel is unbuffered - the channel's buffer is full until the channel is being read from or the channel is closed. `send_raw` will return `false` when attempting to send on an already closed channel. Note: The message referenced by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: The channel - `msg_out`: Pointer to the data to send **Returns** - `true` if the message was sent, `false` when the channel was already closed Example: import "core:sync/chan" send_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) value := 2 assert(chan.send_raw(c, &value)) // this would block since the channel has a buffersize of 1 // assert(chan.send_raw(c, &value)) // sending on a closed channel returns false chan.close(c) assert(! chan.send_raw(c, &value)) } */ @(require_results) send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { if c == nil { return } if c.queue != nil { // buffered sync.guard(&c.mutex) for !c.closed && c.queue.len == c.queue.cap { c.w_waiting += 1 sync.wait(&c.w_cond, &c.mutex) c.w_waiting -= 1 } if c.closed { return false } ok = raw_queue_push(c.queue, msg_in) if c.r_waiting > 0 { sync.signal(&c.r_cond) } } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) if c.closed { return false } c.did_read = false defer c.did_read = false intrinsics.mem_copy(c.unbuffered_data, msg_in, int(c.msg_size)) c.w_waiting += 1 if c.r_waiting > 0 { sync.signal(&c.r_cond) } sync.wait(&c.w_cond, &c.mutex) if c.closed && !c.did_read { return false } ok = true } return } /* Reads a message from the channel, blocking the current thread if: - the channel is unbuffered - the channel's buffer is empty until the channel is being written to or the channel is closed. `recv_raw` will return `false` when attempting to receive a message on an already closed channel. Note: The location pointed to by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: The channel - `msg_out`: Pointer to where the message should be stored **Returns** - `true` if a message was received, `false` when the channel was already closed Example: import "core:sync/chan" recv_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) value := 2 assert(chan.send_raw(c, &value)) assert(chan.recv_raw(c, &value)) // this would block since the channel is now empty // assert(chan.recv_raw(c, &value)) // reading from a closed channel returns false chan.close(c) assert(! chan.recv_raw(c, &value)) } */ @(require_results) recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { if c == nil { return } if c.queue != nil { // buffered sync.guard(&c.mutex) for c.queue.len == 0 { if c.closed { return } c.r_waiting += 1 sync.wait(&c.r_cond, &c.mutex) c.r_waiting -= 1 } msg := raw_queue_pop(c.queue) if msg != nil { intrinsics.mem_copy(msg_out, msg, int(c.msg_size)) } if c.w_waiting > 0 { sync.signal(&c.w_cond) } ok = true } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) for !c.closed && c.w_waiting == 0 { c.r_waiting += 1 sync.wait(&c.r_cond, &c.mutex) c.r_waiting -= 1 } if c.closed { return } intrinsics.mem_copy(msg_out, c.unbuffered_data, int(c.msg_size)) c.w_waiting -= 1 c.did_read = true sync.signal(&c.w_cond) ok = true } return } /* Tries sending the specified message which is: - blocking: given the channel is unbuffered - non-blocking: given the channel is buffered Note: The message referenced by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: the channel - `msg_out`: pointer to the data to send **Returns** - `true` if the message was sent, `false` when the channel was already closed or the channel's buffer was full Example: import "core:sync/chan" try_send_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) value := 2 assert(chan.try_send_raw(c, &value), "there is enough space") assert(!chan.try_send_raw(c, &value), "the buffer is already full") } */ @(require_results) try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { if c == nil { return false } if c.queue != nil { // buffered sync.guard(&c.mutex) if c.queue.len == c.queue.cap { return false } if c.closed { return false } ok = raw_queue_push(c.queue, msg_in) if c.r_waiting > 0 { sync.signal(&c.r_cond) } } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) if c.closed || c.r_waiting - c.w_waiting <= 0 { return false } intrinsics.mem_copy(c.unbuffered_data, msg_in, int(c.msg_size)) c.w_waiting += 1 if c.r_waiting > 0 { sync.signal(&c.r_cond) } sync.wait(&c.w_cond, &c.mutex) ok = true } return } /* Reads a message from the channel if one is available. Note: The location pointed to by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: The channel - `msg_out`: Pointer to where the message should be stored **Returns** - `true` if a message was received, `false` when the channel was already closed or no message was available Example: import "core:sync/chan" try_recv_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), context.allocator) assert(err == .None) defer chan.destroy(c) value: int assert(!chan.try_recv_raw(c, &value)) } */ @(require_results) try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool { if c == nil { return false } if c.queue != nil { // buffered sync.guard(&c.mutex) if c.queue.len == 0 { return false } msg := raw_queue_pop(c.queue) if msg != nil { intrinsics.mem_copy(msg_out, msg, int(c.msg_size)) } if c.w_waiting > 0 { sync.signal(&c.w_cond) } return true } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) if c.closed || c.w_waiting - c.r_waiting <= 0 { return false } intrinsics.mem_copy(msg_out, c.unbuffered_data, int(c.msg_size)) c.w_waiting -= 1 sync.signal(&c.w_cond) return true } return false } /* Checks if the given channel is buffered. **Inputs** - `c`: The channel **Returns**: - `true` if the channel is buffered, `false` otherwise Example: import "core:sync/chan" is_buffered_example :: proc() { c, _ := chan.create(chan.Chan(int), 1, context.allocator) defer chan.destroy(c) assert(chan.is_buffered(c)) } */ @(require_results) is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool { return c != nil && c.queue != nil } /* Checks if the given channel is unbuffered. **Inputs** - `c`: The channel **Returns**: - `true` if the channel is unbuffered, `false` otherwise Example: import "core:sync/chan" is_buffered_example :: proc() { c, _ := chan.create(chan.Chan(int), context.allocator) defer chan.destroy(c) assert(chan.is_unbuffered(c)) } */ @(require_results) is_unbuffered :: proc "contextless" (c: ^Raw_Chan) -> bool { return c != nil && c.unbuffered_data != nil } /* Returns the number of elements currently in the channel. Note: Unbuffered channels will always return `0` because they cannot hold elements. **Inputs** - `c`: The channel **Returns**: - Number of elements Example: import "core:sync/chan" import "core:fmt" len_example :: proc() { c, _ := chan.create(chan.Chan(int), 2, context.allocator) defer chan.destroy(c) fmt.println(chan.len(c)) assert(chan.send(c, 1)) // add an element fmt.println(chan.len(c)) } Output: 0 1 */ @(require_results) len :: proc "contextless" (c: ^Raw_Chan) -> int { if c != nil && c.queue != nil { sync.guard(&c.mutex) return c.queue.len } return 0 } /* Returns the number of elements the channel could hold. Note: Unbuffered channels will always return `0` because they cannot hold elements. **Inputs** - `c`: The channel **Returns**: - Number of elements Example: import "core:sync/chan" import "core:fmt" cap_example :: proc() { c, _ := chan.create(chan.Chan(int), 2, context.allocator) defer chan.destroy(c) fmt.println(chan.cap(c)) } Output: 2 */ @(require_results) cap :: proc "contextless" (c: ^Raw_Chan) -> int { if c != nil && c.queue != nil { sync.guard(&c.mutex) return c.queue.cap } return 0 } /* Closes the channel, preventing new messages from being added. **Inputs** - `c`: The channel **Returns**: - `true` if the channel was closed by this operation, `false` if it was already closed Example: import "core:sync/chan" close_example :: proc() { c, _ := chan.create(chan.Chan(int), 2, context.allocator) defer chan.destroy(c) // Sending a message to an open channel assert(chan.send(c, 1), "allowed to send") // Closing the channel successfully assert(chan.close(c), "successfully closed") // Trying to send a message after the channel is closed (should fail) assert(!chan.send(c, 1), "not allowed to send after close") // Trying to close the channel again (should fail since it's already closed) assert(!chan.close(c), "was already closed") } */ close :: proc "contextless" (c: ^Raw_Chan) -> bool { if c == nil { return false } sync.guard(&c.mutex) if c.closed { return false } c.closed = true sync.broadcast(&c.r_cond) sync.broadcast(&c.w_cond) return true } /* Returns if the channel is closed or not **Inputs** - `c`: The channel **Returns**: - `true` if the channel is closed, `false` otherwise */ @(require_results) is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { if c == nil { return true } sync.guard(&c.mutex) return bool(c.closed) } /* Returns whether a message can be read without blocking the current thread. Specifically, it checks if the channel is buffered and not full, or if there is already a writer attempting to send a message. **Inputs** - `c`: The channel **Returns** - `true` if a message can be read, `false` otherwise Example: import "core:sync/chan" can_recv_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(!chan.can_recv(c), "the cannel is empty") assert(chan.send(c, 2)) assert(chan.can_recv(c), "there is message to read") } */ @(require_results) can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { sync.guard(&c.mutex) if is_buffered(c) { return c.queue.len > 0 } return c.w_waiting - c.r_waiting > 0 } /* Returns whether a message can be sent without blocking the current thread. Specifically, it checks if the channel is buffered and not full, or if there is already a reader waiting for a message. **Inputs** - `c`: The channel **Returns** - `true` if a message can be sent, `false` otherwise Example: import "core:sync/chan" can_send_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.can_send(c), "the channel's buffer is not full") assert(chan.send(c, 2)) assert(!chan.can_send(c), "the channel's buffer is full") } */ @(require_results) can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { sync.guard(&c.mutex) if is_buffered(c) { return c.queue.len < c.queue.cap } return c.r_waiting - c.w_waiting > 0 } /* Specifies the direction of the selected channel. */ Select_Status :: enum { None, Recv, Send, } /* Attempts to either send or receive messages on the specified channels without blocking. `try_select_raw` first identifies which channels have messages ready to be received and which are available for sending. It then randomly selects one operation (either a send or receive) to perform. If no channels have messages ready, the procedure is a noop. Note: Each message in `send_msgs` corresponds to the send channel at the same index in `sends`. If the message is nil, corresponding send channel will be skipped. **Inputs** - `recv`: A slice of channels to read from - `sends`: A slice of channels to send messages on - `send_msgs`: A slice of messages to send - `recv_out`: A pointer to the location where, when receiving, the message should be stored **Returns** - Position of the available channel which was used for receiving or sending - `true` if sending/receiving was successfull, `false` if the channel was closed or no channel was available Example: import "core:sync/chan" import "core:fmt" try_select_raw_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) // sending value '1' on the channel value1 := 1 msgs := [?]rawptr{&value1} send_chans := [?]^chan.Raw_Chan{c} // for simplicity the same channel used for sending is also used for receiving receive_chans := [?]^chan.Raw_Chan{c} // where the value from the read should be stored received_value: int idx, ok := chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) fmt.println("RECEIVED VALUE ", received_value) idx, ok = chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) fmt.println("RECEIVED VALUE ", received_value) // closing of a channel also affects the select operation chan.close(c) idx, ok = chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) } Output: SELECT: 0 Send RECEIVED VALUE 0 SELECT: 0 Recv RECEIVED VALUE 1 SELECT: -1 None */ @(require_results) try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) #no_bounds_check { Select_Op :: struct { idx: int, // local to the slice that was given is_recv: bool, } candidate_count := builtin.len(recvs)+builtin.len(sends) candidates := ([^]Select_Op)(intrinsics.alloca(candidate_count*size_of(Select_Op), align_of(Select_Op))) try_loop: for { count := 0 for c, i in recvs { if !c.closed && can_recv(c) { candidates[count] = { is_recv = true, idx = i, } count += 1 } } for c, i in sends { if i > builtin.len(send_msgs)-1 || send_msgs[i] == nil { continue } if !c.closed && can_send(c) { candidates[count] = { is_recv = false, idx = i, } count += 1 } } if count == 0 { return -1, .None } when ODIN_TEST { if __try_select_raw_pause != nil { __try_select_raw_pause() } } candidate_idx := rand.int_max(count) if count > 0 else 0 sel := candidates[candidate_idx] if sel.is_recv { status = .Recv if !try_recv_raw(recvs[sel.idx], recv_out) { continue try_loop } } else { status = .Send if !try_send_raw(sends[sel.idx], send_msgs[sel.idx]) { continue try_loop } } return sel.idx, status } } @(require_results, deprecated = "use try_select_raw") select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) #no_bounds_check { return try_select_raw(recvs, sends, send_msgs, recv_out) } /* `Raw_Queue` is a non-thread-safe queue implementation designed to store messages of fixed size and alignment. Note: For most use cases, it is recommended to use `core:container/queue` instead, as `Raw_Queue` is used internally by `Raw_Chan` and may not provide the desired level of convenience for typical applications. */ @(private) Raw_Queue :: struct { data: [^]byte, len: int, cap: int, next: int, size: int, // element size } /* Initializes a `Raw_Queue` **Inputs** - `q`: A pointert to the `Raw_Queue` to initialize - `data`: The pointer to backing slice storing the messages - `cap`: The capacity of the queue - `size`: The size of a message Example: import "core:sync/chan" raw_queue_init_example :: proc() { // use a stack allocated array as backing storage storage: [100]int rq: chan.Raw_Queue chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int)) } */ @(private) raw_queue_init :: proc "contextless" (q: ^Raw_Queue, data: rawptr, cap: int, size: int) { q.data = ([^]byte)(data) q.len = 0 q.cap = cap q.next = 0 q.size = size } /* Add an element to the queue. Note: The message referenced by `data` must match the size and alignment used when the `Raw_Queue` was initialized. **Inputs** - `q`: A pointert to the `Raw_Queue` - `data`: The pointer to message to add **Returns** - `true` if the element was added, `false` when the queue is already full Example: import "core:sync/chan" raw_queue_push_example :: proc() { storage: [100]int rq: chan.Raw_Queue chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int)) value := 2 assert(chan.raw_queue_push(&rq, &value), "there was enough space") } */ @(private, require_results) raw_queue_push :: proc "contextless" (q: ^Raw_Queue, data: rawptr) -> bool { if q.len == q.cap { return false } pos := q.next + q.len if pos >= q.cap { pos -= q.cap } val_ptr := q.data[pos*q.size:] intrinsics.mem_copy(val_ptr, data, q.size) q.len += 1 return true } /* Removes and returns the first element of the queue. Note: The returned element is only guaranteed to be valid until the next `raw_queue_push` operation. Accessing it after that point may result in undefined behavior. **Inputs** - `c`: A pointer to the `Raw_Queue`. **Returns** - A pointer to the first element in the queue, or `nil` if the queue is empty. Example: import "core:sync/chan" raw_queue_pop_example :: proc() { storage: [100]int rq: chan.Raw_Queue chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int)) assert(chan.raw_queue_pop(&rq) == nil, "queue was empty") // add an element to the queue value := 2 assert(chan.raw_queue_push(&rq, &value), "there was enough space") assert((cast(^int)chan.raw_queue_pop(&rq))^ == 2, "retrieved the element") } */ @(private, require_results) raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) { if q.len > 0 { data = q.data[q.next*q.size:] q.next += 1 q.len -= 1 if q.next >= q.cap { q.next -= q.cap } } return }