Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119350: util/circuit: allow nil event handler r=erikgrinaker a=erikgrinaker

Extracted from #118943.

---

Users may not need an event handler, allow it to be unset.

Epic: none
Release note: None

119351: kvserver: add `Replica.GetMutexForTesting()` r=erikgrinaker a=erikgrinaker

Extracted from #118943.

---

This allows testing replica stalls and deadlocks from outside of the kvserver package.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Feb 20, 2024
3 parents 66460bf + 4c94699 + 8f405f3 commit a78e197
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 11 deletions.
1 change: 0 additions & 1 deletion pkg/jobs/joberror/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestErrBreakerOpenIsRetriable(t *testing.T) {
AsyncProbe: func(_ func(error), done func()) {
done() // never untrip
},
EventHandler: &circuit.EventLogger{Log: func(redact.StringBuilder) {}},
})
br.Report(errors.New("test error"))
require.False(t, IsPermanentBulkJobError(br.Signal().Err()))
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverbase/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Store interface {

// GetReplicaMutexForTesting returns the mutex of the replica with the given
// range ID, or nil if no replica was found. This is used for testing.
// Returns a syncutil.RWMutex rather than ReplicaMutex to avoid import cycles.
GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2464,3 +2464,8 @@ func (r *Replica) ReadProtectedTimestampsForTesting(ctx context.Context) (err er
ts, err = r.readProtectedTimestampsRLocked(ctx)
return err
}

// GetMutexForTesting returns the replica's mutex, for use in tests.
func (r *Replica) GetMutexForTesting() *ReplicaMutex {
return &r.mu.ReplicaMutex
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/stores_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *baseStore) SetQueueActive(active bool, queue string) error {
func (s *baseStore) GetReplicaMutexForTesting(rangeID roachpb.RangeID) *syncutil.RWMutex {
store := (*Store)(s)
if repl := store.GetReplicaIfExists(rangeID); repl != nil {
return (*syncutil.RWMutex)(&repl.mu.ReplicaMutex)
return (*syncutil.RWMutex)(repl.GetMutexForTesting())
}
return nil
}
3 changes: 0 additions & 3 deletions pkg/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ func (rpcCtx *Context) newPeer(k peerKey) *peer {
p.launch(ctx, report, done)
})
},
// Use a noop EventHandler; we do our own logging in the probe since we'll
// have better information.
EventHandler: &circuit.EventLogger{Log: func(buf redact.StringBuilder) {}},
})
p.b = b
c := newConnectionToNodeID(k, b.Signal)
Expand Down
17 changes: 13 additions & 4 deletions pkg/util/circuit/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (b *Breaker) Report(err error) {
}()

opts := b.Opts()
opts.EventHandler.OnTrip(b, prevErr, storeErr)
if opts.EventHandler != nil {
opts.EventHandler.OnTrip(b, prevErr, storeErr)
}
if prevErr == nil {
// If the breaker wasn't previously tripped, trigger the probe to give the
// Breaker a shot at healing right away. If the breaker is already tripped,
Expand All @@ -163,7 +165,10 @@ func (b *Breaker) Report(err error) {
// Outside of testing, there should be no reason to call this
// as it is the probe's job to reset the breaker if appropriate.
func (b *Breaker) Reset() {
b.Opts().EventHandler.OnReset(b)
opts := b.Opts()
if opts.EventHandler != nil {
opts.EventHandler.OnReset(b)
}
b.mu.Lock()
defer b.mu.Unlock()
// Avoid replacing errAndCh if it wasn't tripped. Otherwise,
Expand Down Expand Up @@ -255,7 +260,9 @@ func (b *Breaker) maybeTriggerProbe(force bool) {
opts := *b.mu.Options // ok to leak out from under the lock
b.mu.Unlock()

opts.EventHandler.OnProbeLaunched(b)
if opts.EventHandler != nil {
opts.EventHandler.OnProbeLaunched(b)
}
var once sync.Once
opts.AsyncProbe(
func(err error) {
Expand All @@ -269,7 +276,9 @@ func (b *Breaker) maybeTriggerProbe(force bool) {
// Avoid potential problems when probe calls done() multiple times.
// It shouldn't do that, but mistakes happen.
once.Do(func() {
opts.EventHandler.OnProbeDone(b)
if opts.EventHandler != nil {
opts.EventHandler.OnProbeDone(b)
}
b.mu.Lock()
defer b.mu.Unlock()
b.mu.probing = false
Expand Down
1 change: 0 additions & 1 deletion pkg/util/circuit/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func BenchmarkBreaker_Signal(b *testing.B) {
AsyncProbe: func(_ func(error), done func()) {
done() // never untrip
},
EventHandler: &EventLogger{Log: func(redact.StringBuilder) {}},
})

// The point of this benchmark is to verify the absence of allocations when
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/circuit/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type Options struct {
AsyncProbe func(report func(error), done func())

// EventHandler receives events from the Breaker. For an implementation that
// performs unstructured logging, see EventLogger.
// performs unstructured logging, see EventLogger. Can be nil if no event
// handler is needed.
EventHandler EventHandler

// signalInterceptor gets to see and change the return value of the Signal
Expand Down

0 comments on commit a78e197

Please sign in to comment.