Go task scheduler
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
0
down vote
favorite
This is the same problem as the Python task scheduler, but in Go.
I chose not to use a sync.Cond
(I couldn't seem to get that right), but instead use channels.
package main
import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)
// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool
// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true
// A min-heap of tasks, ordered by start time.
type tasks *Task
func (ts tasks) Len() int
return len(ts)
func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)
func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j
func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)
func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t
var (
mu sync.Mutex
ts tasks
ch chan bool
)
func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)
func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn
log.Printf("scheduling task %vn", *t)
mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()
log.Printf("scheduled task %vn", *t)
return t
func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)
mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)
if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())
mu.Unlock()
select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:
Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?
Here's my test:
func main()
var wg sync.WaitGroup
start := time.Now()
task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))
wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))
time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()
wg.Wait()
multithreading go concurrency
add a comment |Â
up vote
0
down vote
favorite
This is the same problem as the Python task scheduler, but in Go.
I chose not to use a sync.Cond
(I couldn't seem to get that right), but instead use channels.
package main
import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)
// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool
// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true
// A min-heap of tasks, ordered by start time.
type tasks *Task
func (ts tasks) Len() int
return len(ts)
func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)
func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j
func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)
func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t
var (
mu sync.Mutex
ts tasks
ch chan bool
)
func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)
func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn
log.Printf("scheduling task %vn", *t)
mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()
log.Printf("scheduled task %vn", *t)
return t
func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)
mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)
if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())
mu.Unlock()
select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:
Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?
Here's my test:
func main()
var wg sync.WaitGroup
start := time.Now()
task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))
wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))
time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()
wg.Wait()
multithreading go concurrency
This code does not work.Task
struct is missing,tasks
is not a type,heap
is not defined... Please update your question to fix this issues
â felix
May 23 at 6:24
Updated; thanks.
â yangmillstheory
May 23 at 15:01
add a comment |Â
up vote
0
down vote
favorite
up vote
0
down vote
favorite
This is the same problem as the Python task scheduler, but in Go.
I chose not to use a sync.Cond
(I couldn't seem to get that right), but instead use channels.
package main
import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)
// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool
// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true
// A min-heap of tasks, ordered by start time.
type tasks *Task
func (ts tasks) Len() int
return len(ts)
func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)
func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j
func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)
func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t
var (
mu sync.Mutex
ts tasks
ch chan bool
)
func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)
func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn
log.Printf("scheduling task %vn", *t)
mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()
log.Printf("scheduled task %vn", *t)
return t
func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)
mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)
if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())
mu.Unlock()
select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:
Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?
Here's my test:
func main()
var wg sync.WaitGroup
start := time.Now()
task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))
wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))
time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()
wg.Wait()
multithreading go concurrency
This is the same problem as the Python task scheduler, but in Go.
I chose not to use a sync.Cond
(I couldn't seem to get that right), but instead use channels.
package main
import (
"bufio"
"container/heap"
"log"
"os"
"sync"
"time"
)
// Task represents a scheduled cancellable function.
type Task struct
start time.Time
index int
fn func()
cancelled bool
// Cancel prevents a Task from being run, if it hasn't run yet.
func (t *Task) Cancel()
t.cancelled = true
// A min-heap of tasks, ordered by start time.
type tasks *Task
func (ts tasks) Len() int
return len(ts)
func (ts tasks) Less(i, j int) bool
return ts[i].start.Before(ts[j].start)
func (ts tasks) Swap(i, j int)
ts[i], ts[j] = ts[j], ts[i]
ts[i].index = i
ts[j].index = j
func (ts *tasks) Push(t interface)
task := t.(*Task)
n := len(*ts)
task.index = n
*ts = append(*ts, task)
func (ts *tasks) Pop() interface
old := *ts
n := len(*ts)
t := old[n-1]
t.index = -1 // for safety
*ts = old[0 : n-1]
return t
var (
mu sync.Mutex
ts tasks
ch chan bool
)
func init()
ch = make(chan bool)
ts = make(*Task, 0)
heap.Init(&ts)
func schedule(fn func(), start time.Time) *Task
t := &Taskstart: start, fn: fn
log.Printf("scheduling task %vn", *t)
mu.Lock()
heap.Push(&ts, t)
go func()
ch <- true
()
mu.Unlock()
log.Printf("scheduled task %vn", *t)
return t
func dispatcher()
for
// default timeout of 1 day so the loop isn't busy
timeout := time.Duration(24 * time.Hour)
mu.Lock()
for len(ts) != 0 && ts[0].cancelled
heap.Pop(&ts)
if len(ts) != 0
task := ts[0]
timeout = task.start.Sub(time.Now())
mu.Unlock()
select
case <-time.After(timeout):
mu.Lock()
task := heap.Pop(&ts).(*Task)
mu.Unlock()
go task.fn()
case <-ch:
Testing this via print debugging gives me the right console output. Is there a better way to structure the code? Are there any hidden race conditions?
Here's my test:
func main()
var wg sync.WaitGroup
start := time.Now()
task := func(j int) func()
return func()
log.Printf("running %d: %v n", j, time.Since(start))
wg.Add(1)
go func()
log.Println("Press ENTER to quit.")
r := bufio.NewReader(os.Stdin)
r.ReadRune()
close(ch)
log.Print("Goodbye!")
wg.Done()
()
go dispatcher()
go func()
schedule(task(1), start.Add(1*time.Second))
t := schedule(task(2), start.Add(2*time.Second))
t.Cancel()
schedule(task(3), start.Add(3*time.Second))
schedule(task(4), start.Add(2500*time.Millisecond))
time.Sleep(5 * time.Second)
newStart := time.Now()
schedule(task(5), newStart.Add(5*time.Second))
schedule(task(6), newStart.Add(4*time.Second))
schedule(task(7), newStart.Add(3500*time.Millisecond))
()
wg.Wait()
multithreading go concurrency
edited May 23 at 15:06
asked May 23 at 5:47
yangmillstheory
1344
1344
This code does not work.Task
struct is missing,tasks
is not a type,heap
is not defined... Please update your question to fix this issues
â felix
May 23 at 6:24
Updated; thanks.
â yangmillstheory
May 23 at 15:01
add a comment |Â
This code does not work.Task
struct is missing,tasks
is not a type,heap
is not defined... Please update your question to fix this issues
â felix
May 23 at 6:24
Updated; thanks.
â yangmillstheory
May 23 at 15:01
This code does not work.
Task
struct is missing, tasks
is not a type, heap
is not defined... Please update your question to fix this issuesâ felix
May 23 at 6:24
This code does not work.
Task
struct is missing, tasks
is not a type, heap
is not defined... Please update your question to fix this issuesâ felix
May 23 at 6:24
Updated; thanks.
â yangmillstheory
May 23 at 15:01
Updated; thanks.
â yangmillstheory
May 23 at 15:01
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f194989%2fgo-task-scheduler%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
This code does not work.
Task
struct is missing,tasks
is not a type,heap
is not defined... Please update your question to fix this issuesâ felix
May 23 at 6:24
Updated; thanks.
â yangmillstheory
May 23 at 15:01