Skip to content

Commit babaab7

Browse files
craig[bot]arulajmani
andcommitted
Merge #130025
130025: raft: introduce and make use of SupportTracker r=nvanbenschoten a=arulajmani This patch adds a new type of tracker called a SupportTracker. Then, leaders make use of this tracker to record fortification support from followers on successful MsgFortifyLeaderResp messages. Closes #125264 Release note: None Co-authored-by: Arul Ajmani <[email protected]>
2 parents 870c5fb + d498ae3 commit babaab7

14 files changed

+658
-36
lines changed

pkg/raft/node_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ func TestNodePropose(t *testing.T) {
162162
n.Propose(context.TODO(), []byte("somedata"))
163163
n.Stop()
164164

165-
require.Len(t, msgs, 1)
166-
assert.Equal(t, raftpb.MsgProp, msgs[0].Type)
167-
assert.Equal(t, []byte("somedata"), msgs[0].Entries[0].Data)
165+
require.Len(t, msgs, 2)
166+
assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type)
167+
assert.Equal(t, raftpb.MsgProp, msgs[1].Type)
168+
assert.Equal(t, []byte("somedata"), msgs[1].Entries[0].Data)
168169
}
169170

170171
// TestDisableProposalForwarding ensures that proposals are not forwarded to
@@ -230,9 +231,10 @@ func TestNodeProposeConfig(t *testing.T) {
230231
n.ProposeConfChange(context.TODO(), cc)
231232
n.Stop()
232233

233-
require.Len(t, msgs, 1)
234-
assert.Equal(t, raftpb.MsgProp, msgs[0].Type)
235-
assert.Equal(t, ccdata, msgs[0].Entries[0].Data)
234+
require.Len(t, msgs, 2)
235+
assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type)
236+
assert.Equal(t, raftpb.MsgProp, msgs[1].Type)
237+
assert.Equal(t, ccdata, msgs[1].Entries[0].Data)
236238
}
237239

238240
// TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
@@ -378,7 +380,8 @@ func TestNodeProposeWaitDropped(t *testing.T) {
378380
assert.Equal(t, ErrProposalDropped, n.Propose(context.Background(), droppingMsg))
379381

380382
n.Stop()
381-
require.Empty(t, msgs)
383+
require.Len(t, msgs, 1)
384+
assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type)
382385
}
383386

384387
// TestNodeTick ensures that node.Tick() will increase the

pkg/raft/quorum/quorum_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,37 +35,37 @@ func TestLeadSupportExpiration(t *testing.T) {
3535
testCases := []struct {
3636
ids []pb.PeerID
3737
support map[pb.PeerID]hlc.Timestamp
38-
expQSE hlc.Timestamp
38+
exp hlc.Timestamp
3939
}{
4040
{
4141
ids: []pb.PeerID{1, 2, 3},
4242
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)},
43-
expQSE: ts(15),
43+
exp: ts(15),
4444
},
4545
{
4646
ids: []pb.PeerID{1, 2, 3, 4},
4747
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20)},
48-
expQSE: ts(15),
48+
exp: ts(15),
4949
},
5050
{
5151
ids: []pb.PeerID{1, 2, 3, 4, 5},
5252
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20), 5: ts(20)},
53-
expQSE: ts(20),
53+
exp: ts(20),
5454
},
5555
{
5656
ids: []pb.PeerID{1, 2, 3},
5757
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20)},
58-
expQSE: ts(10),
58+
exp: ts(10),
5959
},
6060
{
6161
ids: []pb.PeerID{1, 2, 3},
6262
support: map[pb.PeerID]hlc.Timestamp{1: ts(10)},
63-
expQSE: hlc.Timestamp{},
63+
exp: hlc.Timestamp{},
6464
},
6565
{
6666
ids: []pb.PeerID{},
6767
support: map[pb.PeerID]hlc.Timestamp{},
68-
expQSE: hlc.MaxTimestamp,
68+
exp: hlc.MaxTimestamp,
6969
},
7070
}
7171

@@ -75,14 +75,14 @@ func TestLeadSupportExpiration(t *testing.T) {
7575
m[id] = struct{}{}
7676
}
7777

78-
require.Equal(t, tc.expQSE, m.LeadSupportExpiration(tc.support))
78+
require.Equal(t, tc.exp, m.LeadSupportExpiration(tc.support))
7979
}
8080
}
8181

82-
// TestComputeQSEJointConfig ensures that the QSE is calculated correctly for
83-
// joint configurations. In particular, it's the minimum of the two majority
84-
// configs.
85-
func TestComputeQSEJointConfig(t *testing.T) {
82+
// TestLeadSupportExpirationJointConfig ensures that the LeadSupportExpiration
83+
// is calculated correctly for joint configurations. In particular, it's the
84+
// minimum of the two majority configs.
85+
func TestLeadSupportExpirationJointConfig(t *testing.T) {
8686
defer leaktest.AfterTest(t)()
8787
defer log.Scope(t).Close(t)
8888

@@ -96,31 +96,31 @@ func TestComputeQSEJointConfig(t *testing.T) {
9696
cfg1 []pb.PeerID
9797
cfg2 []pb.PeerID
9898
support map[pb.PeerID]hlc.Timestamp
99-
expQSE hlc.Timestamp
99+
exp hlc.Timestamp
100100
}{
101101
{
102102
cfg1: []pb.PeerID{1, 2, 3},
103103
cfg2: []pb.PeerID{},
104104
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)},
105-
expQSE: ts(15), // cfg2 is empty, should behave like the (cfg1) majority config case
105+
exp: ts(15), // cfg2 is empty, should behave like the (cfg1) majority config case
106106
},
107107
{
108108
cfg1: []pb.PeerID{},
109109
cfg2: []pb.PeerID{1, 2, 3},
110110
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)},
111-
expQSE: ts(15), // cfg1 is empty, should behave like the (cfg2) majority config case
111+
exp: ts(15), // cfg1 is empty, should behave like the (cfg2) majority config case
112112
},
113113
{
114114
cfg1: []pb.PeerID{3, 4, 5},
115115
cfg2: []pb.PeerID{1, 2, 3},
116116
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20), 5: ts(25)},
117-
expQSE: ts(15), // lower of the two
117+
exp: ts(15), // lower of the two
118118
},
119119
{
120120
cfg1: []pb.PeerID{3, 4, 5},
121121
cfg2: []pb.PeerID{1, 2, 3},
122122
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(10), 5: ts(10)},
123-
expQSE: ts(10), // lower of the two; this time, cfg2 has the lower qse
123+
exp: ts(10), // lower of the two; this time, cfg2 has the lower expiration
124124
},
125125
}
126126

@@ -136,6 +136,6 @@ func TestComputeQSEJointConfig(t *testing.T) {
136136
j[1][id] = struct{}{}
137137
}
138138

139-
require.Equal(t, tc.expQSE, j.LeadSupportExpiration(tc.support))
139+
require.Equal(t, tc.exp, j.LeadSupportExpiration(tc.support))
140140
}
141141
}

pkg/raft/raft.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ type raft struct {
336336
config quorum.Config
337337
trk tracker.ProgressTracker
338338
electionTracker tracker.ElectionTracker
339+
supportTracker tracker.SupportTracker
339340

340341
state StateType
341342

@@ -469,7 +470,8 @@ func newRaft(c *Config) *raft {
469470
}
470471
lastID := r.raftLog.lastEntryID()
471472

472-
r.electionTracker = tracker.MakeVoteTracker(&r.config)
473+
r.electionTracker = tracker.MakeElectionTracker(&r.config)
474+
r.supportTracker = tracker.MakeSupportTracker(&r.config, r.storeLiveness)
473475

474476
cfg, progressMap, err := confchange.Restore(confchange.Changer{
475477
Config: quorum.MakeEmptyConfig(),
@@ -726,8 +728,14 @@ func (r *raft) sendFortify(to pb.PeerID) {
726728
epoch, live := r.storeLiveness.SupportFor(r.lead)
727729
if live {
728730
r.leadEpoch = epoch
729-
// TODO(arul): For now, we're not recording any support on the leader. Do
730-
// this once we implement handleFortifyResp correctly.
731+
// The leader needs to persist the LeadEpoch durably before it can start
732+
// supporting itself. We do so by sending a self-addressed
733+
// MsgFortifyLeaderResp message so that it is added to the msgsAfterAppend
734+
// slice and delivered back to this node only after LeadEpoch is
735+
// persisted. At that point, this node can record support without
736+
// discrimination for who is providing support (itself vs. other
737+
// follower).
738+
r.send(pb.Message{To: r.id, Type: pb.MsgFortifyLeaderResp, LeadEpoch: epoch})
731739
} else {
732740
r.logger.Infof(
733741
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
@@ -858,6 +866,7 @@ func (r *raft) reset(term uint64) {
858866
r.abortLeaderTransfer()
859867

860868
r.electionTracker.ResetVotes()
869+
r.supportTracker.Reset()
861870
r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) {
862871
*pr = tracker.Progress{
863872
Match: 0,
@@ -2096,8 +2105,14 @@ func (r *raft) handleFortify(m pb.Message) {
20962105

20972106
func (r *raft) handleFortifyResp(m pb.Message) {
20982107
assertTrue(r.state == StateLeader, "only leaders should be handling fortification responses")
2099-
// TODO(arul): record support once
2100-
// https://github.com/cockroachdb/cockroach/issues/125264 lands.
2108+
if m.Reject {
2109+
// Couldn't successfully fortify the follower. Typically, this happens when
2110+
// the follower isn't supporting the leader's store in StoreLiveness or the
2111+
// follower is down. We'll try to fortify the follower again later in
2112+
// tickHeartbeat.
2113+
return
2114+
}
2115+
r.supportTracker.RecordSupport(m.From, m.LeadEpoch)
21012116
}
21022117

21032118
// deFortify (conceptually) revokes previously provided fortification support to

pkg/raft/rafttest/interaction_env_handler.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,17 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
262262
// Explanation:
263263
// 1 (from_store) grants support for 2 (for_store) at a higher epoch.
264264
err = env.handleGrantSupport(t, d)
265+
case "print-support-state":
266+
// Prints the support state being tracked by a raft leader. Empty on a
267+
// follower.
268+
//
269+
// print-support-state id
270+
// Arguments are:
271+
// id - id of the raft peer whose support map to print.
272+
//
273+
// Example:
274+
// print-support-state 1
275+
err = env.handlePrintSupportState(t, d)
265276

266277
default:
267278
err = fmt.Errorf("unknown command")

pkg/raft/rafttest/interaction_env_handler_raftstate.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package rafttest
1919

2020
import (
2121
"fmt"
22+
"testing"
2223

2324
"github.com/cockroachdb/cockroach/pkg/raft"
2425
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
26+
"github.com/cockroachdb/datadriven"
2527
)
2628

2729
// isVoter checks whether node id is in the voter list within st.
@@ -51,3 +53,11 @@ func (env *InteractionEnv) handleRaftState() error {
5153
}
5254
return nil
5355
}
56+
57+
// handlePrintSupportState pretty-prints the support map being tracked by a raft
58+
// peer.
59+
func (env *InteractionEnv) handlePrintSupportState(t *testing.T, d datadriven.TestData) error {
60+
idx := firstAsNodeIdx(t, d)
61+
fmt.Fprint(env.Output, env.Nodes[idx].TestingSupportStateString())
62+
return nil
63+
}

pkg/raft/rawnode.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -499,9 +499,6 @@ func (rn *RawNode) LeadSupportStatus() LeadSupportStatus {
499499
return getLeadSupportStatus(rn.raft)
500500
}
501501

502-
// TODO(nvanbenschoten): remove this one the method is used.
503-
var _ = (*RawNode).LeadSupportStatus
504-
505502
// ProgressType indicates the type of replica a Progress corresponds to.
506503
type ProgressType byte
507504

@@ -544,3 +541,7 @@ func (rn *RawNode) ForgetLeader() error {
544541
func (rn *RawNode) TestingStepDown() error {
545542
return rn.raft.testingStepDown()
546543
}
544+
545+
func (rn *RawNode) TestingSupportStateString() string {
546+
return rn.raft.supportTracker.String()
547+
}

pkg/raft/status.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func getStatus(r *raft) Status {
131131
// NOTE: we assign to LeadSupportUntil even if RaftState is not currently
132132
// StateLeader. The replica may have been the leader and stepped down to a
133133
// follower before its lead support ran out.
134-
s.LeadSupportUntil = hlc.Timestamp{} // TODO(arul): populate this field
134+
s.LeadSupportUntil = r.supportTracker.LeadSupportUntil()
135135
return s
136136
}
137137

@@ -155,7 +155,7 @@ func getSparseStatus(r *raft) SparseStatus {
155155
func getLeadSupportStatus(r *raft) LeadSupportStatus {
156156
var s LeadSupportStatus
157157
s.BasicStatus = getBasicStatus(r)
158-
s.LeadSupportUntil = hlc.Timestamp{} // TODO(arul): populate this field
158+
s.LeadSupportUntil = r.supportTracker.LeadSupportUntil()
159159
return s
160160
}
161161

pkg/raft/testdata/async_storage_writes.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ stabilize
9494
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
9595
1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 LeadEpoch:1 Entries:[1/11 EntryNormal ""] Responses:[
9696
1->1 MsgAppResp Term:1 Log:0/11 Commit:10
97+
1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1
9798
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
9899
]
99100
> 2 receiving messages
@@ -107,6 +108,7 @@ stabilize
107108
1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 LeadEpoch:1 Entries:[1/11 EntryNormal ""]
108109
Responses:
109110
1->1 MsgAppResp Term:1 Log:0/11 Commit:10
111+
1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1
110112
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
111113
> 2 handling Ready
112114
Ready MustSync=true:
@@ -132,6 +134,7 @@ stabilize
132134
]
133135
> 1 receiving messages
134136
1->1 MsgAppResp Term:1 Log:0/11 Commit:10
137+
1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1
135138
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
136139
> 2 processing append thread
137140
Processing:

pkg/raft/testdata/async_storage_writes_append_aba_race.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ Messages:
214214
3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
215215
3->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Vote:3 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] Responses:[
216216
3->3 MsgAppResp Term:2 Log:0/12 Commit:11
217+
3->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1
217218
AppendThread->3 MsgStorageAppendResp Term:0 Log:2/12
218219
]
219220

@@ -383,6 +384,7 @@ Messages:
383384
4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
384385
4->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Vote:4 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] Responses:[
385386
4->4 MsgAppResp Term:3 Log:0/12 Commit:11
387+
4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1
386388
AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12
387389
]
388390

0 commit comments

Comments
 (0)