Dynamic thread pool in C++
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
1
down vote
favorite
I have made a thread pool which will dynamically create threads based on how many you need.
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool
public:
thread_pool() : stop(false), busy_workers(0)
~thread_pool()
stop = true;
task_available.notify_all();
for (auto& worker : workers)
if (worker.joinable())
worker.join();
void run_task(std::function<void()> task)
std::lock_guard<std::mutex> task_lock task_mu ;
current_task = std::move(task);
std::lock_guard<std::mutex> workers_lock workers_mu ;
if (workers.size() == busy_workers++)
workers.emplace_back(work);
return;
task_available.notify_one();
private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;
std::function<void()> work = [&]()
while (true)
std::unique_lock<std::mutex> task_lock task_mu ;
task_available.wait(task_lock, [&]() return current_task );
if (!current_task && stop) return;
auto task = std::move(current_task);
task_lock.unlock();
task();
busy_workers--;
;
;
The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.
c++ multithreading
add a comment |Â
up vote
1
down vote
favorite
I have made a thread pool which will dynamically create threads based on how many you need.
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool
public:
thread_pool() : stop(false), busy_workers(0)
~thread_pool()
stop = true;
task_available.notify_all();
for (auto& worker : workers)
if (worker.joinable())
worker.join();
void run_task(std::function<void()> task)
std::lock_guard<std::mutex> task_lock task_mu ;
current_task = std::move(task);
std::lock_guard<std::mutex> workers_lock workers_mu ;
if (workers.size() == busy_workers++)
workers.emplace_back(work);
return;
task_available.notify_one();
private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;
std::function<void()> work = [&]()
while (true)
std::unique_lock<std::mutex> task_lock task_mu ;
task_available.wait(task_lock, [&]() return current_task );
if (!current_task && stop) return;
auto task = std::move(current_task);
task_lock.unlock();
task();
busy_workers--;
;
;
The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.
c++ multithreading
add a comment |Â
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I have made a thread pool which will dynamically create threads based on how many you need.
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool
public:
thread_pool() : stop(false), busy_workers(0)
~thread_pool()
stop = true;
task_available.notify_all();
for (auto& worker : workers)
if (worker.joinable())
worker.join();
void run_task(std::function<void()> task)
std::lock_guard<std::mutex> task_lock task_mu ;
current_task = std::move(task);
std::lock_guard<std::mutex> workers_lock workers_mu ;
if (workers.size() == busy_workers++)
workers.emplace_back(work);
return;
task_available.notify_one();
private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;
std::function<void()> work = [&]()
while (true)
std::unique_lock<std::mutex> task_lock task_mu ;
task_available.wait(task_lock, [&]() return current_task );
if (!current_task && stop) return;
auto task = std::move(current_task);
task_lock.unlock();
task();
busy_workers--;
;
;
The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.
c++ multithreading
I have made a thread pool which will dynamically create threads based on how many you need.
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool
public:
thread_pool() : stop(false), busy_workers(0)
~thread_pool()
stop = true;
task_available.notify_all();
for (auto& worker : workers)
if (worker.joinable())
worker.join();
void run_task(std::function<void()> task)
std::lock_guard<std::mutex> task_lock task_mu ;
current_task = std::move(task);
std::lock_guard<std::mutex> workers_lock workers_mu ;
if (workers.size() == busy_workers++)
workers.emplace_back(work);
return;
task_available.notify_one();
private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;
std::function<void()> work = [&]()
while (true)
std::unique_lock<std::mutex> task_lock task_mu ;
task_available.wait(task_lock, [&]() return current_task );
if (!current_task && stop) return;
auto task = std::move(current_task);
task_lock.unlock();
task();
busy_workers--;
;
;
The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.
c++ multithreading
asked Jan 7 at 22:28
Michael Smith
1508
1508
add a comment |Â
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
3
down vote
accepted
Bugs
You have several race conditions:
There is no guarantee that a worker stays joinable between these two calls.
if (worker.joinable()) {
worker.join();
This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.
std::lock_guard<std::mutex> task_lock task_mu ;
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
Design
Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.
I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
Bugs
You have several race conditions:
There is no guarantee that a worker stays joinable between these two calls.
if (worker.joinable()) {
worker.join();
This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.
std::lock_guard<std::mutex> task_lock task_mu ;
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
Design
Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.
I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).
add a comment |Â
up vote
3
down vote
accepted
Bugs
You have several race conditions:
There is no guarantee that a worker stays joinable between these two calls.
if (worker.joinable()) {
worker.join();
This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.
std::lock_guard<std::mutex> task_lock task_mu ;
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
Design
Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.
I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).
add a comment |Â
up vote
3
down vote
accepted
up vote
3
down vote
accepted
Bugs
You have several race conditions:
There is no guarantee that a worker stays joinable between these two calls.
if (worker.joinable()) {
worker.join();
This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.
std::lock_guard<std::mutex> task_lock task_mu ;
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
Design
Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.
I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).
Bugs
You have several race conditions:
There is no guarantee that a worker stays joinable between these two calls.
if (worker.joinable()) {
worker.join();
This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.
std::lock_guard<std::mutex> task_lock task_mu ;
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
Design
Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.
I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).
answered Jan 8 at 20:16
Martin York
70.9k481244
70.9k481244
add a comment |Â
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184538%2fdynamic-thread-pool-in-c%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password