From 3e7926cc1b93be08e9740630d689fcdd87acb32b Mon Sep 17 00:00:00 2001 From: Rajesh S <105205300+rasamala83@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:44:25 +0530 Subject: [PATCH] Non graceful worker recovery issues and workflow issue (#411) * changes for adding atomic checks while sending and relading data during worker recovery * fixing the workflow issues --- .github/workflows/allcov.yml | 4 +- .github/workflows/gv-oracle.yml | 2 +- .github/workflows/jdbc-ci-maven.yml | 13 +- lib/adaptivequemgr.go | 47 +++--- lib/util.go | 18 ++- lib/workerbroker.go | 116 ++++++++------- lib/workerclient.go | 85 ++++++++--- lib/workerpool.go | 1 - lib/workerpool_test.go | 7 + .../dmldisconnect/main_test.go | 97 ++++++------ tests/unittest/sqlEvict/main_test.go | 140 +++++++++++++++++- 11 files changed, 367 insertions(+), 163 deletions(-) diff --git a/.github/workflows/allcov.yml b/.github/workflows/allcov.yml index 5c5708f6..8fbeca0b 100644 --- a/.github/workflows/allcov.yml +++ b/.github/workflows/allcov.yml @@ -21,10 +21,10 @@ jobs: - name: Baseline for later scripted manual setup of go1.20 uses: actions/setup-go@v4 with: - go-version: 1.19 + go-version: 1.20.14 - name: allCovSh run: tests/unittest/allcover.sh - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 with: name: coverage web page path: /home/runner/go/allcover.htm diff --git a/.github/workflows/gv-oracle.yml b/.github/workflows/gv-oracle.yml index c387cf2b..f9757eff 100644 --- a/.github/workflows/gv-oracle.yml +++ b/.github/workflows/gv-oracle.yml @@ -21,6 +21,6 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.15 + go-version: 1.20.14 - name: System Test run: tests/unittest3/testall.sh diff --git a/.github/workflows/jdbc-ci-maven.yml b/.github/workflows/jdbc-ci-maven.yml index 9ac8aad9..cf5d07ec 100644 --- a/.github/workflows/jdbc-ci-maven.yml +++ b/.github/workflows/jdbc-ci-maven.yml @@ -16,11 +16,22 @@ jobs: steps: - uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Install Docker Compose + run: | + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Verify Docker Compose installation + run: docker-compose --version - name: Set up Go uses: actions/setup-go@v4 with: - go-version: 1.19 + go-version: 1.20.14 - name: whereami run: pwd diff --git a/lib/adaptivequemgr.go b/lib/adaptivequemgr.go index 8e49e2e0..4fdd2860 100644 --- a/lib/adaptivequemgr.go +++ b/lib/adaptivequemgr.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "math/rand" - "sync/atomic" "strings" + "sync/atomic" "time" "github.com/paypal/hera/cal" @@ -118,7 +118,7 @@ type BindCount struct { Workers map[string]*WorkerClient // lookup by ticket } -func bindEvictNameOk(bindName string) (bool) { +func bindEvictNameOk(bindName string) bool { commaNames := GetConfig().BindEvictionNames if len(commaNames) == 0 { // for tests, allow all names to be subject to bind eviction @@ -126,7 +126,7 @@ func bindEvictNameOk(bindName string) (bool) { } commaNames = strings.ToLower(commaNames) bindName = strings.ToLower(bindName) - for _, okSubname := range strings.Split(commaNames,",") { + for _, okSubname := range strings.Split(commaNames, ",") { if strings.Contains(bindName, okSubname) { return true } @@ -134,12 +134,15 @@ func bindEvictNameOk(bindName string) (bool) { return false } -/* A bad query with multiple binds will add independent bind throttles to all -bind name and values */ -func (mgr *adaptiveQueueManager) doBindEviction() (int) { +/* + A bad query with multiple binds will add independent bind throttles to all + +bind name and values +*/ +func (mgr *adaptiveQueueManager) doBindEviction() int { throttleCount := 0 GetBindEvict().lock.Lock() - for _,keyValues := range GetBindEvict().BindThrottle { + for _, keyValues := range GetBindEvict().BindThrottle { throttleCount += len(keyValues) } GetBindEvict().lock.Unlock() @@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } continue } - contextBinds := parseBinds(request) - sqlsrcPrefix := worker.clientHostPrefix.Load().(string) - sqlsrcApp := worker.clientApp.Load().(string) + contextBinds := parseBinds(request) + sqlsrcPrefix := worker.clientHostPrefix.Load().(string) + sqlsrcApp := worker.clientApp.Load().(string) if sqlsrcPrefix != "" { contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) + logger.GetLogger().Log(logger.Debug, msg) } } for bindName0, bindValue := range contextBinds { @@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) + logger.GetLogger().Log(logger.Debug, msg) } entry, ok := bindCounts[concatKey] if !ok { @@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { Name: bindName, Value: bindValue, Workers: make(map[string]*WorkerClient), - } + } bindCounts[concatKey] = entry } @@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { bindName := entry.Name bindValue := entry.Value - if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) { + if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) { continue } // evict sqlhash, bindvalue @@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { if mgr.dispatchedWorkers[worker] != ticket || worker.Status == wsFnsh || - worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ { + atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ { continue } @@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { throttle.incrAllowEveryX() } else { throttle := BindThrottle{ - Name: bindName, - Value: bindValue, - Sqlhash: sqlhash, - AllowEveryX: 3*len(entry.Workers) + 1, + Name: bindName, + Value: bindValue, + Sqlhash: sqlhash, + AllowEveryX: 3*len(entry.Workers) + 1, } now := time.Now() throttle.RecentAttempt.Store(&now) @@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) { } } } else { - if worker != nil && worker.Status == wsFnsh { + if worker != nil && worker.Status == wsFnsh { if logger.GetLogger().V(logger.Warning) { logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid) } diff --git a/lib/util.go b/lib/util.go index 1852810d..1ba6930a 100644 --- a/lib/util.go +++ b/lib/util.go @@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) { } /* - 1st return value: the number - 2nd return value: the number of digits +1st return value: the number +2nd return value: the number of digits */ func atoi(bf []byte) (int, int) { sz := len(bf) @@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) { } /* - 1st return value: the number - 2nd return value: the number of digits +1st return value: the number +2nd return value: the number of digits */ func atoui(str string) (uint64, int) { sz := len(str) @@ -164,3 +164,13 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) { } return 0, false } + +// Contains This is utility method to check whether value present in list or not +func Contains[T comparable](slice []T, value T) bool { + for _, val := range slice { + if val == value { + return true + } + } + return false +} diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 0f9df171..05265754 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -19,13 +19,11 @@ package lib import ( "errors" + "github.com/paypal/hera/utility/logger" "os" "os/signal" "sync" "syscall" - - "github.com/paypal/hera/utility" - "github.com/paypal/hera/utility/logger" ) // HeraWorkerType defines the possible worker type @@ -64,7 +62,7 @@ type WorkerBroker struct { // and restart the stopped workers. // pidworkermap map[int32]*WorkerClient - lock sync.Mutex + lock sync.Mutex // // loaded from cfg once and used later. @@ -204,7 +202,9 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor // GetWorkerPool get the worker pool object for the type and id // ids holds optional paramenters. -// ids[0] == instance id; ids[1] == shard id. +// +// ids[0] == instance id; ids[1] == shard id. +// // if a particular id is not set, it defaults to 0. // TODO: interchange sid <--> instId since instId is not yet used func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) { @@ -273,59 +273,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { // we can get all the pids in this call. double the size in case we // get none-hera defunct processes. +1 in case racing casue mapsize=0. // - var arraySize = 2*len(broker.pidworkermap) + 1 - var defunctPids = make([]int32, arraySize) - if logger.GetLogger().V(logger.Verbose) { - logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap) - } - if arraySize > 0 { - utility.ReapDefunctPids(defunctPids) - } - if logger.GetLogger().V(logger.Info) { - logger.GetLogger().Log(logger.Info, "exited worker", defunctPids) - } - broker.lock.Lock() - for i := 0; i < arraySize; i++ { - // - // last valid entry in stoppedpids is followed by one or more zeros. - // - if defunctPids[i] == 0 { + defunctPids := make([]int32, 0) + for { + var status syscall.WaitStatus + + //Reap exited children in non-blocking mode + pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil) + if pid > 0 { + if logger.GetLogger().V(logger.Verbose) { + logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status) + } + defunctPids = append(defunctPids, int32(pid)) + } else if pid == 0 { break + } else { + if errors.Is(err, syscall.ECHILD) { + break + } else { + logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err) + } } - var workerclient = broker.pidworkermap[defunctPids[i]] - if workerclient != nil { - delete(broker.pidworkermap, defunctPids[i]) - pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) - if err != nil { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) + } + + if len(defunctPids) > 0 { + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids) + } + broker.lock.Lock() + for _, pid := range defunctPids { + var workerclient = broker.pidworkermap[pid] + if workerclient != nil { + delete(broker.pidworkermap, pid) + pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) + if err != nil { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) + } + } else { + // + // a worker could be terminated while serving a request. + // in these cases, doRead() in workerclient will get an + // EOF and exit. doSession() in coordinator will get the + // worker outCh closed event and exit, at which point + // coordinator itself calls returnworker to set connstate + // from assign to idle. + // no need to publish the following event again. + // + //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { + // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) + //} + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") + } + workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + pool.RestartWorker(workerclient) } } else { - // - // a worker could be terminated while serving a request. - // in these cases, doRead() in workerclient will get an - // EOF and exit. doSession() in coordinator will get the - // worker outCh closed event and exit, at which point - // coordinator itself calls returnworker to set connstate - // from assign to idle. - // no need to publish the following event again. - // - //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { - // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) - //} - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found") } - workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long - pool.RestartWorker(workerclient) - } - } else { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found") } } + broker.lock.Unlock() } - broker.lock.Unlock() case syscall.SIGTERM: if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, "Got SIGTERM") @@ -365,8 +375,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { } /* - resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of - the number of workers changed +resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of +the number of workers changed */ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) { broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers @@ -381,7 +391,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha } /* - changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools +changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools */ func (broker *WorkerBroker) changeMaxWorkers() { wW := GetNumWWorkers(0) diff --git a/lib/workerclient.go b/lib/workerclient.go index 1a86d619..5edbb77f 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -21,20 +21,21 @@ import ( "bytes" "errors" "fmt" + "github.com/paypal/hera/cal" + "github.com/paypal/hera/common" + "github.com/paypal/hera/utility/encoding/netstring" + "github.com/paypal/hera/utility/logger" "math/rand" "net" "os" "path/filepath" + "runtime" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" - - "github.com/paypal/hera/cal" - "github.com/paypal/hera/common" - "github.com/paypal/hera/utility/encoding/netstring" - "github.com/paypal/hera/utility/logger" ) // HeraWorkerStatus defines the posible states the worker can be in @@ -147,12 +148,15 @@ type WorkerClient struct { rqId uint32 // - // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state. + // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state // isUnderRecovery int32 // Throtle workers lifecycle thr Throttler + + //mutex lock to update state from single go-routine + stateLock sync.Mutex } type strandedCalInfo struct { @@ -200,7 +204,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam } // TODO worker.racID = -1 - worker.isUnderRecovery = 0 + atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0) if worker.ctrlCh != nil { close(worker.ctrlCh) } @@ -210,6 +214,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam // msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker // worker.ctrlCh = make(chan *workerMsg, 5) + return worker } @@ -633,15 +638,12 @@ type WorkerClientRecoverParam struct { func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int) { if atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 0, 1) { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "begin recover worker: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "begin recover worker Id: ", worker.ID, " process Id: ", worker.pid) } } else { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid) } - // - // defer will not be called. - // return } defer func() { @@ -665,6 +667,9 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor return } priorWorkerStatus := worker.Status + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("about to recover worker Id: %d, worker process Id: %d as part of reconvery process, setting worker state to Quece", worker.ID, worker.pid)) + } worker.setState(wsQuce) killparam := common.StrandedClientClose if len(param) > 0 { @@ -677,8 +682,12 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor case <-workerRecoverTimeout: worker.thr.CanRun() worker.setState(wsInit) // Set the worker state to INIT when we decide to Terminate the worker + GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: worker.Status}) worker.Terminate() worker.callogStranded("RECYCLED", info) + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d and process: %d recovered as part of workerRecoverTimeout set status to INIT", worker.ID, worker.pid)) + } return case msg, ok := <-worker.channel(): if !ok { @@ -714,8 +723,10 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid) } worker.callogStranded("RECOVERED", info) - worker.setState(wsFnsh) + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid)) + } p.ReturnWorker(worker, ticket) // // donot set state to ACPT since worker could already be picked up by another @@ -904,7 +915,7 @@ func (worker *WorkerClient) doRead() { worker.setState(wsWait) } if eor != common.EORMoreIncomingRequests { - worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId} + worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)} payload = nil } else { // buffer data to avoid race condition @@ -941,8 +952,13 @@ func (worker *WorkerClient) doRead() { // Write sends a message to the worker func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error { + if atomic.LoadInt32(&worker.isUnderRecovery) == 1 { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.") + } + return ErrWorkerFail + } worker.setState(wsBusy) - worker.rqId += uint32(nsCount) // @@ -966,16 +982,19 @@ func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error // setState updates the worker state func (worker *WorkerClient) setState(status HeraWorkerStatus) { - if worker.Status == status { + currentStatus := worker.Status + if currentStatus == status { return } - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker pid=", worker.pid, " changing status from", worker.Status, "to", status) + if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) { + logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status) + if logger.GetLogger().V(logger.Debug) { + worker.printCallStack() + } + return } - - // TODO: sync atomic set + //This checks whether state transition is valid or not worker.Status = status - GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status}) } @@ -1002,3 +1021,27 @@ func (worker *WorkerClient) isProcessRunning() bool { } return true } + +func (worker *WorkerClient) printCallStack() { + // Define a large enough buffer to capture the stack. + const depth = 64 + pcs := make([]uintptr, depth) + + // Collect the stack trace. + n := runtime.Callers(2, pcs) // Skip the first 2 callers (runtime and printCallStack itself). + frames := runtime.CallersFrames(pcs[:n]) + indent := 0 + // Iterate through the frames and print function names and line numbers. + var builder strings.Builder + builder.WriteString(fmt.Sprintf("worker Id= %d Process Id= %d Call Stack:", worker.ID, worker.pid)) + for { + frame, more := frames.Next() + builder.WriteString(fmt.Sprintf("%s - %s\n", strings.Repeat(" ", indent), frame.Function)) + builder.WriteString(fmt.Sprintf("%s at %s:%d\n", strings.Repeat(" ", indent), frame.File, frame.Line)) + indent++ + if !more { + break + } + } + logger.GetLogger().Log(logger.Debug, builder.String()) +} diff --git a/lib/workerpool.go b/lib/workerpool.go index 50aab16f..37d96855 100644 --- a/lib/workerpool.go +++ b/lib/workerpool.go @@ -193,7 +193,6 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) { } pool.activeQ.Remove(worker) pool.poolCond.L.Unlock() - go pool.spawnWorker(worker.ID) return nil } diff --git a/lib/workerpool_test.go b/lib/workerpool_test.go index 4e3db96d..14b8cf03 100644 --- a/lib/workerpool_test.go +++ b/lib/workerpool_test.go @@ -76,6 +76,13 @@ func TestPoolDempotency(t *testing.T) { wd := NewWorker(3, wtypeRW, 0, 0, "cloc", nil) we := NewWorker(4, wtypeRW, 0, 0, "cloc", nil) wf := NewWorker(5, wtypeRW, 0, 0, "cloc", nil) + wa.setState(wsInit) + wb.setState(wsInit) + wc.setState(wsInit) + wd.setState(wsInit) + we.setState(wsInit) + wf.setState(wsInit) + wa.setState(wsAcpt) wb.setState(wsAcpt) wc.setState(wsAcpt) diff --git a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go index e261b35a..f2430977 100644 --- a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go +++ b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go @@ -1,13 +1,14 @@ -package main +package main + import ( "context" "database/sql" "fmt" + "github.com/paypal/hera/tests/functionaltest/testutil" + "github.com/paypal/hera/utility/logger" "os" "testing" "time" - "github.com/paypal/hera/tests/functionaltest/testutil" - "github.com/paypal/hera/utility/logger" ) /* @@ -17,7 +18,6 @@ No setup needed */ - var mx testutil.Mux var tableName string @@ -29,8 +29,8 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["log_level"] = "5" appcfg["log_file"] = "hera.log" appcfg["rac_sql_interval"] = "0" - appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" - appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" + appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" + appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" appcfg["child.executable"] = "mysqlworker" appcfg["database_type"] = "mysql" @@ -43,18 +43,16 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { func setupDb() error { testutil.RunDML("DROP TABLE IF EXISTS test_simple_table_2") - return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") + return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") } - func TestMain(m *testing.M) { os.Exit(testutil.UtilMain(m, cfg, setupDb)) } - /* ########################################################################################## - # Perform an insert without commit - # While the query is in transaction, close connection + # Perform an insert without commit + # While the query is in transaction, close connection # Verify worker get stranded and recovered ########################################################################################## */ @@ -63,54 +61,53 @@ func TestDmlDisconnect(t *testing.T) { logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") hostname := testutil.GetHostname() - fmt.Println ("Hostname: ", hostname); - db, err := sql.Open("hera", hostname + ":31002") - if err != nil { - t.Fatal("Error starting Mux:", err) - return - } + fmt.Println("Hostname: ", hostname) + db, err := sql.Open("hera", hostname+":31002") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } db.SetMaxIdleConns(0) defer db.Close() - fmt.Println ("Open new connection"); - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - conn, err := db.Conn(ctx) - if err != nil { - t.Fatalf("Error getting connection %s\n", err.Error()) - } + fmt.Println("Open new connection") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } - fmt.Println ("Perform an insert without commit"); - stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") + fmt.Println("Perform an insert without commit") + stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") _, err = stmt.Exec() if err != nil { - t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) - } - stmt.Close() - cancel() - fmt.Println ("Close connection while insert query is in transaction"); - conn.Close() - - time.Sleep(1 * time.Second); - fmt.Println ("Verify worker get stranded and recovered"); - if ( testutil.RegexCount("begin recover worker:") < 1) { - t.Fatalf ("Error: should have worker recovered"); - } - - if ( testutil.RegexCount("stranded conn recovered") < 1) { - t.Fatalf ("Error: should have stranded conn recovered"); - } - - fmt.Println ("Verify worker recovery is seen in CALlog") - count := testutil.RegexCountFile ("RECOVER.*dedicated.*0", "cal.log") - if (count < 1) { - t.Fatalf ("Error: should see worker recovery event"); + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) } - count = testutil.RegexCountFile ("STRANDED.*RECOVERED.*0", "cal.log") - if (count < 1) { - t.Fatalf ("Error: should see worker recovery event"); + stmt.Close() + cancel() + fmt.Println("Close connection while insert query is in transaction") + conn.Close() + + time.Sleep(1 * time.Second) + fmt.Println("Verify worker get stranded and recovered") + if testutil.RegexCount("begin recover worker") < 1 { + t.Fatalf("Error: should have worker recovered") + } + + if testutil.RegexCount("stranded conn recovered") < 1 { + t.Fatalf("Error: should have stranded conn recovered") + } + + fmt.Println("Verify worker recovery is seen in CALlog") + count := testutil.RegexCountFile("RECOVER.*dedicated.*0", "cal.log") + if count < 1 { + t.Fatalf("Error: should see worker recovery event") + } + count = testutil.RegexCountFile("STRANDED.*RECOVERED.*0", "cal.log") + if count < 1 { + t.Fatalf("Error: should see worker recovery event") } logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect done -------------------------------------------------------------") } - diff --git a/tests/unittest/sqlEvict/main_test.go b/tests/unittest/sqlEvict/main_test.go index 5cf84296..5a704796 100644 --- a/tests/unittest/sqlEvict/main_test.go +++ b/tests/unittest/sqlEvict/main_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math/rand" "os" "testing" "time" @@ -33,11 +34,11 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["bind_eviction_threshold_pct"] = "50" appcfg["request_backlog_timeout"] = "1000" - appcfg["soft_eviction_probability"] = "100" + appcfg["soft_eviction_probability"] = "10" opscfg := make(map[string]string) - max_conn = 25 - opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn)) + max_conn = 50 + opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", 10) opscfg["opscfg.default.server.log_level"] = "5" opscfg["opscfg.default.server.saturation_recover_threshold"] = "10" @@ -99,6 +100,56 @@ func sleepyQ(conn *sql.Conn, delayRow int) error { return nil } +func sleepyDmlQ(conn *sql.Conn, delayRow int) error { + inserQuery := "insert into sleep_info (id,seconds) values (:id, sleep_option(:seconds))" + updateQuery := "update sleep_info set seconds = sleep_option(:seconds) where id=:id" + defer func(conn *sql.Conn) { + err := conn.Close() + if err != nil { + fmt.Printf("Error closing conn %s\n", err.Error()) + } + }(conn) + tx, _ := conn.BeginTx(context.Background(), nil) + inst1, err := conn.PrepareContext(context.Background(), inserQuery) + if err != nil { + fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) + return err + } + defer func(inst1 *sql.Stmt) { + err := inst1.Close() + if err != nil { + fmt.Printf("Error closing insert statement sleepyDmlQ %s\n", err.Error()) + } + }(inst1) + _, err = inst1.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) + if err != nil { + fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) + return err + } + updateStmt, err := conn.PrepareContext(context.Background(), updateQuery) + if err != nil { + fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) + return err + } + defer func(updateStmt *sql.Stmt) { + err := updateStmt.Close() + if err != nil { + fmt.Printf("Error closing update statement sleepyDmlQ %s\n", err.Error()) + } + }(updateStmt) + _, err = updateStmt.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) + if err != nil { + fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) + return err + } + err = tx.Commit() + if err != nil { + fmt.Printf("Error committing sleepyDmlQ %s\n", err.Error()) + return err + } + return nil +} + func simpleEvict() { db, err := sql.Open("hera", "127.0.0.1:31002") if err != nil { @@ -157,10 +208,7 @@ func TestSqlEvict(t *testing.T) { simpleEvict() if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { t.Fatal("backlog timeout was not triggered") - } // */ - /* if (testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-102: backlog eviction", "hera.log") == 0) { - t.Fatal("backlog eviction was not triggered") - } // */ + } if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { t.Fatal("soft eviction was not triggered") } @@ -168,5 +216,81 @@ func TestSqlEvict(t *testing.T) { t.Fatal("eviction was not triggered") } logger.GetLogger().Log(logger.Debug, "TestSqlEvict stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(2 * time.Second) + time.Sleep(10 * time.Second) } // */ + +func TestSqlEvictDML(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + dmlEvict() + if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { + t.Fatal("backlog timeout was not triggered") + } + if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { + t.Fatal("soft eviction was not triggered") + } + if testutil.RegexCountFile("coordinator dispatchrequest: stranded conn HERA-101: saturation kill", "hera.log") == 0 { + t.Fatal("eviction was not triggered") + } + logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + time.Sleep(10 * time.Second) +} + +func dmlEvict() { + db, err := sql.Open("hera", "127.0.0.1:31002") + if err != nil { + fmt.Printf("Error db %s\n", err.Error()) + return + } + db.SetConnMaxLifetime(2 * time.Second) + db.SetMaxIdleConns(0) + db.SetMaxOpenConns(22111) + defer func(db *sql.DB) { + err := db.Close() + if err != nil { + fmt.Printf("Error closing db %s\n", err.Error()) + } + }(db) + + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error conn %s\n", err.Error()) + return + } + err = sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Error Executing first sleepyDmlQ %s\n", err.Error()) + return + } + + for i := 0; i < int(max_conn)+1; i++ { + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error #%d conn %s\n", i, err.Error()) + continue + } + time.Sleep(time.Millisecond * 100) + fmt.Printf("connection count %d\n", i) + go func(index int) { + err := sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Long query Request Id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) + } + }(i) + } + + for i := 0; i < 50; i++ { + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error #%d conn %s\n", i, err.Error()) + continue + } + time.Sleep(time.Millisecond * 100) + fmt.Printf("connection count %d\n", i) + go func(index int) { + err := sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Request id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) + } + }(i) + } +}