pool of python coroutines
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
4
down vote
favorite
Task
control how much coroutines are running simultaneously with ability to stop the process from inside.
Use case
when scraping websites you want to control how much memory program consumes and how much load on target it produces.
Usage
- create an instance
- call
attach_coroutines_generator
to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links) - call
run
method
More description
_sleep
method, _do_stop
and stop_gracefully
all exists to allow code inside _runner
method stop run
method call (because we can't cancel future from its child).
When runner catches StopIterationError
it is put on sleep for delay
seconds.
When some "runner" realizes that every other runner is put on sleep, it updates _do_stop
attribute so all runners will break their while-loops.
Questions
- I'm not sure about appropriate naming and typing.
- Security issues?
- Is there other/better ways to do the same?
Code
import asyncio
class RunnersPool:
def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop
self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0
self._coroutines_generator = None
self._runners = None
# ----------------
# runner methods
async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1
async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return
self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1
# end
# ----------------
async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')
self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners
def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception
def cancel(self):
self._runners.cancel()
def stop_gracefully(self):
self._do_stop = True
def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator
@property
def pool_size(self) -> int:
return self._pool_size
@property
def delay(self) -> float:
return self._delay
@property
def is_running(self) -> bool:
return not self._do_stop
@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled
@property
def delayed_runners(self) -> int:
return self._delayed_counter
@property
def active_runners(self) -> int:
return self._active_counter
Example usage
class SmartQueue:
def __init__(self):
self.queue = collections.deque()
def put(self, x):
self.queue.append(x)
def get(self):
return self.queue.popleft()
def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration
async def main():
sq = SmartQueue()
async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))
for i in range(1, 10+1):
sq.put(cor(i))
r = RunnersPool(5)
r.attach_coroutines_generator(sq)
await r.run()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
python python-3.x async-await
add a comment |Â
up vote
4
down vote
favorite
Task
control how much coroutines are running simultaneously with ability to stop the process from inside.
Use case
when scraping websites you want to control how much memory program consumes and how much load on target it produces.
Usage
- create an instance
- call
attach_coroutines_generator
to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links) - call
run
method
More description
_sleep
method, _do_stop
and stop_gracefully
all exists to allow code inside _runner
method stop run
method call (because we can't cancel future from its child).
When runner catches StopIterationError
it is put on sleep for delay
seconds.
When some "runner" realizes that every other runner is put on sleep, it updates _do_stop
attribute so all runners will break their while-loops.
Questions
- I'm not sure about appropriate naming and typing.
- Security issues?
- Is there other/better ways to do the same?
Code
import asyncio
class RunnersPool:
def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop
self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0
self._coroutines_generator = None
self._runners = None
# ----------------
# runner methods
async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1
async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return
self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1
# end
# ----------------
async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')
self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners
def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception
def cancel(self):
self._runners.cancel()
def stop_gracefully(self):
self._do_stop = True
def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator
@property
def pool_size(self) -> int:
return self._pool_size
@property
def delay(self) -> float:
return self._delay
@property
def is_running(self) -> bool:
return not self._do_stop
@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled
@property
def delayed_runners(self) -> int:
return self._delayed_counter
@property
def active_runners(self) -> int:
return self._active_counter
Example usage
class SmartQueue:
def __init__(self):
self.queue = collections.deque()
def put(self, x):
self.queue.append(x)
def get(self):
return self.queue.popleft()
def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration
async def main():
sq = SmartQueue()
async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))
for i in range(1, 10+1):
sq.put(cor(i))
r = RunnersPool(5)
r.attach_coroutines_generator(sq)
await r.run()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
python python-3.x async-await
From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
â C. Harley
Aug 2 at 2:39
@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
â Illia Ananich
Aug 2 at 16:16
Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
â C. Harley
Aug 3 at 2:08
add a comment |Â
up vote
4
down vote
favorite
up vote
4
down vote
favorite
Task
control how much coroutines are running simultaneously with ability to stop the process from inside.
Use case
when scraping websites you want to control how much memory program consumes and how much load on target it produces.
Usage
- create an instance
- call
attach_coroutines_generator
to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links) - call
run
method
More description
_sleep
method, _do_stop
and stop_gracefully
all exists to allow code inside _runner
method stop run
method call (because we can't cancel future from its child).
When runner catches StopIterationError
it is put on sleep for delay
seconds.
When some "runner" realizes that every other runner is put on sleep, it updates _do_stop
attribute so all runners will break their while-loops.
Questions
- I'm not sure about appropriate naming and typing.
- Security issues?
- Is there other/better ways to do the same?
Code
import asyncio
class RunnersPool:
def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop
self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0
self._coroutines_generator = None
self._runners = None
# ----------------
# runner methods
async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1
async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return
self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1
# end
# ----------------
async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')
self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners
def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception
def cancel(self):
self._runners.cancel()
def stop_gracefully(self):
self._do_stop = True
def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator
@property
def pool_size(self) -> int:
return self._pool_size
@property
def delay(self) -> float:
return self._delay
@property
def is_running(self) -> bool:
return not self._do_stop
@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled
@property
def delayed_runners(self) -> int:
return self._delayed_counter
@property
def active_runners(self) -> int:
return self._active_counter
Example usage
class SmartQueue:
def __init__(self):
self.queue = collections.deque()
def put(self, x):
self.queue.append(x)
def get(self):
return self.queue.popleft()
def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration
async def main():
sq = SmartQueue()
async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))
for i in range(1, 10+1):
sq.put(cor(i))
r = RunnersPool(5)
r.attach_coroutines_generator(sq)
await r.run()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
python python-3.x async-await
Task
control how much coroutines are running simultaneously with ability to stop the process from inside.
Use case
when scraping websites you want to control how much memory program consumes and how much load on target it produces.
Usage
- create an instance
- call
attach_coroutines_generator
to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links) - call
run
method
More description
_sleep
method, _do_stop
and stop_gracefully
all exists to allow code inside _runner
method stop run
method call (because we can't cancel future from its child).
When runner catches StopIterationError
it is put on sleep for delay
seconds.
When some "runner" realizes that every other runner is put on sleep, it updates _do_stop
attribute so all runners will break their while-loops.
Questions
- I'm not sure about appropriate naming and typing.
- Security issues?
- Is there other/better ways to do the same?
Code
import asyncio
class RunnersPool:
def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop
self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0
self._coroutines_generator = None
self._runners = None
# ----------------
# runner methods
async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1
async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return
self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1
# end
# ----------------
async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')
self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners
def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception
def cancel(self):
self._runners.cancel()
def stop_gracefully(self):
self._do_stop = True
def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator
@property
def pool_size(self) -> int:
return self._pool_size
@property
def delay(self) -> float:
return self._delay
@property
def is_running(self) -> bool:
return not self._do_stop
@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled
@property
def delayed_runners(self) -> int:
return self._delayed_counter
@property
def active_runners(self) -> int:
return self._active_counter
Example usage
class SmartQueue:
def __init__(self):
self.queue = collections.deque()
def put(self, x):
self.queue.append(x)
def get(self):
return self.queue.popleft()
def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration
async def main():
sq = SmartQueue()
async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))
for i in range(1, 10+1):
sq.put(cor(i))
r = RunnersPool(5)
r.attach_coroutines_generator(sq)
await r.run()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
python python-3.x async-await
edited Aug 1 at 14:54
Ludisposed
5,66721656
5,66721656
asked Aug 1 at 14:27
Illia Ananich
734
734
From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
â C. Harley
Aug 2 at 2:39
@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
â Illia Ananich
Aug 2 at 16:16
Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
â C. Harley
Aug 3 at 2:08
add a comment |Â
From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
â C. Harley
Aug 2 at 2:39
@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
â Illia Ananich
Aug 2 at 16:16
Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
â C. Harley
Aug 3 at 2:08
From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
â C. Harley
Aug 2 at 2:39
From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
â C. Harley
Aug 2 at 2:39
@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
â Illia Ananich
Aug 2 at 16:16
@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
â Illia Ananich
Aug 2 at 16:16
Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
â C. Harley
Aug 3 at 2:08
Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
â C. Harley
Aug 3 at 2:08
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f200754%2fpool-of-python-coroutines%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
â C. Harley
Aug 2 at 2:39
@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
â Illia Ananich
Aug 2 at 16:16
Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
â C. Harley
Aug 3 at 2:08