Go task scheduler

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
0
down vote

favorite












This is the same problem as the Python task scheduler, but in Go.



I chose not to use a sync.Cond (I couldn't seem to get that right), but instead use channels.



package main

import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)

// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool


// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true


// A min-heap of tasks, ordered by start time.
type tasks *Task

func (ts tasks) Len() int
return len(ts)


func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)


func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j


func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)


func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t




var (
mu sync.Mutex
ts tasks
ch chan bool
)

func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)


func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn

log.Printf("scheduling task %vn", *t)

mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()

log.Printf("scheduled task %vn", *t)

return t


func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)

mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)

if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())

mu.Unlock()

select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:





Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?



Here's my test:



func main() 
var wg sync.WaitGroup

start := time.Now()

task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))



wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))

time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()

wg.Wait()







share|improve this question





















  • This code does not work. Task struct is missing, tasks is not a type, heap is not defined... Please update your question to fix this issues
    – felix
    May 23 at 6:24










  • Updated; thanks.
    – yangmillstheory
    May 23 at 15:01
















up vote
0
down vote

favorite












This is the same problem as the Python task scheduler, but in Go.



I chose not to use a sync.Cond (I couldn't seem to get that right), but instead use channels.



package main

import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)

// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool


// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true


// A min-heap of tasks, ordered by start time.
type tasks *Task

func (ts tasks) Len() int
return len(ts)


func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)


func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j


func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)


func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t




var (
mu sync.Mutex
ts tasks
ch chan bool
)

func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)


func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn

log.Printf("scheduling task %vn", *t)

mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()

log.Printf("scheduled task %vn", *t)

return t


func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)

mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)

if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())

mu.Unlock()

select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:





Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?



Here's my test:



func main() 
var wg sync.WaitGroup

start := time.Now()

task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))



wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))

time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()

wg.Wait()







share|improve this question





















  • This code does not work. Task struct is missing, tasks is not a type, heap is not defined... Please update your question to fix this issues
    – felix
    May 23 at 6:24










  • Updated; thanks.
    – yangmillstheory
    May 23 at 15:01












up vote
0
down vote

favorite









up vote
0
down vote

favorite











This is the same problem as the Python task scheduler, but in Go.



I chose not to use a sync.Cond (I couldn't seem to get that right), but instead use channels.



package main

import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)

// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool


// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true


// A min-heap of tasks, ordered by start time.
type tasks *Task

func (ts tasks) Len() int
return len(ts)


func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)


func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j


func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)


func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t




var (
mu sync.Mutex
ts tasks
ch chan bool
)

func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)


func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn

log.Printf("scheduling task %vn", *t)

mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()

log.Printf("scheduled task %vn", *t)

return t


func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)

mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)

if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())

mu.Unlock()

select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:





Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?



Here's my test:



func main() 
var wg sync.WaitGroup

start := time.Now()

task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))



wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))

time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()

wg.Wait()







share|improve this question













This is the same problem as the Python task scheduler, but in Go.



I chose not to use a sync.Cond (I couldn't seem to get that right), but instead use channels.



package main

import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)

// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool


// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true


// A min-heap of tasks, ordered by start time.
type tasks *Task

func (ts tasks) Len() int
return len(ts)


func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)


func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j


func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)


func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t




var (
mu sync.Mutex
ts tasks
ch chan bool
)

func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)


func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn

log.Printf("scheduling task %vn", *t)

mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()

log.Printf("scheduled task %vn", *t)

return t


func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)

mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)

if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())

mu.Unlock()

select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:





Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?



Here's my test:



func main() 
var wg sync.WaitGroup

start := time.Now()

task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))



wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))

time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()

wg.Wait()









share|improve this question












share|improve this question




share|improve this question








edited May 23 at 15:06
























asked May 23 at 5:47









yangmillstheory

1344




1344











  • This code does not work. Task struct is missing, tasks is not a type, heap is not defined... Please update your question to fix this issues
    – felix
    May 23 at 6:24










  • Updated; thanks.
    – yangmillstheory
    May 23 at 15:01
















  • This code does not work. Task struct is missing, tasks is not a type, heap is not defined... Please update your question to fix this issues
    – felix
    May 23 at 6:24










  • Updated; thanks.
    – yangmillstheory
    May 23 at 15:01















This code does not work. Task struct is missing, tasks is not a type, heap is not defined... Please update your question to fix this issues
– felix
May 23 at 6:24




This code does not work. Task struct is missing, tasks is not a type, heap is not defined... Please update your question to fix this issues
– felix
May 23 at 6:24












Updated; thanks.
– yangmillstheory
May 23 at 15:01




Updated; thanks.
– yangmillstheory
May 23 at 15:01















active

oldest

votes











Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194989%2fgo-task-scheduler%23new-answer', 'question_page');

);

Post as a guest



































active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes










 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194989%2fgo-task-scheduler%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Greedy Best First Search implementation in Rust

Function to Return a JSON Like Objects Using VBA Collections and Arrays

C++11 CLH Lock Implementation