From 0a05ff05a391673b7f9d095d76485706ba50e0bb Mon Sep 17 00:00:00 2001 From: Laytan Laats Date: Fri, 30 Jan 2026 23:20:31 +0100 Subject: nbio: fix send/recv buffer logic --- core/nbio/impl.odin | 61 +++++++++++++++++++++++++++++++++- core/nbio/impl_linux.odin | 80 ++++++++++++++++++++++++++++++++++----------- core/nbio/impl_posix.odin | 19 ++++------- core/nbio/impl_windows.odin | 22 ++++--------- core/nbio/ops.odin | 38 ++++++++------------- tests/core/nbio/net.odin | 72 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 219 insertions(+), 73 deletions(-) diff --git a/core/nbio/impl.odin b/core/nbio/impl.odin index 476b8ab43..18bb7cfa9 100644 --- a/core/nbio/impl.odin +++ b/core/nbio/impl.odin @@ -6,9 +6,10 @@ import "base:intrinsics" import "core:container/pool" import "core:net" +import "core:reflect" +import "core:slice" import "core:strings" import "core:time" -import "core:reflect" @(init, private) init_thread_local_cleaner :: proc "contextless" () { @@ -238,6 +239,64 @@ warn :: proc(text: string, location := #caller_location) { context.logger.procedure(context.logger.data, .Warning, text, context.logger.options, location) } +/* +In order to: +1. Not require the caller to allocate their buffers (`op.send.bufs` and `op.recv.bufs` can be stack allocated) +2. Have `op.send.bufs` and `op.recv.bufs` be valid and the same content in the callback as when called +3. Be able to facilitate the `all` option, which requires mutating the slices (advancing them) +4. Constraint single send/recv syscalls to MAX_RW bytes + +We need to copy the input buffers twice, once for a stable copy returned to the user, +and one for the working copy that we mutate with `all` set. +*/ + +Bufs :: struct { + backing: [1][]byte, + working: struct #raw_union { + small: [1][]byte, + big: [][]byte, + }, +} + +bufs_init :: proc(bufs: ^Bufs, orig: ^[][]byte, allocator: runtime.Allocator) -> runtime.Allocator_Error { + if len(orig) > 1 { + backing := make([][]byte, len(orig)*2, allocator) or_return + bufs.working.big = backing[len(orig):] + copy(bufs.working.big, orig^) + copy(backing, orig^) + orig^ = backing[:len(orig)] + return nil + } + + bufs.backing = {orig[0]} + orig^ = bufs.backing[:] + return nil +} + +bufs_delete :: proc(bufs: ^Bufs, orig: [][]byte, allocator: runtime.Allocator) { + if len(orig) > 1 { + backing := raw_data(orig)[:len(orig)*2] + delete(backing, allocator) + } +} + +@(require_results) +bufs_to_process :: proc(bufs: ^Bufs, orig: [][]byte, processed: int) -> (working: [][]byte, total: int) { + if len(orig) > 1 { + // Reset to length and contents of backing, so a previous modification is removed. + (^runtime.Raw_Slice)(&bufs.working.big).len = len(orig) + copy(bufs.working.big, orig) + working = bufs.working.big + } else { + bufs.working.small = {orig[0]} + working = bufs.working.small[:] + } + + working = slice.advance_slices(working, processed) + working, total = constraint_bufs_to_max_rw(working) + return +} + @(require_results) constraint_bufs_to_max_rw :: proc(bufs: [][]byte) -> (constrained: [][]byte, total: int) { for buf in bufs { diff --git a/core/nbio/impl_linux.odin b/core/nbio/impl_linux.odin index 47d47d77c..f4b200830 100644 --- a/core/nbio/impl_linux.odin +++ b/core/nbio/impl_linux.odin @@ -7,7 +7,6 @@ import "core:container/pool" import "core:container/queue" import "core:mem" import "core:net" -import "core:slice" import "core:strings" import "core:sys/linux" import "core:sys/linux/uring" @@ -62,18 +61,27 @@ _Read :: struct {} @(private="package") _Write :: struct {} +Sock_Addr_Ip :: struct #raw_union { + using _: struct { + family: linux.Address_Family, + port: u16be, + }, + using ipv4: linux.Sock_Addr_In, + using ipv6: linux.Sock_Addr_In6, +} + @(private="package") _Send :: struct { - endpoint: linux.Sock_Addr_Any, - msghdr: linux.Msg_Hdr, - small_bufs: [1][]byte, + endpoint: Sock_Addr_Ip, + msghdr: linux.Msg_Hdr, + bufs: Bufs, } @(private="package") _Recv :: struct { - addr_out: linux.Sock_Addr_Any, - msghdr: linux.Msg_Hdr, - small_bufs: [1][]byte, + addr_out: Sock_Addr_Ip, + msghdr: linux.Msg_Hdr, + bufs: Bufs, } @(private="package") @@ -506,13 +514,13 @@ handle_completed :: proc(op: ^Operation, res: i32) { case .Send: if !send_callback(op, res) { return } maybe_callback(op) - if len(op.send.bufs) > 1 { delete(op.send.bufs, op.l.allocator) } + bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator) cleanup(op) return case .Recv: if !recv_callback(op, res) { return } maybe_callback(op) - if len(op.recv.bufs) > 1 { delete(op.recv.bufs, op.l.allocator) } + bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator) cleanup(op) return case .Open: @@ -678,7 +686,7 @@ accept_callback :: proc(op: ^Operation, res: i32) { op.accept.client = TCP_Socket(res) // net.set_blocking(net.TCP_Socket(op.accept.client), false) - op.accept.client_endpoint = sockaddr_storage_to_endpoint(&op.accept._impl.sockaddr) + op.accept.client_endpoint = sockaddr_storage_to_endpoint_any(&op.accept._impl.sockaddr) } dial_exec :: proc(op: ^Operation) { @@ -698,7 +706,7 @@ dial_exec :: proc(op: ^Operation) { } op.dial.socket = sock.(TCP_Socket) - op.dial._impl.sockaddr = endpoint_to_sockaddr(op.dial.endpoint) + op.dial._impl.sockaddr = endpoint_to_sockaddr_any(op.dial.endpoint) } enqueue(op, uring.connect( @@ -786,8 +794,7 @@ recv_exec :: proc(op: ^Operation) { return } - bufs := slice.advance_slices(op.recv.bufs, op.recv.received) - bufs, _ = constraint_bufs_to_max_rw(bufs) + bufs, _ := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received) op.recv._impl.msghdr.iov = transmute([]linux.IO_Vec)bufs sock: linux.Fd @@ -858,7 +865,7 @@ recv_callback :: proc(op: ^Operation, res: i32) -> bool { } case UDP_Socket: - op.recv.source = sockaddr_storage_to_endpoint(&op.recv._impl.addr_out) + op.recv.source = sockaddr_storage_to_endpoint_ip(&op.recv._impl.addr_out) } return true @@ -872,8 +879,7 @@ send_exec :: proc(op: ^Operation) { return } - bufs := slice.advance_slices(op.send.bufs, op.send.sent) - bufs, _ = constraint_bufs_to_max_rw(bufs) + bufs, _ := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent) op.send._impl.msghdr.iov = transmute([]linux.IO_Vec)bufs sock: linux.Fd @@ -882,7 +888,7 @@ send_exec :: proc(op: ^Operation) { sock = linux.Fd(socket) case UDP_Socket: sock = linux.Fd(socket) - op.send._impl.endpoint = endpoint_to_sockaddr(op.send.endpoint) + op.send._impl.endpoint = endpoint_to_sockaddr_ip(op.send.endpoint) op.send._impl.msghdr.name = &op.send._impl.endpoint op.send._impl.msghdr.namelen = size_of(op.send._impl.endpoint) } @@ -1378,7 +1384,25 @@ stat_callback :: proc(op: ^Operation, res: i32) { } @(require_results) -sockaddr_storage_to_endpoint :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoint) { +sockaddr_storage_to_endpoint_ip :: proc(addr: ^Sock_Addr_Ip) -> (ep: Endpoint) { + #partial switch addr.family { + case .INET: + return Endpoint { + address = IP4_Address(addr.sin_addr), + port = int(addr.sin_port), + } + case .INET6: + return Endpoint { + address = IP6_Address(transmute([8]u16be)addr.sin6_addr), + port = int(addr.sin6_port), + } + case: + return {} + } +} + +@(require_results) +sockaddr_storage_to_endpoint_any :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoint) { #partial switch addr.family { case .INET: return Endpoint { @@ -1396,7 +1420,25 @@ sockaddr_storage_to_endpoint :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoin } @(require_results) -endpoint_to_sockaddr :: proc(ep: Endpoint) -> (sockaddr: linux.Sock_Addr_Any) { +endpoint_to_sockaddr_ip :: proc(ep: Endpoint) -> (sockaddr: Sock_Addr_Ip) { + switch a in ep.address { + case IP4_Address: + sockaddr.sin_family = .INET + sockaddr.sin_port = u16be(ep.port) + sockaddr.sin_addr = cast([4]u8)a + return + case IP6_Address: + sockaddr.sin6_family = .INET6 + sockaddr.sin6_port = u16be(ep.port) + sockaddr.sin6_addr = transmute([16]u8)a + return + } + + unreachable() +} + +@(require_results) +endpoint_to_sockaddr_any :: proc(ep: Endpoint) -> (sockaddr: linux.Sock_Addr_Any) { switch a in ep.address { case IP4_Address: sockaddr.sin_family = .INET diff --git a/core/nbio/impl_posix.odin b/core/nbio/impl_posix.odin index 9b4863710..f76cf2849 100644 --- a/core/nbio/impl_posix.odin +++ b/core/nbio/impl_posix.odin @@ -7,7 +7,6 @@ import "core:container/pool" import "core:container/queue" import "core:mem" import "core:net" -import "core:slice" import "core:strings" import "core:sys/posix" import "core:time" @@ -64,12 +63,12 @@ _Dial :: struct {} @(private="package") _Recv :: struct { - small_bufs: [1][]byte, + bufs: Bufs, } @(private="package") _Send :: struct { - small_bufs: [1][]byte, + bufs: Bufs, } @(private="package") @@ -575,14 +574,14 @@ handle_completed :: proc(op: ^Operation) { case .Send: if send_exec(op) == .Done { maybe_callback(op) - bufs_destroy(op.send.bufs, op.l.allocator) + bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator) cleanup(op) } return case .Recv: if recv_exec(op) == .Done { maybe_callback(op) - bufs_destroy(op.recv.bufs, op.l.allocator) + bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator) cleanup(op) } return @@ -778,10 +777,7 @@ send_exec :: proc(op: ^Operation) -> Op_Result { return .Done } - total: int - bufs := slice.advance_slices(op.send.bufs, op.send.sent) - bufs, total = constraint_bufs_to_max_rw(op.send.bufs) - + bufs, total := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent) sock, n := sendv(op.send.socket, bufs, op.send.endpoint) if n < 0 { if posix.errno() == .EWOULDBLOCK { @@ -840,10 +836,7 @@ recv_exec :: proc(op: ^Operation) -> Op_Result { return .Done } - total: int - bufs := slice.advance_slices(op.recv.bufs, op.recv.received) - bufs, total = constraint_bufs_to_max_rw(op.recv.bufs) - + bufs, total := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received) _, is_tcp := op.recv.socket.(net.TCP_Socket) sock, n := recvv(op.recv.socket, bufs, &op.recv.source) diff --git a/core/nbio/impl_windows.odin b/core/nbio/impl_windows.odin index f75193fe4..adf2d5351 100644 --- a/core/nbio/impl_windows.odin +++ b/core/nbio/impl_windows.odin @@ -69,15 +69,15 @@ _Write :: struct {} @(private="package") _Send :: struct { - small_bufs: [1][]byte, + bufs: Bufs, } @(private="package") _Recv :: struct { source: win.SOCKADDR_STORAGE_LH, source_len: win.INT, - small_bufs: [1][]byte, flags: win.DWORD, + bufs: Bufs, } @(private="package") @@ -320,9 +320,7 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { result = recv_callback(op) if result == .Done { maybe_callback(op) - if len(op.recv.bufs) > 1 { - delete(op.recv.bufs, op.l.allocator) - } + bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator) cleanup(op) return } @@ -332,9 +330,7 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { result = send_callback(op) if result == .Done { maybe_callback(op) - if len(op.send.bufs) > 1 { - delete(op.send.bufs, op.l.allocator) - } + bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator) cleanup(op) return } @@ -1213,9 +1209,7 @@ recv_exec :: proc(op: ^Operation) -> Op_Result { return .Done } - bufs := slice.advance_slices(op.recv.bufs, op.recv.received) - bufs, _ = constraint_bufs_to_max_rw(op.recv.bufs) - + bufs, _ := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received) win_bufs := ([^]win.WSABUF)(intrinsics.alloca(size_of(win.WSABUF) * len(bufs), align_of(win.WSABUF))) for buf, i in bufs { assert(i64(len(buf)) < i64(max(u32))) @@ -1333,9 +1327,7 @@ send_exec :: proc(op: ^Operation) -> Op_Result { return .Done } - bufs := slice.advance_slices(op.send.bufs, op.send.sent) - bufs, _ = constraint_bufs_to_max_rw(op.send.bufs) - + bufs, _ := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent) win_bufs := ([^]win.WSABUF)(intrinsics.alloca(size_of(win.WSABUF) * len(bufs), align_of(win.WSABUF))) for buf, i in bufs { assert(i64(len(buf)) < i64(max(u32))) @@ -1853,4 +1845,4 @@ timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering { assert(a == b) return .Equal } -} \ No newline at end of file +} diff --git a/core/nbio/ops.odin b/core/nbio/ops.odin index e028c4a76..382dca747 100644 --- a/core/nbio/ops.odin +++ b/core/nbio/ops.odin @@ -5,7 +5,6 @@ import "base:intrinsics" import "core:container/pool" import "core:time" import "core:slice" -import "core:mem" NO_TIMEOUT: time.Duration: -1 @@ -480,7 +479,7 @@ Recv :: struct { // The socket to receive from. socket: Any_Socket, // The buffers to receive data into. - // The outer slice is copied internally, but the backing arrays must remain alive. + // The outer slice is copied internally, but the backing data must remain alive. // It is safe to access `bufs` during the callback. bufs: [][]byte, // If true, the operation waits until all buffers are filled (TCP only). @@ -546,17 +545,11 @@ prep_recv :: #force_inline proc( op.recv.expires = time.time_add(now(), timeout) } - if len(op.recv.bufs) == 1 { - op.recv._impl.small_bufs = {op.recv.bufs[0]} - op.recv.bufs = op.recv._impl.small_bufs[:] - } else { - err: mem.Allocator_Error - if op.recv.bufs, err = slice.clone(op.recv.bufs, op.l.allocator); err != nil { - switch _ in op.recv.socket { - case TCP_Socket: op.recv.err = TCP_Recv_Error.Insufficient_Resources - case UDP_Socket: op.recv.err = UDP_Recv_Error.Insufficient_Resources - case: unreachable() - } + if err := bufs_init(&op.recv._impl.bufs, &op.recv.bufs, op.l.allocator); err != nil { + switch _ in op.recv.socket { + case TCP_Socket: op.recv.err = TCP_Recv_Error.Insufficient_Resources + case UDP_Socket: op.recv.err = UDP_Recv_Error.Insufficient_Resources + case: unreachable() } } @@ -714,7 +707,8 @@ Send :: struct { // The socket to send to. socket: Any_Socket, // The buffers to send. - // The outer slice is copied internally, but the backing arrays must remain alive. + // The outer slice is copied internally, but the backing data must remain alive. + // It is safe to access `bufs` during the callback. bufs: [][]byte `fmt:"-"`, // The destination endpoint to send to (UDP only). endpoint: Endpoint, @@ -773,17 +767,11 @@ prep_send :: proc( op.send.expires = time.time_add(now(), timeout) } - if len(op.send.bufs) == 1 { - op.send._impl.small_bufs = {op.send.bufs[0]} - op.send.bufs = op.send._impl.small_bufs[:] - } else { - err: mem.Allocator_Error - if op.send.bufs, err = slice.clone(op.send.bufs, op.l.allocator); err != nil { - switch _ in op.send.socket { - case TCP_Socket: op.send.err = TCP_Send_Error.Insufficient_Resources - case UDP_Socket: op.send.err = UDP_Send_Error.Insufficient_Resources - case: unreachable() - } + if err := bufs_init(&op.send._impl.bufs, &op.send.bufs, op.l.allocator); err != nil { + switch _ in op.send.socket { + case TCP_Socket: op.send.err = TCP_Send_Error.Insufficient_Resources + case UDP_Socket: op.send.err = UDP_Send_Error.Insufficient_Resources + case: unreachable() } } diff --git a/tests/core/nbio/net.odin b/tests/core/nbio/net.odin index 688ee0b45..77e44f73b 100644 --- a/tests/core/nbio/net.odin +++ b/tests/core/nbio/net.odin @@ -398,3 +398,75 @@ sendfile :: proc(t: ^testing.T) { ev(t, nbio.run(), nil) } } + +@(test) +vectored :: proc(t: ^testing.T) { + if event_loop_guard(t) { + testing.set_fail_timeout(t, time.Minute) + + sock, ep := open_next_available_local_port(t) + + to_send := [?][]byte{ + {'H', 'e', 'l', 'l'}, + {'o', 'p', 'e'}, + {'!'}, + } + + to_recv := [?][]byte{ + {0, 0, 0, 0}, + {0, 0, 0}, + {0}, + } + + // Server + { + nbio.accept_poly2(sock, t, &to_send, on_accept) + + on_accept :: proc(op: ^nbio.Operation, t: ^testing.T, to_send: ^[3][]byte) { + ev(t, op.accept.err, nil) + e(t, op.accept.client != 0) + to_send_copy := to_send^[:] + nbio.send_poly3(op.accept.client, to_send_copy, t, op.accept.socket, to_send, on_send) + } + + on_send :: proc(op: ^nbio.Operation, t: ^testing.T, server: net.TCP_Socket, to_send: ^[3][]byte) { + ev(t, op.send.err, nil) + ev(t, op.send.sent, 8) + + expected := to_send^ + for buf, i in expected { + ev(t, string(op.send.bufs[i]), string(buf)) + } + + nbio.close(op.send.socket.(net.TCP_Socket)) + nbio.close(server) + } + } + + // Client + { + nbio.dial_poly3(ep, t, &to_recv, &to_send, on_dial) + + on_dial :: proc(op: ^nbio.Operation, t: ^testing.T, to_recv: ^[3][]byte, expected: ^[3][]byte) { + ev(t, op.dial.err, nil) + + to_recv_copy := to_recv^[:] + nbio.recv_poly2(op.dial.socket, to_recv_copy, t, expected, on_recv, all=true) + } + + on_recv :: proc(op: ^nbio.Operation, t: ^testing.T, expected: ^[3][]byte) { + ev(t, op.recv.err, nil) + ev(t, op.recv.received, 8) + + expected := expected^[:] + for buf, i in expected { + ev(t, string(op.recv.bufs[i]), string(buf)) + } + + nbio.close(op.recv.socket.(net.TCP_Socket)) + } + } + + ev(t, nbio.run(), nil) + } +} -- cgit v1.2.3