aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/sync/chan/chan.odin860
-rw-r--r--core/sync/chan/doc.odin68
2 files changed, 928 insertions, 0 deletions
diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin
index c470d15f3..37f841ee2 100644
--- a/core/sync/chan/chan.odin
+++ b/core/sync/chan/chan.odin
@@ -7,16 +7,62 @@ import "core:mem"
import "core:sync"
import "core:math/rand"
+/*
+Determines what operations `Chan` supports.
+*/
Direction :: enum {
Send = -1,
Both = 0,
Recv = +1,
}
+/*
+A typed wrapper around `Raw_Chan` which should be used
+preferably.
+
+Note: all procedures accepting `Raw_Chan` also accept `Chan`.
+
+**Inputs**
+- `$T`: The type of the messages
+- `Direction`: what `Direction` the channel supports
+
+Example:
+
+ import "core:sync/chan"
+
+ chan_example :: proc() {
+ // Create an unbuffered channel with messages of type int,
+ // supporting both sending and receiving.
+ // Creating unidirectional channels, although possible, is useless.
+ c, _ := chan.create(chan.Chan(int), context.allocator)
+ defer chan.destroy(c)
+
+ // This channel can now only be used for receiving messages
+ recv_only_channel: chan.Chan(int, .Recv) = chan.as_recv(c)
+ // This channel can now only be used for sending messages
+ send_only_channel: chan.Chan(int, .Send) = chan.as_send(c)
+ }
+*/
Chan :: struct($T: typeid, $D: Direction = Direction.Both) {
#subtype impl: ^Raw_Chan `fmt:"-"`,
}
+/*
+`Raw_Chan` allows for thread-safe communication using fixed-size messages.
+This is the low-level implementation of `Chan`, which does not include
+the concept of Direction.
+
+Example:
+
+ import "core:sync/chan"
+
+ raw_chan_example :: proc() {
+ // Create an unbuffered channel with messages of type int,
+ c, _ := chan.create_raw(size_of(int), align_of(int), context.allocator)
+ defer chan.destroy(c)
+ }
+
+*/
Raw_Chan :: struct {
// Shared
allocator: runtime.Allocator,
@@ -36,12 +82,66 @@ Raw_Chan :: struct {
unbuffered_data: rawptr,
}
+/*
+Creates a buffered or unbuffered `Chan` instance.
+
+*Allocates Using Provided Allocator*
+
+**Inputs**
+- `$C`: Type of `Chan` to create
+- [`cap`: The capacity of the channel] omit for creating unbuffered channels
+- `allocator`: The allocator to use
+
+**Returns**:
+- The initialized `Chan`
+- An `Allocator_Error`
+
+Example:
+ import "core:sync/chan"
+
+ create_example :: proc() {
+ unbuffered: chan.Chan(int)
+ buffered: chan.Chan(int)
+ err: runtime.Allocator_Error
+
+ unbuffered, err = chan.create(chan.Chan(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(unbuffered)
+
+ buffered, err = chan.create(chan.Chan(int), 10, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(buffered)
+ }
+*/
create :: proc{
create_unbuffered,
create_buffered,
}
+/*
+Creates an unbuffered version of the specified `Chan` type.
+
+*Allocates Using Provided Allocator*
+
+**Inputs**
+- `$C`: Type of `Chan` to create
+- `allocator`: The allocator to use
+
+**Returns**:
+- The initialized `Chan`
+- An `Allocator_Error`
+
+Example:
+
+ import "core:sync/chan"
+
+ create_unbuffered_example :: proc() {
+ c, err := chan.create_unbuffered(chan.Chan(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+ }
+*/
@(require_results)
create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error)
where size_of(T) <= int(max(u16)) {
@@ -49,6 +149,28 @@ create_unbuffered :: proc($C: typeid/Chan($T), allocator: runtime.Allocator) ->
return
}
+/*
+Creates a buffered version of the specified `Chan` type.
+
+*Allocates Using Provided Allocator*
+
+**Inputs**
+- `$C`: Type of `Chan` to create
+- `cap`: The capacity of the channel
+- `allocator`: The allocator to use
+
+**Returns**:
+- The initialized `Chan`
+- An `Allocator_Error`
+
+Example:
+
+ create_buffered_example :: proc() {
+ c, err := chan.create_buffered(chan.Chan(int), 10, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+ }
+*/
@(require_results)
create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runtime.Allocator) -> (c: C, err: runtime.Allocator_Error)
where size_of(T) <= int(max(u16)) {
@@ -56,11 +178,71 @@ create_buffered :: proc($C: typeid/Chan($T), #any_int cap: int, allocator: runti
return
}
+/*
+Creates a buffered or unbuffered `Raw_Chan` for messages of the specified
+size and alignment.
+
+*Allocates Using Provided Allocator*
+
+**Inputs**
+- `msg_size`: The size of the messages the messages being sent
+- `msg_alignment`: The alignment of the messages being sent
+- [`cap`: The capacity of the channel] omit for creating unbuffered channels
+- `allocator`: The allocator to use
+
+**Returns**:
+- The initialized `Raw_Chan`
+- An `Allocator_Error`
+
+Example:
+
+ import "core:sync/chan"
+
+ create_raw_example :: proc() {
+ unbuffered: ^chan.Raw_Chan
+ buffered: ^chan.Raw_Chan
+ err: runtime.Allocator_Error
+
+ unbuffered, err = chan.create_raw(size_of(int), align_of(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(unbuffered)
+
+ buffered, err = chan.create_raw(size_of(int), align_of(int), 10, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(buffered)
+ }
+*/
create_raw :: proc{
create_raw_unbuffered,
create_raw_buffered,
}
+/*
+Creates an unbuffered `Raw_Chan` for messages of the specified
+size and alignment.
+
+*Allocates Using Provided Allocator*
+
+**Inputs**
+- `msg_size`: The size of the messages the messages being sent
+- `msg_alignment`: The alignment of the messages being sent
+- `allocator`: The allocator to use
+
+**Returns**:
+- The initialized `Raw_Chan`
+- An `Allocator_Error`
+
+Example:
+
+ import "core:sync/chan"
+ import "base:runtime"
+
+ create_raw_unbuffered_example :: proc() {
+ unbuffered, err := chan.create_raw(size_of(int), align_of(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(unbuffered)
+ }
+*/
@(require_results)
create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
assert(msg_size <= int(max(u16)))
@@ -80,6 +262,32 @@ create_raw_unbuffered :: proc(#any_int msg_size, msg_alignment: int, allocator:
return
}
+/*
+Creates a buffered `Raw_Chan` for messages of the specified
+size and alignment.
+
+*Allocates Using Provided Allocator*
+
+**Inputs**
+- `msg_size`: The size of the messages the messages being sent
+- `msg_alignment`: The alignment of the messages being sent
+- `cap`: The capacity of the channel
+- `allocator`: The allocator to use
+
+**Returns**:
+- The initialized `Raw_Chan`
+- An `Allocator_Error`
+
+Example:
+
+ import "core:sync/chan"
+
+ create_raw_unbuffered_example :: proc() {
+ c, err := chan.create_raw_buffered(size_of(int), align_of(int), 10, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+ }
+*/
@(require_results)
create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {
assert(msg_size <= int(max(u16)))
@@ -110,6 +318,16 @@ create_raw_buffered :: proc(#any_int msg_size, msg_alignment: int, #any_int cap:
return
}
+
+/*
+Destroys the Channel.
+
+**Inputs**
+- `c`: The channel to destroy
+
+**Returns**:
+- An `Allocator_Error`
+*/
destroy :: proc(c: ^Raw_Chan) -> (err: runtime.Allocator_Error) {
if c != nil {
allocator := c.allocator
@@ -118,22 +336,142 @@ destroy :: proc(c: ^Raw_Chan) -> (err: runtime.Allocator_Error) {
return
}
+/*
+Creates a version of a channel that can only be used for sending
+not receiving.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- An `Allocator_Error`
+
+Example:
+
+ import "core:sync/chan"
+
+ as_send_example :: proc() {
+ // this procedure takes a channel that can only
+ // be used for sending not receiving.
+ producer :: proc(c: chan.Chan(int, .Send)) {
+ chan.send(c, 112)
+
+ // compile-time error:
+ // value, ok := chan.recv(c)
+ }
+
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ producer(chan.as_send(c))
+ }
+*/
@(require_results)
as_send :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (s: Chan(T, .Send)) where C.D <= .Both {
return transmute(type_of(s))c
}
+
+/*
+Creates a version of a channel that can only be used for receiving
+not sending.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- An `Allocator_Error`
+
+Example:
+
+ import "core:sync/chan"
+
+ as_recv_example :: proc() {
+ consumer :: proc(c: chan.Chan(int, .Recv)) {
+ value, ok := chan.recv(c)
+
+ // compile-time error:
+ // chan.send(c, 22)
+ }
+
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ chan.send(c, 112)
+ consumer(chan.as_recv(c))
+ }
+*/
@(require_results)
as_recv :: #force_inline proc "contextless" (c: $C/Chan($T, $D)) -> (r: Chan(T, .Recv)) where C.D >= .Both {
return transmute(type_of(r))c
}
+/*
+Sends the specified message, blocking the current thread if:
+- the channel is unbuffered
+- the channel's buffer is full
+until the channel is being read from. `send` will return
+`false` when attempting to send on an already closed channel.
+
+**Inputs**
+- `c`: The channel
+- `data`: The message to send
+
+**Returns**
+- `true` if the message was sent, `false` when the channel was already closed
+
+Example:
+
+ import "core:sync/chan"
+
+ send_example :: proc() {
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ assert(chan.send(c, 2))
+ // this would block since the channel has a buffersize of 1
+ // assert(chan.send(c, 2))
+
+ // sending on a closed channel returns false
+ chan.close(c)
+ assert(! chan.send(c, 2))
+ }
+*/
send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both {
data := data
ok = send_raw(c, &data)
return
}
+/*
+Tries sending the specified message which is:
+- blocking: given the channel is unbuffered
+- non-blocking: given the channel is buffered
+
+**Inputs**
+- `c`: The channel
+- `data`: The message to send
+
+**Returns**
+- `true` if the message was sent, `false` when the channel was
+already closed or the channel's buffer was full
+
+Example:
+
+ import "core:sync/chan"
+
+ try_send_example :: proc() {
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ assert(chan.try_send(c, 2), "there is enough space")
+ assert(!chan.try_send(c, 2), "the buffer is already full")
+ }
+*/
@(require_results)
try_send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where C.D <= .Both {
data := data
@@ -141,6 +479,43 @@ try_send :: proc "contextless" (c: $C/Chan($T, $D), data: T) -> (ok: bool) where
return
}
+/*
+Reads a message from the channel, blocking the current thread if:
+- the channel is unbuffered
+- the channel's buffer is empty
+until the channel is being written to. `recv` will return
+`false` when attempting to receive a message on an already closed channel.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**
+- The message
+- `true` if a message was received, `false` when the channel was already closed
+
+Example:
+
+ import "core:sync/chan"
+
+ recv_example :: proc() {
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ assert(chan.send(c, 2))
+
+ value, ok := chan.recv(c)
+ assert(ok, "the value was received")
+
+ // this would block since the channel is now empty
+ // value, ok = chan.recv(c)
+
+ // reading from a closed channel returns false
+ chan.close(c)
+ value, ok = chan.recv(c)
+ assert(!ok, "the channel is closed")
+ }
+*/
@(require_results)
recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both {
ok = recv_raw(c, &data)
@@ -148,6 +523,29 @@ recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >=
}
+/*
+Tries reading a message from the channel in a non-blocking fashion.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**
+- The message
+- `true` if a message was received, `false` when the channel was already closed or no message was available
+
+Example:
+
+ import "core:sync/chan"
+
+ try_recv_example :: proc() {
+ c, err := chan.create(chan.Chan(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ _, ok := chan.try_recv(c)
+ assert(!ok, "there is not value to read")
+ }
+*/
@(require_results)
try_recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D >= .Both {
ok = try_recv_raw(c, &data)
@@ -155,6 +553,43 @@ try_recv :: proc "contextless" (c: $C/Chan($T)) -> (data: T, ok: bool) where C.D
}
+/*
+Sends the specified message, blocking the current thread if:
+- the channel is unbuffered
+- the channel's buffer is full
+until the channel is being read from. `send_raw` will return
+`false` when attempting to send on an already closed channel.
+
+Note: The message referenced by `msg_out` must match the size
+and alignment used when the `Raw_Chan` was created.
+
+**Inputs**
+- `c`: The channel
+- `msg_out`: Pointer to the data to send
+
+**Returns**
+- `true` if the message was sent, `false` when the channel was already closed
+
+Example:
+
+ import "core:sync/chan"
+
+ send_raw_example :: proc() {
+ c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ value := 2
+ assert(chan.send_raw(c, &value))
+
+ // this would block since the channel has a buffersize of 1
+ // assert(chan.send_raw(c, &value))
+
+ // sending on a closed channel returns false
+ chan.close(c)
+ assert(! chan.send_raw(c, &value))
+ }
+*/
@(require_results)
send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
if c == nil {
@@ -194,6 +629,45 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
return
}
+/*
+Reads a message from the channel, blocking the current thread if:
+- the channel is unbuffered
+- the channel's buffer is empty
+until the channel is being written to. `recv_raw` will return
+`false` when attempting to receive a message on an already closed channel.
+
+Note: The location pointed to by `msg_out` must match the size
+and alignment used when the `Raw_Chan` was created.
+
+**Inputs**
+- `c`: The channel
+- `msg_out`: Pointer to where the message should be stored
+
+**Returns**
+- `true` if a message was received, `false` when the channel was already closed
+
+Example:
+
+ import "core:sync/chan"
+
+ recv_raw_example :: proc() {
+ c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ value := 2
+ assert(chan.send_raw(c, &value))
+
+ assert(chan.recv_raw(c, &value))
+
+ // this would block since the channel is now empty
+ // assert(chan.recv_raw(c, &value))
+
+ // reading from a closed channel returns false
+ chan.close(c)
+ assert(! chan.recv_raw(c, &value))
+ }
+*/
@(require_results)
recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
if c == nil {
@@ -244,6 +718,36 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {
}
+/*
+Tries sending the specified message which is:
+- blocking: given the channel is unbuffered
+- non-blocking: given the channel is buffered
+
+Note: The message referenced by `msg_out` must match the size
+and alignment used when the `Raw_Chan` was created.
+
+**Inputs**
+- `c`: the channel
+- `msg_out`: pointer to the data to send
+
+**Returns**
+- `true` if the message was sent, `false` when the channel was
+already closed or the channel's buffer was full
+
+Example:
+
+ import "core:sync/chan"
+
+ try_send_raw_example :: proc() {
+ c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ value := 2
+ assert(chan.try_send_raw(c, &value), "there is enough space")
+ assert(!chan.try_send_raw(c, &value), "the buffer is already full")
+ }
+*/
@(require_results)
try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {
if c == nil {
@@ -281,6 +785,32 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool)
return
}
+/*
+Reads a message from the channel if one is available.
+
+Note: The location pointed to by `msg_out` must match the size
+and alignment used when the `Raw_Chan` was created.
+
+**Inputs**
+- `c`: The channel
+- `msg_out`: Pointer to where the message should be stored
+
+**Returns**
+- `true` if a message was received, `false` when the channel was already closed or no message was available
+
+Example:
+
+ import "core:sync/chan"
+
+ try_recv_raw_example :: proc() {
+ c, err := chan.create_raw(size_of(int), align_of(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ value: int
+ assert(!chan.try_recv_raw(c, &value))
+ }
+*/
@(require_results)
try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {
if c == nil {
@@ -319,16 +849,85 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {
+/*
+Checks if the given channel is buffered.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- `true` if the channel is buffered, `false` otherwise
+
+Example:
+
+ import "core:sync/chan"
+
+ is_buffered_example :: proc() {
+ c, _ := chan.create(chan.Chan(int), 1, context.allocator)
+ defer chan.destroy(c)
+ assert(chan.is_buffered(c))
+ }
+*/
@(require_results)
is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool {
return c != nil && c.queue != nil
}
+/*
+Checks if the given channel is unbuffered.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- `true` if the channel is unbuffered, `false` otherwise
+
+Example:
+
+ import "core:sync/chan"
+
+ is_buffered_example :: proc() {
+ c, _ := chan.create(chan.Chan(int), context.allocator)
+ defer chan.destroy(c)
+ assert(chan.is_unbuffered(c))
+ }
+*/
@(require_results)
is_unbuffered :: proc "contextless" (c: ^Raw_Chan) -> bool {
return c != nil && c.unbuffered_data != nil
}
+/*
+Returns the number of elements currently in the channel.
+
+Note: Unbuffered channels will always return `0`
+because they cannot hold elements.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- Number of elements
+
+Example:
+
+ import "core:sync/chan"
+ import "core:fmt"
+
+ len_example :: proc() {
+ c, _ := chan.create(chan.Chan(int), 2, context.allocator)
+ defer chan.destroy(c)
+
+ fmt.println(chan.len(c))
+ assert(chan.send(c, 1)) // add an element
+ fmt.println(chan.len(c))
+ }
+
+Output:
+
+ 0
+ 1
+*/
@(require_results)
len :: proc "contextless" (c: ^Raw_Chan) -> int {
if c != nil && c.queue != nil {
@@ -338,6 +937,34 @@ len :: proc "contextless" (c: ^Raw_Chan) -> int {
return 0
}
+/*
+Returns the number of elements the channel could hold.
+
+Note: Unbuffered channels will always return `0`
+because they cannot hold elements.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- Number of elements
+
+Example:
+
+ import "core:sync/chan"
+ import "core:fmt"
+
+ cap_example :: proc() {
+ c, _ := chan.create(chan.Chan(int), 2, context.allocator)
+ defer chan.destroy(c)
+
+ fmt.println(chan.cap(c))
+ }
+
+Output:
+
+ 2
+*/
@(require_results)
cap :: proc "contextless" (c: ^Raw_Chan) -> int {
if c != nil && c.queue != nil {
@@ -347,6 +974,36 @@ cap :: proc "contextless" (c: ^Raw_Chan) -> int {
return 0
}
+/*
+Closes the channel, preventing new messages from being added.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- `true` if the channel was closed by this operation, `false` if it was already closed
+
+Example:
+
+ import "core:sync/chan"
+
+ close_example :: proc() {
+ c, _ := chan.create(chan.Chan(int), 2, context.allocator)
+ defer chan.destroy(c)
+
+ // Sending a message to an open channel
+ assert(chan.send(c, 1), "allowed to send")
+
+ // Closing the channel successfully
+ assert(chan.close(c), "successfully closed")
+
+ // Trying to send a message after the channel is closed (should fail)
+ assert(!chan.send(c, 1), "not allowed to send after close")
+
+ // Trying to close the channel again (should fail since it's already closed)
+ assert(!chan.close(c), "was already closed")
+ }
+*/
close :: proc "contextless" (c: ^Raw_Chan) -> bool {
if c == nil {
return false
@@ -361,6 +1018,15 @@ close :: proc "contextless" (c: ^Raw_Chan) -> bool {
return true
}
+/*
+Returns if the channel is closed or not
+
+**Inputs**
+- `c`: The channel
+
+**Returns**:
+- `true` if the channel is closed, `false` otherwise
+*/
@(require_results)
is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
if c == nil {
@@ -372,7 +1038,14 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
+/*
+`Raw_Queue` is a non-thread-safe queue implementation designed to store messages
+of fixed size and alignment.
+Note: For most use cases, it is recommended to use `core:container/queue` instead,
+as `Raw_Queue` is used internally by `Raw_Chan` and may not provide the desired
+level of convenience for typical applications.
+*/
Raw_Queue :: struct {
data: [^]byte,
len: int,
@@ -381,6 +1054,27 @@ Raw_Queue :: struct {
size: int, // element size
}
+/*
+Initializes a `Raw_Queue`
+
+**Inputs**
+- `q`: A pointert to the `Raw_Queue` to initialize
+- `data`: The pointer to backing slice storing the messages
+- `cap`: The capacity of the queue
+- `size`: The size of a message
+
+Example:
+
+ import "core:sync/chan"
+
+ raw_queue_init_example :: proc() {
+ // use a stack allocated array as backing storage
+ storage: [100]int
+
+ rq: chan.Raw_Queue
+ chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int))
+ }
+*/
raw_queue_init :: proc "contextless" (q: ^Raw_Queue, data: rawptr, cap: int, size: int) {
q.data = ([^]byte)(data)
q.len = 0
@@ -389,7 +1083,32 @@ raw_queue_init :: proc "contextless" (q: ^Raw_Queue, data: rawptr, cap: int, siz
q.size = size
}
+/*
+Add an element to the queue.
+
+Note: The message referenced by `data` must match the size
+and alignment used when the `Raw_Queue` was initialized.
+**Inputs**
+- `q`: A pointert to the `Raw_Queue`
+- `data`: The pointer to message to add
+
+**Returns**
+- `true` if the element was added, `false` when the queue is already full
+
+Example:
+
+ import "core:sync/chan"
+
+ raw_queue_push_example :: proc() {
+ storage: [100]int
+ rq: chan.Raw_Queue
+ chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int))
+
+ value := 2
+ assert(chan.raw_queue_push(&rq, &value), "there was enough space")
+ }
+*/
@(require_results)
raw_queue_push :: proc "contextless" (q: ^Raw_Queue, data: rawptr) -> bool {
if q.len == q.cap {
@@ -406,6 +1125,37 @@ raw_queue_push :: proc "contextless" (q: ^Raw_Queue, data: rawptr) -> bool {
return true
}
+/*
+Removes and returns the first element of the queue.
+
+Note: The returned element is only guaranteed to be valid until the next
+`raw_queue_push` operation. Accessing it after that point may result in
+undefined behavior.
+
+**Inputs**
+- `c`: A pointer to the `Raw_Queue`.
+
+**Returns**
+- A pointer to the first element in the queue, or `nil` if the queue is empty.
+
+Example:
+
+ import "core:sync/chan"
+
+ raw_queue_pop_example :: proc() {
+ storage: [100]int
+ rq: chan.Raw_Queue
+ chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int))
+
+ assert(chan.raw_queue_pop(&rq) == nil, "queue was empty")
+
+ // add an element to the queue
+ value := 2
+ assert(chan.raw_queue_push(&rq, &value), "there was enough space")
+
+ assert((cast(^int)chan.raw_queue_pop(&rq))^ == 2, "retrieved the element")
+ }
+*/
@(require_results)
raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) {
if q.len > 0 {
@@ -419,7 +1169,30 @@ raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) {
return
}
+/*
+Returns whether a message is ready to be read, i.e.,
+if a call to `recv` or `recv_raw` would block
+
+**Inputs**
+- `c`: The channel
+
+**Returns**
+- `true` if a message can be read, `false` otherwise
+
+Example:
+ import "core:sync/chan"
+
+ can_recv_example :: proc() {
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ assert(!chan.can_recv(c), "the cannel is empty")
+ assert(chan.send(c, 2))
+ assert(chan.can_recv(c), "there is message to read")
+ }
+*/
@(require_results)
can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {
sync.guard(&c.mutex)
@@ -430,6 +1203,31 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {
}
+/*
+Returns whether a message can be sent without blocking the current
+thread. Specifically, it checks if the channel is buffered and not full,
+or if there is already a reader waiting for a message.
+
+**Inputs**
+- `c`: The channel
+
+**Returns**
+- `true` if a message can be send, `false` otherwise
+
+Example:
+
+ import "core:sync/chan"
+
+ can_send_example :: proc() {
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ assert(chan.can_send(c), "the channel's buffer is not full")
+ assert(chan.send(c, 2))
+ assert(!chan.can_send(c), "the channel's buffer is full")
+ }
+*/
@(require_results)
can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {
sync.guard(&c.mutex)
@@ -440,7 +1238,69 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {
}
+/*
+Attempts to either send or receive messages on the specified channels.
+
+`select_raw` first identifies which channels have messages ready to be received
+and which are available for sending. It then randomly selects one operation
+(either a send or receive) to perform.
+
+Note: Each message in `send_msgs` corresponds to the send channel at the same index in `sends`.
+
+**Inputs**
+- `recv`: A slice of channels to read from
+- `sends`: A slice of channels to send messages on
+- `send_msgs`: A slice of messages to send
+- `recv_out`: A pointer to the location where, when receiving, the message should be stored
+
+**Returns**
+- Position of the available channel which was used for receiving or sending
+- `true` if sending/receiving was successfull, `false` if the channel was closed or no channel was available
+
+Example:
+
+ import "core:sync/chan"
+ import "core:fmt"
+
+ select_raw_example :: proc() {
+ c, err := chan.create(chan.Chan(int), 1, context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ // sending value '1' on the channel
+ value1 := 1
+ msgs := [?]rawptr{&value1}
+ send_chans := [?]^chan.Raw_Chan{c}
+
+ // for simplicity the same channel used for sending is also used for receiving
+ receive_chans := [?]^chan.Raw_Chan{c}
+ // where the value from the read should be stored
+ received_value: int
+
+ idx, ok := chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value)
+ fmt.println("SELECT: ", idx, ok)
+ fmt.println("RECEIVED VALUE ", received_value)
+
+ idx, ok = chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value)
+ fmt.println("SELECT: ", idx, ok)
+ fmt.println("RECEIVED VALUE ", received_value)
+
+ // closing of a channel also affects the select operation
+ chan.close(c)
+
+ idx, ok = chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value)
+ fmt.println("SELECT: ", idx, ok)
+ }
+
+Output:
+
+ SELECT: 0 true
+ RECEIVED VALUE 0
+ SELECT: 0 true
+ RECEIVED VALUE 1
+ SELECT: 0 false
+*/
@(require_results)
select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, ok: bool) #no_bounds_check {
Select_Op :: struct {
diff --git a/core/sync/chan/doc.odin b/core/sync/chan/doc.odin
new file mode 100644
index 000000000..9cc624477
--- /dev/null
+++ b/core/sync/chan/doc.odin
@@ -0,0 +1,68 @@
+/*
+This package provides both high-level and low-level channel types
+for thread-safe communication.
+
+While channels are essentially thread-safe queues under the hood,
+their primary purpose is to facilitate safe communication between
+multiple readers and multiple writers.
+Although they can be used like queues, channels are designed with
+synchronization and concurrent messaging patterns in mind.
+
+Provided types:
+- `Chan` a high level channel
+- `Raw_Chan` a low level channel
+- `Raw_Queue` a low level non-threadsafe queue implementation used internally
+
+Example:
+
+ import "core:sync/chan"
+ import "core:fmt"
+
+ // The consumer reads from the channel until it's closed.
+ // Closing the channel acts as a signal to stop.
+ consumer :: proc(recv_chan: chan.Chan(int, .Recv)) {
+ for {
+ value, ok := chan.recv(recv_chan)
+ if !ok {
+ break // More idiomatic than return here
+ }
+ fmt.println("[CONSUMER] Received:", value)
+ }
+ fmt.println("[CONSUMER] Channel closed, stopping.")
+ }
+
+ // The producer sends `count` number of messages.
+ producer :: proc(send_chan: chan.Chan(int, .Send), count: int) {
+ for i in 0..<count {
+ fmt.println("[PRODUCER] Sending:", i)
+ success := chan.send(send_chan, i)
+ if !success {
+ fmt.println("[PRODUCER] Failed to send, channel may be closed.")
+ return
+ }
+ }
+
+ // Signal that production is complete by closing the channel.
+ chan.close(send_chan)
+ fmt.println("[PRODUCER] Done producing, channel closed.")
+ }
+
+ chan_example :: proc() {
+ // Create an unbuffered channel for int messages
+ c, err := chan.create(chan.Chan(int), context.allocator)
+ assert(err == .None)
+ defer chan.destroy(c)
+
+ // Start the consumer thread
+ consumer_thread := thread.create_and_start_with_poly_data(chan.as_recv(c), consumer)
+ defer thread.destroy(consumer_thread)
+
+ // Start the producer thread with 5 messages (change count as needed)
+ producer_thread := thread.create_and_start_with_poly_data2(chan.as_send(c), 5, producer)
+ defer thread.destroy(producer_thread)
+
+ // Wait for both threads to complete
+ thread.join_multiple(consumer_thread, producer_thread)
+ }
+*/
+package sync_chan