pool of python coroutines

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
4
down vote

favorite












Task



control how much coroutines are running simultaneously with ability to stop the process from inside.



Use case



when scraping websites you want to control how much memory program consumes and how much load on target it produces.



Usage



  1. create an instance

  2. call attach_coroutines_generator to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links)

  3. call run method

More description



_sleep method, _do_stop and stop_gracefully all exists to allow code inside _runner method stop run method call (because we can't cancel future from its child).



When runner catches StopIterationError it is put on sleep for delay seconds.



When some "runner" realizes that every other runner is put on sleep, it updates _do_stop attribute so all runners will break their while-loops.



Questions



  • I'm not sure about appropriate naming and typing.

  • Security issues?

  • Is there other/better ways to do the same?

Code



import asyncio

class RunnersPool:

def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop

self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0

self._coroutines_generator = None
self._runners = None

# ----------------
# runner methods

async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1

async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return

self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1

# end
# ----------------

async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')

self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners

def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception

def cancel(self):
self._runners.cancel()

def stop_gracefully(self):
self._do_stop = True

def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator

@property
def pool_size(self) -> int:
return self._pool_size

@property
def delay(self) -> float:
return self._delay

@property
def is_running(self) -> bool:
return not self._do_stop

@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled

@property
def delayed_runners(self) -> int:
return self._delayed_counter

@property
def active_runners(self) -> int:
return self._active_counter


Example usage



class SmartQueue:

def __init__(self):
self.queue = collections.deque()

def put(self, x):
self.queue.append(x)

def get(self):
return self.queue.popleft()

def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration


async def main():
sq = SmartQueue()

async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))

for i in range(1, 10+1):
sq.put(cor(i))

r = RunnersPool(5)
r.attach_coroutines_generator(sq)

await r.run()

if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())






share|improve this question





















  • From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
    – C. Harley
    Aug 2 at 2:39










  • @C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
    – Illia Ananich
    Aug 2 at 16:16










  • Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
    – C. Harley
    Aug 3 at 2:08
















up vote
4
down vote

favorite












Task



control how much coroutines are running simultaneously with ability to stop the process from inside.



Use case



when scraping websites you want to control how much memory program consumes and how much load on target it produces.



Usage



  1. create an instance

  2. call attach_coroutines_generator to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links)

  3. call run method

More description



_sleep method, _do_stop and stop_gracefully all exists to allow code inside _runner method stop run method call (because we can't cancel future from its child).



When runner catches StopIterationError it is put on sleep for delay seconds.



When some "runner" realizes that every other runner is put on sleep, it updates _do_stop attribute so all runners will break their while-loops.



Questions



  • I'm not sure about appropriate naming and typing.

  • Security issues?

  • Is there other/better ways to do the same?

Code



import asyncio

class RunnersPool:

def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop

self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0

self._coroutines_generator = None
self._runners = None

# ----------------
# runner methods

async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1

async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return

self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1

# end
# ----------------

async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')

self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners

def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception

def cancel(self):
self._runners.cancel()

def stop_gracefully(self):
self._do_stop = True

def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator

@property
def pool_size(self) -> int:
return self._pool_size

@property
def delay(self) -> float:
return self._delay

@property
def is_running(self) -> bool:
return not self._do_stop

@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled

@property
def delayed_runners(self) -> int:
return self._delayed_counter

@property
def active_runners(self) -> int:
return self._active_counter


Example usage



class SmartQueue:

def __init__(self):
self.queue = collections.deque()

def put(self, x):
self.queue.append(x)

def get(self):
return self.queue.popleft()

def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration


async def main():
sq = SmartQueue()

async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))

for i in range(1, 10+1):
sq.put(cor(i))

r = RunnersPool(5)
r.attach_coroutines_generator(sq)

await r.run()

if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())






share|improve this question





















  • From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
    – C. Harley
    Aug 2 at 2:39










  • @C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
    – Illia Ananich
    Aug 2 at 16:16










  • Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
    – C. Harley
    Aug 3 at 2:08












up vote
4
down vote

favorite









up vote
4
down vote

favorite











Task



control how much coroutines are running simultaneously with ability to stop the process from inside.



Use case



when scraping websites you want to control how much memory program consumes and how much load on target it produces.



Usage



  1. create an instance

  2. call attach_coroutines_generator to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links)

  3. call run method

More description



_sleep method, _do_stop and stop_gracefully all exists to allow code inside _runner method stop run method call (because we can't cancel future from its child).



When runner catches StopIterationError it is put on sleep for delay seconds.



When some "runner" realizes that every other runner is put on sleep, it updates _do_stop attribute so all runners will break their while-loops.



Questions



  • I'm not sure about appropriate naming and typing.

  • Security issues?

  • Is there other/better ways to do the same?

Code



import asyncio

class RunnersPool:

def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop

self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0

self._coroutines_generator = None
self._runners = None

# ----------------
# runner methods

async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1

async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return

self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1

# end
# ----------------

async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')

self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners

def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception

def cancel(self):
self._runners.cancel()

def stop_gracefully(self):
self._do_stop = True

def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator

@property
def pool_size(self) -> int:
return self._pool_size

@property
def delay(self) -> float:
return self._delay

@property
def is_running(self) -> bool:
return not self._do_stop

@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled

@property
def delayed_runners(self) -> int:
return self._delayed_counter

@property
def active_runners(self) -> int:
return self._active_counter


Example usage



class SmartQueue:

def __init__(self):
self.queue = collections.deque()

def put(self, x):
self.queue.append(x)

def get(self):
return self.queue.popleft()

def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration


async def main():
sq = SmartQueue()

async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))

for i in range(1, 10+1):
sq.put(cor(i))

r = RunnersPool(5)
r.attach_coroutines_generator(sq)

await r.run()

if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())






share|improve this question













Task



control how much coroutines are running simultaneously with ability to stop the process from inside.



Use case



when scraping websites you want to control how much memory program consumes and how much load on target it produces.



Usage



  1. create an instance

  2. call attach_coroutines_generator to attach some generator which will create new coroutines from queue which is accessible from these coroutines (for example, every coroutine downloads html and queues new downloads for extracted links)

  3. call run method

More description



_sleep method, _do_stop and stop_gracefully all exists to allow code inside _runner method stop run method call (because we can't cancel future from its child).



When runner catches StopIterationError it is put on sleep for delay seconds.



When some "runner" realizes that every other runner is put on sleep, it updates _do_stop attribute so all runners will break their while-loops.



Questions



  • I'm not sure about appropriate naming and typing.

  • Security issues?

  • Is there other/better ways to do the same?

Code



import asyncio

class RunnersPool:

def __init__(self, pool_size: int, delay: float =1., enable_auto_stop=True):
self._pool_size = pool_size
self._delay = delay
self._auto_stop_enabled = enable_auto_stop

self._do_stop = False
self._active_counter = 0
self._delayed_counter = 0

self._coroutines_generator = None
self._runners = None

# ----------------
# runner methods

async def _runner(self) -> None:
self._active_counter += 1
while True:
try:
coroutine = next(self._coroutines_generator)
except StopIteration:
await self._sleep()
if not self.is_running and self._auto_stop_enabled:
break
else:
try:
await coroutine
except Exception as exc:
self.handle_exception(exc, coroutine)
self._active_counter -= 1

async def _sleep(self) -> None:
if self._delayed_counter + 1 == self._pool_size:
self.stop_gracefully()
return

self._delayed_counter += 1
await asyncio.sleep(self._delay)
self._delayed_counter -= 1

# end
# ----------------

async def run(self):
if not self._coroutines_generator:
raise RuntimeError('coroutines_generator not attached yet')

self._runners = asyncio.gather(*[
self._runner() for _ in range(self._pool_size)
])
await self._runners

def handle_exception(self, exception: Exception, coroutine: Coroutine):
raise exception

def cancel(self):
self._runners.cancel()

def stop_gracefully(self):
self._do_stop = True

def attach_coroutines_generator(self, coroutines_generator: Iterable[Coroutine]):
self._coroutines_generator = coroutines_generator

@property
def pool_size(self) -> int:
return self._pool_size

@property
def delay(self) -> float:
return self._delay

@property
def is_running(self) -> bool:
return not self._do_stop

@property
def auto_stop_enabled(self) -> bool:
return self._auto_stop_enabled

@property
def delayed_runners(self) -> int:
return self._delayed_counter

@property
def active_runners(self) -> int:
return self._active_counter


Example usage



class SmartQueue:

def __init__(self):
self.queue = collections.deque()

def put(self, x):
self.queue.append(x)

def get(self):
return self.queue.popleft()

def __next__(self):
if self.queue:
return self.get()
else:
raise StopIteration


async def main():
sq = SmartQueue()

async def cor(num):
await asyncio.sleep(1)
print(num)
if num > 0:
sq.put(cor(num - 1))

for i in range(1, 10+1):
sq.put(cor(i))

r = RunnersPool(5)
r.attach_coroutines_generator(sq)

await r.run()

if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())








share|improve this question












share|improve this question




share|improve this question








edited Aug 1 at 14:54









Ludisposed

5,66721656




5,66721656









asked Aug 1 at 14:27









Illia Ananich

734




734











  • From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
    – C. Harley
    Aug 2 at 2:39










  • @C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
    – Illia Ananich
    Aug 2 at 16:16










  • Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
    – C. Harley
    Aug 3 at 2:08
















  • From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
    – C. Harley
    Aug 2 at 2:39










  • @C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
    – Illia Ananich
    Aug 2 at 16:16










  • Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
    – C. Harley
    Aug 3 at 2:08















From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
– C. Harley
Aug 2 at 2:39




From experience, coroutines are not built to act like this; they're event-based processors (they process something when the next event fires off). You're adding all the functionality that is already inherent in threads. I also have a pet peeve (pithy, I know), people putting "smart" in front of things :-)
– C. Harley
Aug 2 at 2:39












@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
– Illia Ananich
Aug 2 at 16:16




@C.Harley Pool exists to delay coroutines execution by using queue. In case of scraping you have too many start urls to run them all simultaneously (because of a lot of connections and memory usage). And about "SmartQueue" - it's just a quck way to name something )
– Illia Ananich
Aug 2 at 16:16












Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
– C. Harley
Aug 3 at 2:08




Yes, I can see what you're doing, however I'm pointing out that Coroutines have a completely different intent and use. I'd recommend studying the difference and trying to do your assigned task using Threads. Give it a try and see the differences in task allocation, interaction with the queue and built-in functionality.
– C. Harley
Aug 3 at 2:08















active

oldest

votes











Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f200754%2fpool-of-python-coroutines%23new-answer', 'question_page');

);

Post as a guest



































active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes










 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f200754%2fpool-of-python-coroutines%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Greedy Best First Search implementation in Rust

Function to Return a JSON Like Objects Using VBA Collections and Arrays

C++11 CLH Lock Implementation