Skip to content

Commit

Permalink
Non graceful worker recovery issues and workflow issue (#411)
Browse files Browse the repository at this point in the history
* changes for adding atomic checks while sending and relading data during worker recovery

* fixing the workflow issues
  • Loading branch information
rasamala83 authored Feb 5, 2025
1 parent ddc8bd4 commit 3e7926c
Show file tree
Hide file tree
Showing 11 changed files with 367 additions and 163 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/allcov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ jobs:
- name: Baseline for later scripted manual setup of go1.20
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: 1.20.14
- name: allCovSh
run: tests/unittest/allcover.sh
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: coverage web page
path: /home/runner/go/allcover.htm
2 changes: 1 addition & 1 deletion .github/workflows/gv-oracle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.15
go-version: 1.20.14
- name: System Test
run: tests/unittest3/testall.sh
13 changes: 12 additions & 1 deletion .github/workflows/jdbc-ci-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,22 @@ jobs:

steps:
- uses: actions/checkout@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1

- name: Install Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
- name: Verify Docker Compose installation
run: docker-compose --version

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: 1.20.14

- name: whereami
run: pwd
Expand Down
47 changes: 25 additions & 22 deletions lib/adaptivequemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"strings"
"sync/atomic"
"time"

"github.com/paypal/hera/cal"
Expand Down Expand Up @@ -118,28 +118,31 @@ type BindCount struct {
Workers map[string]*WorkerClient // lookup by ticket
}

func bindEvictNameOk(bindName string) (bool) {
func bindEvictNameOk(bindName string) bool {
commaNames := GetConfig().BindEvictionNames
if len(commaNames) == 0 {
// for tests, allow all names to be subject to bind eviction
return true
}
commaNames = strings.ToLower(commaNames)
bindName = strings.ToLower(bindName)
for _, okSubname := range strings.Split(commaNames,",") {
for _, okSubname := range strings.Split(commaNames, ",") {
if strings.Contains(bindName, okSubname) {
return true
}
}
return false
}

/* A bad query with multiple binds will add independent bind throttles to all
bind name and values */
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
/*
A bad query with multiple binds will add independent bind throttles to all
bind name and values
*/
func (mgr *adaptiveQueueManager) doBindEviction() int {
throttleCount := 0
GetBindEvict().lock.Lock()
for _,keyValues := range GetBindEvict().BindThrottle {
for _, keyValues := range GetBindEvict().BindThrottle {
throttleCount += len(keyValues)
}
GetBindEvict().lock.Unlock()
Expand Down Expand Up @@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}
continue
}
contextBinds := parseBinds(request)
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
sqlsrcApp := worker.clientApp.Load().(string)
contextBinds := parseBinds(request)
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
sqlsrcApp := worker.clientApp.Load().(string)
if sqlsrcPrefix != "" {
contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp)
if logger.GetLogger().V(logger.Debug) {
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
logger.GetLogger().Log(logger.Debug, msg)
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
logger.GetLogger().Log(logger.Debug, msg)
}
}
for bindName0, bindValue := range contextBinds {
Expand All @@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}
concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue)
if logger.GetLogger().V(logger.Debug) {
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
logger.GetLogger().Log(logger.Debug, msg)
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
logger.GetLogger().Log(logger.Debug, msg)
}
entry, ok := bindCounts[concatKey]
if !ok {
Expand All @@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
Name: bindName,
Value: bindValue,
Workers: make(map[string]*WorkerClient),
}
}
bindCounts[concatKey] = entry
}

Expand All @@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
bindName := entry.Name
bindValue := entry.Value

if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) {
if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) {
continue
}
// evict sqlhash, bindvalue
Expand All @@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {

if mgr.dispatchedWorkers[worker] != ticket ||
worker.Status == wsFnsh ||
worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ {
atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ {

continue
}
Expand Down Expand Up @@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
throttle.incrAllowEveryX()
} else {
throttle := BindThrottle{
Name: bindName,
Value: bindValue,
Sqlhash: sqlhash,
AllowEveryX: 3*len(entry.Workers) + 1,
Name: bindName,
Value: bindValue,
Sqlhash: sqlhash,
AllowEveryX: 3*len(entry.Workers) + 1,
}
now := time.Now()
throttle.RecentAttempt.Store(&now)
Expand Down Expand Up @@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) {
}
}
} else {
if worker != nil && worker.Status == wsFnsh {
if worker != nil && worker.Status == wsFnsh {
if logger.GetLogger().V(logger.Warning) {
logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid)
}
Expand Down
18 changes: 14 additions & 4 deletions lib/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) {
}

/*
1st return value: the number
2nd return value: the number of digits
1st return value: the number
2nd return value: the number of digits
*/
func atoi(bf []byte) (int, int) {
sz := len(bf)
Expand All @@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) {
}

/*
1st return value: the number
2nd return value: the number of digits
1st return value: the number
2nd return value: the number of digits
*/
func atoui(str string) (uint64, int) {
sz := len(str)
Expand Down Expand Up @@ -164,3 +164,13 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) {
}
return 0, false
}

// Contains This is utility method to check whether value present in list or not
func Contains[T comparable](slice []T, value T) bool {
for _, val := range slice {
if val == value {
return true
}
}
return false
}
116 changes: 63 additions & 53 deletions lib/workerbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ package lib

import (
"errors"
"github.com/paypal/hera/utility/logger"
"os"
"os/signal"
"sync"
"syscall"

"github.com/paypal/hera/utility"
"github.com/paypal/hera/utility/logger"
)

// HeraWorkerType defines the possible worker type
Expand Down Expand Up @@ -64,7 +62,7 @@ type WorkerBroker struct {
// and restart the stopped workers.
//
pidworkermap map[int32]*WorkerClient
lock sync.Mutex
lock sync.Mutex

//
// loaded from cfg once and used later.
Expand Down Expand Up @@ -204,7 +202,9 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor

// GetWorkerPool get the worker pool object for the type and id
// ids holds optional paramenters.
// ids[0] == instance id; ids[1] == shard id.
//
// ids[0] == instance id; ids[1] == shard id.
//
// if a particular id is not set, it defaults to 0.
// TODO: interchange sid <--> instId since instId is not yet used
func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) {
Expand Down Expand Up @@ -273,59 +273,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
// we can get all the pids in this call. double the size in case we
// get none-hera defunct processes. +1 in case racing casue mapsize=0.
//
var arraySize = 2*len(broker.pidworkermap) + 1
var defunctPids = make([]int32, arraySize)
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap)
}
if arraySize > 0 {
utility.ReapDefunctPids(defunctPids)
}
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "exited worker", defunctPids)
}
broker.lock.Lock()
for i := 0; i < arraySize; i++ {
//
// last valid entry in stoppedpids is followed by one or more zeros.
//
if defunctPids[i] == 0 {
defunctPids := make([]int32, 0)
for {
var status syscall.WaitStatus

//Reap exited children in non-blocking mode
pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil)
if pid > 0 {
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status)
}
defunctPids = append(defunctPids, int32(pid))
} else if pid == 0 {
break
} else {
if errors.Is(err, syscall.ECHILD) {
break
} else {
logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err)
}
}
var workerclient = broker.pidworkermap[defunctPids[i]]
if workerclient != nil {
delete(broker.pidworkermap, defunctPids[i])
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}

if len(defunctPids) > 0 {
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids)
}
broker.lock.Lock()
for _, pid := range defunctPids {
var workerclient = broker.pidworkermap[pid]
if workerclient != nil {
delete(broker.pidworkermap, pid)
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
if err != nil {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
}
} else {
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
}
} else {
//
// a worker could be terminated while serving a request.
// in these cases, doRead() in workerclient will get an
// EOF and exit. doSession() in coordinator will get the
// worker outCh closed event and exit, at which point
// coordinator itself calls returnworker to set connstate
// from assign to idle.
// no need to publish the following event again.
//
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
//}
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found")
}
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
pool.RestartWorker(workerclient)
}
} else {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found")
}
}
broker.lock.Unlock()
}
broker.lock.Unlock()
case syscall.SIGTERM:
if logger.GetLogger().V(logger.Debug) {
logger.GetLogger().Log(logger.Debug, "Got SIGTERM")
Expand Down Expand Up @@ -365,8 +375,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
}

/*
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
the number of workers changed
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
the number of workers changed
*/
func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) {
broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers
Expand All @@ -381,7 +391,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
}

/*
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
*/
func (broker *WorkerBroker) changeMaxWorkers() {
wW := GetNumWWorkers(0)
Expand Down
Loading

0 comments on commit 3e7926c

Please sign in to comment.