Python task scheduler
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
4
down vote
favorite
Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.
I used a min-heap to prioritize the next task, and a threading.Condition
to communicate between
- a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)
- the main thread, which writes to the min-heap
Scheduling is O(log n)
, cancellation is O(n)
, and getting the soonest task is O(1)
.
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Task = namedtuple('Task', ['start', 'name', 'fn'])
class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()
def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))
def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))
def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()
def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()
threading.Thread(target=run, name='timer').start()
def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')
start = datetime.now()
def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))
s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))
if __name__ == '__main__':
main()
Output:
â ~/c/dsa [10265e2] (masterâ¡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None
Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!
python multithreading
add a comment |Â
up vote
4
down vote
favorite
Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.
I used a min-heap to prioritize the next task, and a threading.Condition
to communicate between
- a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)
- the main thread, which writes to the min-heap
Scheduling is O(log n)
, cancellation is O(n)
, and getting the soonest task is O(1)
.
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Task = namedtuple('Task', ['start', 'name', 'fn'])
class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()
def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))
def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))
def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()
def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()
threading.Thread(target=run, name='timer').start()
def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')
start = datetime.now()
def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))
s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))
if __name__ == '__main__':
main()
Output:
â ~/c/dsa [10265e2] (masterâ¡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None
Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!
python multithreading
Looks like you got a good start. I'm assuming this is intended to run for Python 3?
â Mast
May 22 at 5:55
Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
â yangmillstheory
May 22 at 5:58
This was solved below by Gareth.
â yangmillstheory
May 22 at 15:56
If this is a problem from somewhere cab you please send the source?
â kuskmen
May 25 at 12:46
1
It's from EPI.
â yangmillstheory
May 25 at 15:26
add a comment |Â
up vote
4
down vote
favorite
up vote
4
down vote
favorite
Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.
I used a min-heap to prioritize the next task, and a threading.Condition
to communicate between
- a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)
- the main thread, which writes to the min-heap
Scheduling is O(log n)
, cancellation is O(n)
, and getting the soonest task is O(1)
.
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Task = namedtuple('Task', ['start', 'name', 'fn'])
class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()
def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))
def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))
def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()
def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()
threading.Thread(target=run, name='timer').start()
def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')
start = datetime.now()
def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))
s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))
if __name__ == '__main__':
main()
Output:
â ~/c/dsa [10265e2] (masterâ¡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None
Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!
python multithreading
Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.
I used a min-heap to prioritize the next task, and a threading.Condition
to communicate between
- a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)
- the main thread, which writes to the min-heap
Scheduling is O(log n)
, cancellation is O(n)
, and getting the soonest task is O(1)
.
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Task = namedtuple('Task', ['start', 'name', 'fn'])
class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()
def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))
def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))
def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()
def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()
threading.Thread(target=run, name='timer').start()
def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')
start = datetime.now()
def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))
s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))
if __name__ == '__main__':
main()
Output:
â ~/c/dsa [10265e2] (masterâ¡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None
Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!
python multithreading
asked May 22 at 5:44
yangmillstheory
1344
1344
Looks like you got a good start. I'm assuming this is intended to run for Python 3?
â Mast
May 22 at 5:55
Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
â yangmillstheory
May 22 at 5:58
This was solved below by Gareth.
â yangmillstheory
May 22 at 15:56
If this is a problem from somewhere cab you please send the source?
â kuskmen
May 25 at 12:46
1
It's from EPI.
â yangmillstheory
May 25 at 15:26
add a comment |Â
Looks like you got a good start. I'm assuming this is intended to run for Python 3?
â Mast
May 22 at 5:55
Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
â yangmillstheory
May 22 at 5:58
This was solved below by Gareth.
â yangmillstheory
May 22 at 15:56
If this is a problem from somewhere cab you please send the source?
â kuskmen
May 25 at 12:46
1
It's from EPI.
â yangmillstheory
May 25 at 15:26
Looks like you got a good start. I'm assuming this is intended to run for Python 3?
â Mast
May 22 at 5:55
Looks like you got a good start. I'm assuming this is intended to run for Python 3?
â Mast
May 22 at 5:55
Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
â yangmillstheory
May 22 at 5:58
Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
â yangmillstheory
May 22 at 5:58
This was solved below by Gareth.
â yangmillstheory
May 22 at 15:56
This was solved below by Gareth.
â yangmillstheory
May 22 at 15:56
If this is a problem from somewhere cab you please send the source?
â kuskmen
May 25 at 12:46
If this is a problem from somewhere cab you please send the source?
â kuskmen
May 25 at 12:46
1
1
It's from EPI.
â yangmillstheory
May 25 at 15:26
It's from EPI.
â yangmillstheory
May 25 at 15:26
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
3
down vote
accepted
1. Review
The docstrings could be improved. It would be nice if the docstring for the
Scheduler
class explained (briefly) how to use it, and there should be docstrings for theschedule
andcancel
methods.When using the
logging
module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:logger.info('waiting with timeout: '.format(self._timeout))
write:
logger.info("waiting with timeout %f", self._timeout)
If there are multiple tasks with the same name, then the
cancel
method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i)ÃÂ cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii)ÃÂ require all tasks in the queue to have unique names; or (iii)ÃÂ have the
schedule
method return some object representing the task, so that this can later be passed tocancel
to uniquely identify the particular task to be cancelled.Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i)ÃÂ having the
schedule
method return some object representing the task as suggesting above; (ii)ÃÂ passing this task object to thecancel
method; (iii)ÃÂ leaving the cancelled task in the heap but marking it as cancelled; (iv)ÃÂ discarding cancelled tasks when they are popped from the heap.The attributes of a
Task
arestart
,name
,fn
, but the arguments toschedule
arename
,fn
,start
. This kind of inconsistency risks confusion or error. (You needstart
to be first so that tasks compare according to their start times, but there are other ways to achieve this.)If you attempt to schedule two tasks with the same start time and the same name, then you get
TypeError: '<' not supported between instances of 'function' and 'function'
.The name
_minheap
describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like_tasks
.The
_timeout
attribute is only used in therun
function, so it could be a local variable there instead of an attribute on theScheduler
object.The
_get_next_timeout
method is only used in therun
function, so it could be a local function there instead of a method on theScheduler
class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)The
_start
method is only called in the__init__
method, so could be inlined there.The logic in
run
seems wrong to me. The following sequence of events is possible: (i)ÃÂ The timer thread calls_get_next_timeout
which returns a timeout of 10 seconds, say. (ii)ÃÂ The timer thread released the condition variable. (iii)ÃÂ Another thread callsschedule
with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv)ÃÂ The timer thread acquires the condition variable. (v)ÃÂ The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.
The use of
acquire
andrelease
on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.
2. Revised code
This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.
class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.
"""
@functools.total_ordering
class _Task:
"A scheduled task."
def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False
def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start
@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()
def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)
def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =
def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()
threading.Thread(target=run, name='Scheduler').start()
def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.
"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
Can thetimeout <= 0
condition ever be true?
â yangmillstheory
May 22 at 14:27
Yes, it is true whentasks[0]
is due (or overdue) to run.
â Gareth Rees
May 22 at 14:34
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
 |Â
show 3 more comments
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
1. Review
The docstrings could be improved. It would be nice if the docstring for the
Scheduler
class explained (briefly) how to use it, and there should be docstrings for theschedule
andcancel
methods.When using the
logging
module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:logger.info('waiting with timeout: '.format(self._timeout))
write:
logger.info("waiting with timeout %f", self._timeout)
If there are multiple tasks with the same name, then the
cancel
method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i)ÃÂ cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii)ÃÂ require all tasks in the queue to have unique names; or (iii)ÃÂ have the
schedule
method return some object representing the task, so that this can later be passed tocancel
to uniquely identify the particular task to be cancelled.Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i)ÃÂ having the
schedule
method return some object representing the task as suggesting above; (ii)ÃÂ passing this task object to thecancel
method; (iii)ÃÂ leaving the cancelled task in the heap but marking it as cancelled; (iv)ÃÂ discarding cancelled tasks when they are popped from the heap.The attributes of a
Task
arestart
,name
,fn
, but the arguments toschedule
arename
,fn
,start
. This kind of inconsistency risks confusion or error. (You needstart
to be first so that tasks compare according to their start times, but there are other ways to achieve this.)If you attempt to schedule two tasks with the same start time and the same name, then you get
TypeError: '<' not supported between instances of 'function' and 'function'
.The name
_minheap
describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like_tasks
.The
_timeout
attribute is only used in therun
function, so it could be a local variable there instead of an attribute on theScheduler
object.The
_get_next_timeout
method is only used in therun
function, so it could be a local function there instead of a method on theScheduler
class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)The
_start
method is only called in the__init__
method, so could be inlined there.The logic in
run
seems wrong to me. The following sequence of events is possible: (i)ÃÂ The timer thread calls_get_next_timeout
which returns a timeout of 10 seconds, say. (ii)ÃÂ The timer thread released the condition variable. (iii)ÃÂ Another thread callsschedule
with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv)ÃÂ The timer thread acquires the condition variable. (v)ÃÂ The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.
The use of
acquire
andrelease
on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.
2. Revised code
This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.
class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.
"""
@functools.total_ordering
class _Task:
"A scheduled task."
def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False
def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start
@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()
def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)
def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =
def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()
threading.Thread(target=run, name='Scheduler').start()
def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.
"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
Can thetimeout <= 0
condition ever be true?
â yangmillstheory
May 22 at 14:27
Yes, it is true whentasks[0]
is due (or overdue) to run.
â Gareth Rees
May 22 at 14:34
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
 |Â
show 3 more comments
up vote
3
down vote
accepted
1. Review
The docstrings could be improved. It would be nice if the docstring for the
Scheduler
class explained (briefly) how to use it, and there should be docstrings for theschedule
andcancel
methods.When using the
logging
module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:logger.info('waiting with timeout: '.format(self._timeout))
write:
logger.info("waiting with timeout %f", self._timeout)
If there are multiple tasks with the same name, then the
cancel
method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i)ÃÂ cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii)ÃÂ require all tasks in the queue to have unique names; or (iii)ÃÂ have the
schedule
method return some object representing the task, so that this can later be passed tocancel
to uniquely identify the particular task to be cancelled.Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i)ÃÂ having the
schedule
method return some object representing the task as suggesting above; (ii)ÃÂ passing this task object to thecancel
method; (iii)ÃÂ leaving the cancelled task in the heap but marking it as cancelled; (iv)ÃÂ discarding cancelled tasks when they are popped from the heap.The attributes of a
Task
arestart
,name
,fn
, but the arguments toschedule
arename
,fn
,start
. This kind of inconsistency risks confusion or error. (You needstart
to be first so that tasks compare according to their start times, but there are other ways to achieve this.)If you attempt to schedule two tasks with the same start time and the same name, then you get
TypeError: '<' not supported between instances of 'function' and 'function'
.The name
_minheap
describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like_tasks
.The
_timeout
attribute is only used in therun
function, so it could be a local variable there instead of an attribute on theScheduler
object.The
_get_next_timeout
method is only used in therun
function, so it could be a local function there instead of a method on theScheduler
class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)The
_start
method is only called in the__init__
method, so could be inlined there.The logic in
run
seems wrong to me. The following sequence of events is possible: (i)ÃÂ The timer thread calls_get_next_timeout
which returns a timeout of 10 seconds, say. (ii)ÃÂ The timer thread released the condition variable. (iii)ÃÂ Another thread callsschedule
with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv)ÃÂ The timer thread acquires the condition variable. (v)ÃÂ The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.
The use of
acquire
andrelease
on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.
2. Revised code
This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.
class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.
"""
@functools.total_ordering
class _Task:
"A scheduled task."
def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False
def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start
@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()
def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)
def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =
def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()
threading.Thread(target=run, name='Scheduler').start()
def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.
"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
Can thetimeout <= 0
condition ever be true?
â yangmillstheory
May 22 at 14:27
Yes, it is true whentasks[0]
is due (or overdue) to run.
â Gareth Rees
May 22 at 14:34
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
 |Â
show 3 more comments
up vote
3
down vote
accepted
up vote
3
down vote
accepted
1. Review
The docstrings could be improved. It would be nice if the docstring for the
Scheduler
class explained (briefly) how to use it, and there should be docstrings for theschedule
andcancel
methods.When using the
logging
module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:logger.info('waiting with timeout: '.format(self._timeout))
write:
logger.info("waiting with timeout %f", self._timeout)
If there are multiple tasks with the same name, then the
cancel
method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i)ÃÂ cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii)ÃÂ require all tasks in the queue to have unique names; or (iii)ÃÂ have the
schedule
method return some object representing the task, so that this can later be passed tocancel
to uniquely identify the particular task to be cancelled.Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i)ÃÂ having the
schedule
method return some object representing the task as suggesting above; (ii)ÃÂ passing this task object to thecancel
method; (iii)ÃÂ leaving the cancelled task in the heap but marking it as cancelled; (iv)ÃÂ discarding cancelled tasks when they are popped from the heap.The attributes of a
Task
arestart
,name
,fn
, but the arguments toschedule
arename
,fn
,start
. This kind of inconsistency risks confusion or error. (You needstart
to be first so that tasks compare according to their start times, but there are other ways to achieve this.)If you attempt to schedule two tasks with the same start time and the same name, then you get
TypeError: '<' not supported between instances of 'function' and 'function'
.The name
_minheap
describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like_tasks
.The
_timeout
attribute is only used in therun
function, so it could be a local variable there instead of an attribute on theScheduler
object.The
_get_next_timeout
method is only used in therun
function, so it could be a local function there instead of a method on theScheduler
class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)The
_start
method is only called in the__init__
method, so could be inlined there.The logic in
run
seems wrong to me. The following sequence of events is possible: (i)ÃÂ The timer thread calls_get_next_timeout
which returns a timeout of 10 seconds, say. (ii)ÃÂ The timer thread released the condition variable. (iii)ÃÂ Another thread callsschedule
with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv)ÃÂ The timer thread acquires the condition variable. (v)ÃÂ The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.
The use of
acquire
andrelease
on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.
2. Revised code
This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.
class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.
"""
@functools.total_ordering
class _Task:
"A scheduled task."
def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False
def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start
@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()
def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)
def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =
def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()
threading.Thread(target=run, name='Scheduler').start()
def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.
"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task
1. Review
The docstrings could be improved. It would be nice if the docstring for the
Scheduler
class explained (briefly) how to use it, and there should be docstrings for theschedule
andcancel
methods.When using the
logging
module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:logger.info('waiting with timeout: '.format(self._timeout))
write:
logger.info("waiting with timeout %f", self._timeout)
If there are multiple tasks with the same name, then the
cancel
method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i)ÃÂ cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii)ÃÂ require all tasks in the queue to have unique names; or (iii)ÃÂ have the
schedule
method return some object representing the task, so that this can later be passed tocancel
to uniquely identify the particular task to be cancelled.Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i)ÃÂ having the
schedule
method return some object representing the task as suggesting above; (ii)ÃÂ passing this task object to thecancel
method; (iii)ÃÂ leaving the cancelled task in the heap but marking it as cancelled; (iv)ÃÂ discarding cancelled tasks when they are popped from the heap.The attributes of a
Task
arestart
,name
,fn
, but the arguments toschedule
arename
,fn
,start
. This kind of inconsistency risks confusion or error. (You needstart
to be first so that tasks compare according to their start times, but there are other ways to achieve this.)If you attempt to schedule two tasks with the same start time and the same name, then you get
TypeError: '<' not supported between instances of 'function' and 'function'
.The name
_minheap
describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like_tasks
.The
_timeout
attribute is only used in therun
function, so it could be a local variable there instead of an attribute on theScheduler
object.The
_get_next_timeout
method is only used in therun
function, so it could be a local function there instead of a method on theScheduler
class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)The
_start
method is only called in the__init__
method, so could be inlined there.The logic in
run
seems wrong to me. The following sequence of events is possible: (i)ÃÂ The timer thread calls_get_next_timeout
which returns a timeout of 10 seconds, say. (ii)ÃÂ The timer thread released the condition variable. (iii)ÃÂ Another thread callsschedule
with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv)ÃÂ The timer thread acquires the condition variable. (v)ÃÂ The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.
The use of
acquire
andrelease
on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.
2. Revised code
This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.
class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.
"""
@functools.total_ordering
class _Task:
"A scheduled task."
def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False
def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start
@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()
def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)
def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =
def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()
threading.Thread(target=run, name='Scheduler').start()
def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.
"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task
answered May 22 at 10:31
Gareth Rees
41.1k394166
41.1k394166
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
Can thetimeout <= 0
condition ever be true?
â yangmillstheory
May 22 at 14:27
Yes, it is true whentasks[0]
is due (or overdue) to run.
â Gareth Rees
May 22 at 14:34
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
 |Â
show 3 more comments
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
Can thetimeout <= 0
condition ever be true?
â yangmillstheory
May 22 at 14:27
Yes, it is true whentasks[0]
is due (or overdue) to run.
â Gareth Rees
May 22 at 14:34
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
This is an great review and a model for how reviews should be done. Thanks @GarethRees!
â yangmillstheory
May 22 at 14:18
Can the
timeout <= 0
condition ever be true?â yangmillstheory
May 22 at 14:27
Can the
timeout <= 0
condition ever be true?â yangmillstheory
May 22 at 14:27
Yes, it is true when
tasks[0]
is due (or overdue) to run.â Gareth Rees
May 22 at 14:34
Yes, it is true when
tasks[0]
is due (or overdue) to run.â Gareth Rees
May 22 at 14:34
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
How does the revised code address point 11?
â yangmillstheory
May 22 at 14:46
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
By recomputing the timeout after each wait (with the lock held).
â Gareth Rees
May 22 at 14:59
 |Â
show 3 more comments
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194922%2fpython-task-scheduler%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Looks like you got a good start. I'm assuming this is intended to run for Python 3?
â Mast
May 22 at 5:55
Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
â yangmillstheory
May 22 at 5:58
This was solved below by Gareth.
â yangmillstheory
May 22 at 15:56
If this is a problem from somewhere cab you please send the source?
â kuskmen
May 25 at 12:46
1
It's from EPI.
â yangmillstheory
May 25 at 15:26