diff options
| author | gingerBill <gingerBill@users.noreply.github.com> | 2023-01-01 13:28:36 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-01-01 13:28:36 +0000 |
| commit | 168cec1e9d56563719039d766fc6bf776f3cf5ee (patch) | |
| tree | 27edc49833bfac6d1054bc8e3ea07d2191648f19 /src/thread_pool.cpp | |
| parent | c1384afe2fd705ce075277aa8dc6bc259dc94cdc (diff) | |
| parent | 27ba1d596c5b68f856b4e74c72bf28439daf4807 (diff) | |
Merge pull request #2283 from colrdavidson/threadpool-swap
move to work-stealing threadpool
Diffstat (limited to 'src/thread_pool.cpp')
| -rw-r--r-- | src/thread_pool.cpp | 252 |
1 files changed, 143 insertions, 109 deletions
diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 6df991d7d..3565ef25a 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -3,164 +3,198 @@ struct WorkerTask; struct ThreadPool; -#define WORKER_TASK_PROC(name) isize name(void *data) -typedef WORKER_TASK_PROC(WorkerTaskProc); +gb_thread_local Thread *current_thread; gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name); gb_internal void thread_pool_destroy(ThreadPool *pool); gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data); gb_internal void thread_pool_wait(ThreadPool *pool); - -struct WorkerTask { - WorkerTask * next; - WorkerTaskProc *do_work; - void * data; - ThreadPool * pool; -}; - - struct ThreadPool { gbAllocator allocator; - BlockingMutex mutex; - Condition task_cond; - + Slice<Thread> threads; - - WorkerTask *task_queue; - - std::atomic<isize> ready; - std::atomic<bool> stop; - -}; + std::atomic<bool> running; -gb_internal THREAD_PROC(thread_pool_thread_proc); + BlockingMutex task_lock; + Condition tasks_available; + + Futex tasks_left; +}; gb_internal void thread_pool_init(ThreadPool *pool, gbAllocator const &a, isize thread_count, char const *worker_name) { + mutex_init(&pool->task_lock); + condition_init(&pool->tasks_available); + pool->allocator = a; - pool->stop = false; - mutex_init(&pool->mutex); - condition_init(&pool->task_cond); - - slice_init(&pool->threads, a, thread_count); - for_array(i, pool->threads) { + slice_init(&pool->threads, a, thread_count + 1); + + // setup the main thread + thread_init(pool, &pool->threads[0], 0); + current_thread = &pool->threads[0]; + + for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; - thread_init_and_start(t, thread_pool_thread_proc, pool); + thread_init_and_start(pool, t, i); } + + pool->running = true; } gb_internal void thread_pool_destroy(ThreadPool *pool) { - mutex_lock(&pool->mutex); - pool->stop = true; - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + pool->running = false; - for_array(i, pool->threads) { + for_array_off(i, 1, pool->threads) { Thread *t = &pool->threads[i]; + condition_broadcast(&pool->tasks_available); thread_join_and_destroy(t); } + for_array(i, pool->threads) { + free(pool->threads[i].queue); + } gb_free(pool->allocator, pool->threads.data); - mutex_destroy(&pool->mutex); - condition_destroy(&pool->task_cond); + mutex_destroy(&pool->task_lock); + condition_destroy(&pool->tasks_available); } -gb_internal bool thread_pool_queue_empty(ThreadPool *pool) { - return pool->task_queue == nullptr; -} +void thread_pool_queue_push(Thread *thread, WorkerTask task) { + uint64_t capture; + uint64_t new_capture; + do { + capture = thread->head_and_tail.load(); + + uint64_t mask = thread->capacity - 1; + uint64_t head = (capture >> 32) & mask; + uint64_t tail = ((uint32_t)capture) & mask; + + uint64_t new_head = (head + 1) & mask; + if (new_head == tail) { + GB_PANIC("Thread Queue Full!\n"); + } + + // This *must* be done in here, to avoid a potential race condition where we no longer own the slot by the time we're assigning + thread->queue[head] = task; + new_capture = (new_head << 32) | tail; + } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); -gb_internal WorkerTask *thread_pool_queue_pop(ThreadPool *pool) { - GB_ASSERT(pool->task_queue != nullptr); - WorkerTask *task = pool->task_queue; - pool->task_queue = task->next; - return task; + thread->pool->tasks_left.fetch_add(1); + condition_broadcast(&thread->pool->tasks_available); } -gb_internal void thread_pool_queue_push(ThreadPool *pool, WorkerTask *task) { - GB_ASSERT(task != nullptr); - task->next = pool->task_queue; - pool->task_queue = task; + +bool thread_pool_queue_pop(Thread *thread, WorkerTask *task) { + uint64_t capture; + uint64_t new_capture; + do { + capture = thread->head_and_tail.load(); + + uint64_t mask = thread->capacity - 1; + uint64_t head = (capture >> 32) & mask; + uint64_t tail = ((uint32_t)capture) & mask; + + uint64_t new_tail = (tail + 1) & mask; + if (tail == head) { + return false; + } + + // Making a copy of the task before we increment the tail, avoiding the same potential race condition as above + *task = thread->queue[tail]; + + new_capture = (head << 32) | new_tail; + } while (!thread->head_and_tail.compare_exchange_weak(capture, new_capture)); + + return true; } gb_internal bool thread_pool_add_task(ThreadPool *pool, WorkerTaskProc *proc, void *data) { - GB_ASSERT(proc != nullptr); - WorkerTask *task = gb_alloc_item(permanent_allocator(), WorkerTask); - if (task == nullptr) { - GB_PANIC("Out of memory"); - return false; - } - task->pool = pool; - task->do_work = proc; - task->data = data; + WorkerTask task = {}; + task.do_work = proc; + task.data = data; - mutex_lock(&pool->mutex); - thread_pool_queue_push(pool, task); - GB_ASSERT(pool->ready >= 0); - pool->ready.fetch_add(1); - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + thread_pool_queue_push(current_thread, task); return true; } +gb_internal void thread_pool_wait(ThreadPool *pool) { + WorkerTask task; -gb_internal void thread_pool_do_task(WorkerTask *task) { - task->do_work(task->data); -} + while (pool->tasks_left) { -gb_internal void thread_pool_wait(ThreadPool *pool) { - if (pool->threads.count == 0) { - while (!thread_pool_queue_empty(pool)) { - thread_pool_do_task(thread_pool_queue_pop(pool)); - pool->ready.fetch_sub(1); + // if we've got tasks on our queue, run them + while (thread_pool_queue_pop(current_thread, &task)) { + task.do_work(task.data); + pool->tasks_left.fetch_sub(1); } - GB_ASSERT(pool->ready == 0); - return; - } - for (;;) { - mutex_lock(&pool->mutex); - while (!pool->stop && pool->ready > 0 && thread_pool_queue_empty(pool)) { - condition_wait(&pool->task_cond, &pool->mutex); - } - if ((pool->stop || pool->ready == 0) && thread_pool_queue_empty(pool)) { - mutex_unlock(&pool->mutex); - return; - } - WorkerTask *task = thread_pool_queue_pop(pool); - mutex_unlock(&pool->mutex); - - thread_pool_do_task(task); - if (--pool->ready == 0) { - mutex_lock(&pool->mutex); - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + // is this mem-barriered enough? + // This *must* be executed in this order, so the futex wakes immediately + // if rem_tasks has changed since we checked last, otherwise the program + // will permanently sleep + Footex rem_tasks = pool->tasks_left.load(); + if (!rem_tasks) { + break; } + + tpool_wait_on_addr(&pool->tasks_left, rem_tasks); } } - gb_internal THREAD_PROC(thread_pool_thread_proc) { - ThreadPool *pool = cast(ThreadPool *)thread->user_data; - + WorkerTask task; + current_thread = thread; + ThreadPool *pool = current_thread->pool; + for (;;) { - mutex_lock(&pool->mutex); +work_start: + if (!pool->running) { + break; + } - while (!pool->stop && thread_pool_queue_empty(pool)) { - condition_wait(&pool->task_cond, &pool->mutex); + // If we've got tasks to process, work through them + size_t finished_tasks = 0; + while (thread_pool_queue_pop(current_thread, &task)) { + task.do_work(task.data); + pool->tasks_left.fetch_sub(1); + + finished_tasks += 1; } - if (pool->stop && thread_pool_queue_empty(pool)) { - mutex_unlock(&pool->mutex); - return 0; + if (finished_tasks > 0 && !pool->tasks_left) { + tpool_wake_addr(&pool->tasks_left); } - WorkerTask *task = thread_pool_queue_pop(pool); - mutex_unlock(&pool->mutex); - - thread_pool_do_task(task); - if (--pool->ready == 0) { - mutex_lock(&pool->mutex); - condition_broadcast(&pool->task_cond); - mutex_unlock(&pool->mutex); + // If there's still work somewhere and we don't have it, steal it + if (pool->tasks_left) { + isize idx = current_thread->idx; + for_array(i, pool->threads) { + if (!pool->tasks_left) { + break; + } + + idx = (idx + 1) % pool->threads.count; + Thread *thread = &pool->threads[idx]; + + WorkerTask task; + if (!thread_pool_queue_pop(thread, &task)) { + continue; + } + + task.do_work(task.data); + pool->tasks_left.fetch_sub(1); + + if (!pool->tasks_left) { + tpool_wake_addr(&pool->tasks_left); + } + + goto work_start; + } } + + // if we've done all our work, and there's nothing to steal, go to sleep + mutex_lock(&pool->task_lock); + condition_wait(&pool->tasks_available, &pool->task_lock); + mutex_unlock(&pool->task_lock); } -}
\ No newline at end of file + + return 0; +} |