1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
|
// thread_pool.cpp
struct WorkerTask;
struct ThreadPool;
gb_global gb_thread_local Thread *current_thread;
gb_internal Thread *get_current_thread(void) {
return current_thread;
}
gb_internal void thread_pool_init(ThreadPool *pool, isize worker_count, char const *worker_name);
gb_internal void thread_pool_destroy(ThreadPool *pool);
gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data);
gb_internal void thread_pool_wait(ThreadPool *pool);
enum GrabState {
Grab_Success = 0,
Grab_Empty = 1,
Grab_Failed = 2,
};
enum BroadcastWaitState {
Nobody_Waiting = 0,
Someone_Waiting = 1,
};
struct ThreadPool {
gbAllocator threads_allocator;
Slice<Thread> threads;
std::atomic<bool> running;
Futex tasks_available;
Futex tasks_left;
};
gb_internal isize current_thread_index(void) {
return current_thread ? current_thread->idx : 0;
}
gb_internal void thread_pool_init(ThreadPool *pool, isize worker_count, char const *worker_name) {
pool->threads_allocator = permanent_allocator();
slice_init(&pool->threads, pool->threads_allocator, worker_count + 1);
// NOTE: this needs to be initialized before any thread starts
pool->running.store(true, std::memory_order_seq_cst);
// setup the main thread
thread_init(pool, &pool->threads[0], 0);
current_thread = &pool->threads[0];
for_array_off(i, 1, pool->threads) {
Thread *t = &pool->threads[i];
thread_init_and_start(pool, t, i);
}
}
gb_internal void thread_pool_destroy(ThreadPool *pool) {
pool->running.store(false, std::memory_order_seq_cst);
for_array_off(i, 1, pool->threads) {
Thread *t = &pool->threads[i];
pool->tasks_available.store(Nobody_Waiting);
futex_broadcast(&t->pool->tasks_available);
thread_join_and_destroy(t);
}
gb_free(pool->threads_allocator, pool->threads.data);
}
TaskRingBuffer *task_ring_grow(TaskRingBuffer *ring, isize bottom, isize top) {
TaskRingBuffer *new_ring = task_ring_init(ring->size * 2);
for (isize i = top; i < bottom; i++) {
new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size];
}
return new_ring;
}
void thread_pool_queue_push(Thread *thread, WorkerTask task) {
isize bot = thread->queue.bottom.load(std::memory_order_relaxed);
isize top = thread->queue.top.load(std::memory_order_acquire);
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
isize size = bot - top;
if (size > (cur_ring->size - 1)) {
// Queue is full
thread->queue.ring = task_ring_grow(thread->queue.ring, bot, top);
cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
}
cur_ring->buffer[bot % cur_ring->size] = task;
std::atomic_thread_fence(std::memory_order_release);
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
thread->pool->tasks_left.fetch_add(1, std::memory_order_release);
i32 state = Someone_Waiting;
if (thread->pool->tasks_available.compare_exchange_strong(state, Nobody_Waiting)) {
futex_broadcast(&thread->pool->tasks_available);
}
}
GrabState thread_pool_queue_take(Thread *thread, WorkerTask *task) {
isize bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1;
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
thread->queue.bottom.store(bot, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
isize top = thread->queue.top.load(std::memory_order_relaxed);
if (top <= bot) {
// Queue is not empty
*task = cur_ring->buffer[bot % cur_ring->size];
if (top == bot) {
// Only one entry left in queue
if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
// Race failed
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
return Grab_Empty;
}
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
return Grab_Success;
}
// We got a task without hitting a race
return Grab_Success;
} else {
// Queue is empty
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
return Grab_Empty;
}
}
GrabState thread_pool_queue_steal(Thread *thread, WorkerTask *task) {
isize top = thread->queue.top.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
isize bot = thread->queue.bottom.load(std::memory_order_acquire);
GrabState ret = Grab_Empty;
if (top < bot) {
// Queue is not empty
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume);
*task = cur_ring->buffer[top % cur_ring->size];
if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
// Race failed
ret = Grab_Failed;
} else {
ret = Grab_Success;
}
}
return ret;
}
gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
WorkerTask task = {};
task.do_work = proc;
task.data = data;
thread_pool_queue_push(current_thread, task);
return true;
}
gb_internal void thread_pool_wait(ThreadPool *pool) {
WorkerTask task;
while (pool->tasks_left.load(std::memory_order_acquire)) {
// if we've got tasks on our queue, run them
while (!thread_pool_queue_take(current_thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
}
// is this mem-barriered enough?
// This *must* be executed in this order, so the futex wakes immediately
// if rem_tasks has changed since we checked last, otherwise the program
// will permanently sleep
Footex rem_tasks = pool->tasks_left.load(std::memory_order_acquire);
if (rem_tasks == 0) {
return;
}
futex_wait(&pool->tasks_left, rem_tasks);
}
}
gb_internal THREAD_PROC(thread_pool_thread_proc) {
WorkerTask task;
current_thread = thread;
ThreadPool *pool = current_thread->pool;
// debugf("worker id: %td\n", current_thread->idx);
while (pool->running.load(std::memory_order_seq_cst)) {
// If we've got tasks to process, work through them
usize finished_tasks = 0;
i32 state;
while (!thread_pool_queue_take(current_thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
finished_tasks += 1;
}
if (finished_tasks > 0 && pool->tasks_left.load(std::memory_order_acquire) == 0) {
futex_signal(&pool->tasks_left);
}
// If there's still work somewhere and we don't have it, steal it
if (pool->tasks_left.load(std::memory_order_acquire)) {
usize idx = cast(usize)current_thread->idx;
for_array(i, pool->threads) {
if (pool->tasks_left.load(std::memory_order_acquire) == 0) {
break;
}
idx = (idx + 1) % cast(usize)pool->threads.count;
Thread *thread = &pool->threads.data[idx];
WorkerTask task;
GrabState ret = thread_pool_queue_steal(thread, &task);
switch (ret) {
case Grab_Empty:
continue;
case Grab_Success:
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
if (pool->tasks_left.load(std::memory_order_acquire) == 0) {
futex_signal(&pool->tasks_left);
}
/*fallthrough*/
case Grab_Failed:
goto main_loop_continue;
}
}
}
// if we've done all our work, and there's nothing to steal, go to sleep
pool->tasks_available.store(Someone_Waiting);
if (!pool->running) { break; }
futex_wait(&pool->tasks_available, Someone_Waiting);
main_loop_continue:;
}
return 0;
}
|