From 4c946991d523e9583b915fb0e27984a03942cc64 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 18 Feb 2024 15:58:54 +0000 Subject: [PATCH 1/2] util/circuit: allow nil event handler Users may not need an event handler, allow it to be unset. Epic: none Release note: None --- pkg/jobs/joberror/errors_test.go | 1 - pkg/rpc/peer.go | 3 --- pkg/util/circuit/circuitbreaker.go | 17 +++++++++++++---- pkg/util/circuit/circuitbreaker_test.go | 1 - pkg/util/circuit/options.go | 3 ++- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/jobs/joberror/errors_test.go b/pkg/jobs/joberror/errors_test.go index 1040277a3c01..c72c02ddc928 100644 --- a/pkg/jobs/joberror/errors_test.go +++ b/pkg/jobs/joberror/errors_test.go @@ -25,7 +25,6 @@ func TestErrBreakerOpenIsRetriable(t *testing.T) { AsyncProbe: func(_ func(error), done func()) { done() // never untrip }, - EventHandler: &circuit.EventLogger{Log: func(redact.StringBuilder) {}}, }) br.Report(errors.New("test error")) require.False(t, IsPermanentBulkJobError(br.Signal().Err())) diff --git a/pkg/rpc/peer.go b/pkg/rpc/peer.go index aa3052c30d9f..264071c51227 100644 --- a/pkg/rpc/peer.go +++ b/pkg/rpc/peer.go @@ -185,9 +185,6 @@ func (rpcCtx *Context) newPeer(k peerKey) *peer { p.launch(ctx, report, done) }) }, - // Use a noop EventHandler; we do our own logging in the probe since we'll - // have better information. - EventHandler: &circuit.EventLogger{Log: func(buf redact.StringBuilder) {}}, }) p.b = b c := newConnectionToNodeID(k, b.Signal) diff --git a/pkg/util/circuit/circuitbreaker.go b/pkg/util/circuit/circuitbreaker.go index d9cee657764e..d52834799026 100644 --- a/pkg/util/circuit/circuitbreaker.go +++ b/pkg/util/circuit/circuitbreaker.go @@ -147,7 +147,9 @@ func (b *Breaker) Report(err error) { }() opts := b.Opts() - opts.EventHandler.OnTrip(b, prevErr, storeErr) + if opts.EventHandler != nil { + opts.EventHandler.OnTrip(b, prevErr, storeErr) + } if prevErr == nil { // If the breaker wasn't previously tripped, trigger the probe to give the // Breaker a shot at healing right away. If the breaker is already tripped, @@ -163,7 +165,10 @@ func (b *Breaker) Report(err error) { // Outside of testing, there should be no reason to call this // as it is the probe's job to reset the breaker if appropriate. func (b *Breaker) Reset() { - b.Opts().EventHandler.OnReset(b) + opts := b.Opts() + if opts.EventHandler != nil { + opts.EventHandler.OnReset(b) + } b.mu.Lock() defer b.mu.Unlock() // Avoid replacing errAndCh if it wasn't tripped. Otherwise, @@ -255,7 +260,9 @@ func (b *Breaker) maybeTriggerProbe(force bool) { opts := *b.mu.Options // ok to leak out from under the lock b.mu.Unlock() - opts.EventHandler.OnProbeLaunched(b) + if opts.EventHandler != nil { + opts.EventHandler.OnProbeLaunched(b) + } var once sync.Once opts.AsyncProbe( func(err error) { @@ -269,7 +276,9 @@ func (b *Breaker) maybeTriggerProbe(force bool) { // Avoid potential problems when probe calls done() multiple times. // It shouldn't do that, but mistakes happen. once.Do(func() { - opts.EventHandler.OnProbeDone(b) + if opts.EventHandler != nil { + opts.EventHandler.OnProbeDone(b) + } b.mu.Lock() defer b.mu.Unlock() b.mu.probing = false diff --git a/pkg/util/circuit/circuitbreaker_test.go b/pkg/util/circuit/circuitbreaker_test.go index 1fa9b1887314..cfa8070d3c79 100644 --- a/pkg/util/circuit/circuitbreaker_test.go +++ b/pkg/util/circuit/circuitbreaker_test.go @@ -438,7 +438,6 @@ func BenchmarkBreaker_Signal(b *testing.B) { AsyncProbe: func(_ func(error), done func()) { done() // never untrip }, - EventHandler: &EventLogger{Log: func(redact.StringBuilder) {}}, }) // The point of this benchmark is to verify the absence of allocations when diff --git a/pkg/util/circuit/options.go b/pkg/util/circuit/options.go index 6945343a16c7..72e619812f5b 100644 --- a/pkg/util/circuit/options.go +++ b/pkg/util/circuit/options.go @@ -42,7 +42,8 @@ type Options struct { AsyncProbe func(report func(error), done func()) // EventHandler receives events from the Breaker. For an implementation that - // performs unstructured logging, see EventLogger. + // performs unstructured logging, see EventLogger. Can be nil if no event + // handler is needed. EventHandler EventHandler // signalInterceptor gets to see and change the return value of the Signal From 8f405f38790bde4396917af6b408c594e957eaad Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 18 Feb 2024 16:16:41 +0000 Subject: [PATCH 2/2] kvserver: add `Replica.GetMutexForTesting()` This allows testing replica stalls and deadlocks from outside of the kvserver package. Epic: none Release note: None --- pkg/kv/kvserver/kvserverbase/stores.go | 1 + pkg/kv/kvserver/replica.go | 5 +++++ pkg/kv/kvserver/stores_base.go | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/kvserverbase/stores.go b/pkg/kv/kvserver/kvserverbase/stores.go index d17312dc703d..8f9c3452da33 100644 --- a/pkg/kv/kvserver/kvserverbase/stores.go +++ b/pkg/kv/kvserver/kvserverbase/stores.go @@ -42,6 +42,7 @@ type Store interface { // GetReplicaMutexForTesting returns the mutex of the replica with the given // range ID, or nil if no replica was found. This is used for testing. + // Returns a syncutil.RWMutex rather than ReplicaMutex to avoid import cycles. GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 0478f7994a4e..5a65c8cf9186 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2464,3 +2464,8 @@ func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err er ts, err = r.readProtectedTimestampsRLocked(ctx) return err } + +// GetMutexForTesting returns the replica's mutex, for use in tests. +func (r *Replica) GetMutexForTesting() *ReplicaMutex { + return &r.mu.ReplicaMutex +} diff --git a/pkg/kv/kvserver/stores_base.go b/pkg/kv/kvserver/stores_base.go index 7e4ed5c2f564..a77a86b4dbcc 100644 --- a/pkg/kv/kvserver/stores_base.go +++ b/pkg/kv/kvserver/stores_base.go @@ -99,7 +99,7 @@ func (s *baseStore) SetQueueActive(active bool, queue string) error { func (s *baseStore) GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex { store := (*Store)(s) if repl := store.GetReplicaIfExists(rangeID); repl != nil { - return (*syncutil.RWMutex)(&repl.mu.ReplicaMutex) + return (*syncutil.RWMutex)(repl.GetMutexForTesting()) } return nil }