aboutsummaryrefslogtreecommitdiff
path: root/core/nbio/mpsc.odin
blob: 7f88829b4544bf2a3e7d1be2bc0755541b58ed07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#+private
package nbio

import "base:runtime"

import "core:sync"

Multi_Producer_Single_Consumer :: struct {
	count:  int,
	head:   int,
	tail:   int,
	buffer: []rawptr,
	mask:   int,
}

mpsc_init :: proc(mpscq: ^Multi_Producer_Single_Consumer, cap: int, allocator: runtime.Allocator) -> runtime.Allocator_Error {
	assert(runtime.is_power_of_two_int(cap), "cap must be a power of 2")
	mpscq.buffer = make([]rawptr, cap, allocator) or_return
	mpscq.mask   = cap-1
	sync.atomic_thread_fence(.Release)
	return nil
}

mpsc_destroy :: proc(mpscq: ^Multi_Producer_Single_Consumer, allocator: runtime.Allocator) {
	delete(mpscq.buffer, allocator)
}

mpsc_enqueue :: proc(mpscq: ^Multi_Producer_Single_Consumer, obj: rawptr) -> bool {
	count := sync.atomic_add_explicit(&mpscq.count, 1, .Acquire)
	if count >= len(mpscq.buffer) {
		sync.atomic_sub_explicit(&mpscq.count, 1, .Release)
		return false
	}

	head := sync.atomic_add_explicit(&mpscq.head, 1, .Acquire)
	assert(mpscq.buffer[head & mpscq.mask] == nil)
	rv := sync.atomic_exchange_explicit(&mpscq.buffer[head & mpscq.mask], obj, .Release)
	assert(rv == nil)
	return true
}

mpsc_dequeue :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> rawptr {
	ret := sync.atomic_exchange_explicit(&mpscq.buffer[mpscq.tail], nil, .Acquire)
	if ret == nil {
		return nil
	}

	mpscq.tail += 1
	if mpscq.tail >= len(mpscq.buffer) {
		mpscq.tail = 0
	}
	r := sync.atomic_sub_explicit(&mpscq.count, 1, .Release)
	assert(r > 0)
	return ret
}

mpsc_count :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int {
	return sync.atomic_load_explicit(&mpscq.count, .Relaxed)
}

mpsc_cap :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int {
	return len(mpscq.buffer)
}