Worker, Pool and Jobs
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
3
down vote
favorite
I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.
I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.
This is the first "concurrent" piece of code I've ever written, so it may not be the best.
It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.
I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.
This is how to use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
This is the package (if you want to run it yourself, just remove the logger
logic, or create a mock of the logger, and you can run it just fine:
package worker
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
"core/logger"
)
const (
// MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
MaxConcurrentRunners = 1000
// MaxRetryLimit is the maximum amount of retries for a failed job.
MaxRetryLimit = 10
// DefaultRetryLimit is the default amount of retries for a failed job.
DefaultRetryLimit = 3
)
// Worker is just an alias for Pool. It's nice to have so i a type can be used
// like worker.Worker instead of worker.Pool.
type Worker interface
Pool
// Pool is an interface for a worker pool.
type Pool interface
NewJob(handler func() error) Job
Run(jobs ...Job)
Flushed() bool
// Job is an interface for a job.
type Job interface
ID() string
SetRetryLimit(retryLimit int) Job
RetryLimit() int
IncrementAttempts()
CurrentAttempts() int
SetHandler(handler func() error)
Handler() func() error
SetFinally(finally func(success bool)) Job
Finally(success bool) Job
Done() chan bool
SignalDone()
// workerPool represents a worker pool.
type workerPool struct
id string
log logger.Logger
semaphore chan struct
// NewWorker returns a new pool, which the Worker interface implements.
func NewWorker(log logger.Logger) Worker
return NewPool(log)
// NewPool returns a new worker instance.
func NewPool(log logger.Logger) Pool
w := new(workerPool)
w.id = randomString()
w.log = log
w.semaphore = make(chan struct, MaxConcurrentRunners)
return w
// NewJob creates a new job for a worker pool.
func (w *workerPool) NewJob(handler func() error) Job
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
// ID will return the ID of a pool.
func (w *workerPool) ID() string
return w.id
// Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
func (w *workerPool) Flushed() bool
return len(w.semaphore) == 0
// DoWork will begin processing the jobs.
func (w *workerPool) Run(jobs ...Job)
// Cache the count of jobs.
l := len(jobs)
// Create a new wait group and set the counter to the count of jobs.
wg := new(sync.WaitGroup)
wg.Add(l)
// Process each job.
for _, job := range jobs
// Block pool buffer is full.
w.semaphore <- struct
go func(job Job)
// Log start of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))
// Execute the job.
go func()
w.run(wg, job)
()
// Wait for the job to be signaled as complete.
<-job.Done()
// Release a slot in the pool buffer.
<-w.semaphore
// Decrement the wait group.
wg.Done()
// Log end of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
(job)
// Wait for the wait group counter to be depleted.
wg.Wait()
// run will process the job until it succeeds or reaches the maximum retries.
func (w *workerPool) run(wg *sync.WaitGroup, job Job)
defer func()
job.SignalDone()
()
// Execute job.
if err := job.Handler()(); err != nil
for
// Increment counter.
job.IncrementAttempts()
// Wait retry period.
timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
<-timer.C
// Execute job.
if err := job.Handler()(); err != nil
// Maximum attempts reached without success.
if job.CurrentAttempts() >= job.RetryLimit()
job.Finally(false)
w.log.Error(err)
return
continue
else
break
job.Finally(true)
// job represents a job for a worker pool.
type job struct
id string
retryLimit int
currentAttempts int
handler func() error
finally func(success bool)
done chan bool
// ID will return the ID of a job.
func (j *job) ID() string
return j.id
// Done returns a channel that signals when the job is done
func (j *job) Done() chan bool
return j.done
// SignalDone will signal when a job is done. This can also be used from outside the
// worker to cancel a job, etc.
func (j *job) SignalDone()
j.done <- true
// SetRetryLimit will set the jobs retry limit.
func (j *job) SetRetryLimit(retryLimit int) Job
if retryLimit <= 0
j.retryLimit = DefaultRetryLimit
else if retryLimit > MaxRetryLimit
j.retryLimit = MaxRetryLimit
else
j.retryLimit = retryLimit
return j
// RetryLimit will get the jobs retry limit.
func (j *job) RetryLimit() int
return j.retryLimit
// CurrentAttempts will get the jobs current attempts.
func (j *job) CurrentAttempts() int
return j.currentAttempts
// IncrementAttempts increments the number of attempts on this job.
func (j *job) IncrementAttempts()
j.currentAttempts++
// SetHandler will set the jobs handler.
func (j *job) SetHandler(handler func() error)
j.handler = handler
// Handler will get the jobs handler.
func (j *job) Handler() func() error
return j.handler
// SetFinally will set the finally function of the job, which will be called upon job completion.
func (j *job) SetFinally(finally func(success bool)) Job
j.finally = finally
return j
// Finally will call finally.
func (j *job) Finally(success bool) Job
if j.finally != nil
j.finally(success)
return j
// ExponentialBackoff will give a duration using an exponential backup.
//
// Example failedAttempts:
// 1: 500ms
// 2: 1s
// 3: 2s
// 4: 4s
// 5: 8s
// 6: 16s
// 7: 32s
// 8: 1m4s
// 9: 2m8s
// 10: 4m16s
func ExponentialBackoff(failedAttempts int) time.Duration
return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)
// randomString will generate a random string.
func randomString() string
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
l := len(chars)
res := make(byte, 30)
for i := range res
res[i] = chars[rand.Intn(l)]
return string(res)
go concurrency
add a comment |Â
up vote
3
down vote
favorite
I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.
I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.
This is the first "concurrent" piece of code I've ever written, so it may not be the best.
It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.
I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.
This is how to use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
This is the package (if you want to run it yourself, just remove the logger
logic, or create a mock of the logger, and you can run it just fine:
package worker
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
"core/logger"
)
const (
// MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
MaxConcurrentRunners = 1000
// MaxRetryLimit is the maximum amount of retries for a failed job.
MaxRetryLimit = 10
// DefaultRetryLimit is the default amount of retries for a failed job.
DefaultRetryLimit = 3
)
// Worker is just an alias for Pool. It's nice to have so i a type can be used
// like worker.Worker instead of worker.Pool.
type Worker interface
Pool
// Pool is an interface for a worker pool.
type Pool interface
NewJob(handler func() error) Job
Run(jobs ...Job)
Flushed() bool
// Job is an interface for a job.
type Job interface
ID() string
SetRetryLimit(retryLimit int) Job
RetryLimit() int
IncrementAttempts()
CurrentAttempts() int
SetHandler(handler func() error)
Handler() func() error
SetFinally(finally func(success bool)) Job
Finally(success bool) Job
Done() chan bool
SignalDone()
// workerPool represents a worker pool.
type workerPool struct
id string
log logger.Logger
semaphore chan struct
// NewWorker returns a new pool, which the Worker interface implements.
func NewWorker(log logger.Logger) Worker
return NewPool(log)
// NewPool returns a new worker instance.
func NewPool(log logger.Logger) Pool
w := new(workerPool)
w.id = randomString()
w.log = log
w.semaphore = make(chan struct, MaxConcurrentRunners)
return w
// NewJob creates a new job for a worker pool.
func (w *workerPool) NewJob(handler func() error) Job
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
// ID will return the ID of a pool.
func (w *workerPool) ID() string
return w.id
// Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
func (w *workerPool) Flushed() bool
return len(w.semaphore) == 0
// DoWork will begin processing the jobs.
func (w *workerPool) Run(jobs ...Job)
// Cache the count of jobs.
l := len(jobs)
// Create a new wait group and set the counter to the count of jobs.
wg := new(sync.WaitGroup)
wg.Add(l)
// Process each job.
for _, job := range jobs
// Block pool buffer is full.
w.semaphore <- struct
go func(job Job)
// Log start of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))
// Execute the job.
go func()
w.run(wg, job)
()
// Wait for the job to be signaled as complete.
<-job.Done()
// Release a slot in the pool buffer.
<-w.semaphore
// Decrement the wait group.
wg.Done()
// Log end of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
(job)
// Wait for the wait group counter to be depleted.
wg.Wait()
// run will process the job until it succeeds or reaches the maximum retries.
func (w *workerPool) run(wg *sync.WaitGroup, job Job)
defer func()
job.SignalDone()
()
// Execute job.
if err := job.Handler()(); err != nil
for
// Increment counter.
job.IncrementAttempts()
// Wait retry period.
timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
<-timer.C
// Execute job.
if err := job.Handler()(); err != nil
// Maximum attempts reached without success.
if job.CurrentAttempts() >= job.RetryLimit()
job.Finally(false)
w.log.Error(err)
return
continue
else
break
job.Finally(true)
// job represents a job for a worker pool.
type job struct
id string
retryLimit int
currentAttempts int
handler func() error
finally func(success bool)
done chan bool
// ID will return the ID of a job.
func (j *job) ID() string
return j.id
// Done returns a channel that signals when the job is done
func (j *job) Done() chan bool
return j.done
// SignalDone will signal when a job is done. This can also be used from outside the
// worker to cancel a job, etc.
func (j *job) SignalDone()
j.done <- true
// SetRetryLimit will set the jobs retry limit.
func (j *job) SetRetryLimit(retryLimit int) Job
if retryLimit <= 0
j.retryLimit = DefaultRetryLimit
else if retryLimit > MaxRetryLimit
j.retryLimit = MaxRetryLimit
else
j.retryLimit = retryLimit
return j
// RetryLimit will get the jobs retry limit.
func (j *job) RetryLimit() int
return j.retryLimit
// CurrentAttempts will get the jobs current attempts.
func (j *job) CurrentAttempts() int
return j.currentAttempts
// IncrementAttempts increments the number of attempts on this job.
func (j *job) IncrementAttempts()
j.currentAttempts++
// SetHandler will set the jobs handler.
func (j *job) SetHandler(handler func() error)
j.handler = handler
// Handler will get the jobs handler.
func (j *job) Handler() func() error
return j.handler
// SetFinally will set the finally function of the job, which will be called upon job completion.
func (j *job) SetFinally(finally func(success bool)) Job
j.finally = finally
return j
// Finally will call finally.
func (j *job) Finally(success bool) Job
if j.finally != nil
j.finally(success)
return j
// ExponentialBackoff will give a duration using an exponential backup.
//
// Example failedAttempts:
// 1: 500ms
// 2: 1s
// 3: 2s
// 4: 4s
// 5: 8s
// 6: 16s
// 7: 32s
// 8: 1m4s
// 9: 2m8s
// 10: 4m16s
func ExponentialBackoff(failedAttempts int) time.Duration
return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)
// randomString will generate a random string.
func randomString() string
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
l := len(chars)
res := make(byte, 30)
for i := range res
res[i] = chars[rand.Intn(l)]
return string(res)
go concurrency
add a comment |Â
up vote
3
down vote
favorite
up vote
3
down vote
favorite
I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.
I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.
This is the first "concurrent" piece of code I've ever written, so it may not be the best.
It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.
I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.
This is how to use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
This is the package (if you want to run it yourself, just remove the logger
logic, or create a mock of the logger, and you can run it just fine:
package worker
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
"core/logger"
)
const (
// MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
MaxConcurrentRunners = 1000
// MaxRetryLimit is the maximum amount of retries for a failed job.
MaxRetryLimit = 10
// DefaultRetryLimit is the default amount of retries for a failed job.
DefaultRetryLimit = 3
)
// Worker is just an alias for Pool. It's nice to have so i a type can be used
// like worker.Worker instead of worker.Pool.
type Worker interface
Pool
// Pool is an interface for a worker pool.
type Pool interface
NewJob(handler func() error) Job
Run(jobs ...Job)
Flushed() bool
// Job is an interface for a job.
type Job interface
ID() string
SetRetryLimit(retryLimit int) Job
RetryLimit() int
IncrementAttempts()
CurrentAttempts() int
SetHandler(handler func() error)
Handler() func() error
SetFinally(finally func(success bool)) Job
Finally(success bool) Job
Done() chan bool
SignalDone()
// workerPool represents a worker pool.
type workerPool struct
id string
log logger.Logger
semaphore chan struct
// NewWorker returns a new pool, which the Worker interface implements.
func NewWorker(log logger.Logger) Worker
return NewPool(log)
// NewPool returns a new worker instance.
func NewPool(log logger.Logger) Pool
w := new(workerPool)
w.id = randomString()
w.log = log
w.semaphore = make(chan struct, MaxConcurrentRunners)
return w
// NewJob creates a new job for a worker pool.
func (w *workerPool) NewJob(handler func() error) Job
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
// ID will return the ID of a pool.
func (w *workerPool) ID() string
return w.id
// Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
func (w *workerPool) Flushed() bool
return len(w.semaphore) == 0
// DoWork will begin processing the jobs.
func (w *workerPool) Run(jobs ...Job)
// Cache the count of jobs.
l := len(jobs)
// Create a new wait group and set the counter to the count of jobs.
wg := new(sync.WaitGroup)
wg.Add(l)
// Process each job.
for _, job := range jobs
// Block pool buffer is full.
w.semaphore <- struct
go func(job Job)
// Log start of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))
// Execute the job.
go func()
w.run(wg, job)
()
// Wait for the job to be signaled as complete.
<-job.Done()
// Release a slot in the pool buffer.
<-w.semaphore
// Decrement the wait group.
wg.Done()
// Log end of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
(job)
// Wait for the wait group counter to be depleted.
wg.Wait()
// run will process the job until it succeeds or reaches the maximum retries.
func (w *workerPool) run(wg *sync.WaitGroup, job Job)
defer func()
job.SignalDone()
()
// Execute job.
if err := job.Handler()(); err != nil
for
// Increment counter.
job.IncrementAttempts()
// Wait retry period.
timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
<-timer.C
// Execute job.
if err := job.Handler()(); err != nil
// Maximum attempts reached without success.
if job.CurrentAttempts() >= job.RetryLimit()
job.Finally(false)
w.log.Error(err)
return
continue
else
break
job.Finally(true)
// job represents a job for a worker pool.
type job struct
id string
retryLimit int
currentAttempts int
handler func() error
finally func(success bool)
done chan bool
// ID will return the ID of a job.
func (j *job) ID() string
return j.id
// Done returns a channel that signals when the job is done
func (j *job) Done() chan bool
return j.done
// SignalDone will signal when a job is done. This can also be used from outside the
// worker to cancel a job, etc.
func (j *job) SignalDone()
j.done <- true
// SetRetryLimit will set the jobs retry limit.
func (j *job) SetRetryLimit(retryLimit int) Job
if retryLimit <= 0
j.retryLimit = DefaultRetryLimit
else if retryLimit > MaxRetryLimit
j.retryLimit = MaxRetryLimit
else
j.retryLimit = retryLimit
return j
// RetryLimit will get the jobs retry limit.
func (j *job) RetryLimit() int
return j.retryLimit
// CurrentAttempts will get the jobs current attempts.
func (j *job) CurrentAttempts() int
return j.currentAttempts
// IncrementAttempts increments the number of attempts on this job.
func (j *job) IncrementAttempts()
j.currentAttempts++
// SetHandler will set the jobs handler.
func (j *job) SetHandler(handler func() error)
j.handler = handler
// Handler will get the jobs handler.
func (j *job) Handler() func() error
return j.handler
// SetFinally will set the finally function of the job, which will be called upon job completion.
func (j *job) SetFinally(finally func(success bool)) Job
j.finally = finally
return j
// Finally will call finally.
func (j *job) Finally(success bool) Job
if j.finally != nil
j.finally(success)
return j
// ExponentialBackoff will give a duration using an exponential backup.
//
// Example failedAttempts:
// 1: 500ms
// 2: 1s
// 3: 2s
// 4: 4s
// 5: 8s
// 6: 16s
// 7: 32s
// 8: 1m4s
// 9: 2m8s
// 10: 4m16s
func ExponentialBackoff(failedAttempts int) time.Duration
return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)
// randomString will generate a random string.
func randomString() string
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
l := len(chars)
res := make(byte, 30)
for i := range res
res[i] = chars[rand.Intn(l)]
return string(res)
go concurrency
I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.
I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.
This is the first "concurrent" piece of code I've ever written, so it may not be the best.
It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.
I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.
This is how to use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
This is the package (if you want to run it yourself, just remove the logger
logic, or create a mock of the logger, and you can run it just fine:
package worker
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
"core/logger"
)
const (
// MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
MaxConcurrentRunners = 1000
// MaxRetryLimit is the maximum amount of retries for a failed job.
MaxRetryLimit = 10
// DefaultRetryLimit is the default amount of retries for a failed job.
DefaultRetryLimit = 3
)
// Worker is just an alias for Pool. It's nice to have so i a type can be used
// like worker.Worker instead of worker.Pool.
type Worker interface
Pool
// Pool is an interface for a worker pool.
type Pool interface
NewJob(handler func() error) Job
Run(jobs ...Job)
Flushed() bool
// Job is an interface for a job.
type Job interface
ID() string
SetRetryLimit(retryLimit int) Job
RetryLimit() int
IncrementAttempts()
CurrentAttempts() int
SetHandler(handler func() error)
Handler() func() error
SetFinally(finally func(success bool)) Job
Finally(success bool) Job
Done() chan bool
SignalDone()
// workerPool represents a worker pool.
type workerPool struct
id string
log logger.Logger
semaphore chan struct
// NewWorker returns a new pool, which the Worker interface implements.
func NewWorker(log logger.Logger) Worker
return NewPool(log)
// NewPool returns a new worker instance.
func NewPool(log logger.Logger) Pool
w := new(workerPool)
w.id = randomString()
w.log = log
w.semaphore = make(chan struct, MaxConcurrentRunners)
return w
// NewJob creates a new job for a worker pool.
func (w *workerPool) NewJob(handler func() error) Job
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
// ID will return the ID of a pool.
func (w *workerPool) ID() string
return w.id
// Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
func (w *workerPool) Flushed() bool
return len(w.semaphore) == 0
// DoWork will begin processing the jobs.
func (w *workerPool) Run(jobs ...Job)
// Cache the count of jobs.
l := len(jobs)
// Create a new wait group and set the counter to the count of jobs.
wg := new(sync.WaitGroup)
wg.Add(l)
// Process each job.
for _, job := range jobs
// Block pool buffer is full.
w.semaphore <- struct
go func(job Job)
// Log start of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))
// Execute the job.
go func()
w.run(wg, job)
()
// Wait for the job to be signaled as complete.
<-job.Done()
// Release a slot in the pool buffer.
<-w.semaphore
// Decrement the wait group.
wg.Done()
// Log end of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
(job)
// Wait for the wait group counter to be depleted.
wg.Wait()
// run will process the job until it succeeds or reaches the maximum retries.
func (w *workerPool) run(wg *sync.WaitGroup, job Job)
defer func()
job.SignalDone()
()
// Execute job.
if err := job.Handler()(); err != nil
for
// Increment counter.
job.IncrementAttempts()
// Wait retry period.
timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
<-timer.C
// Execute job.
if err := job.Handler()(); err != nil
// Maximum attempts reached without success.
if job.CurrentAttempts() >= job.RetryLimit()
job.Finally(false)
w.log.Error(err)
return
continue
else
break
job.Finally(true)
// job represents a job for a worker pool.
type job struct
id string
retryLimit int
currentAttempts int
handler func() error
finally func(success bool)
done chan bool
// ID will return the ID of a job.
func (j *job) ID() string
return j.id
// Done returns a channel that signals when the job is done
func (j *job) Done() chan bool
return j.done
// SignalDone will signal when a job is done. This can also be used from outside the
// worker to cancel a job, etc.
func (j *job) SignalDone()
j.done <- true
// SetRetryLimit will set the jobs retry limit.
func (j *job) SetRetryLimit(retryLimit int) Job
if retryLimit <= 0
j.retryLimit = DefaultRetryLimit
else if retryLimit > MaxRetryLimit
j.retryLimit = MaxRetryLimit
else
j.retryLimit = retryLimit
return j
// RetryLimit will get the jobs retry limit.
func (j *job) RetryLimit() int
return j.retryLimit
// CurrentAttempts will get the jobs current attempts.
func (j *job) CurrentAttempts() int
return j.currentAttempts
// IncrementAttempts increments the number of attempts on this job.
func (j *job) IncrementAttempts()
j.currentAttempts++
// SetHandler will set the jobs handler.
func (j *job) SetHandler(handler func() error)
j.handler = handler
// Handler will get the jobs handler.
func (j *job) Handler() func() error
return j.handler
// SetFinally will set the finally function of the job, which will be called upon job completion.
func (j *job) SetFinally(finally func(success bool)) Job
j.finally = finally
return j
// Finally will call finally.
func (j *job) Finally(success bool) Job
if j.finally != nil
j.finally(success)
return j
// ExponentialBackoff will give a duration using an exponential backup.
//
// Example failedAttempts:
// 1: 500ms
// 2: 1s
// 3: 2s
// 4: 4s
// 5: 8s
// 6: 16s
// 7: 32s
// 8: 1m4s
// 9: 2m8s
// 10: 4m16s
func ExponentialBackoff(failedAttempts int) time.Duration
return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)
// randomString will generate a random string.
func randomString() string
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
l := len(chars)
res := make(byte, 30)
for i := range res
res[i] = chars[rand.Intn(l)]
return string(res)
go concurrency
edited Mar 19 at 1:36
asked Mar 2 at 2:00
Lansana
227211
227211
add a comment |Â
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
3
down vote
accepted
OK, let's go through this step by step. Beginning with the snippet of how you use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker
, the types and functions it exposes shouldn't repeat that name. If you read worker.New
or worker.NewWorker
, both contain the same information. One is shorter, and easier to read.
Another thing that I'm struggling to make sense of is why the NewJob
function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally
on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:
w := worker.New(log) // assume rename NewWorker
aJob := w.NewJob(func() error
if err := db.DoSomethingComplex(); err != nil
return err
return nil
)
go w.Run(aJob)
// a whole lot of stuff taking a long time
aJob.SetFinally(func(success bool)
log.Debugf("Job success %v", success)
)
// and a whole list of calls that change the behaviour of aJob
Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.
What's worse: you're exposing a SetHandler
function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler
on the same object? Surely a job should be immutable at the core: the call which it represents.
What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.
type JobOption func(*job)
// SetFinally returns JobOption, part of variadic args configuring jobs
func SetFinally(f func(bool)) JobOption
return func(j *job)
j.finally = f
func Job(call func() error, jopts ...JobOption) job
j := job
handler: call,
// set all defaults here
for _, o := range jopts
o(&j)
j
So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:
job := worker.Job(
func() error return nil ,
worker.SetFinally(func (s bool)
if s
// ...
),
)
You can add worker.SetRetryAttempts
in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.
I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new()
. I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob
, for example:
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
Compared to the one-liner:
return &job
id: randomString(),
retryLimit: DefaultRetryLimit,
handler: handler,
done: make(chan bool),
I prefer the latter, the &
immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.
The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:
type w struct
// although I don't really see the point in the ID fields everywhere
id string
log logger.Logger
concurrentJobs int
pipeline chan job // channel of jobs to work on
// variadic args to specify the concurrent runners might be nice
func NewPool(l logger.Logger) Pool
return &w
id: randomString(), // use a uuid package at the very least
log: l,
concurrentJobs: MaxConcurrentRunners,
pipeline: make(chan job, MaxConcurrentRunners),
Then when running jobs, just start with:
wg := &sync.WaitGroup
wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
for i := 0; i < w.concurrentJobs; i++
go w.run(wg) // not I'm not passing a job here, just the waitgroup
for _, job := range jobs
w.pipeline <- job // push onto channel
// all jobs have been pushed onto pipeline, close it
close(w.pipeline)
wg.Wait() // wait for everything to finish
// all done, reopen for business
w.pipeline = make(chan job, w.concurrentJobs)
Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run
function:
func (w *workerPool) run(wg *sync.WaitGroup)
defer wg.Done() // no need to wrap a defer call in an anonymous function BTW
// keep reading from pipeline until channel closes
for job := range w.pipeline
if err := job.handler(); err != nil
// retry loop, you may want to break this out into separate func
job.Finally(true)
now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally
with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done()
channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.
Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool
type BTW.
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
OK, let's go through this step by step. Beginning with the snippet of how you use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker
, the types and functions it exposes shouldn't repeat that name. If you read worker.New
or worker.NewWorker
, both contain the same information. One is shorter, and easier to read.
Another thing that I'm struggling to make sense of is why the NewJob
function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally
on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:
w := worker.New(log) // assume rename NewWorker
aJob := w.NewJob(func() error
if err := db.DoSomethingComplex(); err != nil
return err
return nil
)
go w.Run(aJob)
// a whole lot of stuff taking a long time
aJob.SetFinally(func(success bool)
log.Debugf("Job success %v", success)
)
// and a whole list of calls that change the behaviour of aJob
Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.
What's worse: you're exposing a SetHandler
function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler
on the same object? Surely a job should be immutable at the core: the call which it represents.
What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.
type JobOption func(*job)
// SetFinally returns JobOption, part of variadic args configuring jobs
func SetFinally(f func(bool)) JobOption
return func(j *job)
j.finally = f
func Job(call func() error, jopts ...JobOption) job
j := job
handler: call,
// set all defaults here
for _, o := range jopts
o(&j)
j
So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:
job := worker.Job(
func() error return nil ,
worker.SetFinally(func (s bool)
if s
// ...
),
)
You can add worker.SetRetryAttempts
in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.
I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new()
. I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob
, for example:
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
Compared to the one-liner:
return &job
id: randomString(),
retryLimit: DefaultRetryLimit,
handler: handler,
done: make(chan bool),
I prefer the latter, the &
immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.
The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:
type w struct
// although I don't really see the point in the ID fields everywhere
id string
log logger.Logger
concurrentJobs int
pipeline chan job // channel of jobs to work on
// variadic args to specify the concurrent runners might be nice
func NewPool(l logger.Logger) Pool
return &w
id: randomString(), // use a uuid package at the very least
log: l,
concurrentJobs: MaxConcurrentRunners,
pipeline: make(chan job, MaxConcurrentRunners),
Then when running jobs, just start with:
wg := &sync.WaitGroup
wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
for i := 0; i < w.concurrentJobs; i++
go w.run(wg) // not I'm not passing a job here, just the waitgroup
for _, job := range jobs
w.pipeline <- job // push onto channel
// all jobs have been pushed onto pipeline, close it
close(w.pipeline)
wg.Wait() // wait for everything to finish
// all done, reopen for business
w.pipeline = make(chan job, w.concurrentJobs)
Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run
function:
func (w *workerPool) run(wg *sync.WaitGroup)
defer wg.Done() // no need to wrap a defer call in an anonymous function BTW
// keep reading from pipeline until channel closes
for job := range w.pipeline
if err := job.handler(); err != nil
// retry loop, you may want to break this out into separate func
job.Finally(true)
now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally
with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done()
channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.
Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool
type BTW.
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
add a comment |Â
up vote
3
down vote
accepted
OK, let's go through this step by step. Beginning with the snippet of how you use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker
, the types and functions it exposes shouldn't repeat that name. If you read worker.New
or worker.NewWorker
, both contain the same information. One is shorter, and easier to read.
Another thing that I'm struggling to make sense of is why the NewJob
function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally
on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:
w := worker.New(log) // assume rename NewWorker
aJob := w.NewJob(func() error
if err := db.DoSomethingComplex(); err != nil
return err
return nil
)
go w.Run(aJob)
// a whole lot of stuff taking a long time
aJob.SetFinally(func(success bool)
log.Debugf("Job success %v", success)
)
// and a whole list of calls that change the behaviour of aJob
Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.
What's worse: you're exposing a SetHandler
function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler
on the same object? Surely a job should be immutable at the core: the call which it represents.
What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.
type JobOption func(*job)
// SetFinally returns JobOption, part of variadic args configuring jobs
func SetFinally(f func(bool)) JobOption
return func(j *job)
j.finally = f
func Job(call func() error, jopts ...JobOption) job
j := job
handler: call,
// set all defaults here
for _, o := range jopts
o(&j)
j
So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:
job := worker.Job(
func() error return nil ,
worker.SetFinally(func (s bool)
if s
// ...
),
)
You can add worker.SetRetryAttempts
in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.
I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new()
. I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob
, for example:
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
Compared to the one-liner:
return &job
id: randomString(),
retryLimit: DefaultRetryLimit,
handler: handler,
done: make(chan bool),
I prefer the latter, the &
immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.
The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:
type w struct
// although I don't really see the point in the ID fields everywhere
id string
log logger.Logger
concurrentJobs int
pipeline chan job // channel of jobs to work on
// variadic args to specify the concurrent runners might be nice
func NewPool(l logger.Logger) Pool
return &w
id: randomString(), // use a uuid package at the very least
log: l,
concurrentJobs: MaxConcurrentRunners,
pipeline: make(chan job, MaxConcurrentRunners),
Then when running jobs, just start with:
wg := &sync.WaitGroup
wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
for i := 0; i < w.concurrentJobs; i++
go w.run(wg) // not I'm not passing a job here, just the waitgroup
for _, job := range jobs
w.pipeline <- job // push onto channel
// all jobs have been pushed onto pipeline, close it
close(w.pipeline)
wg.Wait() // wait for everything to finish
// all done, reopen for business
w.pipeline = make(chan job, w.concurrentJobs)
Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run
function:
func (w *workerPool) run(wg *sync.WaitGroup)
defer wg.Done() // no need to wrap a defer call in an anonymous function BTW
// keep reading from pipeline until channel closes
for job := range w.pipeline
if err := job.handler(); err != nil
// retry loop, you may want to break this out into separate func
job.Finally(true)
now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally
with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done()
channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.
Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool
type BTW.
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
add a comment |Â
up vote
3
down vote
accepted
up vote
3
down vote
accepted
OK, let's go through this step by step. Beginning with the snippet of how you use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker
, the types and functions it exposes shouldn't repeat that name. If you read worker.New
or worker.NewWorker
, both contain the same information. One is shorter, and easier to read.
Another thing that I'm struggling to make sense of is why the NewJob
function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally
on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:
w := worker.New(log) // assume rename NewWorker
aJob := w.NewJob(func() error
if err := db.DoSomethingComplex(); err != nil
return err
return nil
)
go w.Run(aJob)
// a whole lot of stuff taking a long time
aJob.SetFinally(func(success bool)
log.Debugf("Job success %v", success)
)
// and a whole list of calls that change the behaviour of aJob
Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.
What's worse: you're exposing a SetHandler
function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler
on the same object? Surely a job should be immutable at the core: the call which it represents.
What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.
type JobOption func(*job)
// SetFinally returns JobOption, part of variadic args configuring jobs
func SetFinally(f func(bool)) JobOption
return func(j *job)
j.finally = f
func Job(call func() error, jopts ...JobOption) job
j := job
handler: call,
// set all defaults here
for _, o := range jopts
o(&j)
j
So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:
job := worker.Job(
func() error return nil ,
worker.SetFinally(func (s bool)
if s
// ...
),
)
You can add worker.SetRetryAttempts
in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.
I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new()
. I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob
, for example:
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
Compared to the one-liner:
return &job
id: randomString(),
retryLimit: DefaultRetryLimit,
handler: handler,
done: make(chan bool),
I prefer the latter, the &
immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.
The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:
type w struct
// although I don't really see the point in the ID fields everywhere
id string
log logger.Logger
concurrentJobs int
pipeline chan job // channel of jobs to work on
// variadic args to specify the concurrent runners might be nice
func NewPool(l logger.Logger) Pool
return &w
id: randomString(), // use a uuid package at the very least
log: l,
concurrentJobs: MaxConcurrentRunners,
pipeline: make(chan job, MaxConcurrentRunners),
Then when running jobs, just start with:
wg := &sync.WaitGroup
wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
for i := 0; i < w.concurrentJobs; i++
go w.run(wg) // not I'm not passing a job here, just the waitgroup
for _, job := range jobs
w.pipeline <- job // push onto channel
// all jobs have been pushed onto pipeline, close it
close(w.pipeline)
wg.Wait() // wait for everything to finish
// all done, reopen for business
w.pipeline = make(chan job, w.concurrentJobs)
Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run
function:
func (w *workerPool) run(wg *sync.WaitGroup)
defer wg.Done() // no need to wrap a defer call in an anonymous function BTW
// keep reading from pipeline until channel closes
for job := range w.pipeline
if err := job.handler(); err != nil
// retry loop, you may want to break this out into separate func
job.Finally(true)
now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally
with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done()
channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.
Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool
type BTW.
OK, let's go through this step by step. Beginning with the snippet of how you use the code:
log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...
),
)
The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker
, the types and functions it exposes shouldn't repeat that name. If you read worker.New
or worker.NewWorker
, both contain the same information. One is shorter, and easier to read.
Another thing that I'm struggling to make sense of is why the NewJob
function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally
on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:
w := worker.New(log) // assume rename NewWorker
aJob := w.NewJob(func() error
if err := db.DoSomethingComplex(); err != nil
return err
return nil
)
go w.Run(aJob)
// a whole lot of stuff taking a long time
aJob.SetFinally(func(success bool)
log.Debugf("Job success %v", success)
)
// and a whole list of calls that change the behaviour of aJob
Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.
What's worse: you're exposing a SetHandler
function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler
on the same object? Surely a job should be immutable at the core: the call which it represents.
What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.
type JobOption func(*job)
// SetFinally returns JobOption, part of variadic args configuring jobs
func SetFinally(f func(bool)) JobOption
return func(j *job)
j.finally = f
func Job(call func() error, jopts ...JobOption) job
j := job
handler: call,
// set all defaults here
for _, o := range jopts
o(&j)
j
So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:
job := worker.Job(
func() error return nil ,
worker.SetFinally(func (s bool)
if s
// ...
),
)
You can add worker.SetRetryAttempts
in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.
I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new()
. I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob
, for example:
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)
return j
Compared to the one-liner:
return &job
id: randomString(),
retryLimit: DefaultRetryLimit,
handler: handler,
done: make(chan bool),
I prefer the latter, the &
immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.
The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:
type w struct
// although I don't really see the point in the ID fields everywhere
id string
log logger.Logger
concurrentJobs int
pipeline chan job // channel of jobs to work on
// variadic args to specify the concurrent runners might be nice
func NewPool(l logger.Logger) Pool
return &w
id: randomString(), // use a uuid package at the very least
log: l,
concurrentJobs: MaxConcurrentRunners,
pipeline: make(chan job, MaxConcurrentRunners),
Then when running jobs, just start with:
wg := &sync.WaitGroup
wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
for i := 0; i < w.concurrentJobs; i++
go w.run(wg) // not I'm not passing a job here, just the waitgroup
for _, job := range jobs
w.pipeline <- job // push onto channel
// all jobs have been pushed onto pipeline, close it
close(w.pipeline)
wg.Wait() // wait for everything to finish
// all done, reopen for business
w.pipeline = make(chan job, w.concurrentJobs)
Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run
function:
func (w *workerPool) run(wg *sync.WaitGroup)
defer wg.Done() // no need to wrap a defer call in an anonymous function BTW
// keep reading from pipeline until channel closes
for job := range w.pipeline
if err := job.handler(); err != nil
// retry loop, you may want to break this out into separate func
job.Finally(true)
now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally
with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done()
channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.
Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool
type BTW.
answered Apr 2 at 15:01
Elias Van Ootegem
8,8631944
8,8631944
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
add a comment |Â
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
â Lansana
Apr 2 at 15:19
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
I will likely be back with a few questions... :)
â Lansana
Apr 2 at 15:27
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188652%2fworker-pool-and-jobs%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password