Worker, Pool and Jobs

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
3
down vote

favorite












I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.



I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.



This is the first "concurrent" piece of code I've ever written, so it may not be the best.



It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.



I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.



This is how to use the code:



log := logger.New()
w := worker.NewWorker(log)
go w.Run(
w.NewJob(func() error
return errors.New("foo")
),
w.NewJob(func() error
return nil
).SetFinally(func(success bool)
if success
// ...

),
)


This is the package (if you want to run it yourself, just remove the logger logic, or create a mock of the logger, and you can run it just fine:



package worker

import (
"fmt"
"math"
"math/rand"
"sync"
"time"

"core/logger"
)

const (
// MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
MaxConcurrentRunners = 1000

// MaxRetryLimit is the maximum amount of retries for a failed job.
MaxRetryLimit = 10

// DefaultRetryLimit is the default amount of retries for a failed job.
DefaultRetryLimit = 3
)

// Worker is just an alias for Pool. It's nice to have so i a type can be used
// like worker.Worker instead of worker.Pool.
type Worker interface
Pool


// Pool is an interface for a worker pool.
type Pool interface
NewJob(handler func() error) Job
Run(jobs ...Job)
Flushed() bool


// Job is an interface for a job.
type Job interface
ID() string
SetRetryLimit(retryLimit int) Job
RetryLimit() int
IncrementAttempts()
CurrentAttempts() int
SetHandler(handler func() error)
Handler() func() error
SetFinally(finally func(success bool)) Job
Finally(success bool) Job
Done() chan bool
SignalDone()


// workerPool represents a worker pool.
type workerPool struct
id string
log logger.Logger
semaphore chan struct


// NewWorker returns a new pool, which the Worker interface implements.
func NewWorker(log logger.Logger) Worker
return NewPool(log)


// NewPool returns a new worker instance.
func NewPool(log logger.Logger) Pool
w := new(workerPool)
w.id = randomString()
w.log = log
w.semaphore = make(chan struct, MaxConcurrentRunners)

return w


// NewJob creates a new job for a worker pool.
func (w *workerPool) NewJob(handler func() error) Job
j := new(job)
j.id = randomString()
j.retryLimit = DefaultRetryLimit
j.handler = handler
j.done = make(chan bool)

return j


// ID will return the ID of a pool.
func (w *workerPool) ID() string
return w.id


// Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
func (w *workerPool) Flushed() bool
return len(w.semaphore) == 0


// DoWork will begin processing the jobs.
func (w *workerPool) Run(jobs ...Job)
// Cache the count of jobs.
l := len(jobs)

// Create a new wait group and set the counter to the count of jobs.
wg := new(sync.WaitGroup)
wg.Add(l)

// Process each job.
for _, job := range jobs
// Block pool buffer is full.
w.semaphore <- struct

go func(job Job)
// Log start of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))

// Execute the job.
go func()
w.run(wg, job)
()

// Wait for the job to be signaled as complete.
<-job.Done()

// Release a slot in the pool buffer.
<-w.semaphore

// Decrement the wait group.
wg.Done()

// Log end of job processing.
w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
(job)


// Wait for the wait group counter to be depleted.
wg.Wait()


// run will process the job until it succeeds or reaches the maximum retries.
func (w *workerPool) run(wg *sync.WaitGroup, job Job)
defer func()
job.SignalDone()
()

// Execute job.
if err := job.Handler()(); err != nil
for
// Increment counter.
job.IncrementAttempts()

// Wait retry period.
timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
<-timer.C

// Execute job.
if err := job.Handler()(); err != nil
// Maximum attempts reached without success.
if job.CurrentAttempts() >= job.RetryLimit()
job.Finally(false)
w.log.Error(err)
return


continue
else
break




job.Finally(true)


// job represents a job for a worker pool.
type job struct
id string
retryLimit int
currentAttempts int
handler func() error
finally func(success bool)
done chan bool


// ID will return the ID of a job.
func (j *job) ID() string
return j.id


// Done returns a channel that signals when the job is done
func (j *job) Done() chan bool
return j.done


// SignalDone will signal when a job is done. This can also be used from outside the
// worker to cancel a job, etc.
func (j *job) SignalDone()
j.done <- true


// SetRetryLimit will set the jobs retry limit.
func (j *job) SetRetryLimit(retryLimit int) Job
if retryLimit <= 0
j.retryLimit = DefaultRetryLimit
else if retryLimit > MaxRetryLimit
j.retryLimit = MaxRetryLimit
else
j.retryLimit = retryLimit


return j


// RetryLimit will get the jobs retry limit.
func (j *job) RetryLimit() int
return j.retryLimit


// CurrentAttempts will get the jobs current attempts.
func (j *job) CurrentAttempts() int
return j.currentAttempts


// IncrementAttempts increments the number of attempts on this job.
func (j *job) IncrementAttempts()
j.currentAttempts++


// SetHandler will set the jobs handler.
func (j *job) SetHandler(handler func() error)
j.handler = handler


// Handler will get the jobs handler.
func (j *job) Handler() func() error
return j.handler


// SetFinally will set the finally function of the job, which will be called upon job completion.
func (j *job) SetFinally(finally func(success bool)) Job
j.finally = finally

return j


// Finally will call finally.
func (j *job) Finally(success bool) Job
if j.finally != nil
j.finally(success)


return j


// ExponentialBackoff will give a duration using an exponential backup.
//
// Example failedAttempts:
// 1: 500ms
// 2: 1s
// 3: 2s
// 4: 4s
// 5: 8s
// 6: 16s
// 7: 32s
// 8: 1m4s
// 9: 2m8s
// 10: 4m16s
func ExponentialBackoff(failedAttempts int) time.Duration
return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)


// randomString will generate a random string.
func randomString() string
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

l := len(chars)
res := make(byte, 30)
for i := range res
res[i] = chars[rand.Intn(l)]


return string(res)







share|improve this question



























    up vote
    3
    down vote

    favorite












    I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.



    I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.



    This is the first "concurrent" piece of code I've ever written, so it may not be the best.



    It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.



    I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.



    This is how to use the code:



    log := logger.New()
    w := worker.NewWorker(log)
    go w.Run(
    w.NewJob(func() error
    return errors.New("foo")
    ),
    w.NewJob(func() error
    return nil
    ).SetFinally(func(success bool)
    if success
    // ...

    ),
    )


    This is the package (if you want to run it yourself, just remove the logger logic, or create a mock of the logger, and you can run it just fine:



    package worker

    import (
    "fmt"
    "math"
    "math/rand"
    "sync"
    "time"

    "core/logger"
    )

    const (
    // MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
    MaxConcurrentRunners = 1000

    // MaxRetryLimit is the maximum amount of retries for a failed job.
    MaxRetryLimit = 10

    // DefaultRetryLimit is the default amount of retries for a failed job.
    DefaultRetryLimit = 3
    )

    // Worker is just an alias for Pool. It's nice to have so i a type can be used
    // like worker.Worker instead of worker.Pool.
    type Worker interface
    Pool


    // Pool is an interface for a worker pool.
    type Pool interface
    NewJob(handler func() error) Job
    Run(jobs ...Job)
    Flushed() bool


    // Job is an interface for a job.
    type Job interface
    ID() string
    SetRetryLimit(retryLimit int) Job
    RetryLimit() int
    IncrementAttempts()
    CurrentAttempts() int
    SetHandler(handler func() error)
    Handler() func() error
    SetFinally(finally func(success bool)) Job
    Finally(success bool) Job
    Done() chan bool
    SignalDone()


    // workerPool represents a worker pool.
    type workerPool struct
    id string
    log logger.Logger
    semaphore chan struct


    // NewWorker returns a new pool, which the Worker interface implements.
    func NewWorker(log logger.Logger) Worker
    return NewPool(log)


    // NewPool returns a new worker instance.
    func NewPool(log logger.Logger) Pool
    w := new(workerPool)
    w.id = randomString()
    w.log = log
    w.semaphore = make(chan struct, MaxConcurrentRunners)

    return w


    // NewJob creates a new job for a worker pool.
    func (w *workerPool) NewJob(handler func() error) Job
    j := new(job)
    j.id = randomString()
    j.retryLimit = DefaultRetryLimit
    j.handler = handler
    j.done = make(chan bool)

    return j


    // ID will return the ID of a pool.
    func (w *workerPool) ID() string
    return w.id


    // Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
    func (w *workerPool) Flushed() bool
    return len(w.semaphore) == 0


    // DoWork will begin processing the jobs.
    func (w *workerPool) Run(jobs ...Job)
    // Cache the count of jobs.
    l := len(jobs)

    // Create a new wait group and set the counter to the count of jobs.
    wg := new(sync.WaitGroup)
    wg.Add(l)

    // Process each job.
    for _, job := range jobs
    // Block pool buffer is full.
    w.semaphore <- struct

    go func(job Job)
    // Log start of job processing.
    w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))

    // Execute the job.
    go func()
    w.run(wg, job)
    ()

    // Wait for the job to be signaled as complete.
    <-job.Done()

    // Release a slot in the pool buffer.
    <-w.semaphore

    // Decrement the wait group.
    wg.Done()

    // Log end of job processing.
    w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
    (job)


    // Wait for the wait group counter to be depleted.
    wg.Wait()


    // run will process the job until it succeeds or reaches the maximum retries.
    func (w *workerPool) run(wg *sync.WaitGroup, job Job)
    defer func()
    job.SignalDone()
    ()

    // Execute job.
    if err := job.Handler()(); err != nil
    for
    // Increment counter.
    job.IncrementAttempts()

    // Wait retry period.
    timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
    <-timer.C

    // Execute job.
    if err := job.Handler()(); err != nil
    // Maximum attempts reached without success.
    if job.CurrentAttempts() >= job.RetryLimit()
    job.Finally(false)
    w.log.Error(err)
    return


    continue
    else
    break




    job.Finally(true)


    // job represents a job for a worker pool.
    type job struct
    id string
    retryLimit int
    currentAttempts int
    handler func() error
    finally func(success bool)
    done chan bool


    // ID will return the ID of a job.
    func (j *job) ID() string
    return j.id


    // Done returns a channel that signals when the job is done
    func (j *job) Done() chan bool
    return j.done


    // SignalDone will signal when a job is done. This can also be used from outside the
    // worker to cancel a job, etc.
    func (j *job) SignalDone()
    j.done <- true


    // SetRetryLimit will set the jobs retry limit.
    func (j *job) SetRetryLimit(retryLimit int) Job
    if retryLimit <= 0
    j.retryLimit = DefaultRetryLimit
    else if retryLimit > MaxRetryLimit
    j.retryLimit = MaxRetryLimit
    else
    j.retryLimit = retryLimit


    return j


    // RetryLimit will get the jobs retry limit.
    func (j *job) RetryLimit() int
    return j.retryLimit


    // CurrentAttempts will get the jobs current attempts.
    func (j *job) CurrentAttempts() int
    return j.currentAttempts


    // IncrementAttempts increments the number of attempts on this job.
    func (j *job) IncrementAttempts()
    j.currentAttempts++


    // SetHandler will set the jobs handler.
    func (j *job) SetHandler(handler func() error)
    j.handler = handler


    // Handler will get the jobs handler.
    func (j *job) Handler() func() error
    return j.handler


    // SetFinally will set the finally function of the job, which will be called upon job completion.
    func (j *job) SetFinally(finally func(success bool)) Job
    j.finally = finally

    return j


    // Finally will call finally.
    func (j *job) Finally(success bool) Job
    if j.finally != nil
    j.finally(success)


    return j


    // ExponentialBackoff will give a duration using an exponential backup.
    //
    // Example failedAttempts:
    // 1: 500ms
    // 2: 1s
    // 3: 2s
    // 4: 4s
    // 5: 8s
    // 6: 16s
    // 7: 32s
    // 8: 1m4s
    // 9: 2m8s
    // 10: 4m16s
    func ExponentialBackoff(failedAttempts int) time.Duration
    return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)


    // randomString will generate a random string.
    func randomString() string
    const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

    l := len(chars)
    res := make(byte, 30)
    for i := range res
    res[i] = chars[rand.Intn(l)]


    return string(res)







    share|improve this question























      up vote
      3
      down vote

      favorite









      up vote
      3
      down vote

      favorite











      I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.



      I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.



      This is the first "concurrent" piece of code I've ever written, so it may not be the best.



      It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.



      I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.



      This is how to use the code:



      log := logger.New()
      w := worker.NewWorker(log)
      go w.Run(
      w.NewJob(func() error
      return errors.New("foo")
      ),
      w.NewJob(func() error
      return nil
      ).SetFinally(func(success bool)
      if success
      // ...

      ),
      )


      This is the package (if you want to run it yourself, just remove the logger logic, or create a mock of the logger, and you can run it just fine:



      package worker

      import (
      "fmt"
      "math"
      "math/rand"
      "sync"
      "time"

      "core/logger"
      )

      const (
      // MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
      MaxConcurrentRunners = 1000

      // MaxRetryLimit is the maximum amount of retries for a failed job.
      MaxRetryLimit = 10

      // DefaultRetryLimit is the default amount of retries for a failed job.
      DefaultRetryLimit = 3
      )

      // Worker is just an alias for Pool. It's nice to have so i a type can be used
      // like worker.Worker instead of worker.Pool.
      type Worker interface
      Pool


      // Pool is an interface for a worker pool.
      type Pool interface
      NewJob(handler func() error) Job
      Run(jobs ...Job)
      Flushed() bool


      // Job is an interface for a job.
      type Job interface
      ID() string
      SetRetryLimit(retryLimit int) Job
      RetryLimit() int
      IncrementAttempts()
      CurrentAttempts() int
      SetHandler(handler func() error)
      Handler() func() error
      SetFinally(finally func(success bool)) Job
      Finally(success bool) Job
      Done() chan bool
      SignalDone()


      // workerPool represents a worker pool.
      type workerPool struct
      id string
      log logger.Logger
      semaphore chan struct


      // NewWorker returns a new pool, which the Worker interface implements.
      func NewWorker(log logger.Logger) Worker
      return NewPool(log)


      // NewPool returns a new worker instance.
      func NewPool(log logger.Logger) Pool
      w := new(workerPool)
      w.id = randomString()
      w.log = log
      w.semaphore = make(chan struct, MaxConcurrentRunners)

      return w


      // NewJob creates a new job for a worker pool.
      func (w *workerPool) NewJob(handler func() error) Job
      j := new(job)
      j.id = randomString()
      j.retryLimit = DefaultRetryLimit
      j.handler = handler
      j.done = make(chan bool)

      return j


      // ID will return the ID of a pool.
      func (w *workerPool) ID() string
      return w.id


      // Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
      func (w *workerPool) Flushed() bool
      return len(w.semaphore) == 0


      // DoWork will begin processing the jobs.
      func (w *workerPool) Run(jobs ...Job)
      // Cache the count of jobs.
      l := len(jobs)

      // Create a new wait group and set the counter to the count of jobs.
      wg := new(sync.WaitGroup)
      wg.Add(l)

      // Process each job.
      for _, job := range jobs
      // Block pool buffer is full.
      w.semaphore <- struct

      go func(job Job)
      // Log start of job processing.
      w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))

      // Execute the job.
      go func()
      w.run(wg, job)
      ()

      // Wait for the job to be signaled as complete.
      <-job.Done()

      // Release a slot in the pool buffer.
      <-w.semaphore

      // Decrement the wait group.
      wg.Done()

      // Log end of job processing.
      w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
      (job)


      // Wait for the wait group counter to be depleted.
      wg.Wait()


      // run will process the job until it succeeds or reaches the maximum retries.
      func (w *workerPool) run(wg *sync.WaitGroup, job Job)
      defer func()
      job.SignalDone()
      ()

      // Execute job.
      if err := job.Handler()(); err != nil
      for
      // Increment counter.
      job.IncrementAttempts()

      // Wait retry period.
      timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
      <-timer.C

      // Execute job.
      if err := job.Handler()(); err != nil
      // Maximum attempts reached without success.
      if job.CurrentAttempts() >= job.RetryLimit()
      job.Finally(false)
      w.log.Error(err)
      return


      continue
      else
      break




      job.Finally(true)


      // job represents a job for a worker pool.
      type job struct
      id string
      retryLimit int
      currentAttempts int
      handler func() error
      finally func(success bool)
      done chan bool


      // ID will return the ID of a job.
      func (j *job) ID() string
      return j.id


      // Done returns a channel that signals when the job is done
      func (j *job) Done() chan bool
      return j.done


      // SignalDone will signal when a job is done. This can also be used from outside the
      // worker to cancel a job, etc.
      func (j *job) SignalDone()
      j.done <- true


      // SetRetryLimit will set the jobs retry limit.
      func (j *job) SetRetryLimit(retryLimit int) Job
      if retryLimit <= 0
      j.retryLimit = DefaultRetryLimit
      else if retryLimit > MaxRetryLimit
      j.retryLimit = MaxRetryLimit
      else
      j.retryLimit = retryLimit


      return j


      // RetryLimit will get the jobs retry limit.
      func (j *job) RetryLimit() int
      return j.retryLimit


      // CurrentAttempts will get the jobs current attempts.
      func (j *job) CurrentAttempts() int
      return j.currentAttempts


      // IncrementAttempts increments the number of attempts on this job.
      func (j *job) IncrementAttempts()
      j.currentAttempts++


      // SetHandler will set the jobs handler.
      func (j *job) SetHandler(handler func() error)
      j.handler = handler


      // Handler will get the jobs handler.
      func (j *job) Handler() func() error
      return j.handler


      // SetFinally will set the finally function of the job, which will be called upon job completion.
      func (j *job) SetFinally(finally func(success bool)) Job
      j.finally = finally

      return j


      // Finally will call finally.
      func (j *job) Finally(success bool) Job
      if j.finally != nil
      j.finally(success)


      return j


      // ExponentialBackoff will give a duration using an exponential backup.
      //
      // Example failedAttempts:
      // 1: 500ms
      // 2: 1s
      // 3: 2s
      // 4: 4s
      // 5: 8s
      // 6: 16s
      // 7: 32s
      // 8: 1m4s
      // 9: 2m8s
      // 10: 4m16s
      func ExponentialBackoff(failedAttempts int) time.Duration
      return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)


      // randomString will generate a random string.
      func randomString() string
      const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

      l := len(chars)
      res := make(byte, 30)
      for i := range res
      res[i] = chars[rand.Intn(l)]


      return string(res)







      share|improve this question













      I created a snippet of code that allows me to create a worker pool, and from that worker pool I can run a bunch of jobs.



      I can configure each job to have a retry limit, and set a callback that will be invoked at the end of each job with a boolean argument that represents whether the job ultimately failed or succeeded.



      This is the first "concurrent" piece of code I've ever written, so it may not be the best.



      It works perfectly fine as I expect it to, and passes all my tests, even when I run hundreds of tests over and over.



      I would just like someone with practiced eyes in this area to look at my code and tell me where there may be some flaws or areas of improvement.



      This is how to use the code:



      log := logger.New()
      w := worker.NewWorker(log)
      go w.Run(
      w.NewJob(func() error
      return errors.New("foo")
      ),
      w.NewJob(func() error
      return nil
      ).SetFinally(func(success bool)
      if success
      // ...

      ),
      )


      This is the package (if you want to run it yourself, just remove the logger logic, or create a mock of the logger, and you can run it just fine:



      package worker

      import (
      "fmt"
      "math"
      "math/rand"
      "sync"
      "time"

      "core/logger"
      )

      const (
      // MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
      MaxConcurrentRunners = 1000

      // MaxRetryLimit is the maximum amount of retries for a failed job.
      MaxRetryLimit = 10

      // DefaultRetryLimit is the default amount of retries for a failed job.
      DefaultRetryLimit = 3
      )

      // Worker is just an alias for Pool. It's nice to have so i a type can be used
      // like worker.Worker instead of worker.Pool.
      type Worker interface
      Pool


      // Pool is an interface for a worker pool.
      type Pool interface
      NewJob(handler func() error) Job
      Run(jobs ...Job)
      Flushed() bool


      // Job is an interface for a job.
      type Job interface
      ID() string
      SetRetryLimit(retryLimit int) Job
      RetryLimit() int
      IncrementAttempts()
      CurrentAttempts() int
      SetHandler(handler func() error)
      Handler() func() error
      SetFinally(finally func(success bool)) Job
      Finally(success bool) Job
      Done() chan bool
      SignalDone()


      // workerPool represents a worker pool.
      type workerPool struct
      id string
      log logger.Logger
      semaphore chan struct


      // NewWorker returns a new pool, which the Worker interface implements.
      func NewWorker(log logger.Logger) Worker
      return NewPool(log)


      // NewPool returns a new worker instance.
      func NewPool(log logger.Logger) Pool
      w := new(workerPool)
      w.id = randomString()
      w.log = log
      w.semaphore = make(chan struct, MaxConcurrentRunners)

      return w


      // NewJob creates a new job for a worker pool.
      func (w *workerPool) NewJob(handler func() error) Job
      j := new(job)
      j.id = randomString()
      j.retryLimit = DefaultRetryLimit
      j.handler = handler
      j.done = make(chan bool)

      return j


      // ID will return the ID of a pool.
      func (w *workerPool) ID() string
      return w.id


      // Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
      func (w *workerPool) Flushed() bool
      return len(w.semaphore) == 0


      // DoWork will begin processing the jobs.
      func (w *workerPool) Run(jobs ...Job)
      // Cache the count of jobs.
      l := len(jobs)

      // Create a new wait group and set the counter to the count of jobs.
      wg := new(sync.WaitGroup)
      wg.Add(l)

      // Process each job.
      for _, job := range jobs
      // Block pool buffer is full.
      w.semaphore <- struct

      go func(job Job)
      // Log start of job processing.
      w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))

      // Execute the job.
      go func()
      w.run(wg, job)
      ()

      // Wait for the job to be signaled as complete.
      <-job.Done()

      // Release a slot in the pool buffer.
      <-w.semaphore

      // Decrement the wait group.
      wg.Done()

      // Log end of job processing.
      w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
      (job)


      // Wait for the wait group counter to be depleted.
      wg.Wait()


      // run will process the job until it succeeds or reaches the maximum retries.
      func (w *workerPool) run(wg *sync.WaitGroup, job Job)
      defer func()
      job.SignalDone()
      ()

      // Execute job.
      if err := job.Handler()(); err != nil
      for
      // Increment counter.
      job.IncrementAttempts()

      // Wait retry period.
      timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
      <-timer.C

      // Execute job.
      if err := job.Handler()(); err != nil
      // Maximum attempts reached without success.
      if job.CurrentAttempts() >= job.RetryLimit()
      job.Finally(false)
      w.log.Error(err)
      return


      continue
      else
      break




      job.Finally(true)


      // job represents a job for a worker pool.
      type job struct
      id string
      retryLimit int
      currentAttempts int
      handler func() error
      finally func(success bool)
      done chan bool


      // ID will return the ID of a job.
      func (j *job) ID() string
      return j.id


      // Done returns a channel that signals when the job is done
      func (j *job) Done() chan bool
      return j.done


      // SignalDone will signal when a job is done. This can also be used from outside the
      // worker to cancel a job, etc.
      func (j *job) SignalDone()
      j.done <- true


      // SetRetryLimit will set the jobs retry limit.
      func (j *job) SetRetryLimit(retryLimit int) Job
      if retryLimit <= 0
      j.retryLimit = DefaultRetryLimit
      else if retryLimit > MaxRetryLimit
      j.retryLimit = MaxRetryLimit
      else
      j.retryLimit = retryLimit


      return j


      // RetryLimit will get the jobs retry limit.
      func (j *job) RetryLimit() int
      return j.retryLimit


      // CurrentAttempts will get the jobs current attempts.
      func (j *job) CurrentAttempts() int
      return j.currentAttempts


      // IncrementAttempts increments the number of attempts on this job.
      func (j *job) IncrementAttempts()
      j.currentAttempts++


      // SetHandler will set the jobs handler.
      func (j *job) SetHandler(handler func() error)
      j.handler = handler


      // Handler will get the jobs handler.
      func (j *job) Handler() func() error
      return j.handler


      // SetFinally will set the finally function of the job, which will be called upon job completion.
      func (j *job) SetFinally(finally func(success bool)) Job
      j.finally = finally

      return j


      // Finally will call finally.
      func (j *job) Finally(success bool) Job
      if j.finally != nil
      j.finally(success)


      return j


      // ExponentialBackoff will give a duration using an exponential backup.
      //
      // Example failedAttempts:
      // 1: 500ms
      // 2: 1s
      // 3: 2s
      // 4: 4s
      // 5: 8s
      // 6: 16s
      // 7: 32s
      // 8: 1m4s
      // 9: 2m8s
      // 10: 4m16s
      func ExponentialBackoff(failedAttempts int) time.Duration
      return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)


      // randomString will generate a random string.
      func randomString() string
      const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

      l := len(chars)
      res := make(byte, 30)
      for i := range res
      res[i] = chars[rand.Intn(l)]


      return string(res)









      share|improve this question












      share|improve this question




      share|improve this question








      edited Mar 19 at 1:36
























      asked Mar 2 at 2:00









      Lansana

      227211




      227211




















          1 Answer
          1






          active

          oldest

          votes

















          up vote
          3
          down vote



          accepted










          OK, let's go through this step by step. Beginning with the snippet of how you use the code:



          log := logger.New()
          w := worker.NewWorker(log)
          go w.Run(
          w.NewJob(func() error
          return errors.New("foo")
          ),
          w.NewJob(func() error
          return nil
          ).SetFinally(func(success bool)
          if success
          // ...

          ),
          )


          The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker, the types and functions it exposes shouldn't repeat that name. If you read worker.New or worker.NewWorker, both contain the same information. One is shorter, and easier to read.



          Another thing that I'm struggling to make sense of is why the NewJob function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:



          w := worker.New(log) // assume rename NewWorker
          aJob := w.NewJob(func() error
          if err := db.DoSomethingComplex(); err != nil
          return err

          return nil
          )
          go w.Run(aJob)
          // a whole lot of stuff taking a long time
          aJob.SetFinally(func(success bool)
          log.Debugf("Job success %v", success)
          )
          // and a whole list of calls that change the behaviour of aJob


          Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.

          What's worse: you're exposing a SetHandler function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler on the same object? Surely a job should be immutable at the core: the call which it represents.



          What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.



          type JobOption func(*job)

          // SetFinally returns JobOption, part of variadic args configuring jobs
          func SetFinally(f func(bool)) JobOption
          return func(j *job)
          j.finally = f



          func Job(call func() error, jopts ...JobOption) job
          j := job
          handler: call,
          // set all defaults here

          for _, o := range jopts
          o(&j)

          j



          So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:



          job := worker.Job(
          func() error return nil ,
          worker.SetFinally(func (s bool)
          if s
          // ...

          ),
          )


          You can add worker.SetRetryAttempts in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.




          I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new(). I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob, for example:



          j := new(job)
          j.id = randomString()
          j.retryLimit = DefaultRetryLimit
          j.handler = handler
          j.done = make(chan bool)

          return j


          Compared to the one-liner:



          return &job
          id: randomString(),
          retryLimit: DefaultRetryLimit,
          handler: handler,
          done: make(chan bool),



          I prefer the latter, the & immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.




          The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:



          type w struct 
          // although I don't really see the point in the ID fields everywhere
          id string
          log logger.Logger
          concurrentJobs int
          pipeline chan job // channel of jobs to work on


          // variadic args to specify the concurrent runners might be nice
          func NewPool(l logger.Logger) Pool
          return &w
          id: randomString(), // use a uuid package at the very least
          log: l,
          concurrentJobs: MaxConcurrentRunners,
          pipeline: make(chan job, MaxConcurrentRunners),




          Then when running jobs, just start with:



          wg := &sync.WaitGroup
          wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
          for i := 0; i < w.concurrentJobs; i++
          go w.run(wg) // not I'm not passing a job here, just the waitgroup


          for _, job := range jobs
          w.pipeline <- job // push onto channel

          // all jobs have been pushed onto pipeline, close it
          close(w.pipeline)
          wg.Wait() // wait for everything to finish
          // all done, reopen for business
          w.pipeline = make(chan job, w.concurrentJobs)


          Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run function:



          func (w *workerPool) run(wg *sync.WaitGroup) 
          defer wg.Done() // no need to wrap a defer call in an anonymous function BTW

          // keep reading from pipeline until channel closes
          for job := range w.pipeline
          if err := job.handler(); err != nil
          // retry loop, you may want to break this out into separate func

          job.Finally(true)




          now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done() channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.




          Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool type BTW.






          share|improve this answer





















          • Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
            – Lansana
            Apr 2 at 15:19










          • I will likely be back with a few questions... :)
            – Lansana
            Apr 2 at 15:27










          Your Answer




          StackExchange.ifUsing("editor", function ()
          return StackExchange.using("mathjaxEditing", function ()
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          );
          );
          , "mathjax-editing");

          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "196"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: false,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );








           

          draft saved


          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188652%2fworker-pool-and-jobs%23new-answer', 'question_page');

          );

          Post as a guest






























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          3
          down vote



          accepted










          OK, let's go through this step by step. Beginning with the snippet of how you use the code:



          log := logger.New()
          w := worker.NewWorker(log)
          go w.Run(
          w.NewJob(func() error
          return errors.New("foo")
          ),
          w.NewJob(func() error
          return nil
          ).SetFinally(func(success bool)
          if success
          // ...

          ),
          )


          The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker, the types and functions it exposes shouldn't repeat that name. If you read worker.New or worker.NewWorker, both contain the same information. One is shorter, and easier to read.



          Another thing that I'm struggling to make sense of is why the NewJob function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:



          w := worker.New(log) // assume rename NewWorker
          aJob := w.NewJob(func() error
          if err := db.DoSomethingComplex(); err != nil
          return err

          return nil
          )
          go w.Run(aJob)
          // a whole lot of stuff taking a long time
          aJob.SetFinally(func(success bool)
          log.Debugf("Job success %v", success)
          )
          // and a whole list of calls that change the behaviour of aJob


          Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.

          What's worse: you're exposing a SetHandler function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler on the same object? Surely a job should be immutable at the core: the call which it represents.



          What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.



          type JobOption func(*job)

          // SetFinally returns JobOption, part of variadic args configuring jobs
          func SetFinally(f func(bool)) JobOption
          return func(j *job)
          j.finally = f



          func Job(call func() error, jopts ...JobOption) job
          j := job
          handler: call,
          // set all defaults here

          for _, o := range jopts
          o(&j)

          j



          So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:



          job := worker.Job(
          func() error return nil ,
          worker.SetFinally(func (s bool)
          if s
          // ...

          ),
          )


          You can add worker.SetRetryAttempts in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.




          I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new(). I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob, for example:



          j := new(job)
          j.id = randomString()
          j.retryLimit = DefaultRetryLimit
          j.handler = handler
          j.done = make(chan bool)

          return j


          Compared to the one-liner:



          return &job
          id: randomString(),
          retryLimit: DefaultRetryLimit,
          handler: handler,
          done: make(chan bool),



          I prefer the latter, the & immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.




          The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:



          type w struct 
          // although I don't really see the point in the ID fields everywhere
          id string
          log logger.Logger
          concurrentJobs int
          pipeline chan job // channel of jobs to work on


          // variadic args to specify the concurrent runners might be nice
          func NewPool(l logger.Logger) Pool
          return &w
          id: randomString(), // use a uuid package at the very least
          log: l,
          concurrentJobs: MaxConcurrentRunners,
          pipeline: make(chan job, MaxConcurrentRunners),




          Then when running jobs, just start with:



          wg := &sync.WaitGroup
          wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
          for i := 0; i < w.concurrentJobs; i++
          go w.run(wg) // not I'm not passing a job here, just the waitgroup


          for _, job := range jobs
          w.pipeline <- job // push onto channel

          // all jobs have been pushed onto pipeline, close it
          close(w.pipeline)
          wg.Wait() // wait for everything to finish
          // all done, reopen for business
          w.pipeline = make(chan job, w.concurrentJobs)


          Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run function:



          func (w *workerPool) run(wg *sync.WaitGroup) 
          defer wg.Done() // no need to wrap a defer call in an anonymous function BTW

          // keep reading from pipeline until channel closes
          for job := range w.pipeline
          if err := job.handler(); err != nil
          // retry loop, you may want to break this out into separate func

          job.Finally(true)




          now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done() channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.




          Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool type BTW.






          share|improve this answer





















          • Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
            – Lansana
            Apr 2 at 15:19










          • I will likely be back with a few questions... :)
            – Lansana
            Apr 2 at 15:27














          up vote
          3
          down vote



          accepted










          OK, let's go through this step by step. Beginning with the snippet of how you use the code:



          log := logger.New()
          w := worker.NewWorker(log)
          go w.Run(
          w.NewJob(func() error
          return errors.New("foo")
          ),
          w.NewJob(func() error
          return nil
          ).SetFinally(func(success bool)
          if success
          // ...

          ),
          )


          The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker, the types and functions it exposes shouldn't repeat that name. If you read worker.New or worker.NewWorker, both contain the same information. One is shorter, and easier to read.



          Another thing that I'm struggling to make sense of is why the NewJob function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:



          w := worker.New(log) // assume rename NewWorker
          aJob := w.NewJob(func() error
          if err := db.DoSomethingComplex(); err != nil
          return err

          return nil
          )
          go w.Run(aJob)
          // a whole lot of stuff taking a long time
          aJob.SetFinally(func(success bool)
          log.Debugf("Job success %v", success)
          )
          // and a whole list of calls that change the behaviour of aJob


          Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.

          What's worse: you're exposing a SetHandler function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler on the same object? Surely a job should be immutable at the core: the call which it represents.



          What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.



          type JobOption func(*job)

          // SetFinally returns JobOption, part of variadic args configuring jobs
          func SetFinally(f func(bool)) JobOption
          return func(j *job)
          j.finally = f



          func Job(call func() error, jopts ...JobOption) job
          j := job
          handler: call,
          // set all defaults here

          for _, o := range jopts
          o(&j)

          j



          So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:



          job := worker.Job(
          func() error return nil ,
          worker.SetFinally(func (s bool)
          if s
          // ...

          ),
          )


          You can add worker.SetRetryAttempts in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.




          I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new(). I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob, for example:



          j := new(job)
          j.id = randomString()
          j.retryLimit = DefaultRetryLimit
          j.handler = handler
          j.done = make(chan bool)

          return j


          Compared to the one-liner:



          return &job
          id: randomString(),
          retryLimit: DefaultRetryLimit,
          handler: handler,
          done: make(chan bool),



          I prefer the latter, the & immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.




          The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:



          type w struct 
          // although I don't really see the point in the ID fields everywhere
          id string
          log logger.Logger
          concurrentJobs int
          pipeline chan job // channel of jobs to work on


          // variadic args to specify the concurrent runners might be nice
          func NewPool(l logger.Logger) Pool
          return &w
          id: randomString(), // use a uuid package at the very least
          log: l,
          concurrentJobs: MaxConcurrentRunners,
          pipeline: make(chan job, MaxConcurrentRunners),




          Then when running jobs, just start with:



          wg := &sync.WaitGroup
          wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
          for i := 0; i < w.concurrentJobs; i++
          go w.run(wg) // not I'm not passing a job here, just the waitgroup


          for _, job := range jobs
          w.pipeline <- job // push onto channel

          // all jobs have been pushed onto pipeline, close it
          close(w.pipeline)
          wg.Wait() // wait for everything to finish
          // all done, reopen for business
          w.pipeline = make(chan job, w.concurrentJobs)


          Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run function:



          func (w *workerPool) run(wg *sync.WaitGroup) 
          defer wg.Done() // no need to wrap a defer call in an anonymous function BTW

          // keep reading from pipeline until channel closes
          for job := range w.pipeline
          if err := job.handler(); err != nil
          // retry loop, you may want to break this out into separate func

          job.Finally(true)




          now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done() channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.




          Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool type BTW.






          share|improve this answer





















          • Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
            – Lansana
            Apr 2 at 15:19










          • I will likely be back with a few questions... :)
            – Lansana
            Apr 2 at 15:27












          up vote
          3
          down vote



          accepted







          up vote
          3
          down vote



          accepted






          OK, let's go through this step by step. Beginning with the snippet of how you use the code:



          log := logger.New()
          w := worker.NewWorker(log)
          go w.Run(
          w.NewJob(func() error
          return errors.New("foo")
          ),
          w.NewJob(func() error
          return nil
          ).SetFinally(func(success bool)
          if success
          // ...

          ),
          )


          The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker, the types and functions it exposes shouldn't repeat that name. If you read worker.New or worker.NewWorker, both contain the same information. One is shorter, and easier to read.



          Another thing that I'm struggling to make sense of is why the NewJob function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:



          w := worker.New(log) // assume rename NewWorker
          aJob := w.NewJob(func() error
          if err := db.DoSomethingComplex(); err != nil
          return err

          return nil
          )
          go w.Run(aJob)
          // a whole lot of stuff taking a long time
          aJob.SetFinally(func(success bool)
          log.Debugf("Job success %v", success)
          )
          // and a whole list of calls that change the behaviour of aJob


          Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.

          What's worse: you're exposing a SetHandler function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler on the same object? Surely a job should be immutable at the core: the call which it represents.



          What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.



          type JobOption func(*job)

          // SetFinally returns JobOption, part of variadic args configuring jobs
          func SetFinally(f func(bool)) JobOption
          return func(j *job)
          j.finally = f



          func Job(call func() error, jopts ...JobOption) job
          j := job
          handler: call,
          // set all defaults here

          for _, o := range jopts
          o(&j)

          j



          So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:



          job := worker.Job(
          func() error return nil ,
          worker.SetFinally(func (s bool)
          if s
          // ...

          ),
          )


          You can add worker.SetRetryAttempts in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.




          I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new(). I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob, for example:



          j := new(job)
          j.id = randomString()
          j.retryLimit = DefaultRetryLimit
          j.handler = handler
          j.done = make(chan bool)

          return j


          Compared to the one-liner:



          return &job
          id: randomString(),
          retryLimit: DefaultRetryLimit,
          handler: handler,
          done: make(chan bool),



          I prefer the latter, the & immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.




          The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:



          type w struct 
          // although I don't really see the point in the ID fields everywhere
          id string
          log logger.Logger
          concurrentJobs int
          pipeline chan job // channel of jobs to work on


          // variadic args to specify the concurrent runners might be nice
          func NewPool(l logger.Logger) Pool
          return &w
          id: randomString(), // use a uuid package at the very least
          log: l,
          concurrentJobs: MaxConcurrentRunners,
          pipeline: make(chan job, MaxConcurrentRunners),




          Then when running jobs, just start with:



          wg := &sync.WaitGroup
          wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
          for i := 0; i < w.concurrentJobs; i++
          go w.run(wg) // not I'm not passing a job here, just the waitgroup


          for _, job := range jobs
          w.pipeline <- job // push onto channel

          // all jobs have been pushed onto pipeline, close it
          close(w.pipeline)
          wg.Wait() // wait for everything to finish
          // all done, reopen for business
          w.pipeline = make(chan job, w.concurrentJobs)


          Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run function:



          func (w *workerPool) run(wg *sync.WaitGroup) 
          defer wg.Done() // no need to wrap a defer call in an anonymous function BTW

          // keep reading from pipeline until channel closes
          for job := range w.pipeline
          if err := job.handler(); err != nil
          // retry loop, you may want to break this out into separate func

          job.Finally(true)




          now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done() channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.




          Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool type BTW.






          share|improve this answer













          OK, let's go through this step by step. Beginning with the snippet of how you use the code:



          log := logger.New()
          w := worker.NewWorker(log)
          go w.Run(
          w.NewJob(func() error
          return errors.New("foo")
          ),
          w.NewJob(func() error
          return nil
          ).SetFinally(func(success bool)
          if success
          // ...

          ),
          )


          The second line really bugs me. This is referred to as stuttering names. There's a whole document about naming guidelines, but the gist of it is that, if you have a package called worker, the types and functions it exposes shouldn't repeat that name. If you read worker.New or worker.NewWorker, both contain the same information. One is shorter, and easier to read.



          Another thing that I'm struggling to make sense of is why the NewJob function is a receiver function on a worker? It's not even using the receiver, so why not export it as a function in the first place? I'm also too much reminded of ECMAScript and other languages that have event-based asynchronous calls when I see you call SetFinally on the job object. This is actually more of a problem than you might think, and can cause concurrency issues (ie race conditions). Imagine I write something like this:



          w := worker.New(log) // assume rename NewWorker
          aJob := w.NewJob(func() error
          if err := db.DoSomethingComplex(); err != nil
          return err

          return nil
          )
          go w.Run(aJob)
          // a whole lot of stuff taking a long time
          aJob.SetFinally(func(success bool)
          log.Debugf("Job success %v", success)
          )
          // and a whole list of calls that change the behaviour of aJob


          Now what will happen if the call inside the job fails? I've set the "finally" handler after having started the worker. I've reliable way of predicting what happens when. This is a classic example of code that is not thread-safe.

          What's worse: you're exposing a SetHandler function on the Job interface. What would happen if the worker tries to access the handler field just as someone calls SetHandler on the same object? Surely a job should be immutable at the core: the call which it represents.



          What you need is a way to create an immutable job object. You want to be able to set default values (like default retries etc...), but you don't want the caller to pass all those values to the function that actually creates a job. That's fair, and it's easy to do by using variadic arguments.



          type JobOption func(*job)

          // SetFinally returns JobOption, part of variadic args configuring jobs
          func SetFinally(f func(bool)) JobOption
          return func(j *job)
          j.finally = f



          func Job(call func() error, jopts ...JobOption) job
          j := job
          handler: call,
          // set all defaults here

          for _, o := range jopts
          o(&j)

          j



          So in this case, we're returning a job value, but none of the fields are exported. There's no functions on a job either that would allow the fields to be updated at a later stage. We simply require an actual function to call, can specify all defaults, and the caller is free to pass any number of configs through as callbacks in no particular order. To create the second job from your snippet, the call would look something like this:



          job := worker.Job(
          func() error return nil ,
          worker.SetFinally(func (s bool)
          if s
          // ...

          ),
          )


          You can add worker.SetRetryAttempts in the same way, or any other configuration you want the caller to have control over. The return value is a job object. It's also returned by value instead of a pointer. The main reason for this is that I don't really see a reason to pass it as a pointer, nothing more. There's no calls that work by pointer receiver, there's no exposed fields, and there entire object is, essentially, a read-only value at this point.




          I've been looking at the worker implementation meanwhile (because we've only been focussing on the first snippet so far). Before I get to the nitty-gritty there, There's something that has been bugging me: You have a tendency to overuse new(). I prefer to be explicit when I create something and use literals. Not only do they more clearly signal when/where you're creating a pointer, it's also shorter to write. Take your implementation of NewJob, for example:



          j := new(job)
          j.id = randomString()
          j.retryLimit = DefaultRetryLimit
          j.handler = handler
          j.done = make(chan bool)

          return j


          Compared to the one-liner:



          return &job
          id: randomString(),
          retryLimit: DefaultRetryLimit,
          handler: handler,
          done: make(chan bool),



          I prefer the latter, the & immediately makes it clear a pointer is being returned, and the object literal makes it clear what that literal actually holds.




          The last thing I'll add (for now, because I've been going on for quite a while now), is your use of the semaphore channel. It simply doesn't make sense to me... you create a buffered channel of empty structs to have a limit on the number of "in flight" jobs. Fair enough, if you want that sort of thing, but why use a semaphore if you have a channel in the first place? Wouldn't it make much, much more sense to simply do something like this:



          type w struct 
          // although I don't really see the point in the ID fields everywhere
          id string
          log logger.Logger
          concurrentJobs int
          pipeline chan job // channel of jobs to work on


          // variadic args to specify the concurrent runners might be nice
          func NewPool(l logger.Logger) Pool
          return &w
          id: randomString(), // use a uuid package at the very least
          log: l,
          concurrentJobs: MaxConcurrentRunners,
          pipeline: make(chan job, MaxConcurrentRunners),




          Then when running jobs, just start with:



          wg := &sync.WaitGroup
          wg.Add(w.concurrentJobs) // yes, not number of jobs, but number of routines
          for i := 0; i < w.concurrentJobs; i++
          go w.run(wg) // not I'm not passing a job here, just the waitgroup


          for _, job := range jobs
          w.pipeline <- job // push onto channel

          // all jobs have been pushed onto pipeline, close it
          close(w.pipeline)
          wg.Wait() // wait for everything to finish
          // all done, reopen for business
          w.pipeline = make(chan job, w.concurrentJobs)


          Now I don't need to use the job done channel anymore, because the run routines that I spin up at the start will take care of everything for me. What may strike you as odd is that I'm closing and reopening the channel. That's because, should I limit the concurrent jobs to 2, but pass in 4 jobs, I need the 2 routines to keep on reading from the channel. They can't assume that they'll only ever perform one job and be done with it. Let's look at how I'd implement that run function:



          func (w *workerPool) run(wg *sync.WaitGroup) 
          defer wg.Done() // no need to wrap a defer call in an anonymous function BTW

          // keep reading from pipeline until channel closes
          for job := range w.pipeline
          if err := job.handler(); err != nil
          // retry loop, you may want to break this out into separate func

          job.Finally(true)




          now this run function will continuously read jobs from the channel, so you don't want to return from it if the job fails. you should call Finally with the correct value in the error loop should the call fail entirely. But at least this approach means that the WaitGroup will actually represent the number of actual routines you're waiting for to finish (not the number of jobs you're processing). It also means that you don't need the double channels (the job.Done() channel that blocked the semaphore read, which made the routine executing the job essentially a blocking routine, owing to the fact that the job.Done channel isn't buffered.




          Last comment I would like to make is why are you calling this a worker pool? At best it's a fire-and-forget job-queue. It's not pooling anything. If you ever need to pool resources, make sure to check the sync.Pool type BTW.







          share|improve this answer













          share|improve this answer



          share|improve this answer











          answered Apr 2 at 15:01









          Elias Van Ootegem

          8,8631944




          8,8631944











          • Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
            – Lansana
            Apr 2 at 15:19










          • I will likely be back with a few questions... :)
            – Lansana
            Apr 2 at 15:27
















          • Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
            – Lansana
            Apr 2 at 15:19










          • I will likely be back with a few questions... :)
            – Lansana
            Apr 2 at 15:27















          Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
          – Lansana
          Apr 2 at 15:19




          Wow thanks for the in-depth answer. Read through it all just now, but I will have to take a deeper look at this when I get home. I agree with a lot of points though (such as my naming conventions and other non-idiomatic Go things I am doing), however some things such as the buffering of channels and so on that I'm using doesn't quite make much sense to me as to why my way is bad. But this is likely because I am inexperienced with such topics, so will have to just let it marinate in my mind for a moment.
          – Lansana
          Apr 2 at 15:19












          I will likely be back with a few questions... :)
          – Lansana
          Apr 2 at 15:27




          I will likely be back with a few questions... :)
          – Lansana
          Apr 2 at 15:27












           

          draft saved


          draft discarded


























           


          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188652%2fworker-pool-and-jobs%23new-answer', 'question_page');

          );

          Post as a guest













































































          Popular posts from this blog

          Greedy Best First Search implementation in Rust

          Function to Return a JSON Like Objects Using VBA Collections and Arrays

          C++11 CLH Lock Implementation