Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rpc errors logging #6362

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ type FunctorCallStatus struct {
Err error
}

func (cr CommandResult) Result() []any {
func (cr *CommandResult) Result() []any {
return cr.res
}

func (cr CommandResult) Error() error {
func (cr *CommandResult) Error() error {
return cr.err
}
func (cr CommandResult) Cancelled() bool {

func (cr *CommandResult) Cancelled() bool {
return cr.cancelled
}

func (cr CommandResult) FunctorCallStatuses() []FunctorCallStatus {
func (cr *CommandResult) FunctorCallStatuses() []FunctorCallStatus {
return cr.functorCallStatuses
}

func (cr *CommandResult) addCallStatus(circuitName string, err error) {
func (cr *CommandResult) addCallStatus(providerName string, err error) {
cr.functorCallStatuses = append(cr.functorCallStatuses, FunctorCallStatus{
Name: circuitName,
Name: providerName,
Timestamp: time.Now(),
Err: err,
})
Expand All @@ -62,8 +63,8 @@ func NewCommand(ctx context.Context, functors []*Functor) *Command {
}
}

func (cmd *Command) Add(ftor *Functor) {
cmd.functors = append(cmd.functors, ftor)
func (cmd *Command) Add(functor *Functor) {
cmd.functors = append(cmd.functors, functor)
}

func (cmd *Command) IsEmpty() bool {
Expand Down Expand Up @@ -94,14 +95,19 @@ func NewCircuitBreaker(config Config) *CircuitBreaker {
}

type Functor struct {
exec FallbackFunc
circuitName string
exec FallbackFunc
circuitName string
providerName string
}

func NewFunctor(exec FallbackFunc, circuitName string) *Functor {
// NewFunctor creates a new Functor with the provided FallbackFunc, circuitName and providerName.
// The circuitName is the name of the circuit to be used by the Functor. If the circuitName is empty,
// or there is only one Functor in the Command, the command will be executed without a circuit.
func NewFunctor(exec FallbackFunc, circuitName, providerName string) *Functor {
return &Functor{
exec: exec,
circuitName: circuitName,
exec: exec,
circuitName: circuitName,
providerName: providerName,
}
}

Expand All @@ -115,7 +121,7 @@ func accumulateCommandError(result CommandResult, circuitName string, err error)
return result
}

// Executes the command in its circuit if set.
// Execute the command in its circuit if set.
// If the command's circuit is not configured, the circuit of the CircuitBreaker is used.
// This is a blocking function.
func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
Expand All @@ -137,19 +143,20 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {

var err error
circuitName := f.circuitName
providerName := f.providerName
if cb.circuitNameHandler != nil {
circuitName = cb.circuitNameHandler(circuitName)
}

// if last command, execute without circuit
if i == len(cmd.functors)-1 {
if i == len(cmd.functors)-1 || circuitName == "" {
res, execErr := f.exec()
err = execErr
if err == nil {
result.res = res
result.err = nil
}
result.addCallStatus(circuitName, err)
result.addCallStatus(f.providerName, err)
} else {
if hystrix.GetCircuitSettings()[circuitName] == nil {
hystrix.ConfigureCommand(circuitName, hystrix.CommandConfig{
Expand All @@ -168,10 +175,10 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
result.res = res
result.err = nil
}
result.addCallStatus(circuitName, err)
result.addCallStatus(f.providerName, err)

// If the command has been cancelled, we don't count
// the error towars breaking the circuit, and then we break
// the error towards breaking the circuit, and then we break
if cmd.cancel {
result = accumulateCommandError(result, circuitName, err)
result.cancelled = true
Expand All @@ -187,25 +194,24 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
break
}

result = accumulateCommandError(result, circuitName, err)

// Lets abuse every provider with the same amount of MaxConcurrentRequests,
result = accumulateCommandError(result, providerName, err)
// Let's abuse every provider with the same amount of MaxConcurrentRequests,
// keep iterating even in case of ErrMaxConcurrency error
}
return result
}

func (c *CircuitBreaker) SetOverrideCircuitNameHandler(f func(string) string) {
c.circuitNameHandler = f
func (cb *CircuitBreaker) SetOverrideCircuitNameHandler(f func(string) string) {
cb.circuitNameHandler = f
}

// Expects a circuit to exist because a new circuit is always closed.
// Call CircuitExists to check if a circuit exists.
// IsCircuitOpen Expects a circuit to exist because a new circuit is always closed.
func IsCircuitOpen(circuitName string) bool {
circuit, wasCreated, _ := hystrix.GetCircuit(circuitName)
return !wasCreated && circuit.IsOpen()
}

// CircuitExists checks if a circuit exists.
func CircuitExists(circuitName string) bool {
_, wasCreated, _ := hystrix.GetCircuit(circuitName)
return !wasCreated
Expand Down
57 changes: 28 additions & 29 deletions circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCircuitBreaker_ExecuteSuccessSingle(t *testing.T) {
cmd := NewCommand(context.TODO(), []*Functor{
NewFunctor(func() ([]interface{}, error) {
return []any{expectedResult}, nil
}, circuitName)},
}, circuitName, "")},
)

result := cb.Execute(cmd)
Expand All @@ -53,13 +53,13 @@ func TestCircuitBreaker_ExecuteMultipleFallbacksFail(t *testing.T) {
NewFunctor(func() ([]interface{}, error) {
time.Sleep(100 * time.Millisecond) // will cause hystrix: timeout
return []any{success}, nil
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
return nil, errSecProvFailed
}, circuitName+"2"),
}, circuitName+"2", ""),
NewFunctor(func() ([]interface{}, error) {
return nil, errThirdProvFailed
}, circuitName+"3"),
}, circuitName+"3", ""),
})

result := cb.Execute(cmd)
Expand Down Expand Up @@ -87,13 +87,13 @@ func TestCircuitBreaker_ExecuteMultipleFallbacksFailButLastSuccessStress(t *test
cmd := NewCommand(context.TODO(), []*Functor{
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 1 failed")
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}, circuitName+"2"),
}, circuitName+"2", ""),
NewFunctor(func() ([]interface{}, error) {
return []any{expectedResult}, nil
}, circuitName+"3"),
}, circuitName+"3", ""),
},
)

Expand All @@ -120,15 +120,15 @@ func TestCircuitBreaker_ExecuteSwitchToWorkingProviderOnVolumeThresholdReached(t
NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, errors.New("provider 1 failed")
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
prov2Called++
return nil, errors.New("provider 2 failed")
}, circuitName+"2"),
}, circuitName+"2", ""),
NewFunctor(func() ([]interface{}, error) {
prov3Called++
return []any{expectedResult}, nil
}, circuitName+"3"),
}, circuitName+"3", ""),
})

result := cb.Execute(cmd)
Expand Down Expand Up @@ -160,11 +160,11 @@ func TestCircuitBreaker_ExecuteHealthCheckOnWindowTimeout(t *testing.T) {
NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, errors.New("provider 1 failed")
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
prov2Called++
return []any{expectedResult}, nil
}, circuitName+"2"),
}, circuitName+"2", ""),
})

result := cb.Execute(cmd)
Expand All @@ -183,11 +183,11 @@ func TestCircuitBreaker_ExecuteHealthCheckOnWindowTimeout(t *testing.T) {
NewFunctor(func() ([]interface{}, error) {
prov1Called++
return []any{expectedResult}, nil // Now it is working
}, circuitName+"1"),
}, circuitName+"1", ""),
NewFunctor(func() ([]interface{}, error) {
prov2Called++
return []any{expectedResult}, nil
}, circuitName+"2"),
}, circuitName+"2", ""),
})
result := cb.Execute(cmd)
require.NoError(t, result.Error())
Expand All @@ -212,19 +212,18 @@ func TestCircuitBreaker_CommandCancel(t *testing.T) {
prov1Called++
cmd.Cancel()
return nil, expectedErr
}, circuitName+"1"))
}, circuitName+"1", ""))
cmd.Add(NewFunctor(func() ([]interface{}, error) {
prov2Called++
return nil, errors.New("provider 2 failed")
}, circuitName+"2"))
}, circuitName+"2", ""))

result := cb.Execute(cmd)
require.True(t, errors.Is(result.Error(), expectedErr))
require.True(t, result.Cancelled())

assert.Equal(t, 1, prov1Called)
assert.Equal(t, 0, prov2Called)

}

func TestCircuitBreaker_EmptyOrNilCommand(t *testing.T) {
Expand All @@ -247,11 +246,11 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) {
// We add it twice as otherwise it's only used for the fallback
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, nil
}, existCircuit))
}, existCircuit, ""))

cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, nil
}, existCircuit))
}, existCircuit, ""))
_ = cb.Execute(cmd)
require.True(t, CircuitExists(existCircuit))
require.False(t, IsCircuitOpen(existCircuit))
Expand All @@ -276,10 +275,10 @@ func TestCircuitBreaker_Fallback(t *testing.T) {
cmd := NewCommand(ctx, nil)
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, expectedErr
}, circuitName+"1"))
}, circuitName+"1", ""))
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}, circuitName+"2"))
}, circuitName+"2", ""))

result := cb.Execute(cmd)
require.NotNil(t, result.Error())
Expand All @@ -297,7 +296,7 @@ func TestCircuitBreaker_Fallback(t *testing.T) {
cmd.Add(NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, expectedErr
}, circuitName+"1"))
}, circuitName+"1", ""))

result := cb.Execute(cmd)
require.True(t, errors.Is(result.Error(), expectedErr))
Expand All @@ -310,7 +309,7 @@ func TestCircuitBreaker_SuccessCallStatus(t *testing.T) {

functor := NewFunctor(func() ([]any, error) {
return []any{"success"}, nil
}, "successCircuit")
}, "successCircuit", "")

cmd := NewCommand(context.Background(), []*Functor{functor})

Expand All @@ -337,7 +336,7 @@ func TestCircuitBreaker_ErrorCallStatus(t *testing.T) {
expectedError := errors.New("functor error")
functor := NewFunctor(func() ([]any, error) {
return nil, expectedError
}, "errorCircuit")
}, "errorCircuit", "")

cmd := NewCommand(context.Background(), []*Functor{functor})

Expand All @@ -364,7 +363,7 @@ func TestCircuitBreaker_CancelledResult(t *testing.T) {
functor := NewFunctor(func() ([]any, error) {
time.Sleep(500 * time.Millisecond)
return []any{"should not be returned"}, nil
}, "cancelCircuit")
}, "cancelCircuit", "")

cmd := NewCommand(context.Background(), []*Functor{functor})
cmd.Cancel()
Expand All @@ -388,11 +387,11 @@ func TestCircuitBreaker_MultipleFunctorsResult(t *testing.T) {

functor1 := NewFunctor(func() ([]any, error) {
return nil, errors.New("functor1 error")
}, "circuit1")
}, "circuit1", "")

functor2 := NewFunctor(func() ([]any, error) {
return []any{"success from functor2"}, nil
}, "circuit2")
}, "circuit2", "")

cmd := NewCommand(context.Background(), []*Functor{functor1, functor2})

Expand Down Expand Up @@ -424,11 +423,11 @@ func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
failingFunctor := NewFunctor(func() ([]any, error) {
time.Sleep(20 * time.Millisecond)
return nil, errors.New("should time out")
}, "circuitName")
}, "circuitName", "")

successFunctor := NewFunctor(func() ([]any, error) {
return []any{"success without circuit"}, nil
}, "circuitName")
}, "circuitName", "")

cmd := NewCommand(context.Background(), []*Functor{failingFunctor, successFunctor})

Expand Down
4 changes: 2 additions & 2 deletions healthmanager/blockchain_health_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type BlockchainHealthManager struct {
aggregator *aggregator.Aggregator
subscriptionManager *SubscriptionManager

providers map[uint64]*ProvidersHealthManager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
providers map[uint64]*ProvidersHealthManager // ChainID to its providers health manager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
lastStatus *BlockchainStatus
wg sync.WaitGroup
}
Expand Down
Loading
Loading