aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/nbio/impl.odin61
-rw-r--r--core/nbio/impl_linux.odin80
-rw-r--r--core/nbio/impl_posix.odin19
-rw-r--r--core/nbio/impl_windows.odin22
-rw-r--r--core/nbio/ops.odin38
-rw-r--r--tests/core/nbio/net.odin72
6 files changed, 219 insertions, 73 deletions
diff --git a/core/nbio/impl.odin b/core/nbio/impl.odin
index 476b8ab43..18bb7cfa9 100644
--- a/core/nbio/impl.odin
+++ b/core/nbio/impl.odin
@@ -6,9 +6,10 @@ import "base:intrinsics"
import "core:container/pool"
import "core:net"
+import "core:reflect"
+import "core:slice"
import "core:strings"
import "core:time"
-import "core:reflect"
@(init, private)
init_thread_local_cleaner :: proc "contextless" () {
@@ -238,6 +239,64 @@ warn :: proc(text: string, location := #caller_location) {
context.logger.procedure(context.logger.data, .Warning, text, context.logger.options, location)
}
+/*
+In order to:
+1. Not require the caller to allocate their buffers (`op.send.bufs` and `op.recv.bufs` can be stack allocated)
+2. Have `op.send.bufs` and `op.recv.bufs` be valid and the same content in the callback as when called
+3. Be able to facilitate the `all` option, which requires mutating the slices (advancing them)
+4. Constraint single send/recv syscalls to MAX_RW bytes
+
+We need to copy the input buffers twice, once for a stable copy returned to the user,
+and one for the working copy that we mutate with `all` set.
+*/
+
+Bufs :: struct {
+ backing: [1][]byte,
+ working: struct #raw_union {
+ small: [1][]byte,
+ big: [][]byte,
+ },
+}
+
+bufs_init :: proc(bufs: ^Bufs, orig: ^[][]byte, allocator: runtime.Allocator) -> runtime.Allocator_Error {
+ if len(orig) > 1 {
+ backing := make([][]byte, len(orig)*2, allocator) or_return
+ bufs.working.big = backing[len(orig):]
+ copy(bufs.working.big, orig^)
+ copy(backing, orig^)
+ orig^ = backing[:len(orig)]
+ return nil
+ }
+
+ bufs.backing = {orig[0]}
+ orig^ = bufs.backing[:]
+ return nil
+}
+
+bufs_delete :: proc(bufs: ^Bufs, orig: [][]byte, allocator: runtime.Allocator) {
+ if len(orig) > 1 {
+ backing := raw_data(orig)[:len(orig)*2]
+ delete(backing, allocator)
+ }
+}
+
+@(require_results)
+bufs_to_process :: proc(bufs: ^Bufs, orig: [][]byte, processed: int) -> (working: [][]byte, total: int) {
+ if len(orig) > 1 {
+ // Reset to length and contents of backing, so a previous modification is removed.
+ (^runtime.Raw_Slice)(&bufs.working.big).len = len(orig)
+ copy(bufs.working.big, orig)
+ working = bufs.working.big
+ } else {
+ bufs.working.small = {orig[0]}
+ working = bufs.working.small[:]
+ }
+
+ working = slice.advance_slices(working, processed)
+ working, total = constraint_bufs_to_max_rw(working)
+ return
+}
+
@(require_results)
constraint_bufs_to_max_rw :: proc(bufs: [][]byte) -> (constrained: [][]byte, total: int) {
for buf in bufs {
diff --git a/core/nbio/impl_linux.odin b/core/nbio/impl_linux.odin
index 47d47d77c..f4b200830 100644
--- a/core/nbio/impl_linux.odin
+++ b/core/nbio/impl_linux.odin
@@ -7,7 +7,6 @@ import "core:container/pool"
import "core:container/queue"
import "core:mem"
import "core:net"
-import "core:slice"
import "core:strings"
import "core:sys/linux"
import "core:sys/linux/uring"
@@ -62,18 +61,27 @@ _Read :: struct {}
@(private="package")
_Write :: struct {}
+Sock_Addr_Ip :: struct #raw_union {
+ using _: struct {
+ family: linux.Address_Family,
+ port: u16be,
+ },
+ using ipv4: linux.Sock_Addr_In,
+ using ipv6: linux.Sock_Addr_In6,
+}
+
@(private="package")
_Send :: struct {
- endpoint: linux.Sock_Addr_Any,
- msghdr: linux.Msg_Hdr,
- small_bufs: [1][]byte,
+ endpoint: Sock_Addr_Ip,
+ msghdr: linux.Msg_Hdr,
+ bufs: Bufs,
}
@(private="package")
_Recv :: struct {
- addr_out: linux.Sock_Addr_Any,
- msghdr: linux.Msg_Hdr,
- small_bufs: [1][]byte,
+ addr_out: Sock_Addr_Ip,
+ msghdr: linux.Msg_Hdr,
+ bufs: Bufs,
}
@(private="package")
@@ -506,13 +514,13 @@ handle_completed :: proc(op: ^Operation, res: i32) {
case .Send:
if !send_callback(op, res) { return }
maybe_callback(op)
- if len(op.send.bufs) > 1 { delete(op.send.bufs, op.l.allocator) }
+ bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator)
cleanup(op)
return
case .Recv:
if !recv_callback(op, res) { return }
maybe_callback(op)
- if len(op.recv.bufs) > 1 { delete(op.recv.bufs, op.l.allocator) }
+ bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator)
cleanup(op)
return
case .Open:
@@ -678,7 +686,7 @@ accept_callback :: proc(op: ^Operation, res: i32) {
op.accept.client = TCP_Socket(res)
// net.set_blocking(net.TCP_Socket(op.accept.client), false)
- op.accept.client_endpoint = sockaddr_storage_to_endpoint(&op.accept._impl.sockaddr)
+ op.accept.client_endpoint = sockaddr_storage_to_endpoint_any(&op.accept._impl.sockaddr)
}
dial_exec :: proc(op: ^Operation) {
@@ -698,7 +706,7 @@ dial_exec :: proc(op: ^Operation) {
}
op.dial.socket = sock.(TCP_Socket)
- op.dial._impl.sockaddr = endpoint_to_sockaddr(op.dial.endpoint)
+ op.dial._impl.sockaddr = endpoint_to_sockaddr_any(op.dial.endpoint)
}
enqueue(op, uring.connect(
@@ -786,8 +794,7 @@ recv_exec :: proc(op: ^Operation) {
return
}
- bufs := slice.advance_slices(op.recv.bufs, op.recv.received)
- bufs, _ = constraint_bufs_to_max_rw(bufs)
+ bufs, _ := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received)
op.recv._impl.msghdr.iov = transmute([]linux.IO_Vec)bufs
sock: linux.Fd
@@ -858,7 +865,7 @@ recv_callback :: proc(op: ^Operation, res: i32) -> bool {
}
case UDP_Socket:
- op.recv.source = sockaddr_storage_to_endpoint(&op.recv._impl.addr_out)
+ op.recv.source = sockaddr_storage_to_endpoint_ip(&op.recv._impl.addr_out)
}
return true
@@ -872,8 +879,7 @@ send_exec :: proc(op: ^Operation) {
return
}
- bufs := slice.advance_slices(op.send.bufs, op.send.sent)
- bufs, _ = constraint_bufs_to_max_rw(bufs)
+ bufs, _ := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent)
op.send._impl.msghdr.iov = transmute([]linux.IO_Vec)bufs
sock: linux.Fd
@@ -882,7 +888,7 @@ send_exec :: proc(op: ^Operation) {
sock = linux.Fd(socket)
case UDP_Socket:
sock = linux.Fd(socket)
- op.send._impl.endpoint = endpoint_to_sockaddr(op.send.endpoint)
+ op.send._impl.endpoint = endpoint_to_sockaddr_ip(op.send.endpoint)
op.send._impl.msghdr.name = &op.send._impl.endpoint
op.send._impl.msghdr.namelen = size_of(op.send._impl.endpoint)
}
@@ -1378,7 +1384,25 @@ stat_callback :: proc(op: ^Operation, res: i32) {
}
@(require_results)
-sockaddr_storage_to_endpoint :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoint) {
+sockaddr_storage_to_endpoint_ip :: proc(addr: ^Sock_Addr_Ip) -> (ep: Endpoint) {
+ #partial switch addr.family {
+ case .INET:
+ return Endpoint {
+ address = IP4_Address(addr.sin_addr),
+ port = int(addr.sin_port),
+ }
+ case .INET6:
+ return Endpoint {
+ address = IP6_Address(transmute([8]u16be)addr.sin6_addr),
+ port = int(addr.sin6_port),
+ }
+ case:
+ return {}
+ }
+}
+
+@(require_results)
+sockaddr_storage_to_endpoint_any :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoint) {
#partial switch addr.family {
case .INET:
return Endpoint {
@@ -1396,7 +1420,25 @@ sockaddr_storage_to_endpoint :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoin
}
@(require_results)
-endpoint_to_sockaddr :: proc(ep: Endpoint) -> (sockaddr: linux.Sock_Addr_Any) {
+endpoint_to_sockaddr_ip :: proc(ep: Endpoint) -> (sockaddr: Sock_Addr_Ip) {
+ switch a in ep.address {
+ case IP4_Address:
+ sockaddr.sin_family = .INET
+ sockaddr.sin_port = u16be(ep.port)
+ sockaddr.sin_addr = cast([4]u8)a
+ return
+ case IP6_Address:
+ sockaddr.sin6_family = .INET6
+ sockaddr.sin6_port = u16be(ep.port)
+ sockaddr.sin6_addr = transmute([16]u8)a
+ return
+ }
+
+ unreachable()
+}
+
+@(require_results)
+endpoint_to_sockaddr_any :: proc(ep: Endpoint) -> (sockaddr: linux.Sock_Addr_Any) {
switch a in ep.address {
case IP4_Address:
sockaddr.sin_family = .INET
diff --git a/core/nbio/impl_posix.odin b/core/nbio/impl_posix.odin
index 9b4863710..f76cf2849 100644
--- a/core/nbio/impl_posix.odin
+++ b/core/nbio/impl_posix.odin
@@ -7,7 +7,6 @@ import "core:container/pool"
import "core:container/queue"
import "core:mem"
import "core:net"
-import "core:slice"
import "core:strings"
import "core:sys/posix"
import "core:time"
@@ -64,12 +63,12 @@ _Dial :: struct {}
@(private="package")
_Recv :: struct {
- small_bufs: [1][]byte,
+ bufs: Bufs,
}
@(private="package")
_Send :: struct {
- small_bufs: [1][]byte,
+ bufs: Bufs,
}
@(private="package")
@@ -575,14 +574,14 @@ handle_completed :: proc(op: ^Operation) {
case .Send:
if send_exec(op) == .Done {
maybe_callback(op)
- bufs_destroy(op.send.bufs, op.l.allocator)
+ bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator)
cleanup(op)
}
return
case .Recv:
if recv_exec(op) == .Done {
maybe_callback(op)
- bufs_destroy(op.recv.bufs, op.l.allocator)
+ bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator)
cleanup(op)
}
return
@@ -778,10 +777,7 @@ send_exec :: proc(op: ^Operation) -> Op_Result {
return .Done
}
- total: int
- bufs := slice.advance_slices(op.send.bufs, op.send.sent)
- bufs, total = constraint_bufs_to_max_rw(op.send.bufs)
-
+ bufs, total := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent)
sock, n := sendv(op.send.socket, bufs, op.send.endpoint)
if n < 0 {
if posix.errno() == .EWOULDBLOCK {
@@ -840,10 +836,7 @@ recv_exec :: proc(op: ^Operation) -> Op_Result {
return .Done
}
- total: int
- bufs := slice.advance_slices(op.recv.bufs, op.recv.received)
- bufs, total = constraint_bufs_to_max_rw(op.recv.bufs)
-
+ bufs, total := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received)
_, is_tcp := op.recv.socket.(net.TCP_Socket)
sock, n := recvv(op.recv.socket, bufs, &op.recv.source)
diff --git a/core/nbio/impl_windows.odin b/core/nbio/impl_windows.odin
index f75193fe4..adf2d5351 100644
--- a/core/nbio/impl_windows.odin
+++ b/core/nbio/impl_windows.odin
@@ -69,15 +69,15 @@ _Write :: struct {}
@(private="package")
_Send :: struct {
- small_bufs: [1][]byte,
+ bufs: Bufs,
}
@(private="package")
_Recv :: struct {
source: win.SOCKADDR_STORAGE_LH,
source_len: win.INT,
- small_bufs: [1][]byte,
flags: win.DWORD,
+ bufs: Bufs,
}
@(private="package")
@@ -320,9 +320,7 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
result = recv_callback(op)
if result == .Done {
maybe_callback(op)
- if len(op.recv.bufs) > 1 {
- delete(op.recv.bufs, op.l.allocator)
- }
+ bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator)
cleanup(op)
return
}
@@ -332,9 +330,7 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
result = send_callback(op)
if result == .Done {
maybe_callback(op)
- if len(op.send.bufs) > 1 {
- delete(op.send.bufs, op.l.allocator)
- }
+ bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator)
cleanup(op)
return
}
@@ -1213,9 +1209,7 @@ recv_exec :: proc(op: ^Operation) -> Op_Result {
return .Done
}
- bufs := slice.advance_slices(op.recv.bufs, op.recv.received)
- bufs, _ = constraint_bufs_to_max_rw(op.recv.bufs)
-
+ bufs, _ := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received)
win_bufs := ([^]win.WSABUF)(intrinsics.alloca(size_of(win.WSABUF) * len(bufs), align_of(win.WSABUF)))
for buf, i in bufs {
assert(i64(len(buf)) < i64(max(u32)))
@@ -1333,9 +1327,7 @@ send_exec :: proc(op: ^Operation) -> Op_Result {
return .Done
}
- bufs := slice.advance_slices(op.send.bufs, op.send.sent)
- bufs, _ = constraint_bufs_to_max_rw(op.send.bufs)
-
+ bufs, _ := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent)
win_bufs := ([^]win.WSABUF)(intrinsics.alloca(size_of(win.WSABUF) * len(bufs), align_of(win.WSABUF)))
for buf, i in bufs {
assert(i64(len(buf)) < i64(max(u32)))
@@ -1853,4 +1845,4 @@ timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering {
assert(a == b)
return .Equal
}
-} \ No newline at end of file
+}
diff --git a/core/nbio/ops.odin b/core/nbio/ops.odin
index e028c4a76..382dca747 100644
--- a/core/nbio/ops.odin
+++ b/core/nbio/ops.odin
@@ -5,7 +5,6 @@ import "base:intrinsics"
import "core:container/pool"
import "core:time"
import "core:slice"
-import "core:mem"
NO_TIMEOUT: time.Duration: -1
@@ -480,7 +479,7 @@ Recv :: struct {
// The socket to receive from.
socket: Any_Socket,
// The buffers to receive data into.
- // The outer slice is copied internally, but the backing arrays must remain alive.
+ // The outer slice is copied internally, but the backing data must remain alive.
// It is safe to access `bufs` during the callback.
bufs: [][]byte,
// If true, the operation waits until all buffers are filled (TCP only).
@@ -546,17 +545,11 @@ prep_recv :: #force_inline proc(
op.recv.expires = time.time_add(now(), timeout)
}
- if len(op.recv.bufs) == 1 {
- op.recv._impl.small_bufs = {op.recv.bufs[0]}
- op.recv.bufs = op.recv._impl.small_bufs[:]
- } else {
- err: mem.Allocator_Error
- if op.recv.bufs, err = slice.clone(op.recv.bufs, op.l.allocator); err != nil {
- switch _ in op.recv.socket {
- case TCP_Socket: op.recv.err = TCP_Recv_Error.Insufficient_Resources
- case UDP_Socket: op.recv.err = UDP_Recv_Error.Insufficient_Resources
- case: unreachable()
- }
+ if err := bufs_init(&op.recv._impl.bufs, &op.recv.bufs, op.l.allocator); err != nil {
+ switch _ in op.recv.socket {
+ case TCP_Socket: op.recv.err = TCP_Recv_Error.Insufficient_Resources
+ case UDP_Socket: op.recv.err = UDP_Recv_Error.Insufficient_Resources
+ case: unreachable()
}
}
@@ -714,7 +707,8 @@ Send :: struct {
// The socket to send to.
socket: Any_Socket,
// The buffers to send.
- // The outer slice is copied internally, but the backing arrays must remain alive.
+ // The outer slice is copied internally, but the backing data must remain alive.
+ // It is safe to access `bufs` during the callback.
bufs: [][]byte `fmt:"-"`,
// The destination endpoint to send to (UDP only).
endpoint: Endpoint,
@@ -773,17 +767,11 @@ prep_send :: proc(
op.send.expires = time.time_add(now(), timeout)
}
- if len(op.send.bufs) == 1 {
- op.send._impl.small_bufs = {op.send.bufs[0]}
- op.send.bufs = op.send._impl.small_bufs[:]
- } else {
- err: mem.Allocator_Error
- if op.send.bufs, err = slice.clone(op.send.bufs, op.l.allocator); err != nil {
- switch _ in op.send.socket {
- case TCP_Socket: op.send.err = TCP_Send_Error.Insufficient_Resources
- case UDP_Socket: op.send.err = UDP_Send_Error.Insufficient_Resources
- case: unreachable()
- }
+ if err := bufs_init(&op.send._impl.bufs, &op.send.bufs, op.l.allocator); err != nil {
+ switch _ in op.send.socket {
+ case TCP_Socket: op.send.err = TCP_Send_Error.Insufficient_Resources
+ case UDP_Socket: op.send.err = UDP_Send_Error.Insufficient_Resources
+ case: unreachable()
}
}
diff --git a/tests/core/nbio/net.odin b/tests/core/nbio/net.odin
index 688ee0b45..77e44f73b 100644
--- a/tests/core/nbio/net.odin
+++ b/tests/core/nbio/net.odin
@@ -398,3 +398,75 @@ sendfile :: proc(t: ^testing.T) {
ev(t, nbio.run(), nil)
}
}
+
+@(test)
+vectored :: proc(t: ^testing.T) {
+ if event_loop_guard(t) {
+ testing.set_fail_timeout(t, time.Minute)
+
+ sock, ep := open_next_available_local_port(t)
+
+ to_send := [?][]byte{
+ {'H', 'e', 'l', 'l'},
+ {'o', 'p', 'e'},
+ {'!'},
+ }
+
+ to_recv := [?][]byte{
+ {0, 0, 0, 0},
+ {0, 0, 0},
+ {0},
+ }
+
+ // Server
+ {
+ nbio.accept_poly2(sock, t, &to_send, on_accept)
+
+ on_accept :: proc(op: ^nbio.Operation, t: ^testing.T, to_send: ^[3][]byte) {
+ ev(t, op.accept.err, nil)
+ e(t, op.accept.client != 0)
+ to_send_copy := to_send^[:]
+ nbio.send_poly3(op.accept.client, to_send_copy, t, op.accept.socket, to_send, on_send)
+ }
+
+ on_send :: proc(op: ^nbio.Operation, t: ^testing.T, server: net.TCP_Socket, to_send: ^[3][]byte) {
+ ev(t, op.send.err, nil)
+ ev(t, op.send.sent, 8)
+
+ expected := to_send^
+ for buf, i in expected {
+ ev(t, string(op.send.bufs[i]), string(buf))
+ }
+
+ nbio.close(op.send.socket.(net.TCP_Socket))
+ nbio.close(server)
+ }
+ }
+
+ // Client
+ {
+ nbio.dial_poly3(ep, t, &to_recv, &to_send, on_dial)
+
+ on_dial :: proc(op: ^nbio.Operation, t: ^testing.T, to_recv: ^[3][]byte, expected: ^[3][]byte) {
+ ev(t, op.dial.err, nil)
+
+ to_recv_copy := to_recv^[:]
+ nbio.recv_poly2(op.dial.socket, to_recv_copy, t, expected, on_recv, all=true)
+ }
+
+ on_recv :: proc(op: ^nbio.Operation, t: ^testing.T, expected: ^[3][]byte) {
+ ev(t, op.recv.err, nil)
+ ev(t, op.recv.received, 8)
+
+ expected := expected^[:]
+ for buf, i in expected {
+ ev(t, string(op.recv.bufs[i]), string(buf))
+ }
+
+ nbio.close(op.recv.socket.(net.TCP_Socket))
+ }
+ }
+
+ ev(t, nbio.run(), nil)
+ }
+}