1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
// thread_pool.cpp
struct WorkerTask;
struct ThreadPool;
gb_thread_local Thread *current_thread;
gb_internal void thread_pool_init(ThreadPool *pool, isize worker_count, char const *worker_name);
gb_internal void thread_pool_destroy(ThreadPool *pool);
gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data);
gb_internal void thread_pool_wait(ThreadPool *pool);
struct ThreadPool {
gbAllocator threads_allocator;
Slice<Thread> threads;
std::atomic<bool> running;
Futex tasks_available;
Futex tasks_left;
};
gb_internal isize current_thread_index(void) {
return current_thread ? current_thread->idx : 0;
}
gb_internal void thread_pool_init(ThreadPool *pool, isize worker_count, char const *worker_name) {
pool->threads_allocator = permanent_allocator();
slice_init(&pool->threads, pool->threads_allocator, worker_count + 1);
// NOTE: this needs to be initialized before any thread starts
pool->running.store(true, std::memory_order_seq_cst);
// setup the main thread
thread_init(pool, &pool->threads[0], 0);
current_thread = &pool->threads[0];
for_array_off(i, 1, pool->threads) {
Thread *t = &pool->threads[i];
thread_init_and_start(pool, t, i);
}
}
gb_internal void thread_pool_destroy(ThreadPool *pool) {
pool->running.store(false, std::memory_order_seq_cst);
for_array_off(i, 1, pool->threads) {
Thread *t = &pool->threads[i];
pool->tasks_available.fetch_add(1, std::memory_order_acquire);
futex_broadcast(&pool->tasks_available);
thread_join_and_destroy(t);
}
gb_free(pool->threads_allocator, pool->threads.data);
}
TaskRingBuffer *taskring_grow(TaskRingBuffer *ring, ssize_t bottom, ssize_t top) {
TaskRingBuffer *new_ring = taskring_init(ring->size * 2);
for (ssize_t i = top; i < bottom; i++) {
new_ring->buffer[i % new_ring->size] = ring->buffer[i % ring->size];
}
return new_ring;
}
void thread_pool_queue_push(Thread *thread, WorkerTask task) {
ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed);
ssize_t top = thread->queue.top.load(std::memory_order_acquire);
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
ssize_t size = bot - top;
if (size > (cur_ring->size - 1)) {
// Queue is full
thread->queue.ring = taskring_grow(thread->queue.ring, bot, top);
cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
}
cur_ring->buffer[bot % cur_ring->size] = task;
std::atomic_thread_fence(std::memory_order_release);
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
thread->pool->tasks_left.fetch_add(1, std::memory_order_release);
thread->pool->tasks_available.fetch_add(1, std::memory_order_relaxed);
futex_broadcast(&thread->pool->tasks_available);
}
bool thread_pool_queue_take(Thread *thread, WorkerTask *task) {
ssize_t bot = thread->queue.bottom.load(std::memory_order_relaxed) - 1;
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_relaxed);
thread->queue.bottom.store(bot, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
ssize_t top = thread->queue.top.load(std::memory_order_relaxed);
if (top <= bot) {
// Queue is not empty
*task = cur_ring->buffer[bot % cur_ring->size];
if (top == bot) {
// Only one entry left in queue
if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
// Race failed
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
return false;
}
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
return true;
}
// We got a task without hitting a race
return true;
} else {
// Queue is empty
thread->queue.bottom.store(bot + 1, std::memory_order_relaxed);
return false;
}
}
bool thread_pool_queue_steal(Thread *thread, WorkerTask *task) {
ssize_t top = thread->queue.top.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
ssize_t bot = thread->queue.bottom.load(std::memory_order_acquire);
bool ret = false;
if (top < bot) {
// Queue is not empty
TaskRingBuffer *cur_ring = thread->queue.ring.load(std::memory_order_consume);
*task = cur_ring->buffer[top % cur_ring->size];
if (!thread->queue.top.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
// Race failed
ret = false;
} else {
ret = true;
}
}
return ret;
}
gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) {
WorkerTask task = {};
task.do_work = proc;
task.data = data;
thread_pool_queue_push(current_thread, task);
return true;
}
gb_internal void thread_pool_wait(ThreadPool *pool) {
WorkerTask task;
while (pool->tasks_left.load(std::memory_order_acquire)) {
// if we've got tasks on our queue, run them
while (thread_pool_queue_take(current_thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
}
// is this mem-barriered enough?
// This *must* be executed in this order, so the futex wakes immediately
// if rem_tasks has changed since we checked last, otherwise the program
// will permanently sleep
Footex rem_tasks = pool->tasks_left.load(std::memory_order_acquire);
if (rem_tasks == 0) {
return;
}
futex_wait(&pool->tasks_left, rem_tasks);
}
}
gb_internal THREAD_PROC(thread_pool_thread_proc) {
WorkerTask task;
current_thread = thread;
ThreadPool *pool = current_thread->pool;
// debugf("worker id: %td\n", current_thread->idx);
while (pool->running.load(std::memory_order_seq_cst)) {
// If we've got tasks to process, work through them
usize finished_tasks = 0;
i32 state;
while (thread_pool_queue_take(current_thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
finished_tasks += 1;
}
if (finished_tasks > 0 && pool->tasks_left.load(std::memory_order_acquire) == 0) {
futex_signal(&pool->tasks_left);
}
// If there's still work somewhere and we don't have it, steal it
if (pool->tasks_left.load(std::memory_order_acquire)) {
usize idx = cast(usize)current_thread->idx;
for_array(i, pool->threads) {
if (pool->tasks_left.load(std::memory_order_acquire) == 0) {
break;
}
idx = (idx + 1) % cast(usize)pool->threads.count;
Thread *thread = &pool->threads.data[idx];
WorkerTask task;
if (thread_pool_queue_steal(thread, &task)) {
task.do_work(task.data);
pool->tasks_left.fetch_sub(1, std::memory_order_release);
if (pool->tasks_left.load(std::memory_order_acquire) == 0) {
futex_signal(&pool->tasks_left);
}
goto main_loop_continue;
}
}
}
// if we've done all our work, and there's nothing to steal, go to sleep
state = pool->tasks_available.load(std::memory_order_acquire);
if (!pool->running) { break; }
futex_wait(&pool->tasks_available, state);
main_loop_continue:;
}
return 0;
}
|