Dynamic thread pool in C++

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
1
down vote

favorite












I have made a thread pool which will dynamically create threads based on how many you need.



#pragma once

#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>

class thread_pool
public:
thread_pool() : stop(false), busy_workers(0)

~thread_pool()
stop = true;
task_available.notify_all();
for (auto& worker : workers)
if (worker.joinable())
worker.join();




void run_task(std::function<void()> task)

std::lock_guard<std::mutex> task_lock task_mu ;
current_task = std::move(task);



std::lock_guard<std::mutex> workers_lock workers_mu ;
if (workers.size() == busy_workers++)
workers.emplace_back(work);
return;



task_available.notify_one();

private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;

std::function<void()> work = [&]()
while (true)
std::unique_lock<std::mutex> task_lock task_mu ;
task_available.wait(task_lock, [&]() return current_task );

if (!current_task && stop) return;

auto task = std::move(current_task);
task_lock.unlock();

task();
busy_workers--;

;
;


The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.







share|improve this question

























    up vote
    1
    down vote

    favorite












    I have made a thread pool which will dynamically create threads based on how many you need.



    #pragma once

    #include <thread>
    #include <atomic>
    #include <mutex>
    #include <vector>
    #include <functional>
    #include <condition_variable>

    class thread_pool
    public:
    thread_pool() : stop(false), busy_workers(0)

    ~thread_pool()
    stop = true;
    task_available.notify_all();
    for (auto& worker : workers)
    if (worker.joinable())
    worker.join();




    void run_task(std::function<void()> task)

    std::lock_guard<std::mutex> task_lock task_mu ;
    current_task = std::move(task);



    std::lock_guard<std::mutex> workers_lock workers_mu ;
    if (workers.size() == busy_workers++)
    workers.emplace_back(work);
    return;



    task_available.notify_one();

    private:
    std::atomic_bool stop;
    std::atomic_size_t busy_workers;
    std::vector<std::thread> workers;
    std::function<void()> current_task;
    std::condition_variable task_available;
    std::mutex task_mu;
    std::mutex workers_mu;

    std::function<void()> work = [&]()
    while (true)
    std::unique_lock<std::mutex> task_lock task_mu ;
    task_available.wait(task_lock, [&]() return current_task );

    if (!current_task && stop) return;

    auto task = std::move(current_task);
    task_lock.unlock();

    task();
    busy_workers--;

    ;
    ;


    The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.







    share|improve this question





















      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I have made a thread pool which will dynamically create threads based on how many you need.



      #pragma once

      #include <thread>
      #include <atomic>
      #include <mutex>
      #include <vector>
      #include <functional>
      #include <condition_variable>

      class thread_pool
      public:
      thread_pool() : stop(false), busy_workers(0)

      ~thread_pool()
      stop = true;
      task_available.notify_all();
      for (auto& worker : workers)
      if (worker.joinable())
      worker.join();




      void run_task(std::function<void()> task)

      std::lock_guard<std::mutex> task_lock task_mu ;
      current_task = std::move(task);



      std::lock_guard<std::mutex> workers_lock workers_mu ;
      if (workers.size() == busy_workers++)
      workers.emplace_back(work);
      return;



      task_available.notify_one();

      private:
      std::atomic_bool stop;
      std::atomic_size_t busy_workers;
      std::vector<std::thread> workers;
      std::function<void()> current_task;
      std::condition_variable task_available;
      std::mutex task_mu;
      std::mutex workers_mu;

      std::function<void()> work = [&]()
      while (true)
      std::unique_lock<std::mutex> task_lock task_mu ;
      task_available.wait(task_lock, [&]() return current_task );

      if (!current_task && stop) return;

      auto task = std::move(current_task);
      task_lock.unlock();

      task();
      busy_workers--;

      ;
      ;


      The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.







      share|improve this question











      I have made a thread pool which will dynamically create threads based on how many you need.



      #pragma once

      #include <thread>
      #include <atomic>
      #include <mutex>
      #include <vector>
      #include <functional>
      #include <condition_variable>

      class thread_pool
      public:
      thread_pool() : stop(false), busy_workers(0)

      ~thread_pool()
      stop = true;
      task_available.notify_all();
      for (auto& worker : workers)
      if (worker.joinable())
      worker.join();




      void run_task(std::function<void()> task)

      std::lock_guard<std::mutex> task_lock task_mu ;
      current_task = std::move(task);



      std::lock_guard<std::mutex> workers_lock workers_mu ;
      if (workers.size() == busy_workers++)
      workers.emplace_back(work);
      return;



      task_available.notify_one();

      private:
      std::atomic_bool stop;
      std::atomic_size_t busy_workers;
      std::vector<std::thread> workers;
      std::function<void()> current_task;
      std::condition_variable task_available;
      std::mutex task_mu;
      std::mutex workers_mu;

      std::function<void()> work = [&]()
      while (true)
      std::unique_lock<std::mutex> task_lock task_mu ;
      task_available.wait(task_lock, [&]() return current_task );

      if (!current_task && stop) return;

      auto task = std::move(current_task);
      task_lock.unlock();

      task();
      busy_workers--;

      ;
      ;


      The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.









      share|improve this question










      share|improve this question




      share|improve this question









      asked Jan 7 at 22:28









      Michael Smith

      1508




      1508




















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          3
          down vote



          accepted










          Bugs



          You have several race conditions:



          There is no guarantee that a worker stays joinable between these two calls.



           if (worker.joinable()) {
          worker.join();


          This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.



           
          std::lock_guard<std::mutex> task_lock task_mu ;
          // There is no guarantee that `current_task` stays the same
          // between here and when you create a new worker thread.
          // As soon as the scope is exited another thread can enter
          // and overwrite `current_task` before the first thread
          // pushes `work` onto the thread queue or an existing worker
          // picks it up
          current_task = std::move(task);



          Design



          Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.



          I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).






          share|improve this answer





















            Your Answer




            StackExchange.ifUsing("editor", function ()
            return StackExchange.using("mathjaxEditing", function ()
            StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
            StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
            );
            );
            , "mathjax-editing");

            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "196"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            convertImagesToLinks: false,
            noModals: false,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: null,
            bindNavPrevention: true,
            postfix: "",
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );








             

            draft saved


            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184538%2fdynamic-thread-pool-in-c%23new-answer', 'question_page');

            );

            Post as a guest






























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            3
            down vote



            accepted










            Bugs



            You have several race conditions:



            There is no guarantee that a worker stays joinable between these two calls.



             if (worker.joinable()) {
            worker.join();


            This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.



             
            std::lock_guard<std::mutex> task_lock task_mu ;
            // There is no guarantee that `current_task` stays the same
            // between here and when you create a new worker thread.
            // As soon as the scope is exited another thread can enter
            // and overwrite `current_task` before the first thread
            // pushes `work` onto the thread queue or an existing worker
            // picks it up
            current_task = std::move(task);



            Design



            Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.



            I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).






            share|improve this answer

























              up vote
              3
              down vote



              accepted










              Bugs



              You have several race conditions:



              There is no guarantee that a worker stays joinable between these two calls.



               if (worker.joinable()) {
              worker.join();


              This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.



               
              std::lock_guard<std::mutex> task_lock task_mu ;
              // There is no guarantee that `current_task` stays the same
              // between here and when you create a new worker thread.
              // As soon as the scope is exited another thread can enter
              // and overwrite `current_task` before the first thread
              // pushes `work` onto the thread queue or an existing worker
              // picks it up
              current_task = std::move(task);



              Design



              Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.



              I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).






              share|improve this answer























                up vote
                3
                down vote



                accepted







                up vote
                3
                down vote



                accepted






                Bugs



                You have several race conditions:



                There is no guarantee that a worker stays joinable between these two calls.



                 if (worker.joinable()) {
                worker.join();


                This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.



                 
                std::lock_guard<std::mutex> task_lock task_mu ;
                // There is no guarantee that `current_task` stays the same
                // between here and when you create a new worker thread.
                // As soon as the scope is exited another thread can enter
                // and overwrite `current_task` before the first thread
                // pushes `work` onto the thread queue or an existing worker
                // picks it up
                current_task = std::move(task);



                Design



                Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.



                I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).






                share|improve this answer













                Bugs



                You have several race conditions:



                There is no guarantee that a worker stays joinable between these two calls.



                 if (worker.joinable()) {
                worker.join();


                This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.



                 
                std::lock_guard<std::mutex> task_lock task_mu ;
                // There is no guarantee that `current_task` stays the same
                // between here and when you create a new worker thread.
                // As soon as the scope is exited another thread can enter
                // and overwrite `current_task` before the first thread
                // pushes `work` onto the thread queue or an existing worker
                // picks it up
                current_task = std::move(task);



                Design



                Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.



                I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).







                share|improve this answer













                share|improve this answer



                share|improve this answer











                answered Jan 8 at 20:16









                Martin York

                70.9k481244




                70.9k481244






















                     

                    draft saved


                    draft discarded


























                     


                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184538%2fdynamic-thread-pool-in-c%23new-answer', 'question_page');

                    );

                    Post as a guest













































































                    Popular posts from this blog

                    Greedy Best First Search implementation in Rust

                    Function to Return a JSON Like Objects Using VBA Collections and Arrays

                    C++11 CLH Lock Implementation