Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/kv/kvserver/asim/event/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/history",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// Event outlines the necessary behaviours that event structs must implement.
// Some key implementations of the interface includes assertionEvent,
// SetSpanConfigEvent, AddNodeEvent, SetNodeLivenessEvent,
// SetSpanConfigEvent, AddNodeEvent, SetNodeStatusEvent,
// SetCapacityOverrideEvent.
type Event interface {
// Func returns a closure associated with event which could be an assertion
Expand Down
62 changes: 49 additions & 13 deletions pkg/kv/kvserver/asim/event/mutation_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand All @@ -31,12 +30,29 @@ type AddNodeEvent struct {
LocalityString string
}

// SetNodeLivenessEvent represents a mutation event responsible for setting
// liveness status of a node identified by the NodeID to the specified
// LivenessStatus.
// SetStoreStatusEvent represents a mutation event for setting the liveness
// of a single store.
type SetStoreStatusEvent struct {
StoreID state.StoreID
Status state.StoreStatus
}

// SetNodeStatusEvent represents a mutation event for setting the membership
// and draining signals of a node.
type SetNodeStatusEvent struct {
NodeID state.NodeID
Status state.NodeStatus
}

// SetNodeLivenessEvent represents a mutation event for setting the liveness
// of all stores on a node at once. This exists because DSL commands are parsed
// before the simulation runs, at which point we don't yet know which stores
// exist on each node - that's determined when the cluster is built. So we
// can't expand "node=X liveness=Y" into individual store events at parse time;
// we must defer to runtime when the state knows the node's stores.
type SetNodeLivenessEvent struct {
NodeId state.NodeID
LivenessStatus livenesspb.NodeLivenessStatus
NodeID state.NodeID
Liveness state.LivenessState
}

// SetCapacityOverrideEvent represents a mutation event responsible for updating
Expand All @@ -63,6 +79,8 @@ type SetSimulationSettingsEvent struct {

var _ Event = &SetSpanConfigEvent{}
var _ Event = &AddNodeEvent{}
var _ Event = &SetStoreStatusEvent{}
var _ Event = &SetNodeStatusEvent{}
var _ Event = &SetNodeLivenessEvent{}
var _ Event = &SetCapacityOverrideEvent{}
var _ Event = &SetNodeLocalityEvent{}
Expand Down Expand Up @@ -149,17 +167,35 @@ func (ae AddNodeEvent) String() string {
return buf.String()
}

func (sne SetNodeLivenessEvent) Func() EventFunc {
func (ssse SetStoreStatusEvent) Func() EventFunc {
return MutationFunc(func(ctx context.Context, s state.State) {
s.SetStoreStatus(ssse.StoreID, ssse.Status)
})
}

func (ssse SetStoreStatusEvent) String() string {
return fmt.Sprintf("set s%d liveness=%v", ssse.StoreID, ssse.Status.Liveness)
}

func (snse SetNodeStatusEvent) Func() EventFunc {
return MutationFunc(func(ctx context.Context, s state.State) {
s.SetNodeStatus(snse.NodeID, snse.Status)
})
}

func (snse SetNodeStatusEvent) String() string {
return fmt.Sprintf("set n%d membership=%v, draining=%t",
snse.NodeID, snse.Status.Membership, snse.Status.Draining)
}

func (snle SetNodeLivenessEvent) Func() EventFunc {
return MutationFunc(func(ctx context.Context, s state.State) {
s.SetNodeLiveness(
sne.NodeId,
sne.LivenessStatus,
)
s.SetAllStoresLiveness(snle.NodeID, snle.Liveness)
})
}

func (sne SetNodeLivenessEvent) String() string {
return fmt.Sprintf("set n%d to %v", sne.NodeId, sne.LivenessStatus)
func (snle SetNodeLivenessEvent) String() string {
return fmt.Sprintf("set n%d liveness=%v (all stores)", snle.NodeID, snle.Liveness)
}

func (sce SetCapacityOverrideEvent) Func() EventFunc {
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/asim/queue/lease_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -50,7 +49,7 @@ func TestLeaseQueue(t *testing.T) {
replicaCounts map[state.StoreID]int
spanConfig roachpb.SpanConfig
initialRF int
nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus
nodeStatusOverrides map[state.NodeID]state.NodeStatus // membership, draining
nodeLocalities map[state.NodeID]roachpb.Locality
ticks []int64
expectedLeaseCounts map[int64]map[int]int
Expand Down Expand Up @@ -111,8 +110,8 @@ func TestLeaseQueue(t *testing.T) {
1: singleLocality("region", "a"),
2: singleLocality("region", "b"),
},
nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{
2: livenesspb.NodeLivenessStatus_DRAINING},
nodeStatusOverrides: map[state.NodeID]state.NodeStatus{
2: {Draining: true}},
ticks: []int64{5, 10, 15},
expectedLeaseCounts: map[int64]map[int]int{
5: {1: 10, 2: 0},
Expand Down Expand Up @@ -150,8 +149,8 @@ func TestLeaseQueue(t *testing.T) {
)
s.TickClock(start)

for nodeID, livenessStatus := range tc.nonLiveNodes {
s.SetNodeLiveness(nodeID, livenessStatus)
for nodeID, status := range tc.nodeStatusOverrides {
s.SetNodeStatus(nodeID, status)
}

for nodeID, locality := range tc.nodeLocalities {
Expand Down
32 changes: 18 additions & 14 deletions pkg/kv/kvserver/asim/queue/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func TestReplicateQueue(t *testing.T) {
}

testCases := []struct {
desc string
replicaCounts map[state.StoreID]int
spanConfig roachpb.SpanConfig
initialRF int
nonLiveNodes map[state.NodeID]livenesspb.NodeLivenessStatus
nodeLocalities map[state.NodeID]roachpb.Locality
ticks []int64
expectedReplCounts map[int64]map[int]int
desc string
replicaCounts map[state.StoreID]int
spanConfig roachpb.SpanConfig
initialRF int
nodeLivenessOverrides map[state.NodeID]state.LivenessState // liveness for all stores
nodeStatusOverrides map[state.NodeID]state.NodeStatus // membership, draining
nodeLocalities map[state.NodeID]roachpb.Locality
ticks []int64
expectedReplCounts map[int64]map[int]int
}{
{
// NB: Expect no action, range counts are balanced.
Expand Down Expand Up @@ -208,8 +209,8 @@ func TestReplicateQueue(t *testing.T) {
NumReplicas: 3,
NumVoters: 3,
},
nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{
3: livenesspb.NodeLivenessStatus_DEAD},
nodeLivenessOverrides: map[state.NodeID]state.LivenessState{
3: state.LivenessDead},
ticks: []int64{5, 10, 15},
expectedReplCounts: map[int64]map[int]int{
5: {1: 1, 2: 1, 3: 1, 4: 0},
Expand All @@ -227,8 +228,8 @@ func TestReplicateQueue(t *testing.T) {
NumReplicas: 3,
NumVoters: 3,
},
nonLiveNodes: map[state.NodeID]livenesspb.NodeLivenessStatus{
3: livenesspb.NodeLivenessStatus_DECOMMISSIONING},
nodeStatusOverrides: map[state.NodeID]state.NodeStatus{
3: {Membership: livenesspb.MembershipStatus_DECOMMISSIONING}},
ticks: []int64{5, 10, 15, 20, 25},
expectedReplCounts: map[int64]map[int]int{
5: {1: 10, 2: 10, 3: 10, 4: 0},
Expand Down Expand Up @@ -299,8 +300,11 @@ func TestReplicateQueue(t *testing.T) {
)
s.TickClock(start)

for nodeID, livenessStatus := range tc.nonLiveNodes {
s.SetNodeLiveness(nodeID, livenessStatus)
for nodeID, liveness := range tc.nodeLivenessOverrides {
s.SetAllStoresLiveness(nodeID, liveness)
}
for nodeID, status := range tc.nodeStatusOverrides {
s.SetNodeStatus(nodeID, status)
}

for nodeID, locality := range tc.nodeLocalities {
Expand Down
73 changes: 55 additions & 18 deletions pkg/kv/kvserver/asim/state/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type state struct {
stores map[StoreID]*store
load map[RangeID]ReplicaLoad
loadsplits map[StoreID]LoadSplitter
nodeLiveness MockNodeLiveness
statusTracker StatusTracker
capacityChangeListeners []CapacityChangeListener
newCapacityListeners []NewCapacityListener
configChangeListeners []ConfigChangeListener
Expand Down Expand Up @@ -81,9 +81,11 @@ func newState(settings *config.SimulationSettings) *state {
usageInfo: newClusterUsageInfo(),
settings: settings,
}
s.nodeLiveness = MockNodeLiveness{
clock: hlc.NewClockForTesting(s.clock),
statusMap: map[NodeID]livenesspb.NodeLivenessStatus{},
s.statusTracker = StatusTracker{
clock: hlc.NewClockForTesting(s.clock),
storeStatusMap: map[StoreID]StoreStatus{},
nodeStatusMap: map[NodeID]NodeStatus{},
storeToNode: map[StoreID]NodeID{},
}

s.load = map[RangeID]ReplicaLoad{FirstRangeID: NewReplicaLoadCounter(s.clock)}
Expand Down Expand Up @@ -436,7 +438,7 @@ func (s *state) AddNode(nodeCPUCapacity int64, locality roachpb.Locality) Node {
as: mmaintegration.NewAllocatorSync(sp, mmAllocator, s.settings.ST, nil),
}
s.nodes[nodeID] = node
s.SetNodeLiveness(nodeID, livenesspb.NodeLivenessStatus_LIVE)
s.SetNodeStatus(nodeID, NodeStatus{Membership: livenesspb.MembershipStatus_ACTIVE})
s.SetNodeLocality(nodeID, locality)
s.SetNodeCPURateCapacity(nodeID, nodeCPUCapacity)
return node
Expand Down Expand Up @@ -587,6 +589,10 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) {
node.stores = append(node.stores, storeID)
s.stores[storeID] = store

// Register the store with the liveness tracker, associating
// this store with its node.
s.statusTracker.registerStore(storeID, nodeID)

// Add a range load splitter for this store.
s.loadsplits[storeID] = NewSplitDecider(s.settings)

Expand Down Expand Up @@ -1177,34 +1183,65 @@ func (s *state) NextReplicasFn(storeID StoreID) func() []Replica {
return nextReplFn
}

// SetNodeLiveness sets the liveness status of the node with ID NodeID to be
// the status given.
func (s *state) SetNodeLiveness(nodeID NodeID, status livenesspb.NodeLivenessStatus) {
s.nodeLiveness.statusMap[nodeID] = status
// SetStoreStatus sets the liveness for a store.
func (s *state) SetStoreStatus(storeID StoreID, status StoreStatus) {
// NB: the store->node map entry was created when the store
// was created, so we don't need to create it here.
s.statusTracker.storeStatusMap[storeID] = status
}

// StoreStatus returns the liveness status for a store.
func (s *state) StoreStatus(storeID StoreID) StoreStatus {
return s.statusTracker.storeStatusMap[storeID]
}

// SetNodeStatus sets the membership and draining signals for a node.
func (s *state) SetNodeStatus(nodeID NodeID, status NodeStatus) {
s.statusTracker.nodeStatusMap[nodeID] = status
}

// NodeStatus returns the membership and draining signals for a node.
func (s *state) NodeStatus(nodeID NodeID) NodeStatus {
return s.statusTracker.nodeStatusMap[nodeID]
}

// SetAllStoresLiveness sets the liveness for all stores on a node at once.
func (s *state) SetAllStoresLiveness(nodeID NodeID, liveness LivenessState) {
node, ok := s.nodes[nodeID]
if !ok {
return
}
for _, storeID := range node.stores {
s.statusTracker.storeStatusMap[storeID] = StoreStatus{Liveness: liveness}
}
}

// NodeLiveness returns the aggregated liveness for a node, which is
// the "worst" state. In effect, if one store is doing poorly, we
// report this node as doing as poorly. This is needed for the single-
// metric allocator, which thinks about liveness at the node level.
func (s *state) NodeLiveness(nodeID NodeID) LivenessState {
return s.statusTracker.worstLivenessForStoresOnNode(nodeID)
}

// NodeLivenessFn returns a function, that when called will return the
// liveness of the Node with ID NodeID.
// TODO(kvoli): Find a better home for this method, required by the storepool.
func (s *state) NodeLivenessFn() storepool.NodeLivenessFunc {
return func(nid roachpb.NodeID) livenesspb.NodeLivenessStatus {
return s.nodeLiveness.statusMap[NodeID(nid)]
return s.statusTracker.convertToNodeVitality(NodeID(nid), s.statusTracker.clock.Now()).LivenessStatus()
}
}

// NodeCountFn returns a function, that when called will return the current
// number of nodes that exist in this state.
// TODO(kvoli): Find a better home for this method, required by the storepool.
// TODO(wenyihu6): introduce the concept of membership separated from the
// liveness map.
func (s *state) NodeCountFn() storepool.NodeCountFunc {
return func() int {
count := 0
for _, status := range s.nodeLiveness.statusMap {
// Nodes with a liveness status other than decommissioned or
// decommissioning are considered active members (see
// liveness.MembershipStatus).
if status != livenesspb.NodeLivenessStatus_DECOMMISSIONED && status != livenesspb.NodeLivenessStatus_DECOMMISSIONING {
for _, ns := range s.statusTracker.nodeStatusMap {
// Only nodes with ACTIVE membership are counted.
if ns.Membership.Active() {
count++
}
}
Expand Down Expand Up @@ -1356,7 +1393,7 @@ func (s *state) Scan(
// state of ranges.
func (s *state) Report() roachpb.SpanConfigConformanceReport {
reporter := spanconfigreporter.New(
s.nodeLiveness, s, s, s,
s.statusTracker, s, s, s,
s.settings.ST, &spanconfig.TestingKnobs{})
report, err := reporter.SpanConfigConformance(context.Background(), []roachpb.Span{{}})
if err != nil {
Expand Down
Loading