Skip to content

Commit 7571549

Browse files
craig[bot]pav-kv
andcommitted
Merge #130171
130171: replica_rac2: integrate LogTracker r=sumeerbhola a=pav-kv This PR integrates the `LogTracker` into RACv2 `Processor`. The design sketch is below. - `LogTracker` is created when `raft.RawNode` is initialized. It reads the initial stable state from `RawNode.LogMark()`, and initializes the stable and admitted indices to match this mark. - `LogTracker` observes all log storage appends in `handleRaftReady`, and all log and snapshot syncs in `OnLogSync` and `OnSnapSync` handlers. This guarantees that the stable mark in `LogTracker` is always accurate. - `LogTracker` observes all entries subject to admission control, and their corresponding admissions. This allows updating the admitted vector accurately. The admitted vector is sent to the leader from two places: - In `sendRaftMessage`, any successful `MsgAppResp` message is intercepted, and the corresponding `RaftMessageRequest` is annotated with the admitted vector if it is in the coordinate system of the receiver, i.e. has the same leader term. This flow supports the fast path when logical admission happens without delays: by the time the `MsgAppResp` is sent, the entries are already admitted, and the admitted vector has advanced. - In `handleRaftReady`, the admitted vector is sent to the `Piggybacker`, which then attaches them to any `RaftMessageRequestBatch` going to the same node as the receiver replica. This serves cases when admission is lagging the log syncs, and there might be no `MsgAppResp` to attach the vector to. Such admissions are batched into one `Ready` cycle for efficiency reasons. Part of #129508 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents c6956d5 + 51043f9 commit 7571549

File tree

10 files changed

+454
-310
lines changed

10 files changed

+454
-310
lines changed

pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (a AdmittedState) String() string {
6767
}
6868

6969
func (a AdmittedState) SafeFormat(w redact.SafePrinter, _ rune) {
70-
w.Printf("admitted=t%d/%s", a.Term, a.Admitted)
70+
w.Printf("admitted=t%d/%v", a.Term, a.Admitted)
7171
}
7272

7373
func (a PiggybackedAdmittedState) String() string {

pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
"admission.go",
77
"close_scheduler.go",
88
"doc.go",
9+
"log_tracker.go",
910
"processor.go",
1011
"raft_node.go",
1112
],
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright 2024 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package replica_rac2
12+
13+
import (
14+
"context"
15+
"fmt"
16+
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
18+
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
19+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
20+
)
21+
22+
// logTracker wraps rac2.LogTracker with a mutex and state that helps track
23+
// admitted vector changes and schedule their delivery to the leader. The
24+
// semantics and requirements for all the methods is equivalent to the
25+
// corresponding methods of rac2.LogTracker.
26+
//
27+
// The logTracker has its own mutex in order to avoid interference with objects
28+
// that use wider mutexes such as raftMu.
29+
type logTracker struct {
30+
syncutil.Mutex
31+
lt rac2.LogTracker
32+
// dirty is true when the admitted vector has changed and should be sent to
33+
// the leader.
34+
dirty bool
35+
// scheduled is true when the admitted vector change has been scheduled for
36+
// processing by raft Ready.
37+
scheduled bool
38+
}
39+
40+
func (l *logTracker) init(stable rac2.LogMark) {
41+
l.Lock()
42+
defer l.Unlock()
43+
l.lt = rac2.NewLogTracker(stable)
44+
}
45+
46+
// admitted returns the current admitted vector, and a bool indicating whether
47+
// this is the first call observing this particular admitted vector. The caller
48+
// may decide not to send this vector to the leader if it is not new (since it
49+
// has already been sent).
50+
//
51+
// The passed-in bool indicates whether this call is made from the Ready
52+
// handler. In this case the scheduled flag is reset, which allows the next
53+
// logAdmitted call to return true and allow scheduling a Ready iteration again.
54+
// This flow avoids unnecessary Ready scheduling events.
55+
func (l *logTracker) admitted(sched bool) (av rac2.AdmittedVector, dirty bool) {
56+
l.Lock()
57+
defer l.Unlock()
58+
dirty, l.dirty = l.dirty, false
59+
if sched {
60+
l.scheduled = false
61+
}
62+
av = l.lt.Admitted()
63+
return av, dirty
64+
}
65+
66+
func (l *logTracker) append(ctx context.Context, after uint64, to rac2.LogMark) {
67+
l.Lock()
68+
defer l.Unlock()
69+
if l.lt.Append(ctx, after, to) {
70+
l.dirty = true
71+
}
72+
}
73+
74+
func (l *logTracker) register(ctx context.Context, at rac2.LogMark, pri raftpb.Priority) {
75+
l.Lock()
76+
defer l.Unlock()
77+
l.lt.Register(ctx, at, pri)
78+
}
79+
80+
func (l *logTracker) logSynced(ctx context.Context, stable rac2.LogMark) {
81+
l.Lock()
82+
defer l.Unlock()
83+
if l.lt.LogSynced(ctx, stable) {
84+
l.dirty = true
85+
}
86+
}
87+
88+
// logAdmitted returns true if the admitted vector has advanced and must be
89+
// scheduled for delivery to the leader. At the moment, this schedules a Ready
90+
// handling cycle.
91+
//
92+
// The returned bool helps to avoid scheduling Ready many times in a row, in
93+
// situations when there are many consecutive logAdmitted calls. The next
94+
// scheduling event will be allowed after the next admitted(true) call.
95+
func (l *logTracker) logAdmitted(ctx context.Context, at rac2.LogMark, pri raftpb.Priority) bool {
96+
l.Lock()
97+
defer l.Unlock()
98+
if !l.lt.LogAdmitted(ctx, at, pri) {
99+
return false
100+
}
101+
l.dirty = true
102+
if !l.scheduled {
103+
l.scheduled = true
104+
return true
105+
}
106+
return false
107+
}
108+
109+
func (l *logTracker) snapSynced(ctx context.Context, mark rac2.LogMark) {
110+
l.Lock()
111+
defer l.Unlock()
112+
if l.lt.SnapSynced(ctx, mark) {
113+
l.dirty = true
114+
}
115+
}
116+
117+
func (l *logTracker) debugString() string {
118+
l.Lock()
119+
defer l.Unlock()
120+
var flags string
121+
if l.dirty {
122+
flags += "+dirty"
123+
}
124+
if l.scheduled {
125+
flags += "+sched"
126+
}
127+
if len(flags) != 0 {
128+
flags = " [" + flags + "]"
129+
}
130+
return fmt.Sprintf("LogTracker%s: %s", flags, l.lt.DebugString())
131+
}

0 commit comments

Comments
 (0)