Pipe based multiprocess filter
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
2
down vote
favorite
This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.
from multiprocessing import Process, Pipe
from itertools import cycle, count
def distributer(f, data):
def worker(pipe):
while True:
piece = pipe.recv()
if f(piece):
pipe.send(piece)
pipe.send(False)
pipes =
for _ in range(8):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
pipes.append(parent_conn)
p.start()
parent_conn.send(next(data))
for pipe in cycle(pipes):
if pipe.poll(.0001):
ans = pipe.recv()
if ans:
yield ans
pipe.send(next(data))
if __name__ == '__main__':
def g(x):
return x%1000==0
data = count(1)
for ans in distributer(g, data):
print(ans)
python multiprocessing
add a comment |Â
up vote
2
down vote
favorite
This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.
from multiprocessing import Process, Pipe
from itertools import cycle, count
def distributer(f, data):
def worker(pipe):
while True:
piece = pipe.recv()
if f(piece):
pipe.send(piece)
pipe.send(False)
pipes =
for _ in range(8):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
pipes.append(parent_conn)
p.start()
parent_conn.send(next(data))
for pipe in cycle(pipes):
if pipe.poll(.0001):
ans = pipe.recv()
if ans:
yield ans
pipe.send(next(data))
if __name__ == '__main__':
def g(x):
return x%1000==0
data = count(1)
for ans in distributer(g, data):
print(ans)
python multiprocessing
add a comment |Â
up vote
2
down vote
favorite
up vote
2
down vote
favorite
This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.
from multiprocessing import Process, Pipe
from itertools import cycle, count
def distributer(f, data):
def worker(pipe):
while True:
piece = pipe.recv()
if f(piece):
pipe.send(piece)
pipe.send(False)
pipes =
for _ in range(8):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
pipes.append(parent_conn)
p.start()
parent_conn.send(next(data))
for pipe in cycle(pipes):
if pipe.poll(.0001):
ans = pipe.recv()
if ans:
yield ans
pipe.send(next(data))
if __name__ == '__main__':
def g(x):
return x%1000==0
data = count(1)
for ans in distributer(g, data):
print(ans)
python multiprocessing
This code is a multi-threaded version of filter that uses pipes to communicate between the workers and the main thread. I am looking for any ways to make this code cleaner. I'm also interested in if there is anything I can do to make this play nicely with infinite iterators.
from multiprocessing import Process, Pipe
from itertools import cycle, count
def distributer(f, data):
def worker(pipe):
while True:
piece = pipe.recv()
if f(piece):
pipe.send(piece)
pipe.send(False)
pipes =
for _ in range(8):
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
pipes.append(parent_conn)
p.start()
parent_conn.send(next(data))
for pipe in cycle(pipes):
if pipe.poll(.0001):
ans = pipe.recv()
if ans:
yield ans
pipe.send(next(data))
if __name__ == '__main__':
def g(x):
return x%1000==0
data = count(1)
for ans in distributer(g, data):
print(ans)
python multiprocessing
edited Jan 8 at 3:14
asked Jan 8 at 1:59
Oscar Smith
2,625922
2,625922
add a comment |Â
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
2
down vote
I recommend a signature of:
def distributer(f, data, num_workers=8):
...
for _ in range(num_workers):
Perhaps f
is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.
p = Process(target=worker, args=(child_conn,))
Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc
instead.
Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker()
.
Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.
In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker()
explaining the protocol it implements.
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
Your code essentially doeswhile (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
â J_H
Jan 9 at 4:56
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
I recommend a signature of:
def distributer(f, data, num_workers=8):
...
for _ in range(num_workers):
Perhaps f
is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.
p = Process(target=worker, args=(child_conn,))
Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc
instead.
Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker()
.
Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.
In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker()
explaining the protocol it implements.
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
Your code essentially doeswhile (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
â J_H
Jan 9 at 4:56
add a comment |Â
up vote
2
down vote
I recommend a signature of:
def distributer(f, data, num_workers=8):
...
for _ in range(num_workers):
Perhaps f
is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.
p = Process(target=worker, args=(child_conn,))
Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc
instead.
Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker()
.
Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.
In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker()
explaining the protocol it implements.
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
Your code essentially doeswhile (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
â J_H
Jan 9 at 4:56
add a comment |Â
up vote
2
down vote
up vote
2
down vote
I recommend a signature of:
def distributer(f, data, num_workers=8):
...
for _ in range(num_workers):
Perhaps f
is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.
p = Process(target=worker, args=(child_conn,))
Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc
instead.
Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker()
.
Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.
In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker()
explaining the protocol it implements.
I recommend a signature of:
def distributer(f, data, num_workers=8):
...
for _ in range(num_workers):
Perhaps f
is the appropriate identifier. But it leaves me scratching my head a bit. I wouldn't mind a docstring explaining the function's responsibility.
p = Process(target=worker, args=(child_conn,))
Ordinarily this would be perfect. But given that you're also manipulating pipes, it wouldn't hurt to assign to proc
instead.
Your prose suggests that you're filtering, in this case picking out one value in a thousand. Perhaps the appropriate function name would have been filter_worker()
.
Phrasing it as "reduce" may have been clearer than "filter". For example, every thousandth row return sum of previous rows, that sort of thing, if the code is strictly for illustrative purposes.
In summary, as a reader I'm looking for a comment / docstring that explains what is interesting about the top level code doing busy work forever (I see no benchmark timings), and I'm interested in a docstring on worker()
explaining the protocol it implements.
answered Jan 8 at 3:09
J_H
4,317129
4,317129
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
Your code essentially doeswhile (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
â J_H
Jan 9 at 4:56
add a comment |Â
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
Your code essentially doeswhile (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.
â J_H
Jan 9 at 4:56
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
What do you mean by "what is interesting about the top level code doing busy work forever"
â Oscar Smith
Jan 8 at 3:22
Your code essentially does
while (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.â J_H
Jan 9 at 4:56
Your code essentially does
while (1)
, but distributes the work among multiple cores. What is missing is some motivating prose describing why this technique is of interest, why it might generalize to other problems that folks care about.â J_H
Jan 9 at 4:56
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184547%2fpipe-based-multiprocess-filter%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password