Processing a file using Producer/Consumer Model

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
3
down vote

favorite
1












I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which reads lines from a file and puts them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue, and store the results in a new file.



Please give me some feedback on what I should consider to achieve high performance. I've spent weeks reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feedback, specifically:



  • What type of BlockingQueue implementations should I use for better performance? I can't use a fixed-size BlockingQueue because we don't know how many lines the file has. Or should I use it even if the Producer will be locked?
    (if the queue is full)


  • If f() is the method that the producers use to process the file lines; knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because other Consumers will have to wait for the release of the lock.


Here is my code:



class Producer implements Runnable 
private String location;
private BlockingQueue<String> blockingQueue;

private float numline=0;


protected transient BufferedReader bufferedReader;
protected transient BufferedWriter bufferedWriter;


public Producer (String location, BlockingQueue<String> blockingQueue)
this.location=location;
this.blockingQueue=blockingQueue;

try
bufferedReader = new BufferedReader(new FileReader(location));

// Create the file where the processed lines will be stored
createCluster();

catch (FileNotFoundException e1)
e1.printStackTrace();



@Override
public void run()
String line=null;
try
while ((line = bufferedReader.readLine()) != null)
// Count the read lines
numline++;
blockingQueue.put(line);

catch (IOException e)
System.out.println("Problem reading the log file!");
e.printStackTrace();
catch (InterruptedException e)
e.printStackTrace();




public void createCluster ()
try
String clusterName=location+".csv";
bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
bufferedWriter.write("n");
catch (IOException e)
e.printStackTrace();






And this is the Consumer, where multiple threads will take results from the BlockingQueue and do some processing (f()), and then store the results in a new file:



class Consumer implements Runnable 
private String location;
private BlockingQueue<String> blockingQueue;

protected transient BufferedWriter bufferedWriter;

private String clusterName;

public Consumer (String location, BlockingQueue<String> blockingQueue)
this.blockingQueue=blockingQueue;
this.location=location;

clusterName=location+".csv";


@Override
public void run()
while (true)
try
//Retrieve the lines
String line = blockingQueue.take();
// Call result=f(line)
// TO DO
//
//bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
//BufferedWriter.write(result+ "n");

catch (InterruptedException e)
e.printStackTrace();






And the code in my main class, which uses 1 producer and 3 consumers:



BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

Producer readingThread = new Producer(location, queue);
new Thread(readingThread).start();

Consumer normalizers = new Consumer(location,queue);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++)
executor.submit(normalizers);

System.out.println("Stopped");
executor.shutdown();


Finally, this post really confused me. It suggests that if consumers store the results in a file, it will slow down the process. This might be a problem because I want performance and speed.







share|improve this question



























    up vote
    3
    down vote

    favorite
    1












    I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which reads lines from a file and puts them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue, and store the results in a new file.



    Please give me some feedback on what I should consider to achieve high performance. I've spent weeks reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feedback, specifically:



    • What type of BlockingQueue implementations should I use for better performance? I can't use a fixed-size BlockingQueue because we don't know how many lines the file has. Or should I use it even if the Producer will be locked?
      (if the queue is full)


    • If f() is the method that the producers use to process the file lines; knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because other Consumers will have to wait for the release of the lock.


    Here is my code:



    class Producer implements Runnable 
    private String location;
    private BlockingQueue<String> blockingQueue;

    private float numline=0;


    protected transient BufferedReader bufferedReader;
    protected transient BufferedWriter bufferedWriter;


    public Producer (String location, BlockingQueue<String> blockingQueue)
    this.location=location;
    this.blockingQueue=blockingQueue;

    try
    bufferedReader = new BufferedReader(new FileReader(location));

    // Create the file where the processed lines will be stored
    createCluster();

    catch (FileNotFoundException e1)
    e1.printStackTrace();



    @Override
    public void run()
    String line=null;
    try
    while ((line = bufferedReader.readLine()) != null)
    // Count the read lines
    numline++;
    blockingQueue.put(line);

    catch (IOException e)
    System.out.println("Problem reading the log file!");
    e.printStackTrace();
    catch (InterruptedException e)
    e.printStackTrace();




    public void createCluster ()
    try
    String clusterName=location+".csv";
    bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
    bufferedWriter.write("n");
    catch (IOException e)
    e.printStackTrace();






    And this is the Consumer, where multiple threads will take results from the BlockingQueue and do some processing (f()), and then store the results in a new file:



    class Consumer implements Runnable 
    private String location;
    private BlockingQueue<String> blockingQueue;

    protected transient BufferedWriter bufferedWriter;

    private String clusterName;

    public Consumer (String location, BlockingQueue<String> blockingQueue)
    this.blockingQueue=blockingQueue;
    this.location=location;

    clusterName=location+".csv";


    @Override
    public void run()
    while (true)
    try
    //Retrieve the lines
    String line = blockingQueue.take();
    // Call result=f(line)
    // TO DO
    //
    //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
    //BufferedWriter.write(result+ "n");

    catch (InterruptedException e)
    e.printStackTrace();






    And the code in my main class, which uses 1 producer and 3 consumers:



    BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

    Producer readingThread = new Producer(location, queue);
    new Thread(readingThread).start();

    Consumer normalizers = new Consumer(location,queue);
    ExecutorService executor = Executors.newFixedThreadPool(3);
    for (int i = 1; i <= 3; i++)
    executor.submit(normalizers);

    System.out.println("Stopped");
    executor.shutdown();


    Finally, this post really confused me. It suggests that if consumers store the results in a file, it will slow down the process. This might be a problem because I want performance and speed.







    share|improve this question























      up vote
      3
      down vote

      favorite
      1









      up vote
      3
      down vote

      favorite
      1






      1





      I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which reads lines from a file and puts them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue, and store the results in a new file.



      Please give me some feedback on what I should consider to achieve high performance. I've spent weeks reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feedback, specifically:



      • What type of BlockingQueue implementations should I use for better performance? I can't use a fixed-size BlockingQueue because we don't know how many lines the file has. Or should I use it even if the Producer will be locked?
        (if the queue is full)


      • If f() is the method that the producers use to process the file lines; knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because other Consumers will have to wait for the release of the lock.


      Here is my code:



      class Producer implements Runnable 
      private String location;
      private BlockingQueue<String> blockingQueue;

      private float numline=0;


      protected transient BufferedReader bufferedReader;
      protected transient BufferedWriter bufferedWriter;


      public Producer (String location, BlockingQueue<String> blockingQueue)
      this.location=location;
      this.blockingQueue=blockingQueue;

      try
      bufferedReader = new BufferedReader(new FileReader(location));

      // Create the file where the processed lines will be stored
      createCluster();

      catch (FileNotFoundException e1)
      e1.printStackTrace();



      @Override
      public void run()
      String line=null;
      try
      while ((line = bufferedReader.readLine()) != null)
      // Count the read lines
      numline++;
      blockingQueue.put(line);

      catch (IOException e)
      System.out.println("Problem reading the log file!");
      e.printStackTrace();
      catch (InterruptedException e)
      e.printStackTrace();




      public void createCluster ()
      try
      String clusterName=location+".csv";
      bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
      bufferedWriter.write("n");
      catch (IOException e)
      e.printStackTrace();






      And this is the Consumer, where multiple threads will take results from the BlockingQueue and do some processing (f()), and then store the results in a new file:



      class Consumer implements Runnable 
      private String location;
      private BlockingQueue<String> blockingQueue;

      protected transient BufferedWriter bufferedWriter;

      private String clusterName;

      public Consumer (String location, BlockingQueue<String> blockingQueue)
      this.blockingQueue=blockingQueue;
      this.location=location;

      clusterName=location+".csv";


      @Override
      public void run()
      while (true)
      try
      //Retrieve the lines
      String line = blockingQueue.take();
      // Call result=f(line)
      // TO DO
      //
      //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
      //BufferedWriter.write(result+ "n");

      catch (InterruptedException e)
      e.printStackTrace();






      And the code in my main class, which uses 1 producer and 3 consumers:



      BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

      Producer readingThread = new Producer(location, queue);
      new Thread(readingThread).start();

      Consumer normalizers = new Consumer(location,queue);
      ExecutorService executor = Executors.newFixedThreadPool(3);
      for (int i = 1; i <= 3; i++)
      executor.submit(normalizers);

      System.out.println("Stopped");
      executor.shutdown();


      Finally, this post really confused me. It suggests that if consumers store the results in a file, it will slow down the process. This might be a problem because I want performance and speed.







      share|improve this question













      I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which reads lines from a file and puts them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue, and store the results in a new file.



      Please give me some feedback on what I should consider to achieve high performance. I've spent weeks reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feedback, specifically:



      • What type of BlockingQueue implementations should I use for better performance? I can't use a fixed-size BlockingQueue because we don't know how many lines the file has. Or should I use it even if the Producer will be locked?
        (if the queue is full)


      • If f() is the method that the producers use to process the file lines; knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because other Consumers will have to wait for the release of the lock.


      Here is my code:



      class Producer implements Runnable 
      private String location;
      private BlockingQueue<String> blockingQueue;

      private float numline=0;


      protected transient BufferedReader bufferedReader;
      protected transient BufferedWriter bufferedWriter;


      public Producer (String location, BlockingQueue<String> blockingQueue)
      this.location=location;
      this.blockingQueue=blockingQueue;

      try
      bufferedReader = new BufferedReader(new FileReader(location));

      // Create the file where the processed lines will be stored
      createCluster();

      catch (FileNotFoundException e1)
      e1.printStackTrace();



      @Override
      public void run()
      String line=null;
      try
      while ((line = bufferedReader.readLine()) != null)
      // Count the read lines
      numline++;
      blockingQueue.put(line);

      catch (IOException e)
      System.out.println("Problem reading the log file!");
      e.printStackTrace();
      catch (InterruptedException e)
      e.printStackTrace();




      public void createCluster ()
      try
      String clusterName=location+".csv";
      bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
      bufferedWriter.write("n");
      catch (IOException e)
      e.printStackTrace();






      And this is the Consumer, where multiple threads will take results from the BlockingQueue and do some processing (f()), and then store the results in a new file:



      class Consumer implements Runnable 
      private String location;
      private BlockingQueue<String> blockingQueue;

      protected transient BufferedWriter bufferedWriter;

      private String clusterName;

      public Consumer (String location, BlockingQueue<String> blockingQueue)
      this.blockingQueue=blockingQueue;
      this.location=location;

      clusterName=location+".csv";


      @Override
      public void run()
      while (true)
      try
      //Retrieve the lines
      String line = blockingQueue.take();
      // Call result=f(line)
      // TO DO
      //
      //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
      //BufferedWriter.write(result+ "n");

      catch (InterruptedException e)
      e.printStackTrace();






      And the code in my main class, which uses 1 producer and 3 consumers:



      BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

      Producer readingThread = new Producer(location, queue);
      new Thread(readingThread).start();

      Consumer normalizers = new Consumer(location,queue);
      ExecutorService executor = Executors.newFixedThreadPool(3);
      for (int i = 1; i <= 3; i++)
      executor.submit(normalizers);

      System.out.println("Stopped");
      executor.shutdown();


      Finally, this post really confused me. It suggests that if consumers store the results in a file, it will slow down the process. This might be a problem because I want performance and speed.









      share|improve this question












      share|improve this question




      share|improve this question








      edited Jun 10 at 17:00









      Daniel

      4,1132836




      4,1132836









      asked Jun 10 at 14:33









      U. User

      185




      185




















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          1
          down vote



          accepted










          First of all, using a fixed size blocking queue is totally OK. Yes, eventually your producer might wait for space to be available in the queue, but this is exactly the scenario: if the consumers are "fast enough" to keep the queue basically empty, your producer can go full speed. If the consumers run at maximum capacity and still cannot hold up with the queue, wait for them instead of risking a memory overflow.



          Regarding the speed: it does not matter. As long as I/O is involved, from a CPU perspective, the process is so slow that you will not ever measure a significant difference between queue implementations.



          Regarding the complete process: as mentioned in the answer to the post you referred to, using multiple threads to write a single file is a bad idea and pure looking for trouble. Thus, if you have a scenario where your f() function is CPU intense and benefits from multiple processors, you should rather send your outputs to a single consumer using a second queue.



           Producer (reads file)
          |
          [Queue 1]
          |
          V
          multiple consumers (in memory processing)
          |
          [Queue 2]
          |
          V
          Single Consumer (write output file)


          That single consumer should keep the file open until the process is finished to benefit the most from I/O-buffering. Note however, that in such a setup it might actually be a challenge to signal the end of the process. It may prove difficult to "know" when the input file has been fully read AND the queue is empty AND all processing units are idle.



          One general note: as you mentioned performance multiple times and this is a common beginner's mistake, please search for "premature optimization is the root of all evil" (the words of guru Knuth about this and quite a number of discussion sites.) Basically: make it working first, measure, check your algorithms, and only as a last resort introduce complicated stuff like multiprocessing. Do you actually have a performance problem? Might it be concerned with a sub-optimal algorithm in function f()? (Codereview might help here ;-)) If not: stick to single threading, or maybe to streams using a parallel stream. Do not do this for some imagined problem without measurements to back up your view.






          share|improve this answer





















          • Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
            – U. User
            Jun 12 at 14:04











          • Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
            – mtj
            Jun 12 at 20:06










          • So what do you propose to solve this issue?
            – U. User
            Jun 12 at 20:35






          • 1




            Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
            – mtj
            Jun 13 at 5:21










          • Nice! Thank you very much.
            – U. User
            Jun 13 at 12:38










          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f196233%2fprocessing-a-file-using-producer-consumer-model%23new-answer', 'question_page');

          );

          Post as a guest






























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          1
          down vote



          accepted










          First of all, using a fixed size blocking queue is totally OK. Yes, eventually your producer might wait for space to be available in the queue, but this is exactly the scenario: if the consumers are "fast enough" to keep the queue basically empty, your producer can go full speed. If the consumers run at maximum capacity and still cannot hold up with the queue, wait for them instead of risking a memory overflow.



          Regarding the speed: it does not matter. As long as I/O is involved, from a CPU perspective, the process is so slow that you will not ever measure a significant difference between queue implementations.



          Regarding the complete process: as mentioned in the answer to the post you referred to, using multiple threads to write a single file is a bad idea and pure looking for trouble. Thus, if you have a scenario where your f() function is CPU intense and benefits from multiple processors, you should rather send your outputs to a single consumer using a second queue.



           Producer (reads file)
          |
          [Queue 1]
          |
          V
          multiple consumers (in memory processing)
          |
          [Queue 2]
          |
          V
          Single Consumer (write output file)


          That single consumer should keep the file open until the process is finished to benefit the most from I/O-buffering. Note however, that in such a setup it might actually be a challenge to signal the end of the process. It may prove difficult to "know" when the input file has been fully read AND the queue is empty AND all processing units are idle.



          One general note: as you mentioned performance multiple times and this is a common beginner's mistake, please search for "premature optimization is the root of all evil" (the words of guru Knuth about this and quite a number of discussion sites.) Basically: make it working first, measure, check your algorithms, and only as a last resort introduce complicated stuff like multiprocessing. Do you actually have a performance problem? Might it be concerned with a sub-optimal algorithm in function f()? (Codereview might help here ;-)) If not: stick to single threading, or maybe to streams using a parallel stream. Do not do this for some imagined problem without measurements to back up your view.






          share|improve this answer





















          • Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
            – U. User
            Jun 12 at 14:04











          • Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
            – mtj
            Jun 12 at 20:06










          • So what do you propose to solve this issue?
            – U. User
            Jun 12 at 20:35






          • 1




            Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
            – mtj
            Jun 13 at 5:21










          • Nice! Thank you very much.
            – U. User
            Jun 13 at 12:38














          up vote
          1
          down vote



          accepted










          First of all, using a fixed size blocking queue is totally OK. Yes, eventually your producer might wait for space to be available in the queue, but this is exactly the scenario: if the consumers are "fast enough" to keep the queue basically empty, your producer can go full speed. If the consumers run at maximum capacity and still cannot hold up with the queue, wait for them instead of risking a memory overflow.



          Regarding the speed: it does not matter. As long as I/O is involved, from a CPU perspective, the process is so slow that you will not ever measure a significant difference between queue implementations.



          Regarding the complete process: as mentioned in the answer to the post you referred to, using multiple threads to write a single file is a bad idea and pure looking for trouble. Thus, if you have a scenario where your f() function is CPU intense and benefits from multiple processors, you should rather send your outputs to a single consumer using a second queue.



           Producer (reads file)
          |
          [Queue 1]
          |
          V
          multiple consumers (in memory processing)
          |
          [Queue 2]
          |
          V
          Single Consumer (write output file)


          That single consumer should keep the file open until the process is finished to benefit the most from I/O-buffering. Note however, that in such a setup it might actually be a challenge to signal the end of the process. It may prove difficult to "know" when the input file has been fully read AND the queue is empty AND all processing units are idle.



          One general note: as you mentioned performance multiple times and this is a common beginner's mistake, please search for "premature optimization is the root of all evil" (the words of guru Knuth about this and quite a number of discussion sites.) Basically: make it working first, measure, check your algorithms, and only as a last resort introduce complicated stuff like multiprocessing. Do you actually have a performance problem? Might it be concerned with a sub-optimal algorithm in function f()? (Codereview might help here ;-)) If not: stick to single threading, or maybe to streams using a parallel stream. Do not do this for some imagined problem without measurements to back up your view.






          share|improve this answer





















          • Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
            – U. User
            Jun 12 at 14:04











          • Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
            – mtj
            Jun 12 at 20:06










          • So what do you propose to solve this issue?
            – U. User
            Jun 12 at 20:35






          • 1




            Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
            – mtj
            Jun 13 at 5:21










          • Nice! Thank you very much.
            – U. User
            Jun 13 at 12:38












          up vote
          1
          down vote



          accepted







          up vote
          1
          down vote



          accepted






          First of all, using a fixed size blocking queue is totally OK. Yes, eventually your producer might wait for space to be available in the queue, but this is exactly the scenario: if the consumers are "fast enough" to keep the queue basically empty, your producer can go full speed. If the consumers run at maximum capacity and still cannot hold up with the queue, wait for them instead of risking a memory overflow.



          Regarding the speed: it does not matter. As long as I/O is involved, from a CPU perspective, the process is so slow that you will not ever measure a significant difference between queue implementations.



          Regarding the complete process: as mentioned in the answer to the post you referred to, using multiple threads to write a single file is a bad idea and pure looking for trouble. Thus, if you have a scenario where your f() function is CPU intense and benefits from multiple processors, you should rather send your outputs to a single consumer using a second queue.



           Producer (reads file)
          |
          [Queue 1]
          |
          V
          multiple consumers (in memory processing)
          |
          [Queue 2]
          |
          V
          Single Consumer (write output file)


          That single consumer should keep the file open until the process is finished to benefit the most from I/O-buffering. Note however, that in such a setup it might actually be a challenge to signal the end of the process. It may prove difficult to "know" when the input file has been fully read AND the queue is empty AND all processing units are idle.



          One general note: as you mentioned performance multiple times and this is a common beginner's mistake, please search for "premature optimization is the root of all evil" (the words of guru Knuth about this and quite a number of discussion sites.) Basically: make it working first, measure, check your algorithms, and only as a last resort introduce complicated stuff like multiprocessing. Do you actually have a performance problem? Might it be concerned with a sub-optimal algorithm in function f()? (Codereview might help here ;-)) If not: stick to single threading, or maybe to streams using a parallel stream. Do not do this for some imagined problem without measurements to back up your view.






          share|improve this answer













          First of all, using a fixed size blocking queue is totally OK. Yes, eventually your producer might wait for space to be available in the queue, but this is exactly the scenario: if the consumers are "fast enough" to keep the queue basically empty, your producer can go full speed. If the consumers run at maximum capacity and still cannot hold up with the queue, wait for them instead of risking a memory overflow.



          Regarding the speed: it does not matter. As long as I/O is involved, from a CPU perspective, the process is so slow that you will not ever measure a significant difference between queue implementations.



          Regarding the complete process: as mentioned in the answer to the post you referred to, using multiple threads to write a single file is a bad idea and pure looking for trouble. Thus, if you have a scenario where your f() function is CPU intense and benefits from multiple processors, you should rather send your outputs to a single consumer using a second queue.



           Producer (reads file)
          |
          [Queue 1]
          |
          V
          multiple consumers (in memory processing)
          |
          [Queue 2]
          |
          V
          Single Consumer (write output file)


          That single consumer should keep the file open until the process is finished to benefit the most from I/O-buffering. Note however, that in such a setup it might actually be a challenge to signal the end of the process. It may prove difficult to "know" when the input file has been fully read AND the queue is empty AND all processing units are idle.



          One general note: as you mentioned performance multiple times and this is a common beginner's mistake, please search for "premature optimization is the root of all evil" (the words of guru Knuth about this and quite a number of discussion sites.) Basically: make it working first, measure, check your algorithms, and only as a last resort introduce complicated stuff like multiprocessing. Do you actually have a performance problem? Might it be concerned with a sub-optimal algorithm in function f()? (Codereview might help here ;-)) If not: stick to single threading, or maybe to streams using a parallel stream. Do not do this for some imagined problem without measurements to back up your view.







          share|improve this answer













          share|improve this answer



          share|improve this answer











          answered Jun 11 at 11:42









          mtj

          2,675212




          2,675212











          • Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
            – U. User
            Jun 12 at 14:04











          • Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
            – mtj
            Jun 12 at 20:06










          • So what do you propose to solve this issue?
            – U. User
            Jun 12 at 20:35






          • 1




            Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
            – mtj
            Jun 13 at 5:21










          • Nice! Thank you very much.
            – U. User
            Jun 13 at 12:38
















          • Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
            – U. User
            Jun 12 at 14:04











          • Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
            – mtj
            Jun 12 at 20:06










          • So what do you propose to solve this issue?
            – U. User
            Jun 12 at 20:35






          • 1




            Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
            – mtj
            Jun 13 at 5:21










          • Nice! Thank you very much.
            – U. User
            Jun 13 at 12:38















          Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
          – U. User
          Jun 12 at 14:04





          Sorry for the late replay, multi-threading is hard to program especially for a beginner like me. Thank for your answer !!!! Regarding your remark about the problem in the SinglerConsumer and the second Queue in which it hard for the SinglerConsumer to "know" when the Consumers have done consuming the first Queue. I've encountered a similar problem in the first Queue and theProducer. To solve that I've used a Poison Pill that the Producer puts on the first Queue to let know the Consumers that is has finished Producing. Is this a solution to for the second scenario ?
          – U. User
          Jun 12 at 14:04













          Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
          – mtj
          Jun 12 at 20:06




          Not really. As you cannot know which consumer will grab the poison pill or end-marker (or even any number of end-markers) you might just end up passing the end-markers through consumer 1 while consumers 2 and 3 are still busy with their respective tasks.
          – mtj
          Jun 12 at 20:06












          So what do you propose to solve this issue?
          – U. User
          Jun 12 at 20:35




          So what do you propose to solve this issue?
          – U. User
          Jun 12 at 20:35




          1




          1




          Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
          – mtj
          Jun 13 at 5:21




          Hmm... I guess I misunderstood the "poison pill"... If the meaning is, that the consumer terminates after receiving this message, yes, it should indeed work. Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
          – mtj
          Jun 13 at 5:21












          Nice! Thank you very much.
          – U. User
          Jun 13 at 12:38




          Nice! Thank you very much.
          – U. User
          Jun 13 at 12:38












           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f196233%2fprocessing-a-file-using-producer-consumer-model%23new-answer', 'question_page');

          );

          Post as a guest













































































          Popular posts from this blog

          Python Lists

          Aion

          JavaScript Array Iteration Methods