Pipe based multiprocess filter

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
2
down vote

favorite












This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.



from multiprocessing import Process, Pipe
from itertools import cycle, count

def distributer(f, data):
def worker(pipe):
while True:
piece = pipe.recv()
if f(piece):
pipe.send(piece)
pipe.send(False)

pipes =
for _ in range(8):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
pipes.append(parent_conn)
p.start()
parent_conn.send(next(data))

for pipe in cycle(pipes):
if pipe.poll(.0001):
ans = pipe.recv()
if ans:
yield ans
pipe.send(next(data))

if __name__ == '__main__':
def g(x):
return x%1000==0
data = count(1)
for ans in distributer(g, data):
print(ans)






share|improve this question



























    up vote
    2
    down vote

    favorite












    This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.



    from multiprocessing import Process, Pipe
    from itertools import cycle, count

    def distributer(f, data):
    def worker(pipe):
    while True:
    piece = pipe.recv()
    if f(piece):
    pipe.send(piece)
    pipe.send(False)

    pipes =
    for _ in range(8):
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    pipes.append(parent_conn)
    p.start()
    parent_conn.send(next(data))

    for pipe in cycle(pipes):
    if pipe.poll(.0001):
    ans = pipe.recv()
    if ans:
    yield ans
    pipe.send(next(data))

    if __name__ == '__main__':
    def g(x):
    return x%1000==0
    data = count(1)
    for ans in distributer(g, data):
    print(ans)






    share|improve this question























      up vote
      2
      down vote

      favorite









      up vote
      2
      down vote

      favorite











      This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.



      from multiprocessing import Process, Pipe
      from itertools import cycle, count

      def distributer(f, data):
      def worker(pipe):
      while True:
      piece = pipe.recv()
      if f(piece):
      pipe.send(piece)
      pipe.send(False)

      pipes =
      for _ in range(8):
      parent_conn, child_conn = Pipe()
      p = Process(target=worker, args=(child_conn,))
      pipes.append(parent_conn)
      p.start()
      parent_conn.send(next(data))

      for pipe in cycle(pipes):
      if pipe.poll(.0001):
      ans = pipe.recv()
      if ans:
      yield ans
      pipe.send(next(data))

      if __name__ == '__main__':
      def g(x):
      return x%1000==0
      data = count(1)
      for ans in distributer(g, data):
      print(ans)






      share|improve this question













      This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.



      from multiprocessing import Process, Pipe
      from itertools import cycle, count

      def distributer(f, data):
      def worker(pipe):
      while True:
      piece = pipe.recv()
      if f(piece):
      pipe.send(piece)
      pipe.send(False)

      pipes =
      for _ in range(8):
      parent_conn, child_conn = Pipe()
      p = Process(target=worker, args=(child_conn,))
      pipes.append(parent_conn)
      p.start()
      parent_conn.send(next(data))

      for pipe in cycle(pipes):
      if pipe.poll(.0001):
      ans = pipe.recv()
      if ans:
      yield ans
      pipe.send(next(data))

      if __name__ == '__main__':
      def g(x):
      return x%1000==0
      data = count(1)
      for ans in distributer(g, data):
      print(ans)








      share|improve this question












      share|improve this question




      share|improve this question








      edited Jan 8 at 3:14
























      asked Jan 8 at 1:59









      Oscar Smith

      2,625922




      2,625922




















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          2
          down vote













          I recommend a signature of:



          def distributer(f, data, num_workers=8):
          ...
          for _ in range(num_workers):


          Perhaps f is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.



           p = Process(target=worker, args=(child_conn,))


          Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc instead.



          Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker().



          Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.



          In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker() explaining the protocol it implements.






          share|improve this answer





















          • What do you mean by "what is interesting about the top level code doing busy work forever"
            – Oscar Smith
            Jan 8 at 3:22










          • Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
            – J_H
            Jan 9 at 4:56










          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184547%2fpipe-based-multiprocess-filter%23new-answer', 'question_page');

          );

          Post as a guest






























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          2
          down vote













          I recommend a signature of:



          def distributer(f, data, num_workers=8):
          ...
          for _ in range(num_workers):


          Perhaps f is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.



           p = Process(target=worker, args=(child_conn,))


          Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc instead.



          Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker().



          Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.



          In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker() explaining the protocol it implements.






          share|improve this answer





















          • What do you mean by "what is interesting about the top level code doing busy work forever"
            – Oscar Smith
            Jan 8 at 3:22










          • Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
            – J_H
            Jan 9 at 4:56














          up vote
          2
          down vote













          I recommend a signature of:



          def distributer(f, data, num_workers=8):
          ...
          for _ in range(num_workers):


          Perhaps f is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.



           p = Process(target=worker, args=(child_conn,))


          Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc instead.



          Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker().



          Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.



          In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker() explaining the protocol it implements.






          share|improve this answer





















          • What do you mean by "what is interesting about the top level code doing busy work forever"
            – Oscar Smith
            Jan 8 at 3:22










          • Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
            – J_H
            Jan 9 at 4:56












          up vote
          2
          down vote










          up vote
          2
          down vote









          I recommend a signature of:



          def distributer(f, data, num_workers=8):
          ...
          for _ in range(num_workers):


          Perhaps f is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.



           p = Process(target=worker, args=(child_conn,))


          Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc instead.



          Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker().



          Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.



          In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker() explaining the protocol it implements.






          share|improve this answer













          I recommend a signature of:



          def distributer(f, data, num_workers=8):
          ...
          for _ in range(num_workers):


          Perhaps f is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.



           p = Process(target=worker, args=(child_conn,))


          Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc instead.



          Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker().



          Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.



          In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker() explaining the protocol it implements.







          share|improve this answer













          share|improve this answer



          share|improve this answer











          answered Jan 8 at 3:09









          J_H

          4,317129




          4,317129











          • What do you mean by "what is interesting about the top level code doing busy work forever"
            – Oscar Smith
            Jan 8 at 3:22










          • Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
            – J_H
            Jan 9 at 4:56
















          • What do you mean by "what is interesting about the top level code doing busy work forever"
            – Oscar Smith
            Jan 8 at 3:22










          • Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
            – J_H
            Jan 9 at 4:56















          What do you mean by "what is interesting about the top level code doing busy work forever"
          – Oscar Smith
          Jan 8 at 3:22




          What do you mean by "what is interesting about the top level code doing busy work forever"
          – Oscar Smith
          Jan 8 at 3:22












          Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
          – J_H
          Jan 9 at 4:56




          Your code essentially does while (1) , but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
          – J_H
          Jan 9 at 4:56












           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184547%2fpipe-based-multiprocess-filter%23new-answer', 'question_page');

          );

          Post as a guest













































































          Popular posts from this blog

          Greedy Best First Search implementation in Rust

          Function to Return a JSON Like Objects Using VBA Collections and Arrays

          C++11 CLH Lock Implementation