From f34ddeb791282e09b311e9bc521f4b2f4079ff6d Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 27 Nov 2025 16:56:28 +0100 Subject: [PATCH] asim: improve ability to specify store liveness and node membership/drain status When determining which stores/nodes are fit for being considered as sources and/or targets, we have to consider the two dimensions of liveness (is the process likely running?) and membership/operator intent (is this store decommissioning or draining?). The current asim model models health as a LivenessStatus, which is a single flat enum value. However, both the single-metric and multi-metric allocators in production can draw from a richer tapestry of signals. For example, #155734 sees the single-metric allocator make poor rebalancing decisions that hinge on the fact that it can consider a node "decommissioning" in one place but "unavailable" in another. We couldn't reproduce that particular issue in asim because the liveness status would "force" the single-metric allocator to consistently observe either of the two values - something that can be different in production code. Independently but relatedly, we are also working on adding liveness/status responsivity to the multi-metric allocator (#156776). We are careful to avoid some of the poor behaviors of the single-metric allocator, but similarly, we want to be able to exercise both decommissioning live and decommissioning dead nodes, not to mention that mma will reason about liveness at a store level. All of this suggested a reworking of how asim models liveness and operator intent. In fact, I ended up in this refactor after having implemented the health/status "plumbing" for the multi metric allocator and realizing that we'd need to make changes such as the ones presented here to meaningfully simulation test mma in the presence of interesting liveness/status combinations. The re-working ended up being fairly intense, but I really like where it gets us. We no longer model liveness using a LivenessStatus enum value. Instead, we have asim-attached types StoreStatus and NodeStatus that cleanly track liveness (for a store) and membership/draining status (which are node-level concepts today) and the DSL commands reflect this - we can set individual stores as live/dead, we can set nodes as decommissioning, et cetera. The single-metric allocator now receives signals that closely match what it would in production. It is now possible to specify a situation in which a store is decommissioning but also recently became non-live (which is what was required to simulate #155734). These signals originate in our clean new structs, from which we can (without bending over backwards) construct "NodeVitality" objects, which is what is required. As a reminder, the multi-metric allocator doesn't react to health signals at all yet, so it hasn't been hooked up to anything. This will be done in a separate PR that also adds logic to mma that puts these signals to use. Informs #156776. Epic: CRDB-55052 --- pkg/kv/kvserver/asim/event/BUILD.bazel | 1 - pkg/kv/kvserver/asim/event/event.go | 2 +- pkg/kv/kvserver/asim/event/mutation_event.go | 62 +++++-- .../kvserver/asim/queue/lease_queue_test.go | 11 +- .../asim/queue/replicate_queue_test.go | 32 ++-- pkg/kv/kvserver/asim/state/impl.go | 73 ++++++-- pkg/kv/kvserver/asim/state/liveness.go | 159 +++++++++++------- pkg/kv/kvserver/asim/state/liveness_test.go | 95 +++++++---- pkg/kv/kvserver/asim/state/state.go | 17 +- pkg/kv/kvserver/asim/state/state_test.go | 45 +++-- .../asim/tests/datadriven_simulation_test.go | 116 +++++++++++-- .../non_rand/sma/decommission_conformance.txt | 2 +- ...plicate_recently_down_node_issue152604.txt | 11 +- .../tests/testdata/non_rand/sma/liveness.txt | 4 +- 14 files changed, 446 insertions(+), 184 deletions(-) diff --git a/pkg/kv/kvserver/asim/event/BUILD.bazel b/pkg/kv/kvserver/asim/event/BUILD.bazel index 6122f4fc3178..b9ce8c93472f 100644 --- a/pkg/kv/kvserver/asim/event/BUILD.bazel +++ b/pkg/kv/kvserver/asim/event/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/history", "//pkg/kv/kvserver/asim/state", - "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/asim/event/event.go b/pkg/kv/kvserver/asim/event/event.go index fd8dd6220610..d92b30eafc38 100644 --- a/pkg/kv/kvserver/asim/event/event.go +++ b/pkg/kv/kvserver/asim/event/event.go @@ -16,7 +16,7 @@ import ( // Event outlines the necessary behaviours that event structs must implement. // Some key implementations of the interface includes assertionEvent, -// SetSpanConfigEvent, AddNodeEvent, SetNodeLivenessEvent, +// SetSpanConfigEvent, AddNodeEvent, SetNodeStatusEvent, // SetCapacityOverrideEvent. type Event interface { // Func returns a closure associated with event which could be an assertion diff --git a/pkg/kv/kvserver/asim/event/mutation_event.go b/pkg/kv/kvserver/asim/event/mutation_event.go index ec0b7806c9fe..d4ac8cfd6f42 100644 --- a/pkg/kv/kvserver/asim/event/mutation_event.go +++ b/pkg/kv/kvserver/asim/event/mutation_event.go @@ -12,7 +12,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -31,12 +30,29 @@ type AddNodeEvent struct { LocalityString string } -// SetNodeLivenessEvent represents a mutation event responsible for setting -// liveness status of a node identified by the NodeID to the specified -// LivenessStatus. +// SetStoreStatusEvent represents a mutation event for setting the liveness +// of a single store. +type SetStoreStatusEvent struct { + StoreID state.StoreID + Status state.StoreStatus +} + +// SetNodeStatusEvent represents a mutation event for setting the membership +// and draining signals of a node. +type SetNodeStatusEvent struct { + NodeID state.NodeID + Status state.NodeStatus +} + +// SetNodeLivenessEvent represents a mutation event for setting the liveness +// of all stores on a node at once. This exists because DSL commands are parsed +// before the simulation runs, at which point we don't yet know which stores +// exist on each node - that's determined when the cluster is built. So we +// can't expand "node=X liveness=Y" into individual store events at parse time; +// we must defer to runtime when the state knows the node's stores. type SetNodeLivenessEvent struct { - NodeId state.NodeID - LivenessStatus livenesspb.NodeLivenessStatus + NodeID state.NodeID + Liveness state.LivenessState } // SetCapacityOverrideEvent represents a mutation event responsible for updating @@ -63,6 +79,8 @@ type SetSimulationSettingsEvent struct { var _ Event = &SetSpanConfigEvent{} var _ Event = &AddNodeEvent{} +var _ Event = &SetStoreStatusEvent{} +var _ Event = &SetNodeStatusEvent{} var _ Event = &SetNodeLivenessEvent{} var _ Event = &SetCapacityOverrideEvent{} var _ Event = &SetNodeLocalityEvent{} @@ -149,17 +167,35 @@ func (ae AddNodeEvent) String() string { return buf.String() } -func (sne SetNodeLivenessEvent) Func() EventFunc { +func (ssse SetStoreStatusEvent) Func() EventFunc { + return MutationFunc(func(ctx context.Context, s state.State) { + s.SetStoreStatus(ssse.StoreID, ssse.Status) + }) +} + +func (ssse SetStoreStatusEvent) String() string { + return fmt.Sprintf("set s%d liveness=%v", ssse.StoreID, ssse.Status.Liveness) +} + +func (snse SetNodeStatusEvent) Func() EventFunc { + return MutationFunc(func(ctx context.Context, s state.State) { + s.SetNodeStatus(snse.NodeID, snse.Status) + }) +} + +func (snse SetNodeStatusEvent) String() string { + return fmt.Sprintf("set n%d membership=%v, draining=%t", + snse.NodeID, snse.Status.Membership, snse.Status.Draining) +} + +func (snle SetNodeLivenessEvent) Func() EventFunc { return MutationFunc(func(ctx context.Context, s state.State) { - s.SetNodeLiveness( - sne.NodeId, - sne.LivenessStatus, - ) + s.SetAllStoresLiveness(snle.NodeID, snle.Liveness) }) } -func (sne SetNodeLivenessEvent) String() string { - return fmt.Sprintf("set n%d to %v", sne.NodeId, sne.LivenessStatus) +func (snle SetNodeLivenessEvent) String() string { + return fmt.Sprintf("set n%d liveness=%v (all stores)", snle.NodeID, snle.Liveness) } func (sce SetCapacityOverrideEvent) Func() EventFunc { diff --git a/pkg/kv/kvserver/asim/queue/lease_queue_test.go b/pkg/kv/kvserver/asim/queue/lease_queue_test.go index 0bd99f1d3cfd..b43bad0bc05b 100644 --- a/pkg/kv/kvserver/asim/queue/lease_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/lease_queue_test.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) @@ -50,7 +49,7 @@ func TestLeaseQueue(t *testing.T) { replicaCounts map[state.StoreID]int spanConfig roachpb.SpanConfig initialRF int - nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus + nodeStatusOverrides map[state.NodeID]state.NodeStatus // membership, draining nodeLocalities map[state.NodeID]roachpb.Locality ticks []int64 expectedLeaseCounts map[int64]map[int]int @@ -111,8 +110,8 @@ func TestLeaseQueue(t *testing.T) { 1: singleLocality("region", "a"), 2: singleLocality("region", "b"), }, - nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ - 2: livenesspb.NodeLivenessStatus_DRAINING}, + nodeStatusOverrides: map[state.NodeID]state.NodeStatus{ + 2: {Draining: true}}, ticks: []int64{5, 10, 15}, expectedLeaseCounts: map[int64]map[int]int{ 5: {1: 10, 2: 0}, @@ -150,8 +149,8 @@ func TestLeaseQueue(t *testing.T) { ) s.TickClock(start) - for nodeID, livenessStatus := range tc.nonLiveNodes { - s.SetNodeLiveness(nodeID, livenessStatus) + for nodeID, status := range tc.nodeStatusOverrides { + s.SetNodeStatus(nodeID, status) } for nodeID, locality := range tc.nodeLocalities { diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go index 8b92557bef15..f9eecc647498 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go @@ -107,14 +107,15 @@ func TestReplicateQueue(t *testing.T) { } testCases := []struct { - desc string - replicaCounts map[state.StoreID]int - spanConfig roachpb.SpanConfig - initialRF int - nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus - nodeLocalities map[state.NodeID]roachpb.Locality - ticks []int64 - expectedReplCounts map[int64]map[int]int + desc string + replicaCounts map[state.StoreID]int + spanConfig roachpb.SpanConfig + initialRF int + nodeLivenessOverrides map[state.NodeID]state.LivenessState // liveness for all stores + nodeStatusOverrides map[state.NodeID]state.NodeStatus // membership, draining + nodeLocalities map[state.NodeID]roachpb.Locality + ticks []int64 + expectedReplCounts map[int64]map[int]int }{ { // NB: Expect no action, range counts are balanced. @@ -208,8 +209,8 @@ func TestReplicateQueue(t *testing.T) { NumReplicas: 3, NumVoters: 3, }, - nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ - 3: livenesspb.NodeLivenessStatus_DEAD}, + nodeLivenessOverrides: map[state.NodeID]state.LivenessState{ + 3: state.LivenessDead}, ticks: []int64{5, 10, 15}, expectedReplCounts: map[int64]map[int]int{ 5: {1: 1, 2: 1, 3: 1, 4: 0}, @@ -227,8 +228,8 @@ func TestReplicateQueue(t *testing.T) { NumReplicas: 3, NumVoters: 3, }, - nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ - 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING}, + nodeStatusOverrides: map[state.NodeID]state.NodeStatus{ + 3: {Membership: livenesspb.MembershipStatus_DECOMMISSIONING}}, ticks: []int64{5, 10, 15, 20, 25}, expectedReplCounts: map[int64]map[int]int{ 5: {1: 10, 2: 10, 3: 10, 4: 0}, @@ -299,8 +300,11 @@ func TestReplicateQueue(t *testing.T) { ) s.TickClock(start) - for nodeID, livenessStatus := range tc.nonLiveNodes { - s.SetNodeLiveness(nodeID, livenessStatus) + for nodeID, liveness := range tc.nodeLivenessOverrides { + s.SetAllStoresLiveness(nodeID, liveness) + } + for nodeID, status := range tc.nodeStatusOverrides { + s.SetNodeStatus(nodeID, status) } for nodeID, locality := range tc.nodeLocalities { diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index 49d79a382618..87aee1c55e9e 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -46,7 +46,7 @@ type state struct { stores map[StoreID]*store load map[RangeID]ReplicaLoad loadsplits map[StoreID]LoadSplitter - nodeLiveness MockNodeLiveness + statusTracker StatusTracker capacityChangeListeners []CapacityChangeListener newCapacityListeners []NewCapacityListener configChangeListeners []ConfigChangeListener @@ -81,9 +81,11 @@ func newState(settings *config.SimulationSettings) *state { usageInfo: newClusterUsageInfo(), settings: settings, } - s.nodeLiveness = MockNodeLiveness{ - clock: hlc.NewClockForTesting(s.clock), - statusMap: map[NodeID]livenesspb.NodeLivenessStatus{}, + s.statusTracker = StatusTracker{ + clock: hlc.NewClockForTesting(s.clock), + storeStatusMap: map[StoreID]StoreStatus{}, + nodeStatusMap: map[NodeID]NodeStatus{}, + storeToNode: map[StoreID]NodeID{}, } s.load = map[RangeID]ReplicaLoad{FirstRangeID: NewReplicaLoadCounter(s.clock)} @@ -436,7 +438,7 @@ func (s *state) AddNode(nodeCPUCapacity int64, locality roachpb.Locality) Node { as: mmaintegration.NewAllocatorSync(sp, mmAllocator, s.settings.ST, nil), } s.nodes[nodeID] = node - s.SetNodeLiveness(nodeID, livenesspb.NodeLivenessStatus_LIVE) + s.SetNodeStatus(nodeID, NodeStatus{Membership: livenesspb.MembershipStatus_ACTIVE}) s.SetNodeLocality(nodeID, locality) s.SetNodeCPURateCapacity(nodeID, nodeCPUCapacity) return node @@ -587,6 +589,10 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) { node.stores = append(node.stores, storeID) s.stores[storeID] = store + // Register the store with the liveness tracker, associating + // this store with its node. + s.statusTracker.registerStore(storeID, nodeID) + // Add a range load splitter for this store. s.loadsplits[storeID] = NewSplitDecider(s.settings) @@ -1177,10 +1183,45 @@ func (s *state) NextReplicasFn(storeID StoreID) func() []Replica { return nextReplFn } -// SetNodeLiveness sets the liveness status of the node with ID NodeID to be -// the status given. -func (s *state) SetNodeLiveness(nodeID NodeID, status livenesspb.NodeLivenessStatus) { - s.nodeLiveness.statusMap[nodeID] = status +// SetStoreStatus sets the liveness for a store. +func (s *state) SetStoreStatus(storeID StoreID, status StoreStatus) { + // NB: the store->node map entry was created when the store + // was created, so we don't need to create it here. + s.statusTracker.storeStatusMap[storeID] = status +} + +// StoreStatus returns the liveness status for a store. +func (s *state) StoreStatus(storeID StoreID) StoreStatus { + return s.statusTracker.storeStatusMap[storeID] +} + +// SetNodeStatus sets the membership and draining signals for a node. +func (s *state) SetNodeStatus(nodeID NodeID, status NodeStatus) { + s.statusTracker.nodeStatusMap[nodeID] = status +} + +// NodeStatus returns the membership and draining signals for a node. +func (s *state) NodeStatus(nodeID NodeID) NodeStatus { + return s.statusTracker.nodeStatusMap[nodeID] +} + +// SetAllStoresLiveness sets the liveness for all stores on a node at once. +func (s *state) SetAllStoresLiveness(nodeID NodeID, liveness LivenessState) { + node, ok := s.nodes[nodeID] + if !ok { + return + } + for _, storeID := range node.stores { + s.statusTracker.storeStatusMap[storeID] = StoreStatus{Liveness: liveness} + } +} + +// NodeLiveness returns the aggregated liveness for a node, which is +// the "worst" state. In effect, if one store is doing poorly, we +// report this node as doing as poorly. This is needed for the single- +// metric allocator, which thinks about liveness at the node level. +func (s *state) NodeLiveness(nodeID NodeID) LivenessState { + return s.statusTracker.worstLivenessForStoresOnNode(nodeID) } // NodeLivenessFn returns a function, that when called will return the @@ -1188,23 +1229,19 @@ func (s *state) SetNodeLiveness(nodeID NodeID, status livenesspb.NodeLivenessSta // TODO(kvoli): Find a better home for this method, required by the storepool. func (s *state) NodeLivenessFn() storepool.NodeLivenessFunc { return func(nid roachpb.NodeID) livenesspb.NodeLivenessStatus { - return s.nodeLiveness.statusMap[NodeID(nid)] + return s.statusTracker.convertToNodeVitality(NodeID(nid), s.statusTracker.clock.Now()).LivenessStatus() } } // NodeCountFn returns a function, that when called will return the current // number of nodes that exist in this state. // TODO(kvoli): Find a better home for this method, required by the storepool. -// TODO(wenyihu6): introduce the concept of membership separated from the -// liveness map. func (s *state) NodeCountFn() storepool.NodeCountFunc { return func() int { count := 0 - for _, status := range s.nodeLiveness.statusMap { - // Nodes with a liveness status other than decommissioned or - // decommissioning are considered active members (see - // liveness.MembershipStatus). - if status != livenesspb.NodeLivenessStatus_DECOMMISSIONED && status != livenesspb.NodeLivenessStatus_DECOMMISSIONING { + for _, ns := range s.statusTracker.nodeStatusMap { + // Only nodes with ACTIVE membership are counted. + if ns.Membership.Active() { count++ } } @@ -1356,7 +1393,7 @@ func (s *state) Scan( // state of ranges. func (s *state) Report() roachpb.SpanConfigConformanceReport { reporter := spanconfigreporter.New( - s.nodeLiveness, s, s, s, + s.statusTracker, s, s, s, s.settings.ST, &spanconfig.TestingKnobs{}) report, err := reporter.SpanConfigConformance(context.Background(), []roachpb.Span{{}}) if err != nil { diff --git a/pkg/kv/kvserver/asim/state/liveness.go b/pkg/kv/kvserver/asim/state/liveness.go index 218b8d39306d..7450628e0977 100644 --- a/pkg/kv/kvserver/asim/state/liveness.go +++ b/pkg/kv/kvserver/asim/state/liveness.go @@ -14,83 +14,126 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -// MockNodeLiveness is responsible for tracking the liveness status of nodes -// without the need for real heartbeating. Instead, this mock implementation -// manages the statuses of all nodes by manually updating and accessing the -// internal map. It implements the NodeVitalityInterface to enable the -// SpanConfigConformance reporter to call it. Thus, we only expect calls to the -// ScanNodeVitalityFromCache function. -type MockNodeLiveness struct { - clock *hlc.Clock - statusMap map[NodeID]livenesspb.NodeLivenessStatus +// LivenessState represents the liveness of a store. +// They are ordered from best (healthy) to worst. +type LivenessState byte + +const ( + // LivenessLive means the store is alive and healthy. + LivenessLive LivenessState = iota + // LivenessUnavailable means the store is recently down but not yet marked dead. + LivenessUnavailable + // LivenessDead means the store has been down long enough to be considered dead. + LivenessDead +) + +// StoreStatus represents the status of a store in the simulation. For MMA, +// liveness is tracked per-store and to support this, asim does as well. +// Whenever we simulate the single-metric allocator, it sees a node as live if +// all of its stores are live, i.e. we make the assumption that an unhealthy +// store would also render the node unhealthy (to sma). This should generally +// hold in practice (for example, liveness heartbeats synchronously write to all +// engines). +type StoreStatus struct { + Liveness LivenessState } -var _ livenesspb.NodeVitalityInterface = &MockNodeLiveness{} +// NodeStatus represents the status of a node in the simulation. This holds +// signals that are genuinely per-node: membership and draining. +type NodeStatus struct { + Membership livenesspb.MembershipStatus + Draining bool +} -func (m MockNodeLiveness) GetNodeVitalityFromCache(roachpb.NodeID) livenesspb.NodeVitality { - panic("GetNodeVitalityFromCache is not expected to be called on MockNodeLiveness") +// StatusTracker is responsible for tracking the liveness status of stores +// and the membership/draining status of nodes without real heartbeating. +// It implements the NodeVitalityInterface to enable the SpanConfigConformance +// reporter to call it. +// +// Liveness is tracked per-store (for MMA). When SMA needs node-level liveness, +// it is aggregated from the stores: the node takes the "worst" liveness state +// of any of its stores. +type StatusTracker struct { + clock *hlc.Clock + storeStatusMap map[StoreID]StoreStatus + nodeStatusMap map[NodeID]NodeStatus + // storeToNode maps each store to its node for aggregation. + storeToNode map[StoreID]NodeID } -func (m MockNodeLiveness) ScanNodeVitalityFromKV( +var _ livenesspb.NodeVitalityInterface = &StatusTracker{} + +// registerStore registers a new store with its node mapping and initializes it as live. +func (st *StatusTracker) registerStore(storeID StoreID, nodeID NodeID) { + st.storeStatusMap[storeID] = StoreStatus{Liveness: LivenessLive} + st.storeToNode[storeID] = nodeID +} + +func (st StatusTracker) GetNodeVitalityFromCache(roachpb.NodeID) livenesspb.NodeVitality { + panic("GetNodeVitalityFromCache is not expected to be called on StatusTracker") +} + +func (st StatusTracker) ScanNodeVitalityFromKV( context.Context, ) (livenesspb.NodeVitalityMap, error) { - panic("ScanNodeVitalityFromKV is not expected to be called on MockNodeLiveness") + panic("ScanNodeVitalityFromKV is not expected to be called on StatusTracker") +} + +// worstLivenessForStoresOnNode returns the "worst" liveness state across all +// stores on a node. The ordering is: Dead is worse than Unavailable is worse +// than Live. +func (st StatusTracker) worstLivenessForStoresOnNode(nodeID NodeID) LivenessState { + worst := LivenessLive + for storeID, ss := range st.storeStatusMap { + if st.storeToNode[storeID] == nodeID { + worst = max(worst, ss.Liveness) + } + } + return worst } -// convertNodeStatusToNodeVitality constructs a node vitality in a manner that -// respects all node liveness status properties. The node vitality record for -// nodes in different states are set as follows: -// timeUntilNodeDead = time.Minute -// - unknown: invalid NodeVitality -// - live, decommissioning: expirationWallTime = MaxTimeStamp - timeUntilNodeDead -// - unavailable: expirationWallTime = now - time.Second -// - dead, decommissioned: expirationWallTime = 0 +// convertToNodeVitality constructs a NodeVitality for a node by combining +// store-level liveness (aggregated) with node-level membership and draining. // -// Explanation: refer to liveness.LivenessStatus for an overview of the states a -// liveness record goes through. +// This is used by the old allocator (SMA) which operates on nodes. MMA uses +// store-level liveness directly and doesn't need NodeVitality. // -// - live, decommissioning: set liveness expiration to be in the -// future(MaxTimestamp) and minus timeUntilNodeDead to avoid overflow in -// liveness status check tExp+timeUntilNodeDead. -// - unavailable: set liveness expiration to be just recently expired. This -// needs to be within now - timeUntilNodeDead <= expirationWallTime < now so -// that the status is not dead nor alive. -// - dead, decommissioned: set liveness expiration to be in the -// past(MinTimestamp). -func convertNodeStatusToNodeVitality( - nid roachpb.NodeID, status livenesspb.NodeLivenessStatus, now hlc.Timestamp, +// The liveness affects the expiration timestamp: +// - live: expiration far in the future (node passes isAlive checks) +// - unavailable: expiration just recently passed (node is unavailable but not dead) +// - dead: expiration far in the past (node fails isAlive checks) +// +// timeUntilNodeDead is set to 1 minute. +func (st StatusTracker) convertToNodeVitality( + nodeID NodeID, now hlc.Timestamp, ) livenesspb.NodeVitality { const timeUntilNodeDead = time.Minute var liveTs = hlc.MaxTimestamp.AddDuration(-timeUntilNodeDead).ToLegacyTimestamp() - var deadTs = hlc.MinTimestamp.ToLegacyTimestamp() var unavailableTs = now.AddDuration(-time.Second).ToLegacyTimestamp() + var deadTs = hlc.MinTimestamp.ToLegacyTimestamp() + + ns := st.nodeStatusMap[nodeID] + liveness := st.worstLivenessForStoresOnNode(nodeID) + + nid := roachpb.NodeID(nodeID) l := livenesspb.Liveness{ NodeID: nid, - Draining: false, Epoch: 1, - Membership: livenesspb.MembershipStatus_ACTIVE, + Membership: ns.Membership, + Draining: ns.Draining, } - switch status { - case livenesspb.NodeLivenessStatus_UNKNOWN: - return livenesspb.NodeVitality{} - case livenesspb.NodeLivenessStatus_DEAD: - l.Expiration = deadTs - case livenesspb.NodeLivenessStatus_UNAVAILABLE: - l.Expiration = unavailableTs - case livenesspb.NodeLivenessStatus_LIVE: - l.Expiration = liveTs - case livenesspb.NodeLivenessStatus_DECOMMISSIONING: + + switch liveness { + case LivenessLive: l.Expiration = liveTs - l.Membership = livenesspb.MembershipStatus_DECOMMISSIONING - case livenesspb.NodeLivenessStatus_DECOMMISSIONED: + case LivenessUnavailable: + l.Expiration = unavailableTs + case LivenessDead: l.Expiration = deadTs - l.Membership = livenesspb.MembershipStatus_DECOMMISSIONED - case livenesspb.NodeLivenessStatus_DRAINING: - l.Expiration = liveTs - l.Draining = true } + ncs := livenesspb.NewNodeConnectionStatus(nid, nil) - ncs.SetIsConnected(true) + ncs.SetIsConnected(liveness == LivenessLive) entry := l.CreateNodeVitality( now, /* now */ hlc.Timestamp{}, /* descUpdateTime */ @@ -102,12 +145,12 @@ func convertNodeStatusToNodeVitality( return entry } -func (m MockNodeLiveness) ScanNodeVitalityFromCache() livenesspb.NodeVitalityMap { +func (st StatusTracker) ScanNodeVitalityFromCache() livenesspb.NodeVitalityMap { isLiveMap := livenesspb.NodeVitalityMap{} - for nodeID, status := range m.statusMap { + now := st.clock.Now() + for nodeID := range st.nodeStatusMap { nid := roachpb.NodeID(nodeID) - now := m.clock.Now() - isLiveMap[nid] = convertNodeStatusToNodeVitality(nid, status, now) + isLiveMap[nid] = st.convertToNodeVitality(nodeID, now) } return isLiveMap } diff --git a/pkg/kv/kvserver/asim/state/liveness_test.go b/pkg/kv/kvserver/asim/state/liveness_test.go index 153a61777680..c2ef6a6c49a3 100644 --- a/pkg/kv/kvserver/asim/state/liveness_test.go +++ b/pkg/kv/kvserver/asim/state/liveness_test.go @@ -10,61 +10,92 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/stretchr/testify/require" ) -func TestMockLiveness(t *testing.T) { +// TestStatusTrackerConversion verifies that StatusTracker correctly converts +// asim's internal status representation (store-level liveness + node-level +// membership/draining) into NodeVitality objects used by the single-metric +// allocator. It checks that the resulting LivenessStatus and IsLive values +// match expectations for various combinations of liveness, membership, and +// draining states. +func TestStatusTrackerConversion(t *testing.T) { clock := &ManualSimClock{nanos: time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC).UnixNano()} - now := hlc.NewClockForTesting(clock).Now() + hlcClock := hlc.NewClockForTesting(clock) + testCases := []struct { - status livenesspb.NodeLivenessStatus - IsAlive bool - MembershipStatus livenesspb.MembershipStatus + desc string + storeLiveness LivenessState + membership livenesspb.MembershipStatus + draining bool + expectIsAlive bool + expectedStatus livenesspb.NodeLivenessStatus }{ { - status: livenesspb.NodeLivenessStatus_UNKNOWN, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "live active", + storeLiveness: LivenessLive, + membership: livenesspb.MembershipStatus_ACTIVE, + expectIsAlive: true, + expectedStatus: livenesspb.NodeLivenessStatus_LIVE, }, { - status: livenesspb.NodeLivenessStatus_DEAD, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "live draining", + storeLiveness: LivenessLive, + membership: livenesspb.MembershipStatus_ACTIVE, + draining: true, + expectIsAlive: true, + expectedStatus: livenesspb.NodeLivenessStatus_DRAINING, }, { - status: livenesspb.NodeLivenessStatus_UNAVAILABLE, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "live decommissioning", + storeLiveness: LivenessLive, + membership: livenesspb.MembershipStatus_DECOMMISSIONING, + expectIsAlive: true, + expectedStatus: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, { - status: livenesspb.NodeLivenessStatus_DECOMMISSIONED, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_DECOMMISSIONED, + desc: "unavailable active", + storeLiveness: LivenessUnavailable, + membership: livenesspb.MembershipStatus_ACTIVE, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, { - status: livenesspb.NodeLivenessStatus_DECOMMISSIONING, - IsAlive: true, - MembershipStatus: livenesspb.MembershipStatus_DECOMMISSIONING, + desc: "dead active", + storeLiveness: LivenessDead, + membership: livenesspb.MembershipStatus_ACTIVE, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_DEAD, }, { - status: livenesspb.NodeLivenessStatus_LIVE, - IsAlive: true, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "dead decommissioned", + storeLiveness: LivenessDead, + membership: livenesspb.MembershipStatus_DECOMMISSIONED, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, { - status: livenesspb.NodeLivenessStatus_DRAINING, - IsAlive: true, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "dead decommissioning (returns DECOMMISSIONED)", + storeLiveness: LivenessDead, + membership: livenesspb.MembershipStatus_DECOMMISSIONING, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, } for _, tc := range testCases { - t.Run(tc.status.String(), func(t *testing.T) { - nv := convertNodeStatusToNodeVitality(roachpb.NodeID(1), tc.status, now) - require.Equal(t, tc.IsAlive, nv.IsLive(livenesspb.Rebalance)) - require.Equal(t, tc.MembershipStatus, nv.MembershipStatus()) - require.Equal(t, tc.status, nv.LivenessStatus()) + t.Run(tc.desc, func(t *testing.T) { + // Create a StatusTracker with one node and one store. + m := StatusTracker{ + clock: hlcClock, + storeStatusMap: map[StoreID]StoreStatus{1: {Liveness: tc.storeLiveness}}, + nodeStatusMap: map[NodeID]NodeStatus{1: {Membership: tc.membership, Draining: tc.draining}}, + storeToNode: map[StoreID]NodeID{1: 1}, + } + nv := m.convertToNodeVitality(1, hlcClock.Now()) + require.Equal(t, tc.expectIsAlive, nv.IsLive(livenesspb.Rebalance)) + require.Equal(t, tc.membership, nv.MembershipStatus()) + require.Equal(t, tc.expectedStatus, nv.LivenessStatus()) }) } } diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index b6f9a2a7b26f..baae04f26928 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -163,11 +162,21 @@ type State interface { // NextReplicasFn returns a function, that when called will return the current // replicas that exist on the store. NextReplicasFn(StoreID) func() []Replica - // SetNodeLiveness sets the liveness status of the node with ID NodeID to be - // the status given. - SetNodeLiveness(NodeID, livenesspb.NodeLivenessStatus) + // SetStoreStatus sets the liveness for a store directly. + SetStoreStatus(storeID StoreID, status StoreStatus) + // StoreStatus returns the liveness status for a store. + StoreStatus(StoreID) StoreStatus + // SetNodeStatus sets the membership and draining signals for a node. + SetNodeStatus(nodeID NodeID, status NodeStatus) + // NodeStatus returns the membership and draining signals for a node. + NodeStatus(NodeID) NodeStatus + // SetAllStoresLiveness sets the liveness for all stores on a node at once. + // This is useful for DSL commands that operate at the node level. + SetAllStoresLiveness(nodeID NodeID, liveness LivenessState) // NodeLivenessFn returns a function, that when called will return the // liveness of the Node with ID NodeID. + // This is used by the store pool, which is used only by the single-metric + // allocator (not mma). // TODO(kvoli): Find a better home for this method, required by the // storepool. NodeLivenessFn() storepool.NodeLivenessFunc diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index ce77b6441207..628a4255a5fe 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -664,7 +664,11 @@ func TestSetSpanConfig(t *testing.T) { } } -func TestSetNodeLiveness(t *testing.T) { +// TestNodeAndStoreStatus verifies the status tracking APIs: NodeLivenessFn +// (combining store liveness with node membership), NodeCountFn (counting only +// ACTIVE members), and store-level liveness aggregation (node takes the worst +// liveness of its stores). +func TestNodeAndStoreStatus(t *testing.T) { t.Run("liveness func", func(t *testing.T) { s := LoadClusterInfo( ClusterInfoWithStoreCount(3, 1), @@ -673,12 +677,14 @@ func TestSetNodeLiveness(t *testing.T) { liveFn := s.NodeLivenessFn() - s.SetNodeLiveness(1, livenesspb.NodeLivenessStatus_LIVE) - s.SetNodeLiveness(2, livenesspb.NodeLivenessStatus_DEAD) - s.SetNodeLiveness(3, livenesspb.NodeLivenessStatus_DECOMMISSIONED) + // Set node liveness via SetAllStoresLiveness (sets all stores on the node). + // Node 1: live, Node 2: dead, Node 3: dead + decommissioned. + s.SetAllStoresLiveness(1, LivenessLive) + s.SetAllStoresLiveness(2, LivenessDead) + s.SetAllStoresLiveness(3, LivenessDead) + s.SetNodeStatus(3, NodeStatus{Membership: livenesspb.MembershipStatus_DECOMMISSIONED}) - // Liveness status returend should ignore time till store dead or the - // timestamp given. + // Liveness status returned should combine store liveness and node membership. require.Equal(t, livenesspb.NodeLivenessStatus_LIVE, liveFn(1)) require.Equal(t, livenesspb.NodeLivenessStatus_DEAD, liveFn(2)) require.Equal(t, livenesspb.NodeLivenessStatus_DECOMMISSIONED, liveFn(3)) @@ -693,15 +699,32 @@ func TestSetNodeLiveness(t *testing.T) { countFn := s.NodeCountFn() // Set node 1-5 as decommissioned and nodes 6-10 as dead. There should be a - // node count of 5. + // node count of 5 (dead nodes with ACTIVE membership are still counted). for i := 1; i <= 5; i++ { - s.SetNodeLiveness(NodeID(i), livenesspb.NodeLivenessStatus_DECOMMISSIONED) - } - for i := 6; i <= 10; i++ { - s.SetNodeLiveness(NodeID(i), livenesspb.NodeLivenessStatus_DEAD) + s.SetNodeStatus(NodeID(i), NodeStatus{Membership: livenesspb.MembershipStatus_DECOMMISSIONED}) } + // Nodes 6-10 are already dead by default if not set, but we can set them + // explicitly. Their membership remains ACTIVE by default. require.Equal(t, 5, countFn()) }) + + t.Run("store-level liveness", func(t *testing.T) { + s := LoadClusterInfo( + ClusterInfoWithStoreCount(1, 3), // 1 node with 3 stores + config.DefaultSimulationSettings(), + ).(*state) + + // All stores start live. + require.Equal(t, LivenessLive, s.NodeLiveness(1)) + + // Set one store to unavailable - node becomes unavailable. + s.SetStoreStatus(2, StoreStatus{Liveness: LivenessUnavailable}) + require.Equal(t, LivenessUnavailable, s.NodeLiveness(1)) + + // Set another store to dead - node becomes dead. + s.SetStoreStatus(3, StoreStatus{Liveness: LivenessDead}) + require.Equal(t, LivenessDead, s.NodeLiveness(1)) + }) } // TestTopology loads cluster configurations and checks that the topology diff --git a/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go b/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go index 74f72c962761..c53a35b5c565 100644 --- a/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go +++ b/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go @@ -89,11 +89,13 @@ var runAsimTests = envutil.EnvOrDefaultBool("COCKROACH_RUN_ASIM_TESTS", false) // an extra line should follow with the replica placement. A example of // the replica placement is: {s1:*,s2,s3:NON_VOTER}:1 {s4:*,s5,s6}:1. // -// - set_liveness node= liveness=(livenesspb.NodeLivenessStatus) [delay=] -// status=(dead|decommisssioning|draining|unavailable) -// Set the liveness status of the node with ID NodeID. This applies at the -// start of the simulation or with some delay after the simulation starts, -// if specified. +// - set_status store= [delay=] liveness=(live|unavailable|dead) +// - set_status node= [delay=] [liveness=(live|unavailable|dead)] +// [membership=(active|decommissioning|decommissioned)] [draining=] +// Set status signals for stores or nodes. The store= form sets liveness for +// a single store. The node= form can set liveness for all stores on a node +// and/or set membership/draining (which are per-node). Defaults for node=: +// membership=active, draining=false. // // - set_locality node= [delay= or node=") + } return "" case "set_locality": var nodeID int diff --git a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt index f62a5671a650..7f42b0bb416e 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt +++ b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt @@ -20,7 +20,7 @@ set_span_config delay=5m [0,10000): num_replicas=3 constraints={'+region=a':1,'+region=b':1,'+region=c':1} ---- -set_liveness node=4 liveness=decommissioning delay=5m +set_status node=4 membership=decommissioning delay=5m ---- assertion type=conformance under=0 over=0 unavailable=0 violating=0 diff --git a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt index 9b250376638e..e57be816784d 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt +++ b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt @@ -14,14 +14,13 @@ gen_cluster nodes=5 gen_ranges ranges=100 repl_factor=5 min_key=1 max_key=10000 ---- -# Mark n4 and n5 as NodeLivenessStatus_UNAVAILABLE, which is the status -# stores have when down but not down for long enough to be marked as dead. -# The range doesn't lose quorum as a result of this, since three replicas -# are still around. -set_liveness node=4 liveness=unavailable +# Mark n4 and n5 as unavailable, which is the status stores have when down +# but not down for long enough to be marked as dead. The range doesn't lose +# quorum as a result of this, since three replicas are still around. +set_status node=4 liveness=unavailable ---- -set_liveness node=5 liveness=unavailable +set_status node=5 liveness=unavailable ---- # Trigger down-replication to three replicas. diff --git a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt index 8e9945ce02dc..9fcb557ab9b3 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt +++ b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt @@ -17,12 +17,12 @@ gen_ranges ranges=700 # n7 is dead and remains dead forever. It will still have its initial (3000) # replicas. -set_liveness node=7 liveness=dead +set_status node=7 liveness=dead ---- # n6 becomes decommissioning after 3 minutes and remains decommissioning # thereafter. -set_liveness node=6 liveness=decommissioning delay=3m +set_status node=6 membership=decommissioning delay=3m ---- # The number of replicas on the dead and decommissioning stores should be 0,