Skip to content

Commit 2a6dd6b

Browse files
eeldalyyeya24
andauthored
Add compactor auto-forget from ring on unhealthy (#6563)
* Add metric name in limiter per-metric exceeded errors Signed-off-by: Essam Eldaly <[email protected]> * Remove unused labels from errors Signed-off-by: Essam Eldaly <[email protected]> * fmt Signed-off-by: Essam Eldaly <[email protected]> * Add for ring to auto forget unhealthy compactors Signed-off-by: Essam Eldaly <[email protected]> * Changed LifecyclerDelegate implementation to more closely match BasicLifecyclerDelegate Signed-off-by: Essam Eldaly <[email protected]> * Changelog fix Signed-off-by: Essam Eldaly <[email protected]> * Changelog fix Signed-off-by: Essam Eldaly <[email protected]> * auto forget compactor default set to 2*HeartbeatTimeout Signed-off-by: Essam Eldaly <[email protected]> * added compactor auto forget test Signed-off-by: Essam Eldaly <[email protected]> * gofmt Signed-off-by: Essam Eldaly <[email protected]> * forget compactor test race condition fix Signed-off-by: Essam Eldaly <[email protected]> * gofmt Signed-off-by: Essam Eldaly <[email protected]> * check-doc Signed-off-by: Essam Eldaly <[email protected]> * removed sleep Signed-off-by: Essam Eldaly <[email protected]> * removed no error check on compactor stop in auto forget test Signed-off-by: Essam Eldaly <[email protected]> * fixed auto forget compactor test Signed-off-by: Essam Eldaly <[email protected]> * fixed auto forget compactor test Signed-off-by: Essam Eldaly <[email protected]> --------- Signed-off-by: Essam Eldaly <[email protected]> Signed-off-by: Ben Ye <[email protected]> Co-authored-by: Ben Ye <[email protected]>
1 parent 49e7ca1 commit 2a6dd6b

File tree

10 files changed

+160
-10
lines changed

10 files changed

+160
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [ENHANCEMENT] Add `compactor.auto-forget-delay` for compactor to auto forget compactors after X minutes without heartbeat. #6533
45

56
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
67
* [ENHANCEMENT] StoreGateway: Emit more histogram buckets on the `cortex_querier_storegateway_refetches_per_query` metric. #6570

docs/blocks-storage/compactor.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ compactor:
263263
# CLI flag: -compactor.ring.heartbeat-timeout
264264
[heartbeat_timeout: <duration> | default = 1m]
265265

266+
# Time since last heartbeat before compactor will be removed from ring. 0 to
267+
# disable
268+
# CLI flag: -compactor.auto-forget-delay
269+
[auto_forget_delay: <duration> | default = 2m]
270+
266271
# Minimum time to wait for ring stability at startup. 0 to disable.
267272
# CLI flag: -compactor.ring.wait-stability-min-duration
268273
[wait_stability_min_duration: <duration> | default = 1m]

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2328,6 +2328,11 @@ sharding_ring:
23282328
# CLI flag: -compactor.ring.heartbeat-timeout
23292329
[heartbeat_timeout: <duration> | default = 1m]
23302330
2331+
# Time since last heartbeat before compactor will be removed from ring. 0 to
2332+
# disable
2333+
# CLI flag: -compactor.auto-forget-delay
2334+
[auto_forget_delay: <duration> | default = 2m]
2335+
23312336
# Minimum time to wait for ring stability at startup. 0 to disable.
23322337
# CLI flag: -compactor.ring.wait-stability-min-duration
23332338
[wait_stability_min_duration: <duration> | default = 1m]

pkg/compactor/compactor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,12 @@ func (c *Compactor) starting(ctx context.Context) error {
656656
// Initialize the compactors ring if sharding is enabled.
657657
if c.compactorCfg.ShardingEnabled {
658658
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
659-
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
659+
var delegate ring.LifecyclerDelegate
660+
delegate = &ring.DefaultLifecyclerDelegate{}
661+
if c.compactorCfg.ShardingRing.AutoForgetDelay > 0 {
662+
delegate = ring.NewLifecyclerAutoForgetDelegate(c.compactorCfg.ShardingRing.AutoForgetDelay, delegate, c.logger)
663+
}
664+
c.ringLifecycler, err = ring.NewLifecyclerWithDelegate(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer), delegate)
660665
if err != nil {
661666
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
662667
}

pkg/compactor/compactor_ring.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type RingConfig struct {
2121
KVStore kv.Config `yaml:"kvstore"`
2222
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
2323
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
24+
AutoForgetDelay time.Duration `yaml:"auto_forget_delay"`
2425

2526
// Wait ring stability.
2627
WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
@@ -54,6 +55,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
5455
cfg.KVStore.RegisterFlagsWithPrefix("compactor.ring.", "collectors/", f)
5556
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
5657
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")
58+
f.DurationVar(&cfg.AutoForgetDelay, "compactor.auto-forget-delay", 2*cfg.HeartbeatTimeout, "Time since last heartbeat before compactor will be removed from ring. 0 to disable")
5759

5860
// Wait stability flags.
5961
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")

pkg/compactor/compactor_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,3 +2113,74 @@ func TestCompactor_FailedWithHaltError(t *testing.T) {
21132113
"cortex_compactor_compaction_error_total",
21142114
))
21152115
}
2116+
2117+
func TestCompactor_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
2118+
// Setup user IDs
2119+
userID := "user-0"
2120+
inmem := objstore.WithNoopInstr(objstore.NewInMemBucket())
2121+
2122+
id, err := ulid.New(ulid.Now(), rand.Reader)
2123+
require.NoError(t, err)
2124+
require.NoError(t, inmem.Upload(context.Background(), userID+"/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String()))))
2125+
2126+
// Create a shared KV Store
2127+
kvstore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
2128+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2129+
2130+
// Create two compactors
2131+
var compactors []*Compactor
2132+
2133+
for i := 0; i < 2; i++ {
2134+
// Setup config
2135+
cfg := prepareConfig()
2136+
2137+
cfg.ShardingEnabled = true
2138+
cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i)
2139+
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
2140+
cfg.ShardingRing.WaitStabilityMinDuration = time.Second
2141+
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
2142+
cfg.ShardingRing.KVStore.Mock = kvstore
2143+
cfg.ShardingRing.HeartbeatPeriod = 200 * time.Millisecond
2144+
cfg.ShardingRing.UnregisterOnShutdown = false
2145+
cfg.ShardingRing.AutoForgetDelay = 400 * time.Millisecond
2146+
2147+
// Compactor will get its own temp dir for storing local files.
2148+
compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
2149+
compactor.logger = log.NewNopLogger()
2150+
2151+
compactors = append(compactors, compactor)
2152+
2153+
// Mock the planner as if there's no compaction to do,
2154+
// in order to simplify tests (all in all, we just want to
2155+
// test our logic and not TSDB compactor which we expect to
2156+
// be already tested).
2157+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
2158+
}
2159+
2160+
require.Equal(t, 2, len(compactors))
2161+
compactor := compactors[0]
2162+
compactor2 := compactors[1]
2163+
2164+
// Start compactors
2165+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), compactor))
2166+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), compactor2))
2167+
2168+
// Wait until a run has completed.
2169+
cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} {
2170+
return prom_testutil.ToFloat64(compactor2.CompactionRunsCompleted)
2171+
})
2172+
2173+
cortex_testutil.Poll(t, 5000*time.Millisecond, true, func() interface{} {
2174+
healthy, unhealthy, _ := compactor.ring.GetAllInstanceDescs(ring.Reporting)
2175+
return len(healthy) == 2 && len(unhealthy) == 0
2176+
})
2177+
2178+
// Make one compactor unhealthy in ring by stopping the
2179+
// compactor service while UnregisterOnShutdown is false
2180+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), compactor2))
2181+
2182+
cortex_testutil.Poll(t, 5000*time.Millisecond, true, func() interface{} {
2183+
healthy, unhealthy, _ := compactor.ring.GetAllInstanceDescs(ring.Reporting)
2184+
return len(healthy) == 1 && len(unhealthy) == 0
2185+
})
2186+
}

pkg/ring/basic_lifecycler_delegates.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,6 @@ func (d *AutoForgetDelegate) OnRingInstanceStopping(lifecycler *BasicLifecycler)
141141
}
142142

143143
func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler, ringDesc *Desc, instanceDesc *InstanceDesc) {
144-
for id, instance := range ringDesc.Ingesters {
145-
lastHeartbeat := time.Unix(instance.GetTimestamp(), 0)
146-
147-
if time.Since(lastHeartbeat) > d.forgetPeriod {
148-
level.Warn(d.logger).Log("msg", "auto-forgetting instance from the ring because it is unhealthy for a long time", "instance", id, "last_heartbeat", lastHeartbeat.String(), "forget_period", d.forgetPeriod)
149-
ringDesc.RemoveIngester(id)
150-
}
151-
}
152-
144+
AutoForgetFromRing(ringDesc, d.forgetPeriod, d.logger)
153145
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc, instanceDesc)
154146
}

pkg/ring/lifecycler.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ var (
2727
errInvalidTokensGeneratorStrategy = errors.New("invalid token generator strategy")
2828
)
2929

30+
type LifecyclerDelegate interface {
31+
// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
32+
// in the ring.
33+
OnRingInstanceHeartbeat(lifecycler *Lifecycler, ringDesc *Desc)
34+
}
35+
36+
type DefaultLifecyclerDelegate struct{}
37+
38+
func (d DefaultLifecyclerDelegate) OnRingInstanceHeartbeat(lifecycler *Lifecycler, ringDesc *Desc) {
39+
}
40+
3041
// LifecyclerConfig is the config to build a Lifecycler.
3142
type LifecyclerConfig struct {
3243
RingConfig Config `yaml:"ring"`
@@ -108,6 +119,7 @@ type Lifecycler struct {
108119
cfg LifecyclerConfig
109120
flushTransferer FlushTransferer
110121
KVStore kv.Client
122+
delegate LifecyclerDelegate
111123

112124
actorChan chan func()
113125
autojoinChan chan struct{}
@@ -150,6 +162,22 @@ type Lifecycler struct {
150162
tg TokenGenerator
151163
}
152164

165+
func NewLifecyclerWithDelegate(
166+
cfg LifecyclerConfig,
167+
flushTransferer FlushTransferer,
168+
ringName, ringKey string,
169+
autoJoinOnStartup, flushOnShutdown bool,
170+
logger log.Logger,
171+
reg prometheus.Registerer,
172+
delegate LifecyclerDelegate,
173+
) (*Lifecycler, error) {
174+
l, err := NewLifecycler(cfg, flushTransferer, ringName, ringKey, autoJoinOnStartup, flushOnShutdown, logger, reg)
175+
if l != nil {
176+
l.delegate = delegate
177+
}
178+
return l, err
179+
}
180+
153181
// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
154182
func NewLifecycler(
155183
cfg LifecyclerConfig,
@@ -209,6 +237,7 @@ func NewLifecycler(
209237
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
210238
logger: logger,
211239
tg: tg,
240+
delegate: &DefaultLifecyclerDelegate{},
212241
}
213242

214243
l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens))
@@ -973,6 +1002,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
9731002
instanceDesc.RegisteredTimestamp = i.getRegisteredAt().Unix()
9741003
ringDesc.Ingesters[i.ID] = instanceDesc
9751004
}
1005+
i.delegate.OnRingInstanceHeartbeat(i, ringDesc)
9761006

9771007
return ringDesc, true, nil
9781008
})

pkg/ring/lifecycler_delegates.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package ring
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-kit/log"
7+
)
8+
9+
// AutoForgetDelegate automatically remove an instance from the ring if the last
10+
// heartbeat is older than a configured period.
11+
type LifecyclerAutoForgetDelegate struct {
12+
next LifecyclerDelegate
13+
logger log.Logger
14+
forgetPeriod time.Duration
15+
}
16+
17+
func NewLifecyclerAutoForgetDelegate(forgetPeriod time.Duration, next LifecyclerDelegate, logger log.Logger) *LifecyclerAutoForgetDelegate {
18+
return &LifecyclerAutoForgetDelegate{
19+
next: next,
20+
logger: logger,
21+
forgetPeriod: forgetPeriod,
22+
}
23+
}
24+
25+
func (d *LifecyclerAutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *Lifecycler, ringDesc *Desc) {
26+
AutoForgetFromRing(ringDesc, d.forgetPeriod, d.logger)
27+
d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc)
28+
}

pkg/ring/ring.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,3 +1029,14 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool {
10291029

10301030
// All states are healthy, no states extend replica set.
10311031
var allStatesRingOperation = Operation(0x0000ffff)
1032+
1033+
func AutoForgetFromRing(ringDesc *Desc, forgetPeriod time.Duration, logger log.Logger) {
1034+
for id, instance := range ringDesc.Ingesters {
1035+
lastHeartbeat := time.Unix(instance.GetTimestamp(), 0)
1036+
1037+
if time.Since(lastHeartbeat) > forgetPeriod {
1038+
level.Warn(logger).Log("msg", "auto-forgetting instance from the ring because it is unhealthy for a long time", "instance", id, "last_heartbeat", lastHeartbeat.String(), "forget_period", forgetPeriod)
1039+
ringDesc.RemoveIngester(id)
1040+
}
1041+
}
1042+
}

0 commit comments

Comments
 (0)