diff --git a/pkg/kv/kvserver/asim/event/BUILD.bazel b/pkg/kv/kvserver/asim/event/BUILD.bazel index 6122f4fc3178..b9ce8c93472f 100644 --- a/pkg/kv/kvserver/asim/event/BUILD.bazel +++ b/pkg/kv/kvserver/asim/event/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/history", "//pkg/kv/kvserver/asim/state", - "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/asim/event/event.go b/pkg/kv/kvserver/asim/event/event.go index fd8dd6220610..d92b30eafc38 100644 --- a/pkg/kv/kvserver/asim/event/event.go +++ b/pkg/kv/kvserver/asim/event/event.go @@ -16,7 +16,7 @@ import ( // Event outlines the necessary behaviours that event structs must implement. // Some key implementations of the interface includes assertionEvent, -// SetSpanConfigEvent, AddNodeEvent, SetNodeLivenessEvent, +// SetSpanConfigEvent, AddNodeEvent, SetNodeStatusEvent, // SetCapacityOverrideEvent. type Event interface { // Func returns a closure associated with event which could be an assertion diff --git a/pkg/kv/kvserver/asim/event/mutation_event.go b/pkg/kv/kvserver/asim/event/mutation_event.go index ec0b7806c9fe..d4ac8cfd6f42 100644 --- a/pkg/kv/kvserver/asim/event/mutation_event.go +++ b/pkg/kv/kvserver/asim/event/mutation_event.go @@ -12,7 +12,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -31,12 +30,29 @@ type AddNodeEvent struct { LocalityString string } -// SetNodeLivenessEvent represents a mutation event responsible for setting -// liveness status of a node identified by the NodeID to the specified -// LivenessStatus. +// SetStoreStatusEvent represents a mutation event for setting the liveness +// of a single store. +type SetStoreStatusEvent struct { + StoreID state.StoreID + Status state.StoreStatus +} + +// SetNodeStatusEvent represents a mutation event for setting the membership +// and draining signals of a node. +type SetNodeStatusEvent struct { + NodeID state.NodeID + Status state.NodeStatus +} + +// SetNodeLivenessEvent represents a mutation event for setting the liveness +// of all stores on a node at once. This exists because DSL commands are parsed +// before the simulation runs, at which point we don't yet know which stores +// exist on each node - that's determined when the cluster is built. So we +// can't expand "node=X liveness=Y" into individual store events at parse time; +// we must defer to runtime when the state knows the node's stores. type SetNodeLivenessEvent struct { - NodeId state.NodeID - LivenessStatus livenesspb.NodeLivenessStatus + NodeID state.NodeID + Liveness state.LivenessState } // SetCapacityOverrideEvent represents a mutation event responsible for updating @@ -63,6 +79,8 @@ type SetSimulationSettingsEvent struct { var _ Event = &SetSpanConfigEvent{} var _ Event = &AddNodeEvent{} +var _ Event = &SetStoreStatusEvent{} +var _ Event = &SetNodeStatusEvent{} var _ Event = &SetNodeLivenessEvent{} var _ Event = &SetCapacityOverrideEvent{} var _ Event = &SetNodeLocalityEvent{} @@ -149,17 +167,35 @@ func (ae AddNodeEvent) String() string { return buf.String() } -func (sne SetNodeLivenessEvent) Func() EventFunc { +func (ssse SetStoreStatusEvent) Func() EventFunc { + return MutationFunc(func(ctx context.Context, s state.State) { + s.SetStoreStatus(ssse.StoreID, ssse.Status) + }) +} + +func (ssse SetStoreStatusEvent) String() string { + return fmt.Sprintf("set s%d liveness=%v", ssse.StoreID, ssse.Status.Liveness) +} + +func (snse SetNodeStatusEvent) Func() EventFunc { + return MutationFunc(func(ctx context.Context, s state.State) { + s.SetNodeStatus(snse.NodeID, snse.Status) + }) +} + +func (snse SetNodeStatusEvent) String() string { + return fmt.Sprintf("set n%d membership=%v, draining=%t", + snse.NodeID, snse.Status.Membership, snse.Status.Draining) +} + +func (snle SetNodeLivenessEvent) Func() EventFunc { return MutationFunc(func(ctx context.Context, s state.State) { - s.SetNodeLiveness( - sne.NodeId, - sne.LivenessStatus, - ) + s.SetAllStoresLiveness(snle.NodeID, snle.Liveness) }) } -func (sne SetNodeLivenessEvent) String() string { - return fmt.Sprintf("set n%d to %v", sne.NodeId, sne.LivenessStatus) +func (snle SetNodeLivenessEvent) String() string { + return fmt.Sprintf("set n%d liveness=%v (all stores)", snle.NodeID, snle.Liveness) } func (sce SetCapacityOverrideEvent) Func() EventFunc { diff --git a/pkg/kv/kvserver/asim/queue/lease_queue_test.go b/pkg/kv/kvserver/asim/queue/lease_queue_test.go index 0bd99f1d3cfd..b43bad0bc05b 100644 --- a/pkg/kv/kvserver/asim/queue/lease_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/lease_queue_test.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) @@ -50,7 +49,7 @@ func TestLeaseQueue(t *testing.T) { replicaCounts map[state.StoreID]int spanConfig roachpb.SpanConfig initialRF int - nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus + nodeStatusOverrides map[state.NodeID]state.NodeStatus // membership, draining nodeLocalities map[state.NodeID]roachpb.Locality ticks []int64 expectedLeaseCounts map[int64]map[int]int @@ -111,8 +110,8 @@ func TestLeaseQueue(t *testing.T) { 1: singleLocality("region", "a"), 2: singleLocality("region", "b"), }, - nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ - 2: livenesspb.NodeLivenessStatus_DRAINING}, + nodeStatusOverrides: map[state.NodeID]state.NodeStatus{ + 2: {Draining: true}}, ticks: []int64{5, 10, 15}, expectedLeaseCounts: map[int64]map[int]int{ 5: {1: 10, 2: 0}, @@ -150,8 +149,8 @@ func TestLeaseQueue(t *testing.T) { ) s.TickClock(start) - for nodeID, livenessStatus := range tc.nonLiveNodes { - s.SetNodeLiveness(nodeID, livenessStatus) + for nodeID, status := range tc.nodeStatusOverrides { + s.SetNodeStatus(nodeID, status) } for nodeID, locality := range tc.nodeLocalities { diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go index 8b92557bef15..f9eecc647498 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go @@ -107,14 +107,15 @@ func TestReplicateQueue(t *testing.T) { } testCases := []struct { - desc string - replicaCounts map[state.StoreID]int - spanConfig roachpb.SpanConfig - initialRF int - nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus - nodeLocalities map[state.NodeID]roachpb.Locality - ticks []int64 - expectedReplCounts map[int64]map[int]int + desc string + replicaCounts map[state.StoreID]int + spanConfig roachpb.SpanConfig + initialRF int + nodeLivenessOverrides map[state.NodeID]state.LivenessState // liveness for all stores + nodeStatusOverrides map[state.NodeID]state.NodeStatus // membership, draining + nodeLocalities map[state.NodeID]roachpb.Locality + ticks []int64 + expectedReplCounts map[int64]map[int]int }{ { // NB: Expect no action, range counts are balanced. @@ -208,8 +209,8 @@ func TestReplicateQueue(t *testing.T) { NumReplicas: 3, NumVoters: 3, }, - nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ - 3: livenesspb.NodeLivenessStatus_DEAD}, + nodeLivenessOverrides: map[state.NodeID]state.LivenessState{ + 3: state.LivenessDead}, ticks: []int64{5, 10, 15}, expectedReplCounts: map[int64]map[int]int{ 5: {1: 1, 2: 1, 3: 1, 4: 0}, @@ -227,8 +228,8 @@ func TestReplicateQueue(t *testing.T) { NumReplicas: 3, NumVoters: 3, }, - nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{ - 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING}, + nodeStatusOverrides: map[state.NodeID]state.NodeStatus{ + 3: {Membership: livenesspb.MembershipStatus_DECOMMISSIONING}}, ticks: []int64{5, 10, 15, 20, 25}, expectedReplCounts: map[int64]map[int]int{ 5: {1: 10, 2: 10, 3: 10, 4: 0}, @@ -299,8 +300,11 @@ func TestReplicateQueue(t *testing.T) { ) s.TickClock(start) - for nodeID, livenessStatus := range tc.nonLiveNodes { - s.SetNodeLiveness(nodeID, livenessStatus) + for nodeID, liveness := range tc.nodeLivenessOverrides { + s.SetAllStoresLiveness(nodeID, liveness) + } + for nodeID, status := range tc.nodeStatusOverrides { + s.SetNodeStatus(nodeID, status) } for nodeID, locality := range tc.nodeLocalities { diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index 49d79a382618..87aee1c55e9e 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -46,7 +46,7 @@ type state struct { stores map[StoreID]*store load map[RangeID]ReplicaLoad loadsplits map[StoreID]LoadSplitter - nodeLiveness MockNodeLiveness + statusTracker StatusTracker capacityChangeListeners []CapacityChangeListener newCapacityListeners []NewCapacityListener configChangeListeners []ConfigChangeListener @@ -81,9 +81,11 @@ func newState(settings *config.SimulationSettings) *state { usageInfo: newClusterUsageInfo(), settings: settings, } - s.nodeLiveness = MockNodeLiveness{ - clock: hlc.NewClockForTesting(s.clock), - statusMap: map[NodeID]livenesspb.NodeLivenessStatus{}, + s.statusTracker = StatusTracker{ + clock: hlc.NewClockForTesting(s.clock), + storeStatusMap: map[StoreID]StoreStatus{}, + nodeStatusMap: map[NodeID]NodeStatus{}, + storeToNode: map[StoreID]NodeID{}, } s.load = map[RangeID]ReplicaLoad{FirstRangeID: NewReplicaLoadCounter(s.clock)} @@ -436,7 +438,7 @@ func (s *state) AddNode(nodeCPUCapacity int64, locality roachpb.Locality) Node { as: mmaintegration.NewAllocatorSync(sp, mmAllocator, s.settings.ST, nil), } s.nodes[nodeID] = node - s.SetNodeLiveness(nodeID, livenesspb.NodeLivenessStatus_LIVE) + s.SetNodeStatus(nodeID, NodeStatus{Membership: livenesspb.MembershipStatus_ACTIVE}) s.SetNodeLocality(nodeID, locality) s.SetNodeCPURateCapacity(nodeID, nodeCPUCapacity) return node @@ -587,6 +589,10 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) { node.stores = append(node.stores, storeID) s.stores[storeID] = store + // Register the store with the liveness tracker, associating + // this store with its node. + s.statusTracker.registerStore(storeID, nodeID) + // Add a range load splitter for this store. s.loadsplits[storeID] = NewSplitDecider(s.settings) @@ -1177,10 +1183,45 @@ func (s *state) NextReplicasFn(storeID StoreID) func() []Replica { return nextReplFn } -// SetNodeLiveness sets the liveness status of the node with ID NodeID to be -// the status given. -func (s *state) SetNodeLiveness(nodeID NodeID, status livenesspb.NodeLivenessStatus) { - s.nodeLiveness.statusMap[nodeID] = status +// SetStoreStatus sets the liveness for a store. +func (s *state) SetStoreStatus(storeID StoreID, status StoreStatus) { + // NB: the store->node map entry was created when the store + // was created, so we don't need to create it here. + s.statusTracker.storeStatusMap[storeID] = status +} + +// StoreStatus returns the liveness status for a store. +func (s *state) StoreStatus(storeID StoreID) StoreStatus { + return s.statusTracker.storeStatusMap[storeID] +} + +// SetNodeStatus sets the membership and draining signals for a node. +func (s *state) SetNodeStatus(nodeID NodeID, status NodeStatus) { + s.statusTracker.nodeStatusMap[nodeID] = status +} + +// NodeStatus returns the membership and draining signals for a node. +func (s *state) NodeStatus(nodeID NodeID) NodeStatus { + return s.statusTracker.nodeStatusMap[nodeID] +} + +// SetAllStoresLiveness sets the liveness for all stores on a node at once. +func (s *state) SetAllStoresLiveness(nodeID NodeID, liveness LivenessState) { + node, ok := s.nodes[nodeID] + if !ok { + return + } + for _, storeID := range node.stores { + s.statusTracker.storeStatusMap[storeID] = StoreStatus{Liveness: liveness} + } +} + +// NodeLiveness returns the aggregated liveness for a node, which is +// the "worst" state. In effect, if one store is doing poorly, we +// report this node as doing as poorly. This is needed for the single- +// metric allocator, which thinks about liveness at the node level. +func (s *state) NodeLiveness(nodeID NodeID) LivenessState { + return s.statusTracker.worstLivenessForStoresOnNode(nodeID) } // NodeLivenessFn returns a function, that when called will return the @@ -1188,23 +1229,19 @@ func (s *state) SetNodeLiveness(nodeID NodeID, status livenesspb.NodeLivenessSta // TODO(kvoli): Find a better home for this method, required by the storepool. func (s *state) NodeLivenessFn() storepool.NodeLivenessFunc { return func(nid roachpb.NodeID) livenesspb.NodeLivenessStatus { - return s.nodeLiveness.statusMap[NodeID(nid)] + return s.statusTracker.convertToNodeVitality(NodeID(nid), s.statusTracker.clock.Now()).LivenessStatus() } } // NodeCountFn returns a function, that when called will return the current // number of nodes that exist in this state. // TODO(kvoli): Find a better home for this method, required by the storepool. -// TODO(wenyihu6): introduce the concept of membership separated from the -// liveness map. func (s *state) NodeCountFn() storepool.NodeCountFunc { return func() int { count := 0 - for _, status := range s.nodeLiveness.statusMap { - // Nodes with a liveness status other than decommissioned or - // decommissioning are considered active members (see - // liveness.MembershipStatus). - if status != livenesspb.NodeLivenessStatus_DECOMMISSIONED && status != livenesspb.NodeLivenessStatus_DECOMMISSIONING { + for _, ns := range s.statusTracker.nodeStatusMap { + // Only nodes with ACTIVE membership are counted. + if ns.Membership.Active() { count++ } } @@ -1356,7 +1393,7 @@ func (s *state) Scan( // state of ranges. func (s *state) Report() roachpb.SpanConfigConformanceReport { reporter := spanconfigreporter.New( - s.nodeLiveness, s, s, s, + s.statusTracker, s, s, s, s.settings.ST, &spanconfig.TestingKnobs{}) report, err := reporter.SpanConfigConformance(context.Background(), []roachpb.Span{{}}) if err != nil { diff --git a/pkg/kv/kvserver/asim/state/liveness.go b/pkg/kv/kvserver/asim/state/liveness.go index 218b8d39306d..7450628e0977 100644 --- a/pkg/kv/kvserver/asim/state/liveness.go +++ b/pkg/kv/kvserver/asim/state/liveness.go @@ -14,83 +14,126 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) -// MockNodeLiveness is responsible for tracking the liveness status of nodes -// without the need for real heartbeating. Instead, this mock implementation -// manages the statuses of all nodes by manually updating and accessing the -// internal map. It implements the NodeVitalityInterface to enable the -// SpanConfigConformance reporter to call it. Thus, we only expect calls to the -// ScanNodeVitalityFromCache function. -type MockNodeLiveness struct { - clock *hlc.Clock - statusMap map[NodeID]livenesspb.NodeLivenessStatus +// LivenessState represents the liveness of a store. +// They are ordered from best (healthy) to worst. +type LivenessState byte + +const ( + // LivenessLive means the store is alive and healthy. + LivenessLive LivenessState = iota + // LivenessUnavailable means the store is recently down but not yet marked dead. + LivenessUnavailable + // LivenessDead means the store has been down long enough to be considered dead. + LivenessDead +) + +// StoreStatus represents the status of a store in the simulation. For MMA, +// liveness is tracked per-store and to support this, asim does as well. +// Whenever we simulate the single-metric allocator, it sees a node as live if +// all of its stores are live, i.e. we make the assumption that an unhealthy +// store would also render the node unhealthy (to sma). This should generally +// hold in practice (for example, liveness heartbeats synchronously write to all +// engines). +type StoreStatus struct { + Liveness LivenessState } -var _ livenesspb.NodeVitalityInterface = &MockNodeLiveness{} +// NodeStatus represents the status of a node in the simulation. This holds +// signals that are genuinely per-node: membership and draining. +type NodeStatus struct { + Membership livenesspb.MembershipStatus + Draining bool +} -func (m MockNodeLiveness) GetNodeVitalityFromCache(roachpb.NodeID) livenesspb.NodeVitality { - panic("GetNodeVitalityFromCache is not expected to be called on MockNodeLiveness") +// StatusTracker is responsible for tracking the liveness status of stores +// and the membership/draining status of nodes without real heartbeating. +// It implements the NodeVitalityInterface to enable the SpanConfigConformance +// reporter to call it. +// +// Liveness is tracked per-store (for MMA). When SMA needs node-level liveness, +// it is aggregated from the stores: the node takes the "worst" liveness state +// of any of its stores. +type StatusTracker struct { + clock *hlc.Clock + storeStatusMap map[StoreID]StoreStatus + nodeStatusMap map[NodeID]NodeStatus + // storeToNode maps each store to its node for aggregation. + storeToNode map[StoreID]NodeID } -func (m MockNodeLiveness) ScanNodeVitalityFromKV( +var _ livenesspb.NodeVitalityInterface = &StatusTracker{} + +// registerStore registers a new store with its node mapping and initializes it as live. +func (st *StatusTracker) registerStore(storeID StoreID, nodeID NodeID) { + st.storeStatusMap[storeID] = StoreStatus{Liveness: LivenessLive} + st.storeToNode[storeID] = nodeID +} + +func (st StatusTracker) GetNodeVitalityFromCache(roachpb.NodeID) livenesspb.NodeVitality { + panic("GetNodeVitalityFromCache is not expected to be called on StatusTracker") +} + +func (st StatusTracker) ScanNodeVitalityFromKV( context.Context, ) (livenesspb.NodeVitalityMap, error) { - panic("ScanNodeVitalityFromKV is not expected to be called on MockNodeLiveness") + panic("ScanNodeVitalityFromKV is not expected to be called on StatusTracker") +} + +// worstLivenessForStoresOnNode returns the "worst" liveness state across all +// stores on a node. The ordering is: Dead is worse than Unavailable is worse +// than Live. +func (st StatusTracker) worstLivenessForStoresOnNode(nodeID NodeID) LivenessState { + worst := LivenessLive + for storeID, ss := range st.storeStatusMap { + if st.storeToNode[storeID] == nodeID { + worst = max(worst, ss.Liveness) + } + } + return worst } -// convertNodeStatusToNodeVitality constructs a node vitality in a manner that -// respects all node liveness status properties. The node vitality record for -// nodes in different states are set as follows: -// timeUntilNodeDead = time.Minute -// - unknown: invalid NodeVitality -// - live, decommissioning: expirationWallTime = MaxTimeStamp - timeUntilNodeDead -// - unavailable: expirationWallTime = now - time.Second -// - dead, decommissioned: expirationWallTime = 0 +// convertToNodeVitality constructs a NodeVitality for a node by combining +// store-level liveness (aggregated) with node-level membership and draining. // -// Explanation: refer to liveness.LivenessStatus for an overview of the states a -// liveness record goes through. +// This is used by the old allocator (SMA) which operates on nodes. MMA uses +// store-level liveness directly and doesn't need NodeVitality. // -// - live, decommissioning: set liveness expiration to be in the -// future(MaxTimestamp) and minus timeUntilNodeDead to avoid overflow in -// liveness status check tExp+timeUntilNodeDead. -// - unavailable: set liveness expiration to be just recently expired. This -// needs to be within now - timeUntilNodeDead <= expirationWallTime < now so -// that the status is not dead nor alive. -// - dead, decommissioned: set liveness expiration to be in the -// past(MinTimestamp). -func convertNodeStatusToNodeVitality( - nid roachpb.NodeID, status livenesspb.NodeLivenessStatus, now hlc.Timestamp, +// The liveness affects the expiration timestamp: +// - live: expiration far in the future (node passes isAlive checks) +// - unavailable: expiration just recently passed (node is unavailable but not dead) +// - dead: expiration far in the past (node fails isAlive checks) +// +// timeUntilNodeDead is set to 1 minute. +func (st StatusTracker) convertToNodeVitality( + nodeID NodeID, now hlc.Timestamp, ) livenesspb.NodeVitality { const timeUntilNodeDead = time.Minute var liveTs = hlc.MaxTimestamp.AddDuration(-timeUntilNodeDead).ToLegacyTimestamp() - var deadTs = hlc.MinTimestamp.ToLegacyTimestamp() var unavailableTs = now.AddDuration(-time.Second).ToLegacyTimestamp() + var deadTs = hlc.MinTimestamp.ToLegacyTimestamp() + + ns := st.nodeStatusMap[nodeID] + liveness := st.worstLivenessForStoresOnNode(nodeID) + + nid := roachpb.NodeID(nodeID) l := livenesspb.Liveness{ NodeID: nid, - Draining: false, Epoch: 1, - Membership: livenesspb.MembershipStatus_ACTIVE, + Membership: ns.Membership, + Draining: ns.Draining, } - switch status { - case livenesspb.NodeLivenessStatus_UNKNOWN: - return livenesspb.NodeVitality{} - case livenesspb.NodeLivenessStatus_DEAD: - l.Expiration = deadTs - case livenesspb.NodeLivenessStatus_UNAVAILABLE: - l.Expiration = unavailableTs - case livenesspb.NodeLivenessStatus_LIVE: - l.Expiration = liveTs - case livenesspb.NodeLivenessStatus_DECOMMISSIONING: + + switch liveness { + case LivenessLive: l.Expiration = liveTs - l.Membership = livenesspb.MembershipStatus_DECOMMISSIONING - case livenesspb.NodeLivenessStatus_DECOMMISSIONED: + case LivenessUnavailable: + l.Expiration = unavailableTs + case LivenessDead: l.Expiration = deadTs - l.Membership = livenesspb.MembershipStatus_DECOMMISSIONED - case livenesspb.NodeLivenessStatus_DRAINING: - l.Expiration = liveTs - l.Draining = true } + ncs := livenesspb.NewNodeConnectionStatus(nid, nil) - ncs.SetIsConnected(true) + ncs.SetIsConnected(liveness == LivenessLive) entry := l.CreateNodeVitality( now, /* now */ hlc.Timestamp{}, /* descUpdateTime */ @@ -102,12 +145,12 @@ func convertNodeStatusToNodeVitality( return entry } -func (m MockNodeLiveness) ScanNodeVitalityFromCache() livenesspb.NodeVitalityMap { +func (st StatusTracker) ScanNodeVitalityFromCache() livenesspb.NodeVitalityMap { isLiveMap := livenesspb.NodeVitalityMap{} - for nodeID, status := range m.statusMap { + now := st.clock.Now() + for nodeID := range st.nodeStatusMap { nid := roachpb.NodeID(nodeID) - now := m.clock.Now() - isLiveMap[nid] = convertNodeStatusToNodeVitality(nid, status, now) + isLiveMap[nid] = st.convertToNodeVitality(nodeID, now) } return isLiveMap } diff --git a/pkg/kv/kvserver/asim/state/liveness_test.go b/pkg/kv/kvserver/asim/state/liveness_test.go index 153a61777680..c2ef6a6c49a3 100644 --- a/pkg/kv/kvserver/asim/state/liveness_test.go +++ b/pkg/kv/kvserver/asim/state/liveness_test.go @@ -10,61 +10,92 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/stretchr/testify/require" ) -func TestMockLiveness(t *testing.T) { +// TestStatusTrackerConversion verifies that StatusTracker correctly converts +// asim's internal status representation (store-level liveness + node-level +// membership/draining) into NodeVitality objects used by the single-metric +// allocator. It checks that the resulting LivenessStatus and IsLive values +// match expectations for various combinations of liveness, membership, and +// draining states. +func TestStatusTrackerConversion(t *testing.T) { clock := &ManualSimClock{nanos: time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC).UnixNano()} - now := hlc.NewClockForTesting(clock).Now() + hlcClock := hlc.NewClockForTesting(clock) + testCases := []struct { - status livenesspb.NodeLivenessStatus - IsAlive bool - MembershipStatus livenesspb.MembershipStatus + desc string + storeLiveness LivenessState + membership livenesspb.MembershipStatus + draining bool + expectIsAlive bool + expectedStatus livenesspb.NodeLivenessStatus }{ { - status: livenesspb.NodeLivenessStatus_UNKNOWN, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "live active", + storeLiveness: LivenessLive, + membership: livenesspb.MembershipStatus_ACTIVE, + expectIsAlive: true, + expectedStatus: livenesspb.NodeLivenessStatus_LIVE, }, { - status: livenesspb.NodeLivenessStatus_DEAD, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "live draining", + storeLiveness: LivenessLive, + membership: livenesspb.MembershipStatus_ACTIVE, + draining: true, + expectIsAlive: true, + expectedStatus: livenesspb.NodeLivenessStatus_DRAINING, }, { - status: livenesspb.NodeLivenessStatus_UNAVAILABLE, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "live decommissioning", + storeLiveness: LivenessLive, + membership: livenesspb.MembershipStatus_DECOMMISSIONING, + expectIsAlive: true, + expectedStatus: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, { - status: livenesspb.NodeLivenessStatus_DECOMMISSIONED, - IsAlive: false, - MembershipStatus: livenesspb.MembershipStatus_DECOMMISSIONED, + desc: "unavailable active", + storeLiveness: LivenessUnavailable, + membership: livenesspb.MembershipStatus_ACTIVE, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, { - status: livenesspb.NodeLivenessStatus_DECOMMISSIONING, - IsAlive: true, - MembershipStatus: livenesspb.MembershipStatus_DECOMMISSIONING, + desc: "dead active", + storeLiveness: LivenessDead, + membership: livenesspb.MembershipStatus_ACTIVE, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_DEAD, }, { - status: livenesspb.NodeLivenessStatus_LIVE, - IsAlive: true, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "dead decommissioned", + storeLiveness: LivenessDead, + membership: livenesspb.MembershipStatus_DECOMMISSIONED, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, { - status: livenesspb.NodeLivenessStatus_DRAINING, - IsAlive: true, - MembershipStatus: livenesspb.MembershipStatus_ACTIVE, + desc: "dead decommissioning (returns DECOMMISSIONED)", + storeLiveness: LivenessDead, + membership: livenesspb.MembershipStatus_DECOMMISSIONING, + expectIsAlive: false, + expectedStatus: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, } for _, tc := range testCases { - t.Run(tc.status.String(), func(t *testing.T) { - nv := convertNodeStatusToNodeVitality(roachpb.NodeID(1), tc.status, now) - require.Equal(t, tc.IsAlive, nv.IsLive(livenesspb.Rebalance)) - require.Equal(t, tc.MembershipStatus, nv.MembershipStatus()) - require.Equal(t, tc.status, nv.LivenessStatus()) + t.Run(tc.desc, func(t *testing.T) { + // Create a StatusTracker with one node and one store. + m := StatusTracker{ + clock: hlcClock, + storeStatusMap: map[StoreID]StoreStatus{1: {Liveness: tc.storeLiveness}}, + nodeStatusMap: map[NodeID]NodeStatus{1: {Membership: tc.membership, Draining: tc.draining}}, + storeToNode: map[StoreID]NodeID{1: 1}, + } + nv := m.convertToNodeVitality(1, hlcClock.Now()) + require.Equal(t, tc.expectIsAlive, nv.IsLive(livenesspb.Rebalance)) + require.Equal(t, tc.membership, nv.MembershipStatus()) + require.Equal(t, tc.expectedStatus, nv.LivenessStatus()) }) } } diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index b6f9a2a7b26f..baae04f26928 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -163,11 +162,21 @@ type State interface { // NextReplicasFn returns a function, that when called will return the current // replicas that exist on the store. NextReplicasFn(StoreID) func() []Replica - // SetNodeLiveness sets the liveness status of the node with ID NodeID to be - // the status given. - SetNodeLiveness(NodeID, livenesspb.NodeLivenessStatus) + // SetStoreStatus sets the liveness for a store directly. + SetStoreStatus(storeID StoreID, status StoreStatus) + // StoreStatus returns the liveness status for a store. + StoreStatus(StoreID) StoreStatus + // SetNodeStatus sets the membership and draining signals for a node. + SetNodeStatus(nodeID NodeID, status NodeStatus) + // NodeStatus returns the membership and draining signals for a node. + NodeStatus(NodeID) NodeStatus + // SetAllStoresLiveness sets the liveness for all stores on a node at once. + // This is useful for DSL commands that operate at the node level. + SetAllStoresLiveness(nodeID NodeID, liveness LivenessState) // NodeLivenessFn returns a function, that when called will return the // liveness of the Node with ID NodeID. + // This is used by the store pool, which is used only by the single-metric + // allocator (not mma). // TODO(kvoli): Find a better home for this method, required by the // storepool. NodeLivenessFn() storepool.NodeLivenessFunc diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index ce77b6441207..628a4255a5fe 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -664,7 +664,11 @@ func TestSetSpanConfig(t *testing.T) { } } -func TestSetNodeLiveness(t *testing.T) { +// TestNodeAndStoreStatus verifies the status tracking APIs: NodeLivenessFn +// (combining store liveness with node membership), NodeCountFn (counting only +// ACTIVE members), and store-level liveness aggregation (node takes the worst +// liveness of its stores). +func TestNodeAndStoreStatus(t *testing.T) { t.Run("liveness func", func(t *testing.T) { s := LoadClusterInfo( ClusterInfoWithStoreCount(3, 1), @@ -673,12 +677,14 @@ func TestSetNodeLiveness(t *testing.T) { liveFn := s.NodeLivenessFn() - s.SetNodeLiveness(1, livenesspb.NodeLivenessStatus_LIVE) - s.SetNodeLiveness(2, livenesspb.NodeLivenessStatus_DEAD) - s.SetNodeLiveness(3, livenesspb.NodeLivenessStatus_DECOMMISSIONED) + // Set node liveness via SetAllStoresLiveness (sets all stores on the node). + // Node 1: live, Node 2: dead, Node 3: dead + decommissioned. + s.SetAllStoresLiveness(1, LivenessLive) + s.SetAllStoresLiveness(2, LivenessDead) + s.SetAllStoresLiveness(3, LivenessDead) + s.SetNodeStatus(3, NodeStatus{Membership: livenesspb.MembershipStatus_DECOMMISSIONED}) - // Liveness status returend should ignore time till store dead or the - // timestamp given. + // Liveness status returned should combine store liveness and node membership. require.Equal(t, livenesspb.NodeLivenessStatus_LIVE, liveFn(1)) require.Equal(t, livenesspb.NodeLivenessStatus_DEAD, liveFn(2)) require.Equal(t, livenesspb.NodeLivenessStatus_DECOMMISSIONED, liveFn(3)) @@ -693,15 +699,32 @@ func TestSetNodeLiveness(t *testing.T) { countFn := s.NodeCountFn() // Set node 1-5 as decommissioned and nodes 6-10 as dead. There should be a - // node count of 5. + // node count of 5 (dead nodes with ACTIVE membership are still counted). for i := 1; i <= 5; i++ { - s.SetNodeLiveness(NodeID(i), livenesspb.NodeLivenessStatus_DECOMMISSIONED) - } - for i := 6; i <= 10; i++ { - s.SetNodeLiveness(NodeID(i), livenesspb.NodeLivenessStatus_DEAD) + s.SetNodeStatus(NodeID(i), NodeStatus{Membership: livenesspb.MembershipStatus_DECOMMISSIONED}) } + // Nodes 6-10 are already dead by default if not set, but we can set them + // explicitly. Their membership remains ACTIVE by default. require.Equal(t, 5, countFn()) }) + + t.Run("store-level liveness", func(t *testing.T) { + s := LoadClusterInfo( + ClusterInfoWithStoreCount(1, 3), // 1 node with 3 stores + config.DefaultSimulationSettings(), + ).(*state) + + // All stores start live. + require.Equal(t, LivenessLive, s.NodeLiveness(1)) + + // Set one store to unavailable - node becomes unavailable. + s.SetStoreStatus(2, StoreStatus{Liveness: LivenessUnavailable}) + require.Equal(t, LivenessUnavailable, s.NodeLiveness(1)) + + // Set another store to dead - node becomes dead. + s.SetStoreStatus(3, StoreStatus{Liveness: LivenessDead}) + require.Equal(t, LivenessDead, s.NodeLiveness(1)) + }) } // TestTopology loads cluster configurations and checks that the topology diff --git a/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go b/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go index 74f72c962761..c53a35b5c565 100644 --- a/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go +++ b/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go @@ -89,11 +89,13 @@ var runAsimTests = envutil.EnvOrDefaultBool("COCKROACH_RUN_ASIM_TESTS", false) // an extra line should follow with the replica placement. A example of // the replica placement is: {s1:*,s2,s3:NON_VOTER}:1 {s4:*,s5,s6}:1. // -// - set_liveness node= liveness=(livenesspb.NodeLivenessStatus) [delay=] -// status=(dead|decommisssioning|draining|unavailable) -// Set the liveness status of the node with ID NodeID. This applies at the -// start of the simulation or with some delay after the simulation starts, -// if specified. +// - set_status store= [delay=] liveness=(live|unavailable|dead) +// - set_status node= [delay=] [liveness=(live|unavailable|dead)] +// [membership=(active|decommissioning|decommissioned)] [draining=] +// Set status signals for stores or nodes. The store= form sets liveness for +// a single store. The node= form can set liveness for all stores on a node +// and/or set membership/draining (which are per-node). Defaults for node=: +// membership=active, draining=false. // // - set_locality node= [delay= or node=") + } return "" case "set_locality": var nodeID int diff --git a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt index f62a5671a650..7f42b0bb416e 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt +++ b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/decommission_conformance.txt @@ -20,7 +20,7 @@ set_span_config delay=5m [0,10000): num_replicas=3 constraints={'+region=a':1,'+region=b':1,'+region=c':1} ---- -set_liveness node=4 liveness=decommissioning delay=5m +set_status node=4 membership=decommissioning delay=5m ---- assertion type=conformance under=0 over=0 unavailable=0 violating=0 diff --git a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt index 9b250376638e..e57be816784d 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt +++ b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/downreplicate_recently_down_node_issue152604.txt @@ -14,14 +14,13 @@ gen_cluster nodes=5 gen_ranges ranges=100 repl_factor=5 min_key=1 max_key=10000 ---- -# Mark n4 and n5 as NodeLivenessStatus_UNAVAILABLE, which is the status -# stores have when down but not down for long enough to be marked as dead. -# The range doesn't lose quorum as a result of this, since three replicas -# are still around. -set_liveness node=4 liveness=unavailable +# Mark n4 and n5 as unavailable, which is the status stores have when down +# but not down for long enough to be marked as dead. The range doesn't lose +# quorum as a result of this, since three replicas are still around. +set_status node=4 liveness=unavailable ---- -set_liveness node=5 liveness=unavailable +set_status node=5 liveness=unavailable ---- # Trigger down-replication to three replicas. diff --git a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt index 8e9945ce02dc..9fcb557ab9b3 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt +++ b/pkg/kv/kvserver/asim/tests/testdata/non_rand/sma/liveness.txt @@ -17,12 +17,12 @@ gen_ranges ranges=700 # n7 is dead and remains dead forever. It will still have its initial (3000) # replicas. -set_liveness node=7 liveness=dead +set_status node=7 liveness=dead ---- # n6 becomes decommissioning after 3 minutes and remains decommissioning # thereafter. -set_liveness node=6 liveness=decommissioning delay=3m +set_status node=6 membership=decommissioning delay=3m ---- # The number of replicas on the dead and decommissioning stores should be 0,