Python task scheduler

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
4
down vote

favorite
1












Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.



I used a min-heap to prioritize the next task, and a threading.Condition to communicate between



  • a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)

  • the main thread, which writes to the min-heap

Scheduling is O(log n), cancellation is O(n), and getting the soonest task is O(1).



import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Task = namedtuple('Task', ['start', 'name', 'fn'])


class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()

def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))

def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))

def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()

def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()

threading.Thread(target=run, name='timer').start()


def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')

start = datetime.now()

def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))

s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))


if __name__ == '__main__':
main()


Output:



❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None


Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!







share|improve this question



















  • Looks like you got a good start. I'm assuming this is intended to run for Python 3?
    – Mast
    May 22 at 5:55










  • Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
    – yangmillstheory
    May 22 at 5:58










  • This was solved below by Gareth.
    – yangmillstheory
    May 22 at 15:56










  • If this is a problem from somewhere cab you please send the source?
    – kuskmen
    May 25 at 12:46






  • 1




    It's from EPI.
    – yangmillstheory
    May 25 at 15:26
















up vote
4
down vote

favorite
1












Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.



I used a min-heap to prioritize the next task, and a threading.Condition to communicate between



  • a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)

  • the main thread, which writes to the min-heap

Scheduling is O(log n), cancellation is O(n), and getting the soonest task is O(1).



import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Task = namedtuple('Task', ['start', 'name', 'fn'])


class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()

def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))

def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))

def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()

def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()

threading.Thread(target=run, name='timer').start()


def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')

start = datetime.now()

def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))

s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))


if __name__ == '__main__':
main()


Output:



❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None


Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!







share|improve this question



















  • Looks like you got a good start. I'm assuming this is intended to run for Python 3?
    – Mast
    May 22 at 5:55










  • Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
    – yangmillstheory
    May 22 at 5:58










  • This was solved below by Gareth.
    – yangmillstheory
    May 22 at 15:56










  • If this is a problem from somewhere cab you please send the source?
    – kuskmen
    May 25 at 12:46






  • 1




    It's from EPI.
    – yangmillstheory
    May 25 at 15:26












up vote
4
down vote

favorite
1









up vote
4
down vote

favorite
1






1





Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.



I used a min-heap to prioritize the next task, and a threading.Condition to communicate between



  • a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)

  • the main thread, which writes to the min-heap

Scheduling is O(log n), cancellation is O(n), and getting the soonest task is O(1).



import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Task = namedtuple('Task', ['start', 'name', 'fn'])


class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()

def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))

def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))

def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()

def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()

threading.Thread(target=run, name='timer').start()


def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')

start = datetime.now()

def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))

s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))


if __name__ == '__main__':
main()


Output:



❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None


Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!







share|improve this question











Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.



I used a min-heap to prioritize the next task, and a threading.Condition to communicate between



  • a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)

  • the main thread, which writes to the min-heap

Scheduling is O(log n), cancellation is O(n), and getting the soonest task is O(1).



import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Task = namedtuple('Task', ['start', 'name', 'fn'])


class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap =
self._timeout = None
self._start()

def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled '.format(task.name))

def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: '.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: '.format(name))

def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()

def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: '.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()

threading.Thread(target=run, name='timer').start()


def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')

start = datetime.now()

def task():
logger.info('running, elapsed: '.format((datetime.now() - start).total_seconds()))

s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))


if __name__ == '__main__':
main()


Output:



❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None


Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!









share|improve this question










share|improve this question




share|improve this question









asked May 22 at 5:44









yangmillstheory

1344




1344











  • Looks like you got a good start. I'm assuming this is intended to run for Python 3?
    – Mast
    May 22 at 5:55










  • Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
    – yangmillstheory
    May 22 at 5:58










  • This was solved below by Gareth.
    – yangmillstheory
    May 22 at 15:56










  • If this is a problem from somewhere cab you please send the source?
    – kuskmen
    May 25 at 12:46






  • 1




    It's from EPI.
    – yangmillstheory
    May 25 at 15:26
















  • Looks like you got a good start. I'm assuming this is intended to run for Python 3?
    – Mast
    May 22 at 5:55










  • Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
    – yangmillstheory
    May 22 at 5:58










  • This was solved below by Gareth.
    – yangmillstheory
    May 22 at 15:56










  • If this is a problem from somewhere cab you please send the source?
    – kuskmen
    May 25 at 12:46






  • 1




    It's from EPI.
    – yangmillstheory
    May 25 at 15:26















Looks like you got a good start. I'm assuming this is intended to run for Python 3?
– Mast
May 22 at 5:55




Looks like you got a good start. I'm assuming this is intended to run for Python 3?
– Mast
May 22 at 5:55












Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
– yangmillstheory
May 22 at 5:58




Yes. Note that when I only notified when a new min element was found, the ordering was sometimes incorrect. I'm trying to figure this out...
– yangmillstheory
May 22 at 5:58












This was solved below by Gareth.
– yangmillstheory
May 22 at 15:56




This was solved below by Gareth.
– yangmillstheory
May 22 at 15:56












If this is a problem from somewhere cab you please send the source?
– kuskmen
May 25 at 12:46




If this is a problem from somewhere cab you please send the source?
– kuskmen
May 25 at 12:46




1




1




It's from EPI.
– yangmillstheory
May 25 at 15:26




It's from EPI.
– yangmillstheory
May 25 at 15:26










1 Answer
1






active

oldest

votes

















up vote
3
down vote



accepted










1. Review



  1. The docstrings could be improved. It would be nice if the docstring for the Scheduler class explained (briefly) how to use it, and there should be docstrings for the schedule and cancel methods.



  2. When using the logging module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:



    logger.info('waiting with timeout: '.format(self._timeout))


    write:



    logger.info("waiting with timeout %f", self._timeout)



  3. If there are multiple tasks with the same name, then the cancel method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.



    I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i) cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii) require all tasks in the queue to have unique names; or (iii) have the schedule method return some object representing the task, so that this can later be passed to cancel to uniquely identify the particular task to be cancelled.



  4. Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i) having the schedule method return some object representing the task as suggesting above; (ii) passing this task object to the cancel method; (iii) leaving the cancelled task in the heap but marking it as cancelled; (iv) discarding cancelled tasks when they are popped from the heap.


  5. The attributes of a Task are start, name, fn, but the arguments to schedule are name, fn, start. This kind of inconsistency risks confusion or error. (You need start to be first so that tasks compare according to their start times, but there are other ways to achieve this.)


  6. If you attempt to schedule two tasks with the same start time and the same name, then you get TypeError: '<' not supported between instances of 'function' and 'function'.


  7. The name _minheap describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like _tasks.


  8. The _timeout attribute is only used in the run function, so it could be a local variable there instead of an attribute on the Scheduler object.


  9. The _get_next_timeout method is only used in the run function, so it could be a local function there instead of a method on the Scheduler class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)


  10. The _start method is only called in the __init__ method, so could be inlined there.



  11. The logic in run seems wrong to me. The following sequence of events is possible: (i) The timer thread calls _get_next_timeout which returns a timeout of 10 seconds, say. (ii) The timer thread released the condition variable. (iii) Another thread calls schedule with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv) The timer thread acquires the condition variable. (v) The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.



    To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.



  12. The use of acquire and release on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.


2. Revised code



This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.



class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.

"""

@functools.total_ordering
class _Task:
"A scheduled task."

def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False

def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start

@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()

def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)

def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =

def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()

threading.Thread(target=run, name='Scheduler').start()

def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.

"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task





share|improve this answer





















  • This is an great review and a model for how reviews should be done. Thanks @GarethRees!
    – yangmillstheory
    May 22 at 14:18











  • Can the timeout <= 0 condition ever be true?
    – yangmillstheory
    May 22 at 14:27










  • Yes, it is true when tasks[0] is due (or overdue) to run.
    – Gareth Rees
    May 22 at 14:34










  • How does the revised code address point 11?
    – yangmillstheory
    May 22 at 14:46











  • By recomputing the timeout after each wait (with the lock held).
    – Gareth Rees
    May 22 at 14:59











Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194922%2fpython-task-scheduler%23new-answer', 'question_page');

);

Post as a guest






























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
3
down vote



accepted










1. Review



  1. The docstrings could be improved. It would be nice if the docstring for the Scheduler class explained (briefly) how to use it, and there should be docstrings for the schedule and cancel methods.



  2. When using the logging module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:



    logger.info('waiting with timeout: '.format(self._timeout))


    write:



    logger.info("waiting with timeout %f", self._timeout)



  3. If there are multiple tasks with the same name, then the cancel method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.



    I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i) cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii) require all tasks in the queue to have unique names; or (iii) have the schedule method return some object representing the task, so that this can later be passed to cancel to uniquely identify the particular task to be cancelled.



  4. Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i) having the schedule method return some object representing the task as suggesting above; (ii) passing this task object to the cancel method; (iii) leaving the cancelled task in the heap but marking it as cancelled; (iv) discarding cancelled tasks when they are popped from the heap.


  5. The attributes of a Task are start, name, fn, but the arguments to schedule are name, fn, start. This kind of inconsistency risks confusion or error. (You need start to be first so that tasks compare according to their start times, but there are other ways to achieve this.)


  6. If you attempt to schedule two tasks with the same start time and the same name, then you get TypeError: '<' not supported between instances of 'function' and 'function'.


  7. The name _minheap describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like _tasks.


  8. The _timeout attribute is only used in the run function, so it could be a local variable there instead of an attribute on the Scheduler object.


  9. The _get_next_timeout method is only used in the run function, so it could be a local function there instead of a method on the Scheduler class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)


  10. The _start method is only called in the __init__ method, so could be inlined there.



  11. The logic in run seems wrong to me. The following sequence of events is possible: (i) The timer thread calls _get_next_timeout which returns a timeout of 10 seconds, say. (ii) The timer thread released the condition variable. (iii) Another thread calls schedule with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv) The timer thread acquires the condition variable. (v) The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.



    To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.



  12. The use of acquire and release on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.


2. Revised code



This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.



class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.

"""

@functools.total_ordering
class _Task:
"A scheduled task."

def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False

def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start

@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()

def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)

def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =

def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()

threading.Thread(target=run, name='Scheduler').start()

def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.

"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task





share|improve this answer





















  • This is an great review and a model for how reviews should be done. Thanks @GarethRees!
    – yangmillstheory
    May 22 at 14:18











  • Can the timeout <= 0 condition ever be true?
    – yangmillstheory
    May 22 at 14:27










  • Yes, it is true when tasks[0] is due (or overdue) to run.
    – Gareth Rees
    May 22 at 14:34










  • How does the revised code address point 11?
    – yangmillstheory
    May 22 at 14:46











  • By recomputing the timeout after each wait (with the lock held).
    – Gareth Rees
    May 22 at 14:59















up vote
3
down vote



accepted










1. Review



  1. The docstrings could be improved. It would be nice if the docstring for the Scheduler class explained (briefly) how to use it, and there should be docstrings for the schedule and cancel methods.



  2. When using the logging module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:



    logger.info('waiting with timeout: '.format(self._timeout))


    write:



    logger.info("waiting with timeout %f", self._timeout)



  3. If there are multiple tasks with the same name, then the cancel method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.



    I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i) cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii) require all tasks in the queue to have unique names; or (iii) have the schedule method return some object representing the task, so that this can later be passed to cancel to uniquely identify the particular task to be cancelled.



  4. Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i) having the schedule method return some object representing the task as suggesting above; (ii) passing this task object to the cancel method; (iii) leaving the cancelled task in the heap but marking it as cancelled; (iv) discarding cancelled tasks when they are popped from the heap.


  5. The attributes of a Task are start, name, fn, but the arguments to schedule are name, fn, start. This kind of inconsistency risks confusion or error. (You need start to be first so that tasks compare according to their start times, but there are other ways to achieve this.)


  6. If you attempt to schedule two tasks with the same start time and the same name, then you get TypeError: '<' not supported between instances of 'function' and 'function'.


  7. The name _minheap describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like _tasks.


  8. The _timeout attribute is only used in the run function, so it could be a local variable there instead of an attribute on the Scheduler object.


  9. The _get_next_timeout method is only used in the run function, so it could be a local function there instead of a method on the Scheduler class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)


  10. The _start method is only called in the __init__ method, so could be inlined there.



  11. The logic in run seems wrong to me. The following sequence of events is possible: (i) The timer thread calls _get_next_timeout which returns a timeout of 10 seconds, say. (ii) The timer thread released the condition variable. (iii) Another thread calls schedule with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv) The timer thread acquires the condition variable. (v) The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.



    To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.



  12. The use of acquire and release on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.


2. Revised code



This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.



class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.

"""

@functools.total_ordering
class _Task:
"A scheduled task."

def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False

def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start

@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()

def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)

def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =

def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()

threading.Thread(target=run, name='Scheduler').start()

def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.

"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task





share|improve this answer





















  • This is an great review and a model for how reviews should be done. Thanks @GarethRees!
    – yangmillstheory
    May 22 at 14:18











  • Can the timeout <= 0 condition ever be true?
    – yangmillstheory
    May 22 at 14:27










  • Yes, it is true when tasks[0] is due (or overdue) to run.
    – Gareth Rees
    May 22 at 14:34










  • How does the revised code address point 11?
    – yangmillstheory
    May 22 at 14:46











  • By recomputing the timeout after each wait (with the lock held).
    – Gareth Rees
    May 22 at 14:59













up vote
3
down vote



accepted







up vote
3
down vote



accepted






1. Review



  1. The docstrings could be improved. It would be nice if the docstring for the Scheduler class explained (briefly) how to use it, and there should be docstrings for the schedule and cancel methods.



  2. When using the logging module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:



    logger.info('waiting with timeout: '.format(self._timeout))


    write:



    logger.info("waiting with timeout %f", self._timeout)



  3. If there are multiple tasks with the same name, then the cancel method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.



    I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i) cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii) require all tasks in the queue to have unique names; or (iii) have the schedule method return some object representing the task, so that this can later be passed to cancel to uniquely identify the particular task to be cancelled.



  4. Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i) having the schedule method return some object representing the task as suggesting above; (ii) passing this task object to the cancel method; (iii) leaving the cancelled task in the heap but marking it as cancelled; (iv) discarding cancelled tasks when they are popped from the heap.


  5. The attributes of a Task are start, name, fn, but the arguments to schedule are name, fn, start. This kind of inconsistency risks confusion or error. (You need start to be first so that tasks compare according to their start times, but there are other ways to achieve this.)


  6. If you attempt to schedule two tasks with the same start time and the same name, then you get TypeError: '<' not supported between instances of 'function' and 'function'.


  7. The name _minheap describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like _tasks.


  8. The _timeout attribute is only used in the run function, so it could be a local variable there instead of an attribute on the Scheduler object.


  9. The _get_next_timeout method is only used in the run function, so it could be a local function there instead of a method on the Scheduler class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)


  10. The _start method is only called in the __init__ method, so could be inlined there.



  11. The logic in run seems wrong to me. The following sequence of events is possible: (i) The timer thread calls _get_next_timeout which returns a timeout of 10 seconds, say. (ii) The timer thread released the condition variable. (iii) Another thread calls schedule with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv) The timer thread acquires the condition variable. (v) The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.



    To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.



  12. The use of acquire and release on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.


2. Revised code



This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.



class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.

"""

@functools.total_ordering
class _Task:
"A scheduled task."

def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False

def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start

@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()

def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)

def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =

def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()

threading.Thread(target=run, name='Scheduler').start()

def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.

"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task





share|improve this answer













1. Review



  1. The docstrings could be improved. It would be nice if the docstring for the Scheduler class explained (briefly) how to use it, and there should be docstrings for the schedule and cancel methods.



  2. When using the logging module, it's not normally necessary to format the log messages yourself. Instead, pass the format string and arguments, and let the logger do the formatting if and when it needs to (depending on the log level, it might never need to format a particular message). So instead of:



    logger.info('waiting with timeout: '.format(self._timeout))


    write:



    logger.info("waiting with timeout %f", self._timeout)



  3. If there are multiple tasks with the same name, then the cancel method cancels the one that appears closest to the start of the heap. This is not necessarily the one with the soonest start time (because heaps are arranged as a tree, not as a sorted list). This behaviour seems quite obscure to me and seems likely to lead to unreliable programs.



    I think it would be easier for the user to write reliable programs if you adopted one of these three approaches: (i) cancel all tasks with matching names so that the behaviour doesn't depend on the arrangement of the heap; or (ii) require all tasks in the queue to have unique names; or (iii) have the schedule method return some object representing the task, so that this can later be passed to cancel to uniquely identify the particular task to be cancelled.



  4. Cancellation takes time proportional to the number of tasks in the heap. This could be improved to (amortized) $O(log n)$ by (i) having the schedule method return some object representing the task as suggesting above; (ii) passing this task object to the cancel method; (iii) leaving the cancelled task in the heap but marking it as cancelled; (iv) discarding cancelled tasks when they are popped from the heap.


  5. The attributes of a Task are start, name, fn, but the arguments to schedule are name, fn, start. This kind of inconsistency risks confusion or error. (You need start to be first so that tasks compare according to their start times, but there are other ways to achieve this.)


  6. If you attempt to schedule two tasks with the same start time and the same name, then you get TypeError: '<' not supported between instances of 'function' and 'function'.


  7. The name _minheap describes the data structure, but it is more important to describe the purpose (the data structure is an implementation detail). So I would use a name like _tasks.


  8. The _timeout attribute is only used in the run function, so it could be a local variable there instead of an attribute on the Scheduler object.


  9. The _get_next_timeout method is only used in the run function, so it could be a local function there instead of a method on the Scheduler class. (It must only be called with the lock held, so it is not suitable for a stand-alone method.)


  10. The _start method is only called in the __init__ method, so could be inlined there.



  11. The logic in run seems wrong to me. The following sequence of events is possible: (i) The timer thread calls _get_next_timeout which returns a timeout of 10 seconds, say. (ii) The timer thread released the condition variable. (iii) Another thread calls schedule with a task that should be run in 5 seconds time and notifies the condition variable (but nothing is waiting on the condition variable). (iv) The timer thread acquires the condition variable. (v) The timer thread waits on the old timeout of 10 seconds. But it should have computed a new timeout of 5 seconds and waited on that instead.



    To fix this, it is necessary to compute the timeout between acquiring the condition variable and waiting on it.



  12. The use of acquire and release on the condition variable seems risky, as an exception at the wrong time would leave the lock held. Normally one uses the context manager interface to avoid this risk. I guess that you didn't do this because you wanted to launch the task with the lock not held. But it only takes a small rearrangement of code to make this work.


2. Revised code



This is a version with nameless tasks, but if you prefer your tasks to have names, then it will be straightfoward for you to add them.



class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.

"""

@functools.total_ordering
class _Task:
"A scheduled task."

def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False

def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start

@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()

def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)

def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks =

def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()

threading.Thread(target=run, name='Scheduler').start()

def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.

"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task






share|improve this answer













share|improve this answer



share|improve this answer











answered May 22 at 10:31









Gareth Rees

41.1k394166




41.1k394166











  • This is an great review and a model for how reviews should be done. Thanks @GarethRees!
    – yangmillstheory
    May 22 at 14:18











  • Can the timeout <= 0 condition ever be true?
    – yangmillstheory
    May 22 at 14:27










  • Yes, it is true when tasks[0] is due (or overdue) to run.
    – Gareth Rees
    May 22 at 14:34










  • How does the revised code address point 11?
    – yangmillstheory
    May 22 at 14:46











  • By recomputing the timeout after each wait (with the lock held).
    – Gareth Rees
    May 22 at 14:59

















  • This is an great review and a model for how reviews should be done. Thanks @GarethRees!
    – yangmillstheory
    May 22 at 14:18











  • Can the timeout <= 0 condition ever be true?
    – yangmillstheory
    May 22 at 14:27










  • Yes, it is true when tasks[0] is due (or overdue) to run.
    – Gareth Rees
    May 22 at 14:34










  • How does the revised code address point 11?
    – yangmillstheory
    May 22 at 14:46











  • By recomputing the timeout after each wait (with the lock held).
    – Gareth Rees
    May 22 at 14:59
















This is an great review and a model for how reviews should be done. Thanks @GarethRees!
– yangmillstheory
May 22 at 14:18





This is an great review and a model for how reviews should be done. Thanks @GarethRees!
– yangmillstheory
May 22 at 14:18













Can the timeout <= 0 condition ever be true?
– yangmillstheory
May 22 at 14:27




Can the timeout <= 0 condition ever be true?
– yangmillstheory
May 22 at 14:27












Yes, it is true when tasks[0] is due (or overdue) to run.
– Gareth Rees
May 22 at 14:34




Yes, it is true when tasks[0] is due (or overdue) to run.
– Gareth Rees
May 22 at 14:34












How does the revised code address point 11?
– yangmillstheory
May 22 at 14:46





How does the revised code address point 11?
– yangmillstheory
May 22 at 14:46













By recomputing the timeout after each wait (with the lock held).
– Gareth Rees
May 22 at 14:59





By recomputing the timeout after each wait (with the lock held).
– Gareth Rees
May 22 at 14:59













 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194922%2fpython-task-scheduler%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Greedy Best First Search implementation in Rust

Function to Return a JSON Like Objects Using VBA Collections and Arrays

C++11 CLH Lock Implementation