Skip to content

Commit

Permalink
fix: remove data race
Browse files Browse the repository at this point in the history
  • Loading branch information
proullon committed Apr 18, 2020
1 parent 50c1b42 commit fa4143c
Showing 1 changed file with 80 additions and 67 deletions.
147 changes: 80 additions & 67 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ type WorkerPool struct {
velocity map[int]int
ops map[int]int
sizeindex int
vm sync.Mutex

responses *list.List
rm sync.Mutex
respmu sync.RWMutex

sync.WaitGroup
sync.RWMutex
mu sync.RWMutex
}

func WithMaxWorker(max int) OptFunc {
Expand Down Expand Up @@ -133,46 +132,54 @@ func New(jobfnc JobFnc, opts ...OptFunc) (*WorkerPool, error) {
return wp, nil
}

// VelocityValues returns map of recorded velocity for each used velocity percentil
func (wp *WorkerPool) VelocityValues() map[int]int {
c := make(map[int]int)
wp.vm.Lock()
wp.mu.RLock()
for k, v := range wp.velocity {
c[k] = v
}
wp.vm.Unlock()
wp.mu.RUnlock()
return c
}

func (wp *WorkerPool) CurrentVelocityValues() (int, int) {
wp.vm.Lock()
p := wp.SizePercentil[wp.sizeindex]
v := wp.velocity[wp.SizePercentil[wp.sizeindex]]
wp.vm.Unlock()
i := wp.index()
wp.mu.RLock()
p := wp.SizePercentil[i]
v := wp.velocity[wp.SizePercentil[i]]
wp.mu.RUnlock()

return p, v
}

// Stop WorkerPool. All worker goroutine will exit, but stacked responses can still be consumed
func (wp *WorkerPool) Stop() {
wp.mu.Lock()
defer wp.mu.Unlock()
wp.stopped = true
wp.status = Stopped
close(wp.jobch)
}

// Pause all workers with killing them
func (wp *WorkerPool) Pause() {
wp.Lock()
wp.mu.Lock()
wp.status = Paused
}

// Resume all workers
func (wp *WorkerPool) Resume() {
wp.Unlock()
wp.status = Running
wp.mu.Unlock()
}

// Status return WorkerPool current status
func (wp *WorkerPool) Status() Status {
return wp.status
wp.mu.RLock()
st := wp.status
wp.mu.RUnlock()
return st
}

// Feed payload to worker
Expand All @@ -181,50 +188,44 @@ func (wp *WorkerPool) Feed(payload interface{}) {
wp.Add(1)
}

// AvailableResponses returns the current number of stacked responses. Consume ReturnChannel to read them
func (wp *WorkerPool) AvailableResponses() int {
wp.rm.Lock()
wp.respmu.RLock()
n := wp.responses.Len()
wp.rm.Unlock()
wp.respmu.RUnlock()
return n
}

func (wp *WorkerPool) index() int {
wp.mu.RLock()
si := wp.sizeindex
wp.mu.RUnlock()
return si
}

func (wp *WorkerPool) evaluate(d time.Duration, err error) bool {
if err != nil {
return wp.errexit(err)
return wp.exit()
}

if d > wp.MaxDuration*time.Second {
return wp.timeexit(d)
return wp.exit()
}

wp.RLock()
defer wp.RUnlock()
if wp.active > wp.wanted {
wp.active--
return true
wp.mu.RLock()
active := wp.active
wanted := wp.wanted
wp.mu.RUnlock()
if active > wanted {
return wp.exit()
}

return false
}

func (wp *WorkerPool) errexit(err error) bool {

wp.Lock()
defer wp.Unlock()

// leave at least 1 worker
if wp.active == 1 {
return false
}

wp.active--
return true
}

func (wp *WorkerPool) timeexit(t time.Duration) bool {

wp.Lock()
defer wp.Unlock()
func (wp *WorkerPool) exit() bool {
wp.mu.Lock()
defer wp.mu.Unlock()

// leave at least 1 worker
if wp.active == 1 {
Expand Down Expand Up @@ -270,57 +271,70 @@ func (wp *WorkerPool) worker() {
}
}

func (wp *WorkerPool) isStopped() bool {
wp.mu.RLock()
stopped := wp.stopped
wp.mu.RUnlock()
return stopped
}

func (wp *WorkerPool) tick() {
wp.Lock()
defer wp.Unlock()
wp.mu.Lock()
defer wp.mu.Unlock()

wp.ops[wp.SizePercentil[wp.sizeindex]]++
}

func (wp *WorkerPool) velocityRoutine() {
for {
if wp.stopped {
if wp.isStopped() {
return
}

wp.Lock()
wp.mu.Lock()
wp.ops[wp.SizePercentil[wp.sizeindex]] = 0
wp.Unlock()
wp.mu.Unlock()

time.Sleep(time.Duration(wp.EvaluationTime) * time.Second)

wp.vm.Lock()
wp.velocity[wp.SizePercentil[wp.sizeindex]] = wp.ops[wp.SizePercentil[wp.sizeindex]] / wp.EvaluationTime
wp.vm.Unlock()
i := wp.index()

wp.mu.Lock()
wp.velocity[wp.SizePercentil[i]] = wp.ops[wp.SizePercentil[i]] / wp.EvaluationTime
wp.mu.Unlock()

wp.mu.RLock()
// always increase from first value
if wp.sizeindex == 0 && wp.sizeindex < len(wp.SizePercentil)-1 {
// fmt.Printf("VelocityRoutine: increasing because sizeindex=0\n")
wp.setsize(wp.sizeindex + 1)
if i == 0 && i < len(wp.SizePercentil)-1 {
wp.mu.RUnlock()
wp.setsize(i + 1)
continue
}
// if velocity is 0, decrease
if wp.velocity[wp.SizePercentil[wp.sizeindex]] == 0 {
wp.setsize(wp.sizeindex - 1)
if wp.velocity[wp.SizePercentil[i]] == 0 {
wp.mu.RUnlock()
wp.setsize(i - 1)
continue
}
// if velocity increased, then increase worker pool size
if wp.sizeindex < len(wp.SizePercentil)-1 && wp.velocity[wp.SizePercentil[wp.sizeindex]] > wp.velocity[wp.SizePercentil[wp.sizeindex-1]] {
// fmt.Printf("VelocityRoutine: increasing because velocity(%d%%)=%d \n")
wp.setsize(wp.sizeindex + 1)
if i < len(wp.SizePercentil)-1 && wp.velocity[wp.SizePercentil[i]] > wp.velocity[wp.SizePercentil[i-1]] {
wp.mu.RUnlock()
wp.setsize(i + 1)
continue
}
// if velocity decreased then decrease worker pool size
if wp.sizeindex > 0 && wp.velocity[wp.SizePercentil[wp.sizeindex]] < wp.velocity[wp.SizePercentil[wp.sizeindex-1]] {
wp.setsize(wp.sizeindex - 1)
if wp.sizeindex > 0 && wp.velocity[wp.SizePercentil[i]] < wp.velocity[wp.SizePercentil[i-1]] {
wp.mu.RUnlock()
wp.setsize(i - 1)
continue
}
wp.mu.RUnlock()
}
}

func (wp *WorkerPool) setsize(i int) {
wp.Lock()
defer wp.Unlock()
wp.mu.Lock()
defer wp.mu.Unlock()

wp.sizeindex = i
wp.wanted = int(wp.MaxWorker / 100 * wp.SizePercentil[i])
Expand All @@ -336,17 +350,16 @@ func (wp *WorkerPool) setsize(i int) {
}

func (wp *WorkerPool) pushResponse(r Response) {
wp.rm.Lock()
wp.respmu.Lock()
wp.responses.PushBack(r)
wp.rm.Unlock()
wp.respmu.Unlock()
}

func (wp *WorkerPool) responseRoutine() {
var n int
for {
n = wp.AvailableResponses()

if wp.stopped && n == 0 {
if wp.isStopped() && n == 0 {
close(wp.ReturnChannel)
return
}
Expand All @@ -356,17 +369,17 @@ func (wp *WorkerPool) responseRoutine() {
continue
}

wp.rm.Lock()
wp.respmu.RLock()
e := wp.responses.Front()
wp.rm.Unlock()
wp.respmu.RUnlock()
r, ok := e.Value.(Response)
if !ok {
continue
}
wp.ReturnChannel <- r

wp.rm.Lock()
wp.respmu.Lock()
wp.responses.Remove(e)
wp.rm.Unlock()
wp.respmu.Unlock()
}
}

0 comments on commit fa4143c

Please sign in to comment.